广告
您当前的位置: 首页 >  技术 >  Python

Python 深入浅出:asyncio 异常处理与任务取消的硬核指南

作者:XiaoZhang 时间:2026-06-28 阅读数:10人阅读

在使用 asyncio 编写并发异步程序时,编写出正常运行的代码往往只是第一步。在生产环境中,网络抖动、数据库超时、客户端断开连接等异常情况是家常便饭。

如果在异步协程中对**异常处理(Exception Handling)任务取消(Task Cancellation)**缺乏严谨的逻辑控制,就极易引发资源泄漏、任务在后台静默崩溃,或者程序陷入死锁。

本文将带您深入剖析 asyncio 中的异常捕获艺术、多任务下的异常管理,以及如何安全地取消正在运行的任务。


一、 单个协程内的异常处理:经典的 try...except

在单个协程内部,异常捕获与传统的同步 Python 代码并无二致。您可以使用经典的 try...except...finally 结构,并且它能跨越 await 挂起线工作:

async def fetch_api(url):
    try:
        # 发起异步请求
        response = await httpx.get(url)
        return response.json()
    except httpx.HTTPStatusError as e:
        print(f"HTTP 错误: {e}")
        return None
    finally:
        print("API 请求尝试结束。")

二、 并发任务群的异常管理:asyncio.gather 的陷阱

当我们使用 asyncio.gather(*tasks) 并发运行多个任务时,如果其中某一个任务抛出了异常,默认会发生什么?

默认陷阱:一人犯错,全局隐蔽

默认情况下,gather 在遇到第一个任务抛出的异常时,会立即将其向上抛出给调用者。
然而,此时其他未执行完的任务并不会被取消,它们依然会在后台继续运行! 这往往会导致资源的隐蔽泄露或状态的不一致。

解决方案:使用 return_exceptions=True

如果您希望个别任务的失败不要中断整个流程,并且想一次性收集所有的成功结果与失败异常,应该开启 return_exceptions=True

import asyncio

async def division(a, b):
    await asyncio.sleep(0.5)
    return a / b

async def main():
    # 模拟其中一个任务会抛出 ZeroDivisionError 异常
    results = await asyncio.gather(
        division(10, 2),
        division(10, 0),  # 会抛出异常
        division(10, 5),
        return_exceptions=True  # 开启异常返回模式
    )
    
    # 结果列表中会混入 Exception 对象,而不会导致整个程序崩溃
    for res in results:
        if isinstance(res, Exception):
            print(f"❌ 任务发生错误: {res}")
        else:
            print(f"✅ 任务成功返回: {res}")

if __name__ == "__main__":
    asyncio.run(main())

三、 任务取消机制与 CancelledError

在 Web 服务中,如果用户在网页加载前关闭了浏览器,后端就应该立即取消所有尚未完成的异步数据库或 API 请求,释放服务器资源。

1. 如何取消一个任务?

调用 task.cancel() 会向该协程内部抛入一个特殊的 asyncio.CancelledError 异常。该异常会在协程下一次遇到 await 挂起时触发。

async def my_task():
    try:
        print("开始工作...")
        await asyncio.sleep(10)  # 在此等待时,如果被 cancel(),会直接抛出异常
        print("工作完成!")
    except asyncio.CancelledError:
        print("🧹 收到取消信号,开始执行资源清理...")
        raise  # ⚠️ 黄金法则:捕获了 CancelledError 必须重新向上抛出!

⚠️ 绝对不能吞掉 CancelledError
如果你在 except Exception: 中不加区分地吞掉了所有异常(没有重新 raise 抛出 CancelledError),事件循环将无法正常关闭该协程,可能导致程序卡死。


四、 优雅处理超时:asyncio.wait_for

我们不需要每次都手动调用 task.cancel()。对于有超时要求的操作,asyncio.wait_for(task, timeout) 会自动在超时到达时,帮我们对任务执行 cancel() 取消操作,并抛出 TimeoutError

async def fetch_heavy_data():
    await asyncio.sleep(5)
    return "Massive Data"

async def main():
    try:
        # 限制该任务必须在 2 秒内返回,否则自动取消它
        data = await asyncio.wait_for(fetch_heavy_data(), timeout=2.0)
    except asyncio.TimeoutError:
        print("⏰ 请求超时,后台任务已被自动取消并清理!")

五、 总结

  1. 并发异常隔离:使用 asyncio.gather 时,建议评估是否需要开启 return_exceptions=True,防止个别任务失败导致整个调用栈崩溃,同时避免失控任务在后台继续偷跑。
  2. 遵守取消协议:编写异步清理逻辑时,使用 try...finally 是最安全的选择;若捕获 CancelledError,务必记得重新 raise 抛出。
  3. 超时防护常态化:对于任何可能无限期挂起的网络请求、文件读写,都应该用 asyncio.wait_for 包裹,防范僵尸协程霸占内存。

防患于未然,是构建高可用 Python 异步并发系统的核心基石!

本站所有文章、数据、图片均来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。

评论交流 (0)

正在加载评论...
头像

XiaoZhang

当你还撑不起你的梦想时,就要去奋斗。如果缘分安排我们相遇,请不要让她擦肩和过。我们一起奋斗!

微信