aiohttp - 基于 asyncio 的异步 HTTP 客户端与服务端
aiohttp 是一个基于 asyncio 的异步 HTTP 库,提供高性能的客户端与服务端实现,常用于高并发网络请求与异步爬虫。
aiohttp - 基于 asyncio 的异步 HTTP 客户端与服务端
📝 概述
aiohttp 是一个基于 asyncio 的异步 HTTP 库,提供客户端和服务端两部分功能。与 requests 等同步库不同,aiohttp 通过协程并发有效提升 IO 密集型任务效率,广泛用于高并发爬虫、接口调用、实时数据拉取等场景。
🎯 学习目标
- 了解 aiohttp 的安装方式与基本概念
- 掌握客户端常用 API(会话、请求、响应处理)
- 学会设置超时、并发控制等实用技巧
- 能够实现一个简单的异步爬虫实战
📋 前置知识
- Python 3.7+ 的 asyncio 协程基础
- 基本的 HTTP 请求/响应概念
🔍 详细内容
安装
# 使用 pip 安装
pip install aiohttp
# 使用 conda 安装(推荐在 Anaconda/Miniconda 环境中)
# 来自 conda-forge 源
conda install -c conda-forge aiohttp
同步 vs 异步对比
- 同步 requests:请求需要等待响应返回再发起下一次请求,吞吐低。
- 异步 aiohttp:通过协程并发,任务在等待 IO 时主动让出控制权,提高吞吐量。
# 同步请求示例(requests)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 作者:钢铁知识库
import time
import datetime
import requests
# 同步请求
def main():
start = time.time()
for i in range(5):
res = requests.get('http://httpbin.org/delay/2')
print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status_code}')
print(f'requests同步耗时:{time.time() - start}')
if __name__ == '__main__':
main()
# 异步请求示例(aiohttp + asyncio)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 作者:钢铁知识库
import asyncio
import time
import datetime
import aiohttp
async def async_http():
# 声明一个支持异步的上下文管理器
async with aiohttp.ClientSession() as session:
res = await session.get('http://httpbin.org/delay/2')
print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')
async def main():
tasks = [async_http() for _ in range(5)]
start = time.time()
await asyncio.gather(*tasks)
print(f'aiohttp异步耗时:{time.time() - start}')
if __name__ == '__main__':
# Python 3.7 及以后可直接使用 asyncio.run
asyncio.run(main())
客户端基础
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 作者:钢铁知识库
import asyncio
import aiohttp
async def get_api(session, url):
# 声明一个支持异步的上下文管理器
async with session.get(url) as response:
# 读取响应文本需要 await,因为是协程
return await response.text(), response.status
async def main():
async with aiohttp.ClientSession() as session:
html, status = await get_api(session, 'http://httpbin.org/delay/2')
print(f'html: {html[:50]}')
print(f'status : {status}')
if __name__ == '__main__':
asyncio.run(main())
请求参数与多种方法
- URL 查询参数:通过 params 传入字典
- 常用方法:GET、POST、PUT、DELETE、HEAD、OPTIONS、PATCH
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
async def main():
params = {'name': '钢铁知识库', 'age': 23}
async with aiohttp.ClientSession() as session:
async with session.get('https://www.httpbin.org/get', params=params) as res:
print(await res.json()) # 读取 JSON 需要 await
if __name__ == '__main__':
asyncio.run(main())
# 其它方法示例
session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')
响应读取方式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
async def main():
data = {'name': '钢铁知识库', 'age': 23}
async with aiohttp.ClientSession() as session:
async with session.post('https://www.httpbin.org/post', data=data) as response:
print('status:', response.status) # 状态码
print('headers:', response.headers) # 响应头
print('body:', await response.text()) # 文本
print('bytes:', await response.read()) # 二进制
print('json:', await response.json()) # JSON
if __name__ == '__main__':
asyncio.run(main())
超时设置
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import aiohttp
import asyncio
async def main():
# 设置 1 秒的超时
timeout = aiohttp.ClientTimeout(total=1)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://www.httpbin.org/delay/2') as response:
print('status:', response.status)
if __name__ == '__main__':
asyncio.run(main())
并发限制(Semaphore)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 作者:钢铁知识库
import asyncio
from datetime import datetime
import aiohttp
# 声明最大并发量
semaphore = asyncio.Semaphore(2)
async def get_api(session):
async with semaphore:
print(f'scrapting...{datetime.now()}')
async with session.get('https://www.baidu.com') as response:
await asyncio.sleep(2)
async def main():
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(get_api(session)) for _ in range(10)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
💡 实际应用:异步小说爬虫
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 作者:钢铁知识库
import asyncio
import json
import logging
import aiohttp
import requests
# 日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
# 章节目录api
b_id = '4308080950'
url = f'https://dushu.baidu.com/api/pc/getCatalog?data={{"book_id":"{b_id}"}}'
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36"
}
# 并发声明
semaphore = asyncio.Semaphore(5)
async def download(title, b_id, cid):
# 组装详情接口 URL
data = json.dumps({"book_id": b_id, "cid": f"{b_id}|{cid}"})
detail_url = f'https://dushu.baidu.com/api/pc/getChapterContent?data={data}'
async with semaphore:
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(detail_url) as response:
res = await response.json()
# 这里写入数据库(略)
logging.info('title=%s, length=%s', title, len(res['data']['novel']['content']))
async def main():
res = requests.get(url, headers=headers)
tasks = []
for item in res.json()['data']['novel']['items']: # 拿到某小说目录cid
title = item['title']
cid = item['cid']
tasks.append(download(title, b_id, cid))
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
⚠️ 注意事项
- Python 3.7+ 推荐使用 asyncio.run 启动主协程
- ClientSession 建议复用,频繁创建/销毁会有额外开销
- JSON、text、read 等读取方法均需 await,因为是协程
- 设置合理超时,避免挂起导致资源占用
- 并发量需结合目标站点承载能力,避免过载
🔗 相关内容
📚 扩展阅读
🏷️ 标签
aiohttp asyncio 异步HTTP 客户端 服务器 爬虫
最后更新: 2024-01-15
作者: Python 技术文档工程师
版本: 1.0
进阶技巧与最佳实践(客户端)
更细粒度的超时配置
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 更细粒度的超时:分别控制连接、连接建立、读取超时
import asyncio
import aiohttp
async def main():
# total=None 表示不设置总超时,仅限制各阶段
timeout = aiohttp.ClientTimeout(total=None, connect=3, sock_connect=3, sock_read=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get('https://httpbin.org/delay/2') as resp:
print('status:', resp.status)
except asyncio.TimeoutError:
# 任何阶段触发超时都会抛出 TimeoutError
print('超时!')
if __name__ == '__main__':
asyncio.run(main())
连接池与并发管理(TCPConnector)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 通过连接器限制总并发与单主机并发,并启用 DNS 缓存
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.text()
async def main():
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10, ttl_dns_cache=300)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch(session, 'https://httpbin.org/get') for _ in range(50)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
代理与 SSL 证书
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 代理方式:
# 1) 从系统环境变量读取代理(HTTP_PROXY/HTTPS_PROXY):trust_env=True
# 2) 在请求级别指定 proxy 参数
# 3) 关于 SSL:生产环境建议使用默认验证;如确需跳过验证,可传 ssl=False(不安全)
import asyncio
import aiohttp
async def main():
async with aiohttp.ClientSession(trust_env=True) as session:
# 指定代理示例:proxy='http://127.0.0.1:7890'
async with session.get('https://httpbin.org/ip', proxy=None, ssl=True) as resp:
print(await resp.json())
# 如必须跳过 SSL 验证(不安全,谨慎使用)
async with aiohttp.ClientSession() as session:
async with session.get('https://self-signed.badssl.com/', ssl=False) as resp:
print('status:', resp.status)
if __name__ == '__main__':
asyncio.run(main())
简单重试与指数退避
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 使用简单的 for 循环实现重试与指数退避,避免瞬时网络抖动导致失败
import asyncio
import aiohttp
async def get_with_retry(session, url, retries=3, base_delay=0.5):
for i in range(retries):
try:
async with session.get(url) as resp:
# 可根据状态码决定是否重试,例如 5xx 重试
if 500 <= resp.status < 600:
raise aiohttp.ClientResponseError(
request_info=resp.request_info, history=resp.history,
status=resp.status, message='server error')
return await resp.text()
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if i == retries - 1:
raise
# 指数退避
await asyncio.sleep(base_delay * (2 ** i))
async def main():
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
html = await get_with_retry(session, 'https://httpbin.org/status/500')
print(html[:80])
if __name__ == '__main__':
asyncio.run(main())
流式下载(节省内存)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 使用流式读取按块写入文件,避免一次性读入内存
import asyncio
import aiohttp
async def download(url, filepath):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
resp.raise_for_status()
with open(filepath, 'wb') as f:
async for chunk in resp.content.iter_chunked(8192):
f.write(chunk)
async def main():
await download('https://httpbin.org/image/png', 'image.png')
if __name__ == '__main__':
asyncio.run(main())
服务端快速上手(aiohttp.web)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 最小可运行的 aiohttp 服务端示例
# 访问: http://127.0.0.1:8080/hello
import asyncio
from aiohttp import web
async def hello(request):
name = request.query.get('name', 'world')
return web.json_response({'msg': f'Hello, {name}!'});
async def init_app():
app = web.Application()
app.router.add_get('/hello', hello)
return app
def main():
web.run_app(init_app(), host='127.0.0.1', port=8080)
if __name__ == '__main__':
# Windows/conda 环境可直接运行
main()
# 简单客户端调用示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import asyncio
import aiohttp
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://127.0.0.1:8080/hello', params={'name': 'Trae'}) as resp:
print(await resp.json())
if __name__ == '__main__':
asyncio.run(main())
讨论与反馈
欢迎在下方留言讨论,分享你的学习心得或提出问题。评论基于GitHub Issues,需要GitHub账号。