Python 中使用 aiohttp 创建自定义 HTTP 客户端并拦截请求
Python 中使用 aiohttp 创建自定义 HTTP 客户端并拦截请求
aiohttp 是 Python 中用于异步 HTTP 请求的核心库,支持高并发、非阻塞的请求处理。通过自定义 ClientSession 或中间件(Middleware),可以灵活拦截请求和响应,适用于 API 调试、Mock 测试或请求日志记录等场景。以下是详细实现方案:
1. 基础:自定义 ClientSession 拦截请求
通过继承 aiohttp.ClientSession 或直接使用 request 方法钩子,在请求发送前/后插入逻辑。
示例:拦截请求并修改 Headers
python
import aiohttp
import asyncio
class CustomClientSession(aiohttp.ClientSession):
async def request(self, method, url, **kwargs):
# 1. 请求拦截点:修改请求参数
print(f"⚠️ Intercepted request to {url}")
kwargs.setdefault('headers', {}).update({'X-Custom-Header': 'aiohttp-Interceptor'})
# 2. 调用父类方法发送请求
response = await super().request(method, url, **kwargs)
# 3. 响应拦截点:处理响应数据(示例:记录状态码)
print(f"✅ Response status: {response.status}")
return response
async def fetch_data():
async with CustomClientSess