广告
您当前的位置: 首页 >  技术 >  Python

Python 深入浅出:asyncio 任务调度与并发流量控制实战

作者:CoderWang 时间:2026-06-28 阅读数:5人阅读

在网络爬虫、API 网关以及分布式微服务开发中,我们经常使用 asyncio 来发起成百上千个并发请求。

然而,并发并不是越高越好。如果对并发流量不加控制,瞬间发出过多的请求,不仅会导致目标服务器将你的 IP 拉黑,还会使本地主机因为创建了过多的 Socket 连接而抛出 OSError: Too many open files(文件描述符耗尽)的系统级报错。

因此,在高并发异步编程中,并发流量控制是保障系统稳定性的必修课。

本文将带您深入剖析 asyncio 的任务调度机制,并重点介绍如何使用 Semaphore(信号量) 进行优雅的并发限流实战。


一、 异步任务调度的核心:create_task

在异步协程中,简单的 await 并不会开启并发,而是会进行同步等待:

# 这样会串行等待 3 秒(先等第一个,再等第二个,最后等第三个)
await task1()
await task2()
await task3()

要想实现真正的并发,必须将协程包装为 asyncio.Task 对象,并提交给事件循环调度运行。

  • asyncio.create_task(coro):将协程封装为任务,提交给事件循环在后台并发执行,并立即返回任务对象。
  • asyncio.gather(*tasks):将多个任务聚拢,并等待它们全部执行完毕,按顺序返回结果。
import asyncio

async def main():
    # 提交三个任务,它们开始在后台并发运行
    t1 = asyncio.create_task(task1())
    t2 = asyncio.create_task(task2())
    t3 = asyncio.create_task(task3())

    # 统一等待它们执行完毕
    results = await asyncio.gather(t1, t2, t3)

二、 异步并发控流神器:asyncio.Semaphore

当我们有 1000 个网络请求任务时,不能直接一次性执行 asyncio.gather(*1000_tasks)。我们需要对其进行“限流”,控制在任意时刻,并发运行的协程数量不超过指定的上限(如 10 个)

asyncio.Semaphore(信号量)就是为此设计的。它内部维护一个计数器,每次进入临界区时减 1,退出时加 1。当计数器归 0 时,后续的协程会被阻塞,直到有其他协程退出并释放信号量。

信号量的标准使用模板

import asyncio

# 创建一个信号量,限制最大并发数为 5
sem = asyncio.Semaphore(5)

async def worker(task_id):
    # 使用 async with 语法获取信号量
    async with sem:
        print(f"🔄 任务 {task_id} 进入临界区,当前并发运行中...")
        await asyncio.sleep(2)  # 模拟 IO 操作
        print(f"✅ 任务 {task_id} 执行结束,释放信号量。")

async def main():
    # 模拟创建 100 个任务
    tasks = [asyncio.create_task(worker(i)) for i in range(20)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

运行该代码,你会发现:无论总任务有多少个,屏幕上每次最多只会同时闪现 5 个任务的处理信息,完美地将最大并发控制在阈值以内。


三、 高阶对比:asyncio.gather vs. asyncio.as_completed

在等待并发任务返回结果时,我们有两种核心方案:

1. asyncio.gather:顺序收集

  • 特点:等待所有任务全部结束后统一返回。返回的结果列表顺序与传入的任务顺序完全一致
  • 适用场景:需要按特定顺序消费数据的场景。

2. asyncio.as_completed:先到先得

  • 特点:返回一个迭代器,每当有任何一个任务先执行完,就立即 yield 出它的结果。它不保证返回顺序,只保证效率。
  • 适用场景:网页爬取(谁先下完谁先入库)、流式处理等。
# 谁先执行完就先打印谁,极大地提升了数据的流式处理效率
for future in asyncio.as_completed(tasks):
    result = await future
    print("获取到就绪数据:", result)

四、 总结

  1. 并发限制是生产环境的底线:在进行任何外部 IO 调用、数据库写入时,都要加 Semaphore 保护,防止本地系统文件描述符崩溃或将目标服务器压挂。
  2. 灵活组合调度工具
  3. 需要严格的返回顺序,选 gather
  4. 需要先就绪先处理,选 as_completed
  5. 需要高级的队列缓存,首选 asyncio.Queue

熟练掌握这套限流并发组合拳,能让您的 Python 异步程序在高负载场景下依然稳如磐石!

本站所有文章、数据、图片均来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。

评论交流 (0)

正在加载评论...
头像

CoderWang

当你还撑不起你的梦想时,就要去奋斗。如果缘分安排我们相遇,请不要让她擦肩和过。我们一起奋斗!

微信