Python 深入浅出:asyncio 异常处理与任务取消的硬核指南
在使用 asyncio 编写并发异步程序时,编写出正常运行的代码往往只是第一步。在生产环境中,网络抖动、数据库超时、客户端断开连接等异常情况是家常便饭。
如果在异步协程中对**异常处理(Exception Handling)和任务取消(Task Cancellation)**缺乏严谨的逻辑控制,就极易引发资源泄漏、任务在后台静默崩溃,或者程序陷入死锁。
本文将带您深入剖析 asyncio 中的异常捕获艺术、多任务下的异常管理,以及如何安全地取消正在运行的任务。
一、 单个协程内的异常处理:经典的 try...except
在单个协程内部,异常捕获与传统的同步 Python 代码并无二致。您可以使用经典的 try...except...finally 结构,并且它能跨越 await 挂起线工作:
async def fetch_api(url):
try:
# 发起异步请求
response = await httpx.get(url)
return response.json()
except httpx.HTTPStatusError as e:
print(f"HTTP 错误: {e}")
return None
finally:
print("API 请求尝试结束。")
二、 并发任务群的异常管理:asyncio.gather 的陷阱
当我们使用 asyncio.gather(*tasks) 并发运行多个任务时,如果其中某一个任务抛出了异常,默认会发生什么?
默认陷阱:一人犯错,全局隐蔽
默认情况下,gather 在遇到第一个任务抛出的异常时,会立即将其向上抛出给调用者。
然而,此时其他未执行完的任务并不会被取消,它们依然会在后台继续运行! 这往往会导致资源的隐蔽泄露或状态的不一致。
解决方案:使用 return_exceptions=True
如果您希望个别任务的失败不要中断整个流程,并且想一次性收集所有的成功结果与失败异常,应该开启 return_exceptions=True:
import asyncio
async def division(a, b):
await asyncio.sleep(0.5)
return a / b
async def main():
# 模拟其中一个任务会抛出 ZeroDivisionError 异常
results = await asyncio.gather(
division(10, 2),
division(10, 0), # 会抛出异常
division(10, 5),
return_exceptions=True # 开启异常返回模式
)
# 结果列表中会混入 Exception 对象,而不会导致整个程序崩溃
for res in results:
if isinstance(res, Exception):
print(f"❌ 任务发生错误: {res}")
else:
print(f"✅ 任务成功返回: {res}")
if __name__ == "__main__":
asyncio.run(main())
三、 任务取消机制与 CancelledError
在 Web 服务中,如果用户在网页加载前关闭了浏览器,后端就应该立即取消所有尚未完成的异步数据库或 API 请求,释放服务器资源。
1. 如何取消一个任务?
调用 task.cancel() 会向该协程内部抛入一个特殊的 asyncio.CancelledError 异常。该异常会在协程下一次遇到 await 挂起时触发。
async def my_task():
try:
print("开始工作...")
await asyncio.sleep(10) # 在此等待时,如果被 cancel(),会直接抛出异常
print("工作完成!")
except asyncio.CancelledError:
print("🧹 收到取消信号,开始执行资源清理...")
raise # ⚠️ 黄金法则:捕获了 CancelledError 必须重新向上抛出!
⚠️ 绝对不能吞掉
CancelledError:
如果你在except Exception:中不加区分地吞掉了所有异常(没有重新raise抛出CancelledError),事件循环将无法正常关闭该协程,可能导致程序卡死。
四、 优雅处理超时:asyncio.wait_for
我们不需要每次都手动调用 task.cancel()。对于有超时要求的操作,asyncio.wait_for(task, timeout) 会自动在超时到达时,帮我们对任务执行 cancel() 取消操作,并抛出 TimeoutError。
async def fetch_heavy_data():
await asyncio.sleep(5)
return "Massive Data"
async def main():
try:
# 限制该任务必须在 2 秒内返回,否则自动取消它
data = await asyncio.wait_for(fetch_heavy_data(), timeout=2.0)
except asyncio.TimeoutError:
print("⏰ 请求超时,后台任务已被自动取消并清理!")
五、 总结
- 并发异常隔离:使用
asyncio.gather时,建议评估是否需要开启return_exceptions=True,防止个别任务失败导致整个调用栈崩溃,同时避免失控任务在后台继续偷跑。 - 遵守取消协议:编写异步清理逻辑时,使用
try...finally是最安全的选择;若捕获CancelledError,务必记得重新raise抛出。 - 超时防护常态化:对于任何可能无限期挂起的网络请求、文件读写,都应该用
asyncio.wait_for包裹,防范僵尸协程霸占内存。
防患于未然,是构建高可用 Python 异步并发系统的核心基石!
本站所有文章、数据、图片均来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。



暂无评论
还没有人评论过本文,快来发表你的高见吧!