Python 深入浅出:asyncio 任务调度与并发流量控制实战
在网络爬虫、API 网关以及分布式微服务开发中,我们经常使用 asyncio 来发起成百上千个并发请求。
然而,并发并不是越高越好。如果对并发流量不加控制,瞬间发出过多的请求,不仅会导致目标服务器将你的 IP 拉黑,还会使本地主机因为创建了过多的 Socket 连接而抛出 OSError: Too many open files(文件描述符耗尽)的系统级报错。
因此,在高并发异步编程中,并发流量控制是保障系统稳定性的必修课。
本文将带您深入剖析 asyncio 的任务调度机制,并重点介绍如何使用 Semaphore(信号量) 进行优雅的并发限流实战。
一、 异步任务调度的核心:create_task
在异步协程中,简单的 await 并不会开启并发,而是会进行同步等待:
# 这样会串行等待 3 秒(先等第一个,再等第二个,最后等第三个)
await task1()
await task2()
await task3()
要想实现真正的并发,必须将协程包装为 asyncio.Task 对象,并提交给事件循环调度运行。
asyncio.create_task(coro):将协程封装为任务,提交给事件循环在后台并发执行,并立即返回任务对象。asyncio.gather(*tasks):将多个任务聚拢,并等待它们全部执行完毕,按顺序返回结果。
import asyncio
async def main():
# 提交三个任务,它们开始在后台并发运行
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
t3 = asyncio.create_task(task3())
# 统一等待它们执行完毕
results = await asyncio.gather(t1, t2, t3)
二、 异步并发控流神器:asyncio.Semaphore
当我们有 1000 个网络请求任务时,不能直接一次性执行 asyncio.gather(*1000_tasks)。我们需要对其进行“限流”,控制在任意时刻,并发运行的协程数量不超过指定的上限(如 10 个)。
asyncio.Semaphore(信号量)就是为此设计的。它内部维护一个计数器,每次进入临界区时减 1,退出时加 1。当计数器归 0 时,后续的协程会被阻塞,直到有其他协程退出并释放信号量。
信号量的标准使用模板
import asyncio
# 创建一个信号量,限制最大并发数为 5
sem = asyncio.Semaphore(5)
async def worker(task_id):
# 使用 async with 语法获取信号量
async with sem:
print(f"🔄 任务 {task_id} 进入临界区,当前并发运行中...")
await asyncio.sleep(2) # 模拟 IO 操作
print(f"✅ 任务 {task_id} 执行结束,释放信号量。")
async def main():
# 模拟创建 100 个任务
tasks = [asyncio.create_task(worker(i)) for i in range(20)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
运行该代码,你会发现:无论总任务有多少个,屏幕上每次最多只会同时闪现 5 个任务的处理信息,完美地将最大并发控制在阈值以内。
三、 高阶对比:asyncio.gather vs. asyncio.as_completed
在等待并发任务返回结果时,我们有两种核心方案:
1. asyncio.gather:顺序收集
- 特点:等待所有任务全部结束后统一返回。返回的结果列表顺序与传入的任务顺序完全一致。
- 适用场景:需要按特定顺序消费数据的场景。
2. asyncio.as_completed:先到先得
- 特点:返回一个迭代器,每当有任何一个任务先执行完,就立即 yield 出它的结果。它不保证返回顺序,只保证效率。
- 适用场景:网页爬取(谁先下完谁先入库)、流式处理等。
# 谁先执行完就先打印谁,极大地提升了数据的流式处理效率
for future in asyncio.as_completed(tasks):
result = await future
print("获取到就绪数据:", result)
四、 总结
- 并发限制是生产环境的底线:在进行任何外部 IO 调用、数据库写入时,都要加
Semaphore保护,防止本地系统文件描述符崩溃或将目标服务器压挂。 - 灵活组合调度工具:
- 需要严格的返回顺序,选
gather; - 需要先就绪先处理,选
as_completed; - 需要高级的队列缓存,首选
asyncio.Queue。
熟练掌握这套限流并发组合拳,能让您的 Python 异步程序在高负载场景下依然稳如磐石!
本站所有文章、数据、图片均来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。



暂无评论
还没有人评论过本文,快来发表你的高见吧!