Python 深入浅出:用 Python 3.11 现代化 TaskGroup 优雅管理并发
在 Python 3.11 之前,我们管理和并发等待多个异步任务的最常用工具是 asyncio.gather()。
然而,asyncio.gather() 在设计上存在明显的缺陷:如果其中一个任务执行失败抛出异常,gather() 会立即将异常抛出,但其余仍在运行的任务会被遗留在后台继续执行(默默吃内存),这违反了现代并发倡导的“结构化并发(Structured Concurrency)”原则。
为了提供更安全、更现代的并发任务管理方案,Python 3.11 正式引入了 asyncio.TaskGroup。
借助 TaskGroup 强大的上下文管理和全新的 ExceptionGroup(异常组)捕获语法,Python 异步并发开发迈上了一个全新的台阶。
本文将带您深入剖析 TaskGroup 的优势与硬核实战。
一、 什么是结构化并发(Structured Concurrency)?
简单来说,结构化并发要求:并发任务的生命周期必须被限定在特定的代码作用域内。当代码离开这个作用域时,所有派生出的子任务必须全部结束(正常结束或被取消)。
在 Python 3.11 之前,CPython 没有官方的结构化并发工具(只有第三方库 Trio 率先实现了该理念)。asyncio.TaskGroup 则是 Python 官方对结构化并发的正式落地实现。
二、 asyncio.TaskGroup 的运作机制
TaskGroup 采用 async with 上下文管理器来维护任务的生命周期:
async with asyncio.TaskGroup() as tg:
# 1. 向任务组中注册任务,它们会立即在后台并发运行
task1 = tg.create_task(coro1())
task2 = tg.create_task(coro2())
# 2. 只有当 task1 和 task2 全部执行完毕后,代码才会走出 async with 作用域
核心安全保障:自动级联取消
如果在运行期间,task1 发生了异常崩溃:
1. TaskGroup 会自动发送取消信号(cancel)给 task2(以及该任务组中所有其他仍在跑的任务)。
2. TaskGroup 会一直等待所有任务(包括被取消的任务)彻底退出,清理完资源。
3. 最后,将所有发生的异常打包成一个全新的 ExceptionGroup 向上抛出。
这完美地防范了后台僵尸协程的残留!
三、 实战:TaskGroup 的并发与异常捕获
下面我们编写一段实战代码,模拟并发请求三个 API。其中第二个请求故意抛错,展示 TaskGroup 是如何安全清理现场的:
import asyncio
async def fetch_api_success(api_id, delay):
try:
print(f"📡 接口 {api_id}:开始请求...")
await asyncio.sleep(delay)
print(f"✅ 接口 {api_id}:请求成功返回!")
return f"Data_{api_id}"
except asyncio.CancelledError:
print(f"🧹 接口 {api_id}:检测到级联取消,开始执行现场清理...")
raise
async def fetch_api_fail():
await asyncio.sleep(0.5)
print("❌ 接口 2:发生故障,抛出 RuntimeError!")
raise RuntimeError("API-2 连接超时")
async def main():
try:
# 使用 TaskGroup 启动并发任务组
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_api_success(1, 2.0))
t2 = tg.create_task(fetch_api_fail()) # 这个任务会在 0.5s 时失败
t3 = tg.create_task(fetch_api_success(3, 1.5))
# 只有在没有任务出错时,才会走到这里获取结果
print("所有数据:", t1.result(), t3.result())
except* RuntimeError as eg: # 💡 Python 3.11 全新的 Except* 语法,用于捕获异常组
print(f"捕获到异常组中的特定错误: {eg.exceptions}")
if __name__ == "__main__":
asyncio.run(main())
运行输出流程分析:
- 0.0 秒:接口 1、2、3 并发启动。
- 0.5 秒:接口 2 发生故障抛出
RuntimeError。 - 0.5 秒:
TaskGroup立即拦截该错误,并自动取消了还在运行的接口 1 和接口 3。 - 0.5 秒:接口 1 和接口 3 收到取消信号,执行
except CancelledError里的清理逻辑。 - 所有任务清理完毕后:退出
async with块,抛出ExceptionGroup,并被except* RuntimeError精准捕获。
四、 总结与最佳实践
- 废弃
asyncio.gather:在 Python 3.11+ 的新项目中,应全面使用asyncio.TaskGroup代替asyncio.gather进行任务并发调度。 - 掌握
except*语法:由于 TaskGroup 抛出的是ExceptionGroup(可能包含多个并发任务抛出的多个不同异常),必须使用 Python 3.11 的新语法except* ValueError as eg:来分门别类地捕获它们,传统的except ValueError:无法捕获异常组。 - 保证代码整洁度:TaskGroup 把并发任务阻断在上下文缩进块内,使得并发的边界极其清晰,代码可读性获得了质的飞跃。
拥抱结构化并发,用现代化的 TaskGroup 编写出更安全、更优雅的 Python 高并发系统吧!
本站所有文章、数据、图片均来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。



暂无评论
还没有人评论过本文,快来发表你的高见吧!