广告
您当前的位置: 首页 >  技术 >  Python

Python 深入浅出:asyncio 异步协作与同步原语设计模式

作者:admin 时间:2026-06-28 阅读数:8人阅读

在编写多进程或传统多线程程序时,为了防止发生数据抢占与竞态条件,我们经常使用互斥锁(Mutex)或事件(Event)进行同步控制。

很多开发者会产生一个误区:“既然 asyncio 是单线程运行的,在任何时刻都只有一个协程在执行,那我们是不是就完全不需要锁等同步机制了?”

答案是:绝对需要!

虽然单线程内没有 CPU 级别的指令抢占,但协程会在遇到 await 挂起时让出 CPU 控制权。如果多个协程异步读写同一个共享资源,并在操作中间夹杂了 await 挂起,依然会发生严重的竞态条件(Race Conditions)

本文将带您了解为什么单线程协程仍需同步,并重点剖析 asyncioLock(锁)Event(事件)Queue(队列) 的实战设计模式。


一、 协程的竞态条件:为什么单线程也需要锁?

让我们看一个经典的竞态条件案例:多个协程并发模拟银行开户并进行资金扣减,中间模拟网络延迟挂起:

import asyncio

shared_balance = 100  # 共享账户余额为 100 元

async def withdraw(amount):
    global shared_balance
    print(f"开始取款: {amount} 元")
    
    # 模拟查询数据库网络延迟(此时协程被挂起,让出 CPU)
    await asyncio.sleep(1)
    
    # 重新获得 CPU 后的扣减逻辑
    if shared_balance >= amount:
        shared_balance -= amount
        print(f"成功取出 {amount} 元,剩余余额: {shared_balance} 元")
    else:
        print("余额不足,取款失败!")

async def main():
    # 并发执行两个取款任务,总额度超出 100 元
    await asyncio.gather(withdraw(80), withdraw(80))

# 运行结果:
# 成功取出 80 元,剩余余额: 20 元
# 成功取出 80 元,剩余余额: -60 元 (账户透支了!发生了竞态错误)

竞态发生的原因:

当第一个取款协程在 await asyncio.sleep(1) 处挂起时,第二个取款协程被调度运行。由于此时余额尚未真正被扣减,第二个协程也通过了 shared_balance >= amount 的校验。当两者陆续被唤醒后,都执行了扣减,导致透支。


二, asyncio.Lock(异步锁)的解决方案

为了解决上述问题,我们可以使用 asyncio.Lock。它能保证在任意时刻,只有一个协程能够进入被锁保护的临界区代码段

import asyncio

shared_balance = 100
lock = asyncio.Lock()  # 创建异步锁

async def safe_withdraw(amount):
    global shared_balance
    # 使用 async with 语法自动获取和释放锁
    async with lock:
        print(f"开始安全取款: {amount} 元")
        await asyncio.sleep(1)  # 即使在此挂起,其他协程也无法进入临界区
        if shared_balance >= amount:
            shared_balance -= amount
            print(f"成功取出 {amount} 元,剩余余额: {shared_balance} 元")
        else:
            print("余额不足,取款失败!")

使用锁后,即使取款操作有网络延迟挂起,第二个协程也会在 async with lock 处排队等待,直到第一个协程执行完毕释放锁,从而保证了业务正确性。


三、 asyncio.Event(异步事件通知)

asyncio.Event 用于在协程之间进行一对多的单向广播通知。一个或多个协程可以通过 await event.wait() 处于暂停状态,等待某个信号;而另一个协程通过 event.set() 激发信号,瞬间唤醒所有等待的协程。

  • 经典场景:系统启动时,多个业务初始化协程需要等待“数据库连接成功”这一事件触发。
import asyncio

db_connected_event = asyncio.Event()

async def fetch_api_data(task_id):
    print(f"任务 {task_id}:等待数据库连接就绪...")
    await db_connected_event.wait()  # 挂起等待事件激活
    print(f"任务 {task_id}:数据库已就绪,开始读取数据。")

async def init_database():
    print("正在连接数据库并初始化数据...")
    await asyncio.sleep(3)  # 模拟初始化耗时
    print("数据库初始化完成!")
    db_connected_event.set()  # 发送广播信号,唤醒所有等待的协程

async def main():
    # 并发运行多个数据处理协程和初始化协程
    await asyncio.gather(
        fetch_api_data(1),
        fetch_api_data(2),
        init_database()
    )

四、 asyncio.Queue(异步队列)

asyncio.Queue 是实现生产者-消费者模式的标准工具。它提供了协程安全的 FIFO(先进先出)队列,当队列满时放入数据会被挂起,当队列空时获取数据会被挂起。

  • 经典场景:网络爬虫任务队列、消息消费中间件。
import asyncio

async def producer(queue):
    for i in range(3):
        await queue.put(f"Task_{i}")
        print(f"生产者:放入了 Task_{i}")
        await asyncio.sleep(0.5)

async def consumer(queue):
    while True:
        # 获取任务,如果队列为空则挂起等待
        task = await queue.get()
        print(f"消费者:正在处理 {task}")
        await asyncio.sleep(1)
        queue.task_done()  # 通知队列该任务已处理完毕

async def main():
    queue = asyncio.Queue()
    # 启动消费者(后台守护任务)
    consumer_task = asyncio.create_task(consumer(queue))
    
    # 运行生产者,直至其结束
    await producer(queue)
    
    # 等待队列中所有任务被处理完毕 (queue.join())
    await queue.join()
    consumer_task.cancel()  # 取消消费者任务

五、 总结

  1. 协程锁是必要的:任何夹杂了 await 操作且涉及共享变量读写的逻辑,都必须加锁保护。
  2. Event 用于一对多通知:它是实现解耦初始化等待和广播的利器。
  3. Queue 用于流量平滑:生产者-消费者模式能极大地平滑高并发请求,避免后端服务过载。

深入理解并灵活运用这三种协作同步原语,将让您的 Python 异步并发架构设计更加严谨、健壮与专业!

本站所有文章、数据、图片均来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。

评论交流 (0)

正在加载评论...
头像

admin

当你还撑不起你的梦想时,就要去奋斗。如果缘分安排我们相遇,请不要让她擦肩和过。我们一起奋斗!

微信