本文详细介绍了在使用asyncio库编写异步程序时常见的错误和问题,并进一步通过实践案例进行分析和讨论,以便在项目中更有效地应用asyncio库。有关asyncio库的详细介绍,可参考:Python 异步编程库 asyncio 使用指北。
1 asyncio程序的常见错误
本节展示了在使用asyncio模块时,开发人员常遇到的一些常见错误示例。以下是四个最常见的异步编程错误:
- 直接调用并运行协程。
- 主协程过早退出。
- 错误使用asyncio的低级API。
- 程序出现竞争条件或死锁问题。
1.1 试图直接调用并运行协程
协程通常通过async def
定义,如下所示:
# 自定义协程 async def custom_coro(): print('hi there')
若直接像函数一样调用该协程,通常不会执行预期的操作,而是创建一个协程对象。这种调用方式不会触发协程的执行:
# 错误:像函数一样调用协程 custom_coro() # 这只是创建了一个协程对象,并不会执行
此时,返回的是一个协程对象,而不是立即执行协程主体,这忽略协程必须在事件循环中运行。如果协程未被执行,系统将发出以下运行时警告:
sys:1: RuntimeWarning: coroutine 'custom_coro' was never awaited
要正确执行协程,需要在asyncio
事件循环中等待该对象。例如,使用asyncio.run()
启动事件循环来执行协程:
# 正确:通过 asyncio.run() 运行协程 import asyncio asyncio.run(custom_coro())
另一种执行协程方法是通过await
表达式在现有协程中挂起并调度其他协程。例如,定义一个新的协程,在其中调用 custom_coro()
:
# 正确:在协程中使用 await 调度另一个协程 async def main(): await custom_coro() # 使用 asyncio.run 启动事件循环 asyncio.run(main())
1.2 主协程过早退出
在异步编程中,任务的执行可能无法按预期及时完成。通过asyncio.create_task()
可以并行运行多个协程,但如果主协程提前退出,这些任务可能会被强制中止。为确保所有任务能够在主协程退出前完成,主协程应在无其他活动时显式等待剩余任务的完成。可以使用asyncio.all_tasks()
来获取当前事件循环中的所有任务,并在移除主协程本身后,通过asyncio.wait()
等待其他任务的执行结果。如果不移除当前协程,asyncio.wait
会等待所有任务完成,包括当前协程,从而导致程序不退出(死锁)。示例如下:
import asyncio async def task_1(): print("任务 1 开始") await asyncio.sleep(2) print("任务 1 完成") async def task_2(): print("任务 2 开始") await asyncio.sleep(1) print("任务 2 完成") async def main(): # 创建多个任务 task1 = asyncio.create_task(task_1()) task2 = asyncio.create_task(task_2()) # 获取所有正在运行的任务的集合 all_tasks = asyncio.all_tasks() # 获取当前任务(即主协程) current_task = asyncio.current_task() # 从所有任务列表中删除当前任务 all_tasks.remove(current_task) # 暂停直到所有任务完成 await asyncio.wait(all_tasks) # 运行主协程 asyncio.run(main())
代码运行结果为:
任务 1 开始 任务 2 开始 任务 2 完成 任务 1 完成
1.3 错误使用asyncio的低级API
asyncio
提供了两类API:一类是面向应用程序开发者的高级API,另一类是面向框架开发者的低级API。低级API主要为高级API提供底层支持,如事件循环、传输协议等内部结构。在大多数情况下,推荐优先使用高级API,特别是在学习阶段。只有在需要实现特定功能时,才应考虑使用低级API。尽管学习低级API具有一定的价值,但不应在刚开始时就使用。建议先通过高级API熟悉异步编程的基本概念,进行应用开发,掌握核心知识后,再深入探讨技术细节。例如:
import asyncio # 高级API:推荐的用法 async def hello_world(): print("你好,世界!") # 使用 asyncio.run 来启动事件循环 def run_hello_world(): asyncio.run(hello_world()) # 低级API:不推荐直接使用 async def low_level_example(): loop = asyncio.get_event_loop() # 获取当前事件循环 task = loop.create_task(hello_world()) # 创建任务 await task # 显式等待任务完成 # 运行高级 API 示例 print("使用 asyncio.run 运行:") run_hello_world() # 运行低级 API 示例 print("n使用低级 API 运行:") asyncio.run(low_level_example())
1.4 程序出现竞争条件或死锁问题
竞争条件和死锁是并发编程中常见的错误。竞争条件发生在多个任务同时访问相同资源时,缺乏适当的控制可能导致数据错误或丢失。死锁则是指不同任务互相等待对方释放资源,最终导致所有任务无法继续执行。
许多Python开发者认为,使用asyncio
协程可以避免这些问题,因为在任何时刻,事件循环中只有一个协程在执行。然而,协程在运行过程中可能会暂停和恢复,并且可能会访问共享资源。如果对这些资源没有适当的保护,就可能会引发竞争条件。此外,在协程同步资源时处理不当,也有可能导致死锁。因此,在编写asyncio
程序时,确保协程的安全性至关重要。
1.4.1 竞争条件问题
以下示例代码模拟了两个异步任务并行增加共享变量counter
,每个任务循环10000次对counter
进行递增操作。通过awaitasyncio.sleep(0)
来模拟上下文切换,确保两个任务能够交替执行。然而,由于未使用同步机制(如锁),会导致竞态条件。因此,最终的counter
值可能小于预期的20000,而不是20000,因为两个任务可能在读取和更新counter
的值时发生冲突,导致多个协程可能重复更新相同的数据:
import asyncio # 共享资源 counter = 0 async def increment(): global counter for _ in range(10000): temp = counter temp += 1 await asyncio.sleep(0) # 让出控制权,模拟上下文切换 counter = temp async def main(): tasks = [increment(), increment()] await asyncio.gather(*tasks) print("最终计数器的值:", counter) # 运行 asyncio 程序 asyncio.run(main())
代码运行结果为:
最终计数器的值: 10000
为了解决这个问题,可以使用 asyncio.Lock
来同步对共享资源 counter
的访问。然而,由于asyncio.Lock
与asyncio.run
之间的事件循环可能不匹配,通常会在某些环境中(如特定的 IDE 或脚本运行环境)出现问题。原因在于asyncio.run
创建并管理一个新的事件循环,而锁 (asyncio.Lock
) 可能会被不同的事件循环使用,从而导致不一致。为避免这种情况,可以显式创建并使用一个事件循环,如下所示:
import asyncio # 共享资源 counter = 0 # 创建锁 lock = asyncio.Lock() async def increment(): global counter for _ in range(10000): async with lock: # 确保在修改 counter 时,只有一个任务可以访问 temp = counter temp += 1 await asyncio.sleep(0) # 让出控制权,模拟上下文切换 counter = temp async def main(): tasks = [increment(), increment()] await asyncio.gather(*tasks) print("最终计数器的值:", counter) # 显式创建事件循环并运行 loop = asyncio.get_event_loop() loop.run_until_complete(main())
代码运行结果为:
最终计数器的值: 20000
1.4.2 死锁问题
死锁介绍
死锁(Deadlock)是并发编程中的一种常见问题,它发生在多个任务之间的资源争用中,导致所有任务都陷入无法继续执行的僵局。即使在Python中使用asyncio
协程框架,资源竞争和同步问题也可能导致死锁的发生,尤其是在协程需要同步资源(如锁)时。如果同步机制设计不当,容易引发死锁。
死锁的特征如下:
- 循环等待:多个任务之间相互等待对方释放资源,从而形成一个循环等待的关系。例如,任务1等待任务2释放资源,而任务2又在等待任务1释放资源,形成闭环。
- 不可抢占:每个任务持有的资源(如锁)不能被其他任务强制抢占。只有在任务主动释放资源时,其他任务才能获取该资源。
- 持有资源且等待:任务持有某些资源(如锁),同时又在等待其他资源的释放。由于任务在持有资源的情况下无法继续执行,导致系统中的任务无法前进。
以下代码中的死锁是典型的循环等待问题,所有相关任务陷入相互等待的死循环,无法继续执行:
import asyncio # 创建两个共享锁 lock1 = asyncio.Lock() lock2 = asyncio.Lock() async def task1(): print("任务1:尝试获取锁1") await lock1.acquire() # 获取锁1 print("任务1:已获取锁1,尝试获取锁2") await asyncio.sleep(1) # 模拟一些操作 await lock2.acquire() # 获取锁2 print("任务1:已获取锁2") # 释放锁 lock1.release() lock2.release() async def task2(): print("任务2:尝试获取锁2") await lock2.acquire() # 获取锁2 print("任务2:已获取锁2,尝试获取锁1") await asyncio.sleep(1) # 模拟一些操作 await lock1.acquire() # 获取锁1 print("任务2:已获取锁1") # 释放锁 lock1.release() lock2.release() async def main(): # 启动两个任务 await asyncio.gather(task1(), task2()) # 创建事件循环并运行 loop = asyncio.get_event_loop() loop.run_until_complete(main())
代码运行结果如下,由于两个任务都被挂起,程序无法退出,且永远不会打印出"任务1:已获取锁2"或"任务2:已获取锁1":
任务1:尝试获取锁1 任务1:已获取锁1,尝试获取锁2 任务2:尝试获取锁2 任务2:已获取锁2,尝试获取锁1 ...
asyncio中死锁的避免
在使用asyncio
时,为了避免死锁,可以采取以下几种方法:
- 锁的顺序管理:确保所有任务按照相同的顺序获取锁,以防止发生相互等待的情况。
- 尝试获取锁:使用
asyncio.Lock
的acquire
方法并设置超时时间,避免任务长时间处于等待锁的状态。 - 使用
async with
:通过async with
语句来管理锁,这样可以确保在任务完成后自动释放锁,避免因忘记释放锁而引发问题。
根据这一思路,前面死锁的案例解决示例代码如下:
import asyncio # 创建两个共享锁 lock1 = asyncio.Lock() lock2 = asyncio.Lock() async def task1(): print("任务1:尝试获取锁1") async with lock1: # 使用async with获取锁,自动释放 print("任务1:已获取锁1,尝试获取锁2") await asyncio.sleep(1) # 模拟一些操作 print("任务1:尝试获取锁2") async with lock2: # 使用async with获取锁,自动释放 print("任务1:已获取锁2") async def task2(): print("任务2:尝试获取锁1") async with lock1: # 使用async with获取锁,自动释放 print("任务2:已获取锁1,尝试获取锁2") await asyncio.sleep(1) # 模拟一些操作 print("任务2:尝试获取锁2") async with lock2: # 使用async with获取锁,自动释放 print("任务2:已获取锁2") async def main(): # 启动两个任务 await asyncio.gather(task1(), task2()) # 创建事件循环并运行 loop = asyncio.get_event_loop() loop.run_until_complete(main())
代码运行结果如下,可以看到两个任务避免了死锁:
任务1:尝试获取锁1 任务1:已获取锁1,尝试获取锁2 任务2:尝试获取锁1 任务1:尝试获取锁2 任务1:已获取锁2 任务2:已获取锁1,尝试获取锁2 任务2:尝试获取锁2 任务2:已获取锁2
2 asyncio程序的常见问题
在使用asyncio编写异步程序时,开发者可能会遇到一系列常见问题,这些问题涉及到任务的管理、执行流程、性能优化等多个方面。以下是一些常见的问题和挑战:
- 任务的等待、停止、结果获取
- 如何在后台运行和等待任务
- 任务的延迟后运行和后续运行
- 如何显示运行任务的进度
- 如何在asyncio中执行阻塞I/O或CPU密集型函数
- Python协程:操作系统原生支持吗
2.1 任务的等待、停止、结果获取
2.1.1 如何等待任务
可以通过直接等待asyncio.Task
对象来等待任务的完成:
# 等待任务完成 await task
也同时创建并等待任务完成。例如:
# 创建并等待任务完成 await asyncio.create_task(custom_coro())
与协程不同,任务可以多次等待而不会引发错误。以下是一个演示如何多次等待同一任务的示例,在此例中,await task
两次都能成功执行,因为task
已经完成并保存了返回值:
import asyncio async def other_coro(): await asyncio.sleep(1) return "任务完成" async def main(): # 将协程包装在任务中并安排其执行 task = asyncio.create_task(other_coro()) # 第一次等待任务并获取返回值 value1 = await task print(value1) # 再次等待任务(任务已经完成) value2 = await task print(value2) # 运行主协程 asyncio.run(main())
2.1.2 何时停止任务
可以通过asyncio.Task
对象的cancel()
方法取消任务。若任务被成功取消,cancel()
方法返回True
,否则返回False
。例如:
# 取消任务 was_cancelled = task.cancel()
2.1.3 如何获取任务的返回值
在Python中创建一个asyncio
任务后,有两种方法可以从 asyncio.Task
中检索返回值:
- 等待任务(使用
await
)。 - 调用
result()
方法。
基于await
函数,等待任务时,调用者会挂起,直到任务完成并返回结果。如果任务已完成,返回值会立即提供。以下代码展示了如何等待任务并获取其返回值:
import asyncio async def other_coro(): await asyncio.sleep(1) return "任务完成" async def main(): # 将协程包装在任务中并安排其执行 task = asyncio.create_task(other_coro()) # 等待任务完成并获取返回值 value = await task print(value) # 运行主协程 asyncio.run(main())
也可以通过调用 asyncio.Task
对象的 result()
方法获取任务的返回值。此时要求任务已完成。如果任务未完成,调用 result()
会引发 InvalidStateError
异常。如果任务被取消,则会引发 CancelledError
异常。以下是一个使用 result()
方法的例子:
import asyncio async def other_coro(): await asyncio.sleep(1) return "任务完成" async def main(): task = asyncio.create_task(other_coro()) # 等待任务完成 await task try: # 获取任务的返回值 value = task.result() print(value) except asyncio.InvalidStateError: print("任务尚未完成") except asyncio.CancelledError: print("任务已取消") # 运行主协程 asyncio.run(main())
2.2 如何在后台运行和等待任务
2.2.1 如何在后台运行任务
通过 asyncio.create_task()
可以将协程封装为Task对象,并在后台执行。创建的任务对象会立即返回,且不会阻塞调用者的执行。为了确保任务能够开始执行,可以使用 await asyncio.sleep(0)
暂停片刻。之所以使用 await asyncio.sleep(0)
,是因为新创建的任务并不会立刻开始执行。事件循环负责管理多个任务,它会根据调度策略决定哪个任务优先执行。通过 await asyncio.sleep(0)
暂时让出执行权,使得事件循环有机会调度并执行刚刚创建的任务。这样,await asyncio.sleep(0)
确保了任务在创建后能尽早开始执行,同时不会阻塞主协程的其他操作。示例代码如下:
import asyncio async def other_coroutine(): print("开始执行 other_coroutine") await asyncio.sleep(2) print("other_coroutine 执行完毕") async def main(): # 创建并调度任务 task = asyncio.create_task(other_coroutine()) # 暂停片刻以确保任务开始执行 await asyncio.sleep(0) print("主协程正在执行") # 等待任务完成 await task print("任务执行完毕") # 运行主协程 asyncio.run(main())
此外,后台任务可以在程序运行时执行,不会妨碍主程序的结束。如果主程序没有其他待执行的任务,而后台任务仍在进行中,那么需要确保程序在后台任务完成后才会完全退出。
2.2.2 如何等待所有后台任务
在使用asyncio
时,可能需要等待多个独立的任务完成。比如,当多个任务同时运行时,有时想要等待所有任务完成,但又不想一直阻塞当前正在运行的任务。为了实现这个功能,可以通过以下步骤:
- 获取所有当前任务:使用
asyncio.all_tasks()
可以获取到当前事件循环中的所有任务。 - 排除当前任务:通过
asyncio.current_task()
获取当前正在运行的任务,并将其从任务集合中移除。这样可以避免等待当前任务自己。 - 等待所有剩余任务完成:使用
asyncio.wait()
来等待所有任务完成,直到它们都执行完毕。
示例代码如下:
import asyncio async def example_coroutine(name): # 这是一个模拟任务的协程,睡眠 1 秒钟 await asyncio.sleep(1) print(f"任务 {name} 完成。") async def main(): # 创建多个协程任务 tasks = [asyncio.create_task(example_coroutine(name = str(i))) for i in range(5)] # 获取所有正在运行的任务 all_tasks = asyncio.all_tasks() # 获取当前正在运行的任务(即 main 协程) current_task = asyncio.current_task() # 从任务集合中移除当前任务 all_tasks.remove(current_task) # 等待所有其他任务完成 await asyncio.wait(all_tasks) # 启动事件循环并执行主协程 asyncio.run(main())
2.3 任务的延迟后运行和后续运行
2.3.1 任务的延迟后运行
想要实现任务的延迟后运行,可以通过开发一个自定义的包装协程,使其在延迟指定时间后执行目标协程。该包装协程接受两个参数:目标协程和延迟时间(单位为秒)。它会先休眠指定的延迟时间,然后执行传入的目标协程。
以下代码展示了如何通过自定义包装协程 delay
,在指定的延迟时间后执行目标协程。delay
协程通过 asyncio.sleep()
实现延时,随后再执行传入的目标协程。可以在不同场景中使用该方法,如直接挂起协程或将任务安排为独立执行:
import asyncio # 延迟几秒后启动另一个协程的包装协程 async def delay(coro, seconds): """ 延迟指定时间(秒)后执行目标协程。 参数: coro: 要执行的目标协程 seconds: 延迟时间,单位为秒 """ # 暂停指定时间(以秒为单位) await asyncio.sleep(seconds) # 执行目标协程 await coro # 示例目标协程 async def my_coroutine(): print("目标协程开始执行") # 模拟一些工作 await asyncio.sleep(2) print("目标协程执行完成") # 使用包装协程时,可以创建协程对象并直接等待,或将其作为任务独立执行 # 1. 调用者可以挂起并调度延迟后的协程 async def main(): print("延迟10秒后执行目标协程:") await delay(my_coroutine(), 10) print("目标协程已经完成执行") # 2. 或者调用者可以安排延迟协程独立运行 async def schedule_task(): print("将目标协程安排为独立任务,延迟10秒后执行") task = asyncio.create_task(delay(my_coroutine(), 10)) await task # 等待任务完成 print("任务已完成") # 运行示例 if __name__ == "__main__": asyncio.run(main()) # 运行主协程 # 或者运行独立任务的调度 # asyncio.run(schedule_task())
2.3.2 任务的后续运行
在asyncio中,触发后续任务的方式主要有三种:
- 通过已完成的任务本身调度后续任务
- 通过任务发起方调度后续任务
- 使用回调函数自动调度后续任务
逐一分析这三种方式:
1. 通过已完成的任务本身调度后续任务
已完成的任务可以触发后续任务的调度,通常依赖于某些状态检查来决定是否应该发起后续任务。任务调度可以通过asyncio.create_task()
来完成。示例代码展示了运行指定任务后直接调度后续任务:
import asyncio async def task(): print("任务开始执行。") await asyncio.sleep(2) # 模拟任务执行 print("任务执行完成。") await followup_task() # 在任务完成后直接调度后续任务 async def followup_task(): print("正在执行后续任务。") await asyncio.sleep(2) # 模拟后续任务执行 print("后续任务执行完成。") # 启动事件循环,执行任务 async def main(): await task() asyncio.run(main())
2. 通过任务发起方调度后续任务
任务发起方可以根据实际需要决定是否继续启动后续任务。在启动第一个任务时,可以保留 asyncio.Task
对象,通过检查任务的结果或状态,来判断是否启动后续任务。任务发起方还可以选择等待后续任务完成,也可以选择不等待。示例代码如下:
import asyncio async def task(): # 模拟一个任务 await asyncio.sleep(1) return True # 假设任务成功完成,返回True async def followup_task(): # 模拟后续任务 await asyncio.sleep(1) print("后续任务执行") async def main(): # 发起并等待第一个任务 task_1 = asyncio.create_task(task()) # 等待第一个任务完成 result = await task_1 # 检查任务结果 if result: # 发起后续任务 await followup_task() # 运行主程序 asyncio.run(main())
3. 使用回调函数自动调度后续任务
在任务发起时,可以为其注册一个回调函数。该回调函数会在任务完成后自动执行。回调函数接收一个 asyncio.Task
对象作为参数,但它不会等待后续任务的执行。因为回调函数通常是普通的Python函数,无法进行异步操作。示例代码:
import asyncio # 定义回调函数 def callback(task): # 安排并启动后续任务 # 注意:这里不能直接使用 await,需通过 create_task 调度异步任务 asyncio.create_task(followup()) # 定义第一个异步任务 async def work(): print("工作任务正在执行...") await asyncio.sleep(2) # 模拟一些异步操作 print("工作任务完成!") # 定义后续异步任务 async def followup(): print("后续任务正在执行...") await asyncio.sleep(1) # 模拟一些异步操作 print("后续任务完成!") # 创建事件循环并运行任务 async def main(): # 发起任务并注册回调函数 task = asyncio.create_task(work()) task.add_done_callback(callback) # 等待任务完成 await task # 确保后续任务完成 await asyncio.sleep(1) # 等待回调任务完成的时间 # 执行事件循环 asyncio.run(main())
2.4 如何显示运行任务的进度
2.4.1 基于回调函数的任务进度显示
每个任务的回调函数可用于显示进度。asyncio.Task
对象支持注册回调函数,这些函数会在任务完成时被调用,无论是正常完成还是以异常结束。回调函数是普通函数而非协程,且接受与其关联的 asyncio.Task
对象作为参数。通过为所有任务注册相同的回调函数,可以统一报告任务进度:
import asyncio # 回调函数,用于显示任务完成的进度,区分任务 def progress(task): task_name = task.get_name() # 获取任务的名称 print(f"任务 {task_name} 完成。") async def example_task(n, task_name): """模拟一个异步任务,表示处理n秒的任务,并设置任务名称""" await asyncio.sleep(n) return task_name async def main(): # 定义多个异步任务并添加回调函数 tasks = [] for i in range(1, 6): task_name = f"Task-{i}" # 为每个任务分配一个唯一名称 task = asyncio.create_task(example_task(i, task_name)) # 创建任务,模拟不同的执行时间 task.set_name(task_name) # 设置任务名称 # 为任务添加回调函数,回调函数会在相应任务执行完毕时被调用 task.add_done_callback(progress) tasks.append(task) # 等待所有任务完成 await asyncio.gather(*tasks) # 运行主程序 asyncio.run(main())
2.4.2 基于tqdm库的任务进度显示
使用tqdm库显示任务总体进度
以下代码演示了如何结合tqdm
库和asyncio
库,来展示异步任务的总体执行进度:
import asyncio from tqdm.asyncio import tqdm async def example_task(n, task_name): """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称""" await asyncio.sleep(n) # 模拟任务处理时间 return task_name # 返回任务名称 async def main(): # 定义多个异步任务并使用 tqdm 显示进度 tasks = [] total_tasks = 5 # 总任务数 task_durations = [1, 2, 3, 4, 5] # 每个任务的持续时间(秒) # 使用 tqdm 创建进度条,`total` 为任务的数量 progress_bar = tqdm(total=total_tasks, desc="已完成任务数", ncols=100) # 创建任务 for i, n in enumerate(task_durations): task_name = f"Task-{i+1}" # 为每个任务分配一个唯一名称 task = asyncio.create_task(example_task(n, task_name)) # 创建任务,模拟不同的执行时间 tasks.append(task) # 等待任务完成并更新进度条 for task in asyncio.as_completed(tasks): await task # 等待每个任务完成 progress_bar.update(1) # 每完成一个任务,更新进度条 progress_bar.close() # 关闭进度条 # 运行主程序 asyncio.run(main())
使用tqdm库为多个任务设置单独进度条
以下示例代码演示了如何使用asyncio
并行执行多个异步任务,同时通过tqdm
库为每个任务单独显示进度条:
import asyncio from tqdm.asyncio import tqdm async def example_task(n, task_name, progress_bar): """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称""" for _ in range(n): # 每秒更新一次进度 await asyncio.sleep(1) # 模拟任务处理时间 progress_bar.update(1) # 更新当前任务的进度 return task_name # 返回任务名称 async def main(): # 定义多个异步任务并使用 tqdm 显示进度 tasks = [] total_tasks = 5 # 总任务数 task_durations = [1, 2, 3, 4, 5] # 每个任务的持续时间(秒) # 创建进度条并为每个任务单独设置 progress_bars = [] for i, n in enumerate(task_durations): task_name = f"Task-{i+1}" # 为每个任务分配一个唯一名称 progress_bar = tqdm(total=n, desc=task_name, ncols=100, position=i) # 创建任务对应的进度条 progress_bars.append(progress_bar) task = asyncio.create_task(example_task(n, task_name, progress_bar)) # 创建任务 tasks.append(task) # 等待任务完成 await asyncio.gather(*tasks) # 使用 asyncio.gather 同时等待所有任务完成 # 关闭所有进度条 for progress_bar in progress_bars: progress_bar.close() # 运行主程序 asyncio.run(main())
2.5 如何在asyncio中执行阻塞I/O或CPU密集型函数
在编程中,“阻塞调用”指的是某些操作(例如读取文件、等待网络请求或执行数据库查询等)需要一定时间才能完成。在执行这些操作时,程序会暂停,无法继续处理其他任务,这就是“阻塞”。另外,CPU密集型操作也可能会导致程序阻塞。因此,为了在异步环境中仍然能够处理阻塞调用,asyncio模块提供了两种方法来在异步程序中执行阻塞调用:
asyncio.to_thread()
:此方法简化了线程管理流程,特别适合处理大多数I/O密集型任务。它允许将阻塞调用委派给一个线程,从而避免阻塞主事件循环。loop.run_in_executor()
:此方法提供了更高的灵活性,支持使用自定义的执行器,比如线程池或进程池。这适用于需要精细控制执行环境的场景。
这两种方法均可有效地将阻塞调用转为异步任务,以下逐一分析这两种方式:
2.5.1 使用 asyncio.to_thread()
asyncio.to_thread()
是一个高级 API,适用于大多数应用场景。它会将指定的函数和参数提交到一个独立的线程中执行,并返回一个可等待的协程。这样,阻塞操作就可以在后台线程池中执行,而不会阻塞事件循环。需要注意的是,任务并不会立即执行,而是会等待事件循环空闲时再开始执行。由于 asyncio.to_thread()
会在后台创建一个 ThreadPoolExecutor
来处理阻塞任务,因此它特别适合 I/O 密集型的操作。示例代码如下:
import asyncio import time def blocking_task(task_id): # 模拟一个耗时的阻塞操作 time.sleep(2) return f"任务 {task_id} 完成" # 同步执行多个任务 def sync_main(): start_time = time.time() # 顺序执行多个阻塞任务 results = [blocking_task(i) for i in range(5)] end_time = time.time() for result in results: print(result) print(f"同步任务执行时间: {end_time - start_time:.4f} 秒") # 异步运行多个阻塞任务 async def async_main(): start_time = time.time() # 使用 asyncio.to_thread 来并发运行多个阻塞任务 tasks = [asyncio.to_thread(blocking_task, i) for i in range(5)] results = await asyncio.gather(*tasks) end_time = time.time() for result in results: print(result) print(f"异步任务执行时间: {end_time - start_time:.4f} 秒") # 执行同步任务 print("同步执行开始:") sync_main() # 执行异步任务 print("n异步执行开始:") asyncio.run(async_main())
以上代码展示了同步执行阻塞任务与异步执行阻塞任务的对比。通过使用asyncio.to_thread()
,I/O 密集型操作的处理被委托给独立的线程池,从而避免了阻塞事件循环,显著提升了异步任务的效率:
- 同步执行:在
sync_main()
中,多个阻塞任务按顺序逐一执行,每个任务需等待前一个任务完成后才能开始,整体执行时间为所有任务总时间(即 5 * 2 秒)。 - 异步执行:在
async_main()
中,多个阻塞任务并发执行。尽管每个任务仍然是阻塞的,但它们在后台线程中并行处理,因此总执行时间仅为单个任务的执行时间(即约 2 秒)。
代码运行结果如下:
同步执行开始: 任务 0 完成 任务 1 完成 任务 2 完成 任务 3 完成 任务 4 完成 同步任务执行时间: 10.0317 秒 异步执行开始: 任务 0 完成 任务 1 完成 任务 2 完成 任务 3 完成 任务 4 完成 异步任务执行时间: 2.0089 秒
2.5.2 使用 loop.run_in_executor()
loop.run_in_executor()
是asyncio
提供的低级API,需先获取事件循环(例如,使用asyncio.get_running_loop()
)。该函数允许指定执行器(默认是ThreadPoolExecutor
)以及要执行的函数。
与asyncio.to_thread()
相比,run_in_executor()
提供了更大的灵活性,支持使用自定义执行器,而不仅限于线程池。此外,调用该函数后,任务会立即开始执行,无需等待返回的可等待对象来触发任务的启动。
示例代码如下:
import asyncio import time # 定义一个需要执行的阻塞任务 def task(): print("任务开始") time.sleep(2) print("任务结束") # 在单独的线程中执行函数 async def main(): # 获取事件循环 loop = asyncio.get_running_loop() # 使用run_in_executor来将task函数异步执行在线程池中 # None 表示使用默认的线程池执行器 await loop.run_in_executor(None, task) # 执行主任务 asyncio.run(main())
如果希望使用进程池,可以创建一个自定义的执行器并传递给 run_in_executor()
。在这种情况下,调用者需要负责管理执行器的生命周期,使用完后要手动关闭。示例代码如下:
import asyncio from concurrent.futures import ProcessPoolExecutor import time # 定义一个耗时的任务 def task(name): print(f"任务 {name} 开始") time.sleep(2) # 模拟一个阻塞的操作 print(f"任务 {name} 完成") return f"来自 {name} 的结果" # 使用自定义的执行器来运行任务 async def main(): # 创建一个进程池 with ProcessPoolExecutor() as executor: # 获取当前的事件循环 loop = asyncio.get_running_loop() # 使用 run_in_executor 来在进程池中执行任务 results = await asyncio.gather( loop.run_in_executor(executor, task, "A"), loop.run_in_executor(executor, task, "B"), loop.run_in_executor(executor, task, "C") ) # 打印所有任务的结果 for result in results: print(result) # 启动 asyncio 事件循环并执行 main if __name__ == "__main__": asyncio.run(main())
2.6 Python协程:操作系统原生支持吗
异步编程和协程并不总是解决程序中所有并发问题的最佳方案。Python 中的协程是由软件管理的,它们通过asyncio事件循环来执行和调度。与操作系统提供的线程和进程不同,协程并不由操作系统直接支持,而是通过Python的软件框架来实现的。在这个意义上,Python中的协程并不是“原生”的。它们并不像线程或进程那样具有独立的执行上下文,反而是在同一个线程内通过协作式调度来切换任务。
此外,Python的GIL(全局解释器锁)用来保护解释器内部的状态,防止多个线程同时访问和修改解释器的数据。而asyncio的事件循环是单线程运行的,这意味着所有的协程都在同一个线程里执行。由于协程本身是通过事件循环调度的,而不是通过多线程或多进程并行执行,因此,尽管Python中的多线程模型受到GIL的限制,协程在处理 I/O 密集型任务时能够有效避免GIL的影响,从而提高并发性能。这也是为什么在处理大量I/O操作时,使用asyncio和协程能够带来较好的性能表现。
然而,协程并不适用于所有类型的并发任务。例如,对于计算密集型任务,使用线程或进程模型可能更为合适,因为协程并不会突破GIL的限制,计算密集型任务依然会在单个CPU核心上串行执行。因此,在选择是否使用协程时,需要根据任务的特性做出权衡。
3 应用实例
3.1 在基于线程的程序中调用asyncio代码
直接调用同步I/O代码
以下代码实现了一个简单的Tkinter应用,点击按钮后,程序会发起一个同步HTTP请求(GET 请求)。在每60毫秒的刷新周期中,程序会根据当前状态更新显示的文本。然而,当点击按钮时,request_remote
方法中的 requests.get
会发起一个同步请求,这会阻塞主线程,从而导致界面卡顿或无响应。如下代码,App.QUERYING_STATE
状态相关信息不会显示出来:
import tkinter as tk import requests class App(tk.Tk): INIT_STATE = 0 # 初始化状态 QUERYING_STATE = 1 # 请求中状态 RESULT_STATE = 2 # 请求结果状态 def __init__(self): super().__init__() self.status_code = 0 # HTTP请求返回的状态码 self._refresh_ms = 60 # 刷新间隔时间(毫秒) self.state = App.INIT_STATE # 初始状态 self._button = None # 按钮 self._label = None # 标签 self.render_elements() # 渲染界面元素 self.after(self._refresh_ms, self.refresh) # 设置定时刷新,定时调用refresh方法 def render_elements(self): """ 设置界面布局,渲染UI元素 """ self.geometry("400x200") # 设置窗口大小 self._button = tk.Button(self, text="请求状态码", command=self.request_remote) # 创建按钮,点击时调用request_remote方法 self._label = tk.Label(self, text="") # 创建标签,初始为空 self._button.pack() # 将按钮添加到窗口中 self._label.pack() # 将标签添加到窗口中 def request_remote(self): """ 发起同步HTTP请求 """ self.state = App.QUERYING_STATE # 设置状态为请求中 response = requests.get("https://www.example.com") # 发起GET请求,获取响应 self.status_code = response.status_code # 获取响应返回的状态码 self.state = App.RESULT_STATE # 设置状态为结果状态,表示请求已完成 def refresh(self): """ 每60毫秒刷新一次UI内容 """ self.update_label() # 更新标签内容 self.after(self._refresh_ms, self.refresh) # 设置下次刷新时间(每60毫秒刷新一次) def update_label(self): """ 根据应用状态更新标签内容 """ if self.state == App.INIT_STATE: self._label.config(text="这里将显示状态码。") # 初始状态下提示文字 elif self.state == App.QUERYING_STATE: self._label.config(text="正在查询远程...") # 请求中状态时显示提示文字 elif self.state == App.RESULT_STATE: self._label.config(text=f"返回的状态码是: {self.status_code}") # 请求结果状态时显示返回的状态码 def start(self): self.mainloop() # 启动Tkinter事件循环,进入GUI界面 def main(): app = App() # 创建应用实例 app.start() # 启动应用 if __name__ == "__main__": main()
I/O请求的异步调用
可以将requests包替换为aiohttp包,实现I/O请求的异步调用。aiohttp和requests都是Python中常用的HTTP客户端库,但requests适用于同步场景,简单易用,aiohttp则适用于异步并发的场景,能够处理大量并行请求。具体区别如下:
- 同步vs异步:
- requests是一个同步库,意味着每次发送请求时,程序会等待响应回来后才继续执行。适用于一些简单的、串行的HTTP请求场景。
- aiohttp是一个异步库,基于Python的asyncio模块,能够在发送HTTP请求时非阻塞地继续执行其他任务。适用于需要大量并发请求或长时间等待的异步场景。
- 性能:
- requests由于是同步的,处理大量请求时容易出现性能瓶颈,因为每个请求必须等待前一个请求完成。
- aiohttp通过异步I/O处理,可以在等待响应时同时发起其他请求,极大提高了并发性能,尤其在处理大量HTTP请求时。
- 用法:
- requests用法简单,适合初学者和一般同步的任务。
- aiohttp需要使用async和await,适合需要并发或异步操作的任务。
在上述示例代码中,为了替代requests模块的同步请求,可以创建一个继承自App
类的AppAsync
类,并利用aiohttp和asyncio库实现异步请求。通过async_request
方法异步发起HTTP请求:
import aiohttp import asyncio class AppAsync(App): async def async_request(self): """ 异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。 """ async with aiohttp.ClientSession() as session: # 创建一个aiohttp会话对象 async with session.get("https://www.example.com") as response: # 发起GET请求 self.status_code = response.status # 获取响应状态码 self.state = App.RESULT_STATE # 更新应用状态 def __int__(self): super().__init__() def request_remote(self): """ 使用asyncio.run来调用异步请求代码 """ self.state = self.QUERYING_STATE # 设置状态为请求中 asyncio.run(self.async_request()) # 异步发起HTTP请求 def main(): app = AppAsync() # 创建应用实例 app.start() # 启动应用 if __name__ == "__main__": main() # 运行主程序
然而AppAsync
类中的asyncio.run(self.async_request())
会阻塞Tkinter的主线程,因为asyncio.run()
会一直运行,直到异步任务完成。同时Tkinter自身有一个事件循环(mainloop()),与asyncio
需要的事件循环冲突。如果在Tkinter内创建新事件循环,可能会导致Tkinter关闭或中断后出现问题。
将asyncio与线程结合
为了解决asyncio事件循环阻塞的问题,可以使用一个单独的守护线程,并在守护线程中运行事件循环,这样asyncio的事件循环就不会阻塞主线程。重写AppAsync
类示例如下:
import aiohttp import asyncio import threading class AppAsync(App): def __init__(self): super().__init__() self._loop_thread = threading.Thread(target=self.run_asyncio_loop, daemon=True) self._loop_thread.start() # 启动事件循环线程 async def async_request(self): """ 异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。 """ async with aiohttp.ClientSession() as session: # 创建一个aiohttp会话对象 async with session.get("https://www.example.com") as response: # 发起GET请求 self.status_code = response.status # 获取响应状态码 self.state = App.RESULT_STATE # 更新应用状态 def request_remote(self): """ 使用异步请求,在事件循环中执行 """ self.state = App.QUERYING_STATE # 设置状态为请求中 asyncio.run_coroutine_threadsafe(self.async_request(), self._loop) # 调用异步请求并与当前事件循环进行交互 def run_asyncio_loop(self): """ 运行asyncio事件循环 """ self._loop = asyncio.new_event_loop() # 创建新的事件循环 asyncio.set_event_loop(self._loop) # 设置当前线程的事件循环 self._loop.run_forever() # 启动事件循环 def main(): app = AppAsync() # 创建应用实例 app.start() # 启动应用 if __name__ == "__main__": main() # 运行主程序
示例代码运行时,App.QUERYING_STATE
状态相关信息会显示出来,AppAsync
类主要的改动点如下:
AppAsync
类的构造函数:- 增加了一个新的线程来运行asyncio事件循环,避免在Tkinter线程中阻塞。
- 使用
threading.Thread
启动一个守护线程,执行run_asyncio_loop
方法,确保事件循环在后台运行。 - 在创建线程时设置为守护线程。这样即使主线程退出,守护线程也会自动结束。
run_asyncio_loop
方法:- 在一个单独的线程中启动新的asyncio事件循环。
- 使用
asyncio.set_event_loop
设置当前线程的事件循环,并调用loop.run_forever()
来保持事件循环持续运行。
request_remote
方法:- 使用
asyncio.run_coroutine_threadsafe
将异步请求任务提交给后台事件循环执行,用于在非主线程中安全地执行协程。
- 使用
3.2 基于asyncio实现多核异步处理
单核异步处理
asyncio的并发机制是基于协作式多任务(协程),它不会并行地使用多个CPU核心来加速计算,所有的任务都是在单个核心上轮流执行的。以下代码模拟了1000个爬虫任务,并使用单核异步来执行:
import random import asyncio import time # 模拟爬虫任务,执行时会有随机的延迟 async def fake_crawlers(): # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数 io_delay = round(random.uniform(0.2, 1.0), 2) await asyncio.sleep(io_delay) result = 0 # 随机生成100,000到500,000之间的数字,用于模拟计算密集型任务 # 这段代码耗时大约0.2秒到0.5秒之间 for i in range(random.randint(100000, 500000)): result += i return result # 主程序入口,负责创建并执行多个爬虫任务 async def main(): # time.monotonic()是用于测量时间间隔的可靠方法,它不受系统时间更改的影响 start = time.monotonic() tasks = [asyncio.create_task(fake_crawlers()) for i in range(1000)] # 模拟创建1000个任务 await asyncio.gather(*tasks) # 等待所有任务完成 # 输出所有任务完成的时间 print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒") # 启动程序 asyncio.run(main())
代码运行结果如下:
所有任务已完成,耗时 8.51 秒
多核异步处理
要实现多核异步处理,可以将异步编程和多进程池结合起来使用。具体来说,主程序会把任务分成多个批次,每个批次由不同的进程来处理。每个进程内部,多个任务又是通过异步方式并行执行的。这样一来,计算密集型的任务可以通过多进程并行处理,而每个进程内部的I/O操作则可以通过asyncio来异步管理,从而大幅提高整体效率。示例代码如下,代码将1000个任务分布到10个子进程中并行执行,每个子进程执行100个模拟的爬虫任务:
import random import asyncio # import time from concurrent.futures import ProcessPoolExecutor # 模拟爬虫任务,执行时会有随机的延迟 async def fake_crawlers(): # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数 io_delay = round(random.uniform(0.2, 1.0), 2) await asyncio.sleep(io_delay) result = 0 # 随机生成100,000到500,000之间的数字,用于模拟阻塞任务 # 这段代码耗时大约0.2秒到0.5秒之间 for i in range(random.randint(100000, 500000)): result += i return result # 并发查询任务,通过起始和结束索引分配任务 async def query_concurrently(begin_idx: int, end_idx: int): """ 启动并发任务,通过起始和结束序列号 """ tasks = [] # 根据给定的索引范围(从 begin_idx 到 end_idx),创建并发任务 for _ in range(begin_idx, end_idx, 1): tasks.append(asyncio.create_task(fake_crawlers())) # 等待所有任务完成,并返回每个任务的结果 results = await asyncio.gather(*tasks) return results # 批量任务执行函数,使用子进程池并行执行任务 def run_batch_tasks(batch_idx: int, step: int): """ 在子进程中执行批量任务 """ # 计算当前批次任务的起始和结束索引 begin = batch_idx * step + 1 # 当前批次任务的起始索引 end = begin + step # 当前批次任务的结束索引 # 使用 asyncio.run() 启动异步任务并获取结果 results = [result for result in asyncio.run(query_concurrently(begin, end))] return results # 主函数,分批次将任务分配到子进程中执行 async def main(): """ 将任务分批次分配到子进程中执行 """ start = time.monotonic() loop = asyncio.get_running_loop() # 获取当前运行的事件循环 # 创建进程池执行器,用于将任务分配到多个子进程中执行 with ProcessPoolExecutor() as executor: # 启动多个批次任务,并行执行。每个批次执行 100个任务,共启动10个批次 tasks = [loop.run_in_executor(executor, run_batch_tasks, batch_idx, 100) for batch_idx in range(10)] # 等待所有子进程任务完成,并将结果汇总 results = [result for sub_list in await asyncio.gather(*tasks) for result in sub_list] # 输出所有任务完成的时间 print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒") # 程序入口 if __name__ == "__main__": asyncio.run(main())
代码运行结果如下:
所有任务已完成,耗时 1.83 秒
3.3 图片下载器
若经常需要从互联网下载文件,可以使用aiohttp库来实现任务的自动化。下面提供了一个简单的脚本,用于从指定URL下载文件:
建立本地图片服务器
为了提供图片下载链接,以下代码展示了如何使用FastAPI框架创建一个简单的Web应用程序,用于上传、管理和访问图片:
import os from fastapi import FastAPI, File, UploadFile from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles import uvicorn app = FastAPI() # 配置图片存储目录 UPLOAD_DIR = "./uploaded_images" if not os.path.exists(UPLOAD_DIR): os.makedirs(UPLOAD_DIR) # 将图片目录挂载为静态文件目录 app.mount("/images", StaticFiles(directory=UPLOAD_DIR), name="images") # 上传图片接口 @app.post("/upload/") async def upload_image(file: UploadFile = File(...)): try: # 定义图片保存路径 file_path = os.path.join(UPLOAD_DIR, file.filename) # 保存图片到本地 with open(file_path, "wb") as f: f.write(file.file.read()) # 返回图片的访问 URL image_url = f"http://127.0.0.1:8000/images/{file.filename}" return {"image_url": image_url} except Exception as e: return {"error": str(e)} # 获取所有上传图片的链接 @app.get("/images_list/") async def list_images(): try: # 获取目录下的所有文件 files = os.listdir(UPLOAD_DIR) image_urls = [f"http://127.0.0.1:8000/images/{file}" for file in files if os.path.isfile(os.path.join(UPLOAD_DIR, file))] return {"image_urls": image_urls} except Exception as e: return {"error": str(e)} # 获取单个图片 @app.get("/image/{image_name}") async def get_image(image_name: str): try: file_path = os.path.join(UPLOAD_DIR, image_name) if os.path.exists(file_path): return FileResponse(file_path) else: return {"error": "Image not found"} except Exception as e: return {"error": str(e)} # 启动 FastAPI 服务器 if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8000)
该代码实现了一个图片上传和访问服务,包含以下三个主要接口:
- 服务器启动后,会监听本地地址
127.0.0.1
的8000端口。 - 客户端可以通过以下方式与服务器进行交互:
- 访问
http://127.0.0.1:8000/upload/
上传图片,并获取返回的图片 URL。 - 访问
http://127.0.0.1:8000/images_list/
查看所有已上传图片的 URL。 - 访问
http://127.0.0.1:8000/images/{image_name}
来查看特定图片。
- 访问
注意,所有上传和保存的图片都会保存在本地的uploaded_images
文件夹中。
图片下载
以下代码利用了aiohttp、asyncio和aiofiles库,通过异步方式从API获取图片URL列表,并将图片下载到指定目录。借助这些库的结合,代码能够高效地处理HTTP请求、文件下载和文件操作,同时确保主程序的执行不被阻塞:
import aiohttp # 导入 aiohttp 库,用于异步 HTTP 请求 import asyncio # 导入 asyncio 库,用于管理异步任务 import aiofiles # 导入 aiofiles 库,用于异步文件操作 import os # 获取图片 URL 列表的异步函数 async def get_image_urls(api_url): try: # 使用 aiohttp 启动一个异步 HTTP 会话 async with aiohttp.ClientSession() as session: # 异步发送 GET 请求以获取 API 返回的数据 async with session.get(api_url) as response: # 如果响应状态码是 200 (请求成功) if response.status == 200: # 将响应内容解析为 JSON 格式 data = await response.json() # 从 JSON 数据中提取图片 URL 列表,若没有则返回空列表 return data.get("image_urls", []) else: # 如果请求失败,打印错误信息 print(f"从 {api_url} 获取图片列表失败。状态码: {response.status}") return [] except Exception as e: # 如果发生任何异常,打印错误信息 print(f"获取图片列表时发生错误: {e}") return [] # 下载文件的异步函数 async def download_file(url, save_directory): try: # 使用 aiohttp 启动异步 HTTP 会话 async with aiohttp.ClientSession() as session: # 异步发送 GET 请求以获取文件内容 async with session.get(url) as response: # 如果响应状态码是 200 (请求成功) if response.status == 200: # 确保保存文件的目录存在,若不存在则创建 os.makedirs(save_directory, exist_ok=True) # 从 URL 中提取文件名 filename = os.path.join(save_directory, url.split('/')[-1]) # 异步打开文件以进行写入操作 async with aiofiles.open(filename, 'wb') as file: # 读取响应内容 content = await response.read() # 将内容写入本地文件 await file.write(content) print(f"已下载 {filename}") else: # 如果下载失败,打印错误信息 print(f"下载 {url} 失败。状态码: {response.status}") except Exception as e: # 如果发生任何异常,打印错误信息 print(f"下载 {url} 时发生错误: {e}") # 根据获取的图片 URL 列表进行下载的异步函数 async def download_images(api_url, save_directory): # 调用 get_image_urls 函数获取图片 URL 列表 image_urls = await get_image_urls(api_url) # 如果没有获取到图片 URL,则打印提示并返回 if not image_urls: print("没有找到需要下载的图片。") return # 为每个图片 URL 创建一个下载任务 tasks = [download_file(url, save_directory) for url in image_urls] # 使用 asyncio.gather 并行执行所有下载任务 await asyncio.gather(*tasks) # 启动事件循环,开始下载图片 if __name__ == "__main__": # API 地址,提供图片 URL 列表 api_url = "http://127.0.0.1:8000/images_list/" # 指定保存下载图片的目录 save_directory = "downloads" # 获取事件循环并运行下载任务 loop = asyncio.get_event_loop() loop.run_until_complete(download_images(api_url, save_directory))
3.4 生产者消费者模型
生产者-消费者模型(Producer-Consumer Model)是一种经典的并发编程模式,旨在解决多个任务之间生产和消费的协调问题,从而确保资源得到合理利用并保证数据按顺序处理。该模型通过生产者和消费者两个角色,模拟共享资源的生产和消费过程。以下代码实现了一个基本的生产者-消费者模型,采用了asyncio进行异步任务处理:
import asyncio from asyncio import Queue from typing import List # 生产者函数,负责将物品添加到队列 async def produce_items(queue: Queue, items: List[int], producer_name: str): for item in items: await queue.put(item) # 将物品放入队列 print(f"{producer_name} 添加物品:{item}") await asyncio.sleep(0.5) # 模拟生产过程中的等待时间 print(f"{producer_name} 完成所有物品的生产") # 消费者函数,负责从队列中取出并处理物品 async def consume_items(queue: Queue, consumer_name: str): while True: item = await queue.get() # 阻塞直到获取到一个物品 if item is None: # 使用None作为结束信号 queue.task_done() # 标记任务完成 break # 退出循环 print(f"{consumer_name} 处理物品:{item}") await asyncio.sleep(1) # 模拟处理物品的时间 queue.task_done() # 标记任务完成 # 主函数,负责启动多个生产者和消费者任务 async def main(): queue = Queue() # 创建一个队列 items_to_produce = ['A','B','C','D'] # 需要生产的物品列表 # 创建产者任务(例如3个生产者) producer_tasks = [ asyncio.create_task(produce_items(queue, items_to_produce, f"生产者_{i}")) for i in range(3) ] # 创建消费者任务(例如2个消费者) consumer_tasks = [ asyncio.create_task(consume_items(queue, f"消费者_{i}")) for i in range(2) ] # 等待所有生产者任务完成 await asyncio.gather(*producer_tasks) # 生产者完成后,发送 None 给消费者,通知它们退出 for _ in consumer_tasks: await queue.put(None) # 等待队列中的所有任务处理完成 await queue.join() # 等待所有消费者任务完成 await asyncio.gather(*consumer_tasks) if __name__ == '__main__': # 运行主函数 asyncio.run(main())