[python] asyncio库常见问题与实践案例

本文详细介绍了在使用asyncio库编写异步程序时常见的错误和问题,并进一步通过实践案例进行分析和讨论,以便在项目中更有效地应用asyncio库。有关asyncio库的详细介绍,可参考:Python 异步编程库 asyncio 使用指北

1 asyncio程序的常见错误

本节展示了在使用asyncio模块时,开发人员常遇到的一些常见错误示例。以下是四个最常见的异步编程错误:

  1. 直接调用并运行协程。
  2. 主协程过早退出。
  3. 错误使用asyncio的低级API。
  4. 程序出现竞争条件或死锁问题。

1.1 试图直接调用并运行协程

协程通常通过async def定义,如下所示:

# 自定义协程 async def custom_coro():     print('hi there') 

若直接像函数一样调用该协程,通常不会执行预期的操作,而是创建一个协程对象。这种调用方式不会触发协程的执行:

# 错误:像函数一样调用协程 custom_coro()  # 这只是创建了一个协程对象,并不会执行 

此时,返回的是一个协程对象,而不是立即执行协程主体,这忽略协程必须在事件循环中运行。如果协程未被执行,系统将发出以下运行时警告:

sys:1: RuntimeWarning: coroutine 'custom_coro' was never awaited 

要正确执行协程,需要在asyncio事件循环中等待该对象。例如,使用asyncio.run()启动事件循环来执行协程:

# 正确:通过 asyncio.run() 运行协程 import asyncio  asyncio.run(custom_coro())  

另一种执行协程方法是通过await表达式在现有协程中挂起并调度其他协程。例如,定义一个新的协程,在其中调用 custom_coro()

# 正确:在协程中使用 await 调度另一个协程 async def main():     await custom_coro()   # 使用 asyncio.run 启动事件循环 asyncio.run(main())  

1.2 主协程过早退出

在异步编程中,任务的执行可能无法按预期及时完成。通过asyncio.create_task()可以并行运行多个协程,但如果主协程提前退出,这些任务可能会被强制中止。为确保所有任务能够在主协程退出前完成,主协程应在无其他活动时显式等待剩余任务的完成。可以使用asyncio.all_tasks()来获取当前事件循环中的所有任务,并在移除主协程本身后,通过asyncio.wait()等待其他任务的执行结果。如果不移除当前协程,asyncio.wait会等待所有任务完成,包括当前协程,从而导致程序不退出(死锁)。示例如下:

import asyncio  async def task_1():     print("任务 1 开始")     await asyncio.sleep(2)     print("任务 1 完成")  async def task_2():     print("任务 2 开始")     await asyncio.sleep(1)     print("任务 2 完成")  async def main():     # 创建多个任务     task1 = asyncio.create_task(task_1())     task2 = asyncio.create_task(task_2())          # 获取所有正在运行的任务的集合     all_tasks = asyncio.all_tasks()          # 获取当前任务(即主协程)     current_task = asyncio.current_task()          # 从所有任务列表中删除当前任务     all_tasks.remove(current_task)          # 暂停直到所有任务完成     await asyncio.wait(all_tasks)  # 运行主协程 asyncio.run(main()) 

代码运行结果为:

任务 1 开始 任务 2 开始 任务 2 完成 任务 1 完成 

1.3 错误使用asyncio的低级API

asyncio提供了两类API:一类是面向应用程序开发者的高级API,另一类是面向框架开发者的低级API。低级API主要为高级API提供底层支持,如事件循环、传输协议等内部结构。在大多数情况下,推荐优先使用高级API,特别是在学习阶段。只有在需要实现特定功能时,才应考虑使用低级API。尽管学习低级API具有一定的价值,但不应在刚开始时就使用。建议先通过高级API熟悉异步编程的基本概念,进行应用开发,掌握核心知识后,再深入探讨技术细节。例如:

import asyncio  # 高级API:推荐的用法 async def hello_world():     print("你好,世界!")  # 使用 asyncio.run 来启动事件循环 def run_hello_world():     asyncio.run(hello_world())  # 低级API:不推荐直接使用 async def low_level_example():     loop = asyncio.get_event_loop()  # 获取当前事件循环     task = loop.create_task(hello_world())  # 创建任务     await task  # 显式等待任务完成  # 运行高级 API 示例 print("使用 asyncio.run 运行:") run_hello_world()  # 运行低级 API 示例 print("n使用低级 API 运行:") asyncio.run(low_level_example()) 

1.4 程序出现竞争条件或死锁问题

竞争条件和死锁是并发编程中常见的错误。竞争条件发生在多个任务同时访问相同资源时,缺乏适当的控制可能导致数据错误或丢失。死锁则是指不同任务互相等待对方释放资源,最终导致所有任务无法继续执行。

许多Python开发者认为,使用asyncio协程可以避免这些问题,因为在任何时刻,事件循环中只有一个协程在执行。然而,协程在运行过程中可能会暂停和恢复,并且可能会访问共享资源。如果对这些资源没有适当的保护,就可能会引发竞争条件。此外,在协程同步资源时处理不当,也有可能导致死锁。因此,在编写asyncio程序时,确保协程的安全性至关重要。

1.4.1 竞争条件问题

以下示例代码模拟了两个异步任务并行增加共享变量counter,每个任务循环10000次对counter进行递增操作。通过awaitasyncio.sleep(0)来模拟上下文切换,确保两个任务能够交替执行。然而,由于未使用同步机制(如锁),会导致竞态条件。因此,最终的counter值可能小于预期的20000,而不是20000,因为两个任务可能在读取和更新counter的值时发生冲突,导致多个协程可能重复更新相同的数据:

import asyncio  # 共享资源 counter = 0  async def increment():     global counter     for _ in range(10000):         temp = counter         temp += 1         await asyncio.sleep(0)  # 让出控制权,模拟上下文切换         counter = temp  async def main():     tasks = [increment(), increment()]     await asyncio.gather(*tasks)     print("最终计数器的值:", counter)    # 运行 asyncio 程序 asyncio.run(main()) 

代码运行结果为:

最终计数器的值: 10000 

为了解决这个问题,可以使用 asyncio.Lock 来同步对共享资源 counter 的访问。然而,由于asyncio.Lockasyncio.run之间的事件循环可能不匹配,通常会在某些环境中(如特定的 IDE 或脚本运行环境)出现问题。原因在于asyncio.run 创建并管理一个新的事件循环,而锁 (asyncio.Lock) 可能会被不同的事件循环使用,从而导致不一致。为避免这种情况,可以显式创建并使用一个事件循环,如下所示:

import asyncio  # 共享资源 counter = 0 # 创建锁 lock = asyncio.Lock()  async def increment():     global counter     for _ in range(10000):         async with lock:  # 确保在修改 counter 时,只有一个任务可以访问             temp = counter             temp += 1             await asyncio.sleep(0)  # 让出控制权,模拟上下文切换             counter = temp  async def main():     tasks = [increment(), increment()]     await asyncio.gather(*tasks)     print("最终计数器的值:", counter)  # 显式创建事件循环并运行 loop = asyncio.get_event_loop() loop.run_until_complete(main()) 

代码运行结果为:

最终计数器的值: 20000 

1.4.2 死锁问题

死锁介绍

死锁(Deadlock)是并发编程中的一种常见问题,它发生在多个任务之间的资源争用中,导致所有任务都陷入无法继续执行的僵局。即使在Python中使用asyncio协程框架,资源竞争和同步问题也可能导致死锁的发生,尤其是在协程需要同步资源(如锁)时。如果同步机制设计不当,容易引发死锁。

死锁的特征如下:

  • 循环等待:多个任务之间相互等待对方释放资源,从而形成一个循环等待的关系。例如,任务1等待任务2释放资源,而任务2又在等待任务1释放资源,形成闭环。
  • 不可抢占:每个任务持有的资源(如锁)不能被其他任务强制抢占。只有在任务主动释放资源时,其他任务才能获取该资源。
  • 持有资源且等待:任务持有某些资源(如锁),同时又在等待其他资源的释放。由于任务在持有资源的情况下无法继续执行,导致系统中的任务无法前进。

以下代码中的死锁是典型的循环等待问题,所有相关任务陷入相互等待的死循环,无法继续执行:

import asyncio  # 创建两个共享锁 lock1 = asyncio.Lock() lock2 = asyncio.Lock()  async def task1():     print("任务1:尝试获取锁1")     await lock1.acquire()  # 获取锁1     print("任务1:已获取锁1,尝试获取锁2")     await asyncio.sleep(1)  # 模拟一些操作     await lock2.acquire()  # 获取锁2     print("任务1:已获取锁2")          # 释放锁     lock1.release()     lock2.release()  async def task2():     print("任务2:尝试获取锁2")     await lock2.acquire()  # 获取锁2     print("任务2:已获取锁2,尝试获取锁1")     await asyncio.sleep(1)  # 模拟一些操作     await lock1.acquire()  # 获取锁1     print("任务2:已获取锁1")          # 释放锁     lock1.release()     lock2.release()  async def main():     # 启动两个任务     await asyncio.gather(task1(), task2())  # 创建事件循环并运行 loop = asyncio.get_event_loop() loop.run_until_complete(main()) 

代码运行结果如下,由于两个任务都被挂起,程序无法退出,且永远不会打印出"任务1:已获取锁2"或"任务2:已获取锁1":

任务1:尝试获取锁1 任务1:已获取锁1,尝试获取锁2 任务2:尝试获取锁2 任务2:已获取锁2,尝试获取锁1 ... 

asyncio中死锁的避免

在使用asyncio时,为了避免死锁,可以采取以下几种方法:

  1. 锁的顺序管理:确保所有任务按照相同的顺序获取锁,以防止发生相互等待的情况。
  2. 尝试获取锁:使用asyncio.Lockacquire方法并设置超时时间,避免任务长时间处于等待锁的状态。
  3. 使用async with:通过async with语句来管理锁,这样可以确保在任务完成后自动释放锁,避免因忘记释放锁而引发问题。

根据这一思路,前面死锁的案例解决示例代码如下:

import asyncio  # 创建两个共享锁 lock1 = asyncio.Lock() lock2 = asyncio.Lock()  async def task1():     print("任务1:尝试获取锁1")     async with lock1:  # 使用async with获取锁,自动释放         print("任务1:已获取锁1,尝试获取锁2")         await asyncio.sleep(1)  # 模拟一些操作                  print("任务1:尝试获取锁2")         async with lock2:  # 使用async with获取锁,自动释放             print("任务1:已获取锁2")  async def task2():     print("任务2:尝试获取锁1")     async with lock1:  # 使用async with获取锁,自动释放         print("任务2:已获取锁1,尝试获取锁2")         await asyncio.sleep(1)  # 模拟一些操作                  print("任务2:尝试获取锁2")         async with lock2:  # 使用async with获取锁,自动释放             print("任务2:已获取锁2")  async def main():     # 启动两个任务     await asyncio.gather(task1(), task2())  # 创建事件循环并运行 loop = asyncio.get_event_loop() loop.run_until_complete(main())  

代码运行结果如下,可以看到两个任务避免了死锁:

任务1:尝试获取锁1 任务1:已获取锁1,尝试获取锁2 任务2:尝试获取锁1 任务1:尝试获取锁2 任务1:已获取锁2 任务2:已获取锁1,尝试获取锁2 任务2:尝试获取锁2 任务2:已获取锁2 

2 asyncio程序的常见问题

在使用asyncio编写异步程序时,开发者可能会遇到一系列常见问题,这些问题涉及到任务的管理、执行流程、性能优化等多个方面。以下是一些常见的问题和挑战:

  1. 任务的等待、停止、结果获取
  2. 如何在后台运行和等待任务
  3. 任务的延迟后运行和后续运行
  4. 如何显示运行任务的进度
  5. 如何在asyncio中执行阻塞I/O或CPU密集型函数
  6. Python协程:操作系统原生支持吗

2.1 任务的等待、停止、结果获取

2.1.1 如何等待任务

可以通过直接等待asyncio.Task对象来等待任务的完成:

# 等待任务完成 await task 

也同时创建并等待任务完成。例如:

# 创建并等待任务完成 await asyncio.create_task(custom_coro()) 

与协程不同,任务可以多次等待而不会引发错误。以下是一个演示如何多次等待同一任务的示例,在此例中,await task两次都能成功执行,因为task已经完成并保存了返回值:

import asyncio  async def other_coro():     await asyncio.sleep(1)     return "任务完成"  async def main():     # 将协程包装在任务中并安排其执行     task = asyncio.create_task(other_coro())          # 第一次等待任务并获取返回值     value1 = await task     print(value1)          # 再次等待任务(任务已经完成)     value2 = await task     print(value2)  # 运行主协程 asyncio.run(main()) 

2.1.2 何时停止任务

可以通过asyncio.Task对象的cancel()方法取消任务。若任务被成功取消,cancel()方法返回True,否则返回False。例如:

# 取消任务 was_cancelled = task.cancel() 

2.1.3 如何获取任务的返回值

在Python中创建一个asyncio任务后,有两种方法可以从 asyncio.Task 中检索返回值:

  1. 等待任务(使用 await)。
  2. 调用 result() 方法。

基于await函数,等待任务时,调用者会挂起,直到任务完成并返回结果。如果任务已完成,返回值会立即提供。以下代码展示了如何等待任务并获取其返回值:

import asyncio  async def other_coro():     await asyncio.sleep(1)     return "任务完成"  async def main():     # 将协程包装在任务中并安排其执行     task = asyncio.create_task(other_coro())          # 等待任务完成并获取返回值     value = await task     print(value)  # 运行主协程 asyncio.run(main()) 

也可以通过调用 asyncio.Task 对象的 result() 方法获取任务的返回值。此时要求任务已完成。如果任务未完成,调用 result() 会引发 InvalidStateError 异常。如果任务被取消,则会引发 CancelledError 异常。以下是一个使用 result() 方法的例子:

import asyncio  async def other_coro():     await asyncio.sleep(1)     return "任务完成"  async def main():     task = asyncio.create_task(other_coro())          # 等待任务完成     await task          try:         # 获取任务的返回值         value = task.result()         print(value)     except asyncio.InvalidStateError:         print("任务尚未完成")     except asyncio.CancelledError:         print("任务已取消")  # 运行主协程 asyncio.run(main()) 

2.2 如何在后台运行和等待任务

2.2.1 如何在后台运行任务

通过 asyncio.create_task()可以将协程封装为Task对象,并在后台执行。创建的任务对象会立即返回,且不会阻塞调用者的执行。为了确保任务能够开始执行,可以使用 await asyncio.sleep(0) 暂停片刻。之所以使用 await asyncio.sleep(0),是因为新创建的任务并不会立刻开始执行。事件循环负责管理多个任务,它会根据调度策略决定哪个任务优先执行。通过 await asyncio.sleep(0) 暂时让出执行权,使得事件循环有机会调度并执行刚刚创建的任务。这样,await asyncio.sleep(0) 确保了任务在创建后能尽早开始执行,同时不会阻塞主协程的其他操作。示例代码如下:

import asyncio  async def other_coroutine():     print("开始执行 other_coroutine")     await asyncio.sleep(2)     print("other_coroutine 执行完毕")  async def main():     # 创建并调度任务     task = asyncio.create_task(other_coroutine())          # 暂停片刻以确保任务开始执行     await asyncio.sleep(0)          print("主协程正在执行")          # 等待任务完成     await task     print("任务执行完毕")  # 运行主协程 asyncio.run(main()) 

此外,后台任务可以在程序运行时执行,不会妨碍主程序的结束。如果主程序没有其他待执行的任务,而后台任务仍在进行中,那么需要确保程序在后台任务完成后才会完全退出。

2.2.2 如何等待所有后台任务

在使用asyncio时,可能需要等待多个独立的任务完成。比如,当多个任务同时运行时,有时想要等待所有任务完成,但又不想一直阻塞当前正在运行的任务。为了实现这个功能,可以通过以下步骤:

  1. 获取所有当前任务:使用asyncio.all_tasks()可以获取到当前事件循环中的所有任务。
  2. 排除当前任务:通过asyncio.current_task()获取当前正在运行的任务,并将其从任务集合中移除。这样可以避免等待当前任务自己。
  3. 等待所有剩余任务完成:使用asyncio.wait()来等待所有任务完成,直到它们都执行完毕。

示例代码如下:

import asyncio  async def example_coroutine(name):     # 这是一个模拟任务的协程,睡眠 1 秒钟     await asyncio.sleep(1)     print(f"任务 {name} 完成。")  async def main():     # 创建多个协程任务     tasks = [asyncio.create_task(example_coroutine(name = str(i))) for i in range(5)]          # 获取所有正在运行的任务     all_tasks = asyncio.all_tasks()          # 获取当前正在运行的任务(即 main 协程)     current_task = asyncio.current_task()          # 从任务集合中移除当前任务     all_tasks.remove(current_task)          # 等待所有其他任务完成     await asyncio.wait(all_tasks)  # 启动事件循环并执行主协程 asyncio.run(main()) 

2.3 任务的延迟后运行和后续运行

2.3.1 任务的延迟后运行

想要实现任务的延迟后运行,可以通过开发一个自定义的包装协程,使其在延迟指定时间后执行目标协程。该包装协程接受两个参数:目标协程和延迟时间(单位为秒)。它会先休眠指定的延迟时间,然后执行传入的目标协程。

以下代码展示了如何通过自定义包装协程 delay,在指定的延迟时间后执行目标协程。delay 协程通过 asyncio.sleep() 实现延时,随后再执行传入的目标协程。可以在不同场景中使用该方法,如直接挂起协程或将任务安排为独立执行:

import asyncio  # 延迟几秒后启动另一个协程的包装协程 async def delay(coro, seconds):     """     延迟指定时间(秒)后执行目标协程。      参数:     coro: 要执行的目标协程     seconds: 延迟时间,单位为秒     """     # 暂停指定时间(以秒为单位)     await asyncio.sleep(seconds)     # 执行目标协程     await coro  # 示例目标协程 async def my_coroutine():     print("目标协程开始执行")     # 模拟一些工作     await asyncio.sleep(2)     print("目标协程执行完成")  # 使用包装协程时,可以创建协程对象并直接等待,或将其作为任务独立执行  # 1. 调用者可以挂起并调度延迟后的协程 async def main():     print("延迟10秒后执行目标协程:")     await delay(my_coroutine(), 10)     print("目标协程已经完成执行")  # 2. 或者调用者可以安排延迟协程独立运行 async def schedule_task():     print("将目标协程安排为独立任务,延迟10秒后执行")     task = asyncio.create_task(delay(my_coroutine(), 10))     await task  # 等待任务完成     print("任务已完成")  # 运行示例 if __name__ == "__main__":     asyncio.run(main())  # 运行主协程      # 或者运行独立任务的调度     # asyncio.run(schedule_task()) 

2.3.2 任务的后续运行

在asyncio中,触发后续任务的方式主要有三种:

  1. 通过已完成的任务本身调度后续任务
  2. 通过任务发起方调度后续任务
  3. 使用回调函数自动调度后续任务

逐一分析这三种方式:

1. 通过已完成的任务本身调度后续任务

已完成的任务可以触发后续任务的调度,通常依赖于某些状态检查来决定是否应该发起后续任务。任务调度可以通过asyncio.create_task()来完成。示例代码展示了运行指定任务后直接调度后续任务:

import asyncio  async def task():     print("任务开始执行。")     await asyncio.sleep(2)  # 模拟任务执行     print("任务执行完成。")     await followup_task()  # 在任务完成后直接调度后续任务  async def followup_task():     print("正在执行后续任务。")     await asyncio.sleep(2)  # 模拟后续任务执行     print("后续任务执行完成。")  # 启动事件循环,执行任务 async def main():     await task()  asyncio.run(main()) 

2. 通过任务发起方调度后续任务

任务发起方可以根据实际需要决定是否继续启动后续任务。在启动第一个任务时,可以保留 asyncio.Task 对象,通过检查任务的结果或状态,来判断是否启动后续任务。任务发起方还可以选择等待后续任务完成,也可以选择不等待。示例代码如下:

import asyncio  async def task():     # 模拟一个任务     await asyncio.sleep(1)     return True  # 假设任务成功完成,返回True  async def followup_task():     # 模拟后续任务     await asyncio.sleep(1)     print("后续任务执行")  async def main():     # 发起并等待第一个任务     task_1 = asyncio.create_task(task())          # 等待第一个任务完成     result = await task_1          # 检查任务结果     if result:         # 发起后续任务         await followup_task()  # 运行主程序 asyncio.run(main()) 

3. 使用回调函数自动调度后续任务

在任务发起时,可以为其注册一个回调函数。该回调函数会在任务完成后自动执行。回调函数接收一个 asyncio.Task 对象作为参数,但它不会等待后续任务的执行。因为回调函数通常是普通的Python函数,无法进行异步操作。示例代码:

import asyncio  # 定义回调函数 def callback(task):     # 安排并启动后续任务     # 注意:这里不能直接使用 await,需通过 create_task 调度异步任务     asyncio.create_task(followup())  # 定义第一个异步任务 async def work():     print("工作任务正在执行...")     await asyncio.sleep(2)  # 模拟一些异步操作     print("工作任务完成!")  # 定义后续异步任务 async def followup():     print("后续任务正在执行...")     await asyncio.sleep(1)  # 模拟一些异步操作     print("后续任务完成!")  # 创建事件循环并运行任务 async def main():     # 发起任务并注册回调函数     task = asyncio.create_task(work())     task.add_done_callback(callback)      # 等待任务完成     await task     # 确保后续任务完成     await asyncio.sleep(1)  # 等待回调任务完成的时间  # 执行事件循环 asyncio.run(main()) 

2.4 如何显示运行任务的进度

2.4.1 基于回调函数的任务进度显示

每个任务的回调函数可用于显示进度。asyncio.Task 对象支持注册回调函数,这些函数会在任务完成时被调用,无论是正常完成还是以异常结束。回调函数是普通函数而非协程,且接受与其关联的 asyncio.Task 对象作为参数。通过为所有任务注册相同的回调函数,可以统一报告任务进度:

import asyncio  # 回调函数,用于显示任务完成的进度,区分任务 def progress(task):     task_name = task.get_name()  # 获取任务的名称     print(f"任务 {task_name} 完成。")    async def example_task(n, task_name):     """模拟一个异步任务,表示处理n秒的任务,并设置任务名称"""     await asyncio.sleep(n)     return task_name  async def main():     # 定义多个异步任务并添加回调函数     tasks = []     for i in range(1, 6):         task_name = f"Task-{i}"  # 为每个任务分配一个唯一名称         task = asyncio.create_task(example_task(i, task_name))  # 创建任务,模拟不同的执行时间         task.set_name(task_name)  # 设置任务名称         # 为任务添加回调函数,回调函数会在相应任务执行完毕时被调用         task.add_done_callback(progress)           tasks.append(task)      # 等待所有任务完成     await asyncio.gather(*tasks)  # 运行主程序 asyncio.run(main()) 

2.4.2 基于tqdm库的任务进度显示

使用tqdm库显示任务总体进度

以下代码演示了如何结合tqdm库和asyncio库,来展示异步任务的总体执行进度:

import asyncio from tqdm.asyncio import tqdm  async def example_task(n, task_name):     """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""     await asyncio.sleep(n)  # 模拟任务处理时间     return task_name  # 返回任务名称  async def main():     # 定义多个异步任务并使用 tqdm 显示进度     tasks = []     total_tasks = 5  # 总任务数     task_durations = [1, 2, 3, 4, 5]  # 每个任务的持续时间(秒)      # 使用 tqdm 创建进度条,`total` 为任务的数量     progress_bar = tqdm(total=total_tasks, desc="已完成任务数", ncols=100)      # 创建任务     for i, n in enumerate(task_durations):         task_name = f"Task-{i+1}"  # 为每个任务分配一个唯一名称         task = asyncio.create_task(example_task(n, task_name))  # 创建任务,模拟不同的执行时间         tasks.append(task)      # 等待任务完成并更新进度条     for task in asyncio.as_completed(tasks):         await task  # 等待每个任务完成         progress_bar.update(1)  # 每完成一个任务,更新进度条      progress_bar.close()  # 关闭进度条  # 运行主程序 asyncio.run(main()) 

使用tqdm库为多个任务设置单独进度条

以下示例代码演示了如何使用asyncio并行执行多个异步任务,同时通过tqdm库为每个任务单独显示进度条:

import asyncio from tqdm.asyncio import tqdm   async def example_task(n, task_name, progress_bar):     """模拟一个异步任务,表示处理 n 秒的任务,并设置任务名称"""     for _ in range(n):  # 每秒更新一次进度         await asyncio.sleep(1)  # 模拟任务处理时间         progress_bar.update(1)  # 更新当前任务的进度     return task_name  # 返回任务名称  async def main():     # 定义多个异步任务并使用 tqdm 显示进度     tasks = []     total_tasks = 5  # 总任务数     task_durations = [1, 2, 3, 4, 5]  # 每个任务的持续时间(秒)      # 创建进度条并为每个任务单独设置     progress_bars = []     for i, n in enumerate(task_durations):         task_name = f"Task-{i+1}"  # 为每个任务分配一个唯一名称         progress_bar = tqdm(total=n, desc=task_name, ncols=100, position=i)  # 创建任务对应的进度条         progress_bars.append(progress_bar)         task = asyncio.create_task(example_task(n, task_name, progress_bar))  # 创建任务         tasks.append(task)      # 等待任务完成     await asyncio.gather(*tasks)  # 使用 asyncio.gather 同时等待所有任务完成      # 关闭所有进度条     for progress_bar in progress_bars:         progress_bar.close()  # 运行主程序 asyncio.run(main()) 

2.5 如何在asyncio中执行阻塞I/O或CPU密集型函数

在编程中,“阻塞调用”指的是某些操作(例如读取文件、等待网络请求或执行数据库查询等)需要一定时间才能完成。在执行这些操作时,程序会暂停,无法继续处理其他任务,这就是“阻塞”。另外,CPU密集型操作也可能会导致程序阻塞。因此,为了在异步环境中仍然能够处理阻塞调用,asyncio模块提供了两种方法来在异步程序中执行阻塞调用:

  • asyncio.to_thread() :此方法简化了线程管理流程,特别适合处理大多数I/O密集型任务。它允许将阻塞调用委派给一个线程,从而避免阻塞主事件循环。
  • loop.run_in_executor() :此方法提供了更高的灵活性,支持使用自定义的执行器,比如线程池或进程池。这适用于需要精细控制执行环境的场景。

这两种方法均可有效地将阻塞调用转为异步任务,以下逐一分析这两种方式:

2.5.1 使用 asyncio.to_thread()

asyncio.to_thread() 是一个高级 API,适用于大多数应用场景。它会将指定的函数和参数提交到一个独立的线程中执行,并返回一个可等待的协程。这样,阻塞操作就可以在后台线程池中执行,而不会阻塞事件循环。需要注意的是,任务并不会立即执行,而是会等待事件循环空闲时再开始执行。由于 asyncio.to_thread() 会在后台创建一个 ThreadPoolExecutor 来处理阻塞任务,因此它特别适合 I/O 密集型的操作。示例代码如下:

import asyncio import time  def blocking_task(task_id):     # 模拟一个耗时的阻塞操作     time.sleep(2)     return f"任务 {task_id} 完成"  # 同步执行多个任务 def sync_main():     start_time = time.time()          # 顺序执行多个阻塞任务     results = [blocking_task(i) for i in range(5)]          end_time = time.time()          for result in results:         print(result)          print(f"同步任务执行时间: {end_time - start_time:.4f} 秒")  # 异步运行多个阻塞任务 async def async_main():     start_time = time.time()          # 使用 asyncio.to_thread 来并发运行多个阻塞任务     tasks = [asyncio.to_thread(blocking_task, i) for i in range(5)]     results = await asyncio.gather(*tasks)          end_time = time.time()          for result in results:         print(result)          print(f"异步任务执行时间: {end_time - start_time:.4f} 秒")  # 执行同步任务 print("同步执行开始:") sync_main()  # 执行异步任务 print("n异步执行开始:") asyncio.run(async_main()) 

以上代码展示了同步执行阻塞任务与异步执行阻塞任务的对比。通过使用asyncio.to_thread(),I/O 密集型操作的处理被委托给独立的线程池,从而避免了阻塞事件循环,显著提升了异步任务的效率:

  • 同步执行:在 sync_main() 中,多个阻塞任务按顺序逐一执行,每个任务需等待前一个任务完成后才能开始,整体执行时间为所有任务总时间(即 5 * 2 秒)。
  • 异步执行:在 async_main() 中,多个阻塞任务并发执行。尽管每个任务仍然是阻塞的,但它们在后台线程中并行处理,因此总执行时间仅为单个任务的执行时间(即约 2 秒)。

代码运行结果如下:

同步执行开始: 任务 0 完成 任务 1 完成 任务 2 完成 任务 3 完成 任务 4 完成 同步任务执行时间: 10.0317 秒  异步执行开始: 任务 0 完成 任务 1 完成 任务 2 完成 任务 3 完成 任务 4 完成 异步任务执行时间: 2.0089 秒 

2.5.2 使用 loop.run_in_executor()

loop.run_in_executor()asyncio提供的低级API,需先获取事件循环(例如,使用asyncio.get_running_loop())。该函数允许指定执行器(默认是ThreadPoolExecutor)以及要执行的函数。

asyncio.to_thread()相比,run_in_executor()提供了更大的灵活性,支持使用自定义执行器,而不仅限于线程池。此外,调用该函数后,任务会立即开始执行,无需等待返回的可等待对象来触发任务的启动。

示例代码如下:

import asyncio import time  # 定义一个需要执行的阻塞任务 def task():     print("任务开始")     time.sleep(2)     print("任务结束")   # 在单独的线程中执行函数 async def main():     # 获取事件循环     loop = asyncio.get_running_loop()     # 使用run_in_executor来将task函数异步执行在线程池中     # None 表示使用默认的线程池执行器     await loop.run_in_executor(None, task)  # 执行主任务 asyncio.run(main()) 

如果希望使用进程池,可以创建一个自定义的执行器并传递给 run_in_executor()。在这种情况下,调用者需要负责管理执行器的生命周期,使用完后要手动关闭。示例代码如下:

import asyncio from concurrent.futures import ProcessPoolExecutor import time  # 定义一个耗时的任务 def task(name):     print(f"任务 {name} 开始")     time.sleep(2)  # 模拟一个阻塞的操作     print(f"任务 {name} 完成")     return f"来自 {name} 的结果"  # 使用自定义的执行器来运行任务 async def main():     # 创建一个进程池     with ProcessPoolExecutor() as executor:         # 获取当前的事件循环         loop = asyncio.get_running_loop()          # 使用 run_in_executor 来在进程池中执行任务         results = await asyncio.gather(             loop.run_in_executor(executor, task, "A"),             loop.run_in_executor(executor, task, "B"),             loop.run_in_executor(executor, task, "C")         )          # 打印所有任务的结果         for result in results:             print(result)  # 启动 asyncio 事件循环并执行 main if __name__ == "__main__":     asyncio.run(main()) 

2.6 Python协程:操作系统原生支持吗

异步编程和协程并不总是解决程序中所有并发问题的最佳方案。Python 中的协程是由软件管理的,它们通过asyncio事件循环来执行和调度。与操作系统提供的线程和进程不同,协程并不由操作系统直接支持,而是通过Python的软件框架来实现的。在这个意义上,Python中的协程并不是“原生”的。它们并不像线程或进程那样具有独立的执行上下文,反而是在同一个线程内通过协作式调度来切换任务。

此外,Python的GIL(全局解释器锁)用来保护解释器内部的状态,防止多个线程同时访问和修改解释器的数据。而asyncio的事件循环是单线程运行的,这意味着所有的协程都在同一个线程里执行。由于协程本身是通过事件循环调度的,而不是通过多线程或多进程并行执行,因此,尽管Python中的多线程模型受到GIL的限制,协程在处理 I/O 密集型任务时能够有效避免GIL的影响,从而提高并发性能。这也是为什么在处理大量I/O操作时,使用asyncio和协程能够带来较好的性能表现。

然而,协程并不适用于所有类型的并发任务。例如,对于计算密集型任务,使用线程或进程模型可能更为合适,因为协程并不会突破GIL的限制,计算密集型任务依然会在单个CPU核心上串行执行。因此,在选择是否使用协程时,需要根据任务的特性做出权衡。

[python] asyncio库常见问题与实践案例

3 应用实例

3.1 在基于线程的程序中调用asyncio代码

直接调用同步I/O代码

以下代码实现了一个简单的Tkinter应用,点击按钮后,程序会发起一个同步HTTP请求(GET 请求)。在每60毫秒的刷新周期中,程序会根据当前状态更新显示的文本。然而,当点击按钮时,request_remote方法中的 requests.get会发起一个同步请求,这会阻塞主线程,从而导致界面卡顿或无响应。如下代码,App.QUERYING_STATE状态相关信息不会显示出来:

import tkinter as tk import requests   class App(tk.Tk):     INIT_STATE = 0         # 初始化状态     QUERYING_STATE = 1     # 请求中状态     RESULT_STATE = 2       # 请求结果状态      def __init__(self):         super().__init__()         self.status_code = 0           # HTTP请求返回的状态码         self._refresh_ms = 60          # 刷新间隔时间(毫秒)         self.state = App.INIT_STATE    # 初始状态         self._button = None            # 按钮         self._label = None             # 标签         self.render_elements()         # 渲染界面元素         self.after(self._refresh_ms, self.refresh)  # 设置定时刷新,定时调用refresh方法      def render_elements(self):         """ 设置界面布局,渲染UI元素 """         self.geometry("400x200")  # 设置窗口大小         self._button = tk.Button(self, text="请求状态码", command=self.request_remote)  # 创建按钮,点击时调用request_remote方法         self._label = tk.Label(self, text="")  # 创建标签,初始为空          self._button.pack()  # 将按钮添加到窗口中         self._label.pack()   # 将标签添加到窗口中      def request_remote(self):         """ 发起同步HTTP请求 """         self.state = App.QUERYING_STATE  # 设置状态为请求中         response = requests.get("https://www.example.com")  # 发起GET请求,获取响应         self.status_code = response.status_code  # 获取响应返回的状态码         self.state = App.RESULT_STATE  # 设置状态为结果状态,表示请求已完成      def refresh(self):         """ 每60毫秒刷新一次UI内容 """         self.update_label()  # 更新标签内容         self.after(self._refresh_ms, self.refresh)  # 设置下次刷新时间(每60毫秒刷新一次)      def update_label(self):         """ 根据应用状态更新标签内容 """         if self.state == App.INIT_STATE:             self._label.config(text="这里将显示状态码。")  # 初始状态下提示文字         elif self.state == App.QUERYING_STATE:             self._label.config(text="正在查询远程...")  # 请求中状态时显示提示文字         elif self.state == App.RESULT_STATE:             self._label.config(text=f"返回的状态码是: {self.status_code}")  # 请求结果状态时显示返回的状态码      def start(self):         self.mainloop()  # 启动Tkinter事件循环,进入GUI界面  def main():     app = App()  # 创建应用实例     app.start()  # 启动应用  if __name__ == "__main__":     main()   

I/O请求的异步调用

可以将requests包替换为aiohttp包,实现I/O请求的异步调用。aiohttp和requests都是Python中常用的HTTP客户端库,但requests适用于同步场景,简单易用,aiohttp则适用于异步并发的场景,能够处理大量并行请求。具体区别如下:

  1. 同步vs异步:
  • requests是一个同步库,意味着每次发送请求时,程序会等待响应回来后才继续执行。适用于一些简单的、串行的HTTP请求场景。
  • aiohttp是一个异步库,基于Python的asyncio模块,能够在发送HTTP请求时非阻塞地继续执行其他任务。适用于需要大量并发请求或长时间等待的异步场景。
  1. 性能:
  • requests由于是同步的,处理大量请求时容易出现性能瓶颈,因为每个请求必须等待前一个请求完成。
  • aiohttp通过异步I/O处理,可以在等待响应时同时发起其他请求,极大提高了并发性能,尤其在处理大量HTTP请求时。
  1. 用法:
  • requests用法简单,适合初学者和一般同步的任务。
  • aiohttp需要使用async和await,适合需要并发或异步操作的任务。

在上述示例代码中,为了替代requests模块的同步请求,可以创建一个继承自App类的AppAsync类,并利用aiohttp和asyncio库实现异步请求。通过async_request方法异步发起HTTP请求:

import aiohttp import asyncio  class AppAsync(App):     async def async_request(self):         """          异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。         """         async with aiohttp.ClientSession() as session:  # 创建一个aiohttp会话对象             async with session.get("https://www.example.com") as response:  # 发起GET请求                 self.status_code = response.status  # 获取响应状态码                 self.state = App.RESULT_STATE  # 更新应用状态      def __int__(self):         super().__init__()      def request_remote(self):         """ 使用asyncio.run来调用异步请求代码 """         self.state = self.QUERYING_STATE  # 设置状态为请求中         asyncio.run(self.async_request())  # 异步发起HTTP请求  def main():     app = AppAsync()  # 创建应用实例     app.start()  # 启动应用  if __name__ == "__main__":     main()  # 运行主程序 

然而AppAsync类中的asyncio.run(self.async_request())会阻塞Tkinter的主线程,因为asyncio.run()会一直运行,直到异步任务完成。同时Tkinter自身有一个事件循环(mainloop()),与asyncio需要的事件循环冲突。如果在Tkinter内创建新事件循环,可能会导致Tkinter关闭或中断后出现问题。

将asyncio与线程结合

为了解决asyncio事件循环阻塞的问题,可以使用一个单独的守护线程,并在守护线程中运行事件循环,这样asyncio的事件循环就不会阻塞主线程。重写AppAsync类示例如下:

import aiohttp import asyncio import threading  class AppAsync(App):     def __init__(self):         super().__init__()         self._loop_thread = threading.Thread(target=self.run_asyncio_loop, daemon=True)         self._loop_thread.start()  # 启动事件循环线程      async def async_request(self):         """          异步发起HTTP请求,使用aiohttp库来实现I/O请求的异步调用。         """         async with aiohttp.ClientSession() as session:  # 创建一个aiohttp会话对象             async with session.get("https://www.example.com") as response:  # 发起GET请求                 self.status_code = response.status  # 获取响应状态码                 self.state = App.RESULT_STATE  # 更新应用状态      def request_remote(self):         """ 使用异步请求,在事件循环中执行 """         self.state = App.QUERYING_STATE  # 设置状态为请求中         asyncio.run_coroutine_threadsafe(self.async_request(), self._loop)  # 调用异步请求并与当前事件循环进行交互      def run_asyncio_loop(self):         """ 运行asyncio事件循环 """         self._loop = asyncio.new_event_loop()  # 创建新的事件循环         asyncio.set_event_loop(self._loop)  # 设置当前线程的事件循环         self._loop.run_forever()  # 启动事件循环  def main():     app = AppAsync()  # 创建应用实例     app.start()  # 启动应用  if __name__ == "__main__":     main()  # 运行主程序 

示例代码运行时,App.QUERYING_STATE状态相关信息会显示出来,AppAsync类主要的改动点如下:

  1. AppAsync类的构造函数:
    • 增加了一个新的线程来运行asyncio事件循环,避免在Tkinter线程中阻塞。
    • 使用threading.Thread启动一个守护线程,执行run_asyncio_loop方法,确保事件循环在后台运行。
    • 在创建线程时设置为守护线程。这样即使主线程退出,守护线程也会自动结束。
  2. run_asyncio_loop方法:
    • 在一个单独的线程中启动新的asyncio事件循环。
    • 使用asyncio.set_event_loop设置当前线程的事件循环,并调用loop.run_forever()来保持事件循环持续运行。
  3. request_remote方法:
    • 使用asyncio.run_coroutine_threadsafe将异步请求任务提交给后台事件循环执行,用于在非主线程中安全地执行协程。

3.2 基于asyncio实现多核异步处理

单核异步处理

asyncio的并发机制是基于协作式多任务(协程),它不会并行地使用多个CPU核心来加速计算,所有的任务都是在单个核心上轮流执行的。以下代码模拟了1000个爬虫任务,并使用单核异步来执行:

import random import asyncio import time  # 模拟爬虫任务,执行时会有随机的延迟 async def fake_crawlers():     # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数     io_delay = round(random.uniform(0.2, 1.0), 2)     await asyncio.sleep(io_delay)      result = 0     # 随机生成100,000到500,000之间的数字,用于模拟计算密集型任务     # 这段代码耗时大约0.2秒到0.5秒之间     for i in range(random.randint(100000, 500000)):         result += i     return result  # 主程序入口,负责创建并执行多个爬虫任务 async def main():     # time.monotonic()是用于测量时间间隔的可靠方法,它不受系统时间更改的影响     start = time.monotonic()     tasks = [asyncio.create_task(fake_crawlers()) for i in range(1000)]  # 模拟创建1000个任务      await asyncio.gather(*tasks)  # 等待所有任务完成     # 输出所有任务完成的时间     print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")      # 启动程序 asyncio.run(main()) 

代码运行结果如下:

所有任务已完成,耗时 8.51 秒 

多核异步处理

要实现多核异步处理,可以将异步编程和多进程池结合起来使用。具体来说,主程序会把任务分成多个批次,每个批次由不同的进程来处理。每个进程内部,多个任务又是通过异步方式并行执行的。这样一来,计算密集型的任务可以通过多进程并行处理,而每个进程内部的I/O操作则可以通过asyncio来异步管理,从而大幅提高整体效率。示例代码如下,代码将1000个任务分布到10个子进程中并行执行,每个子进程执行100个模拟的爬虫任务:

import random import asyncio  # import time   from concurrent.futures import ProcessPoolExecutor    # 模拟爬虫任务,执行时会有随机的延迟 async def fake_crawlers():     # 随机生成一个0.2到1.0秒之间的延迟,保留两位小数     io_delay = round(random.uniform(0.2, 1.0), 2)     await asyncio.sleep(io_delay)      result = 0     # 随机生成100,000到500,000之间的数字,用于模拟阻塞任务     # 这段代码耗时大约0.2秒到0.5秒之间     for i in range(random.randint(100000, 500000)):         result += i     return result  # 并发查询任务,通过起始和结束索引分配任务 async def query_concurrently(begin_idx: int, end_idx: int):     """ 启动并发任务,通过起始和结束序列号 """     tasks = []       # 根据给定的索引范围(从 begin_idx 到 end_idx),创建并发任务     for _ in range(begin_idx, end_idx, 1):         tasks.append(asyncio.create_task(fake_crawlers()))       # 等待所有任务完成,并返回每个任务的结果     results = await asyncio.gather(*tasks)     return results   # 批量任务执行函数,使用子进程池并行执行任务 def run_batch_tasks(batch_idx: int, step: int):     """ 在子进程中执行批量任务 """     # 计算当前批次任务的起始和结束索引     begin = batch_idx * step + 1  # 当前批次任务的起始索引     end = begin + step  # 当前批次任务的结束索引      # 使用 asyncio.run() 启动异步任务并获取结果     results = [result for result in asyncio.run(query_concurrently(begin, end))]     return results    # 主函数,分批次将任务分配到子进程中执行 async def main():     """ 将任务分批次分配到子进程中执行 """     start = time.monotonic()        loop = asyncio.get_running_loop()  # 获取当前运行的事件循环     # 创建进程池执行器,用于将任务分配到多个子进程中执行     with ProcessPoolExecutor() as executor:         # 启动多个批次任务,并行执行。每个批次执行 100个任务,共启动10个批次         tasks = [loop.run_in_executor(executor, run_batch_tasks, batch_idx, 100)                  for batch_idx in range(10)]       # 等待所有子进程任务完成,并将结果汇总     results = [result for sub_list in await asyncio.gather(*tasks) for result in sub_list]      # 输出所有任务完成的时间     print(f"所有任务已完成,耗时 {time.monotonic() - start:.2f} 秒")  # 程序入口 if __name__ == "__main__":     asyncio.run(main())   

代码运行结果如下:

所有任务已完成,耗时 1.83 秒 

3.3 图片下载器

若经常需要从互联网下载文件,可以使用aiohttp库来实现任务的自动化。下面提供了一个简单的脚本,用于从指定URL下载文件:

建立本地图片服务器

为了提供图片下载链接,以下代码展示了如何使用FastAPI框架创建一个简单的Web应用程序,用于上传、管理和访问图片:

import os from fastapi import FastAPI, File, UploadFile from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles import uvicorn  app = FastAPI()  # 配置图片存储目录 UPLOAD_DIR = "./uploaded_images" if not os.path.exists(UPLOAD_DIR):     os.makedirs(UPLOAD_DIR)  # 将图片目录挂载为静态文件目录 app.mount("/images", StaticFiles(directory=UPLOAD_DIR), name="images")  # 上传图片接口 @app.post("/upload/") async def upload_image(file: UploadFile = File(...)):     try:         # 定义图片保存路径         file_path = os.path.join(UPLOAD_DIR, file.filename)                  # 保存图片到本地         with open(file_path, "wb") as f:             f.write(file.file.read())          # 返回图片的访问 URL         image_url = f"http://127.0.0.1:8000/images/{file.filename}"         return {"image_url": image_url}          except Exception as e:         return {"error": str(e)}  # 获取所有上传图片的链接 @app.get("/images_list/") async def list_images():     try:         # 获取目录下的所有文件         files = os.listdir(UPLOAD_DIR)         image_urls = [f"http://127.0.0.1:8000/images/{file}" for file in files if os.path.isfile(os.path.join(UPLOAD_DIR, file))]         return {"image_urls": image_urls}     except Exception as e:         return {"error": str(e)}  # 获取单个图片 @app.get("/image/{image_name}") async def get_image(image_name: str):     try:         file_path = os.path.join(UPLOAD_DIR, image_name)         if os.path.exists(file_path):             return FileResponse(file_path)         else:             return {"error": "Image not found"}     except Exception as e:         return {"error": str(e)}  # 启动 FastAPI 服务器 if __name__ == "__main__":     uvicorn.run(app, host="127.0.0.1", port=8000) 

该代码实现了一个图片上传和访问服务,包含以下三个主要接口:

  • 服务器启动后,会监听本地地址127.0.0.1的8000端口。
  • 客户端可以通过以下方式与服务器进行交互:
    • 访问http://127.0.0.1:8000/upload/上传图片,并获取返回的图片 URL。
    • 访问http://127.0.0.1:8000/images_list/查看所有已上传图片的 URL。
    • 访问http://127.0.0.1:8000/images/{image_name} 来查看特定图片。

注意,所有上传和保存的图片都会保存在本地的uploaded_images文件夹中。

图片下载

以下代码利用了aiohttp、asyncio和aiofiles库,通过异步方式从API获取图片URL列表,并将图片下载到指定目录。借助这些库的结合,代码能够高效地处理HTTP请求、文件下载和文件操作,同时确保主程序的执行不被阻塞:

import aiohttp  # 导入 aiohttp 库,用于异步 HTTP 请求 import asyncio  # 导入 asyncio 库,用于管理异步任务 import aiofiles  # 导入 aiofiles 库,用于异步文件操作 import os   # 获取图片 URL 列表的异步函数 async def get_image_urls(api_url):     try:         # 使用 aiohttp 启动一个异步 HTTP 会话         async with aiohttp.ClientSession() as session:             # 异步发送 GET 请求以获取 API 返回的数据             async with session.get(api_url) as response:                 # 如果响应状态码是 200 (请求成功)                 if response.status == 200:                     # 将响应内容解析为 JSON 格式                     data = await response.json()                     # 从 JSON 数据中提取图片 URL 列表,若没有则返回空列表                     return data.get("image_urls", [])                 else:                     # 如果请求失败,打印错误信息                     print(f"从 {api_url} 获取图片列表失败。状态码: {response.status}")                     return []     except Exception as e:         # 如果发生任何异常,打印错误信息         print(f"获取图片列表时发生错误: {e}")         return []  # 下载文件的异步函数 async def download_file(url, save_directory):     try:         # 使用 aiohttp 启动异步 HTTP 会话         async with aiohttp.ClientSession() as session:             # 异步发送 GET 请求以获取文件内容             async with session.get(url) as response:                 # 如果响应状态码是 200 (请求成功)                 if response.status == 200:                     # 确保保存文件的目录存在,若不存在则创建                     os.makedirs(save_directory, exist_ok=True)                                          # 从 URL 中提取文件名                     filename = os.path.join(save_directory, url.split('/')[-1])                     # 异步打开文件以进行写入操作                     async with aiofiles.open(filename, 'wb') as file:                         # 读取响应内容                         content = await response.read()                         # 将内容写入本地文件                         await file.write(content)                     print(f"已下载 {filename}")                 else:                     # 如果下载失败,打印错误信息                     print(f"下载 {url} 失败。状态码: {response.status}")     except Exception as e:         # 如果发生任何异常,打印错误信息         print(f"下载 {url} 时发生错误: {e}")  # 根据获取的图片 URL 列表进行下载的异步函数 async def download_images(api_url, save_directory):     # 调用 get_image_urls 函数获取图片 URL 列表     image_urls = await get_image_urls(api_url)          # 如果没有获取到图片 URL,则打印提示并返回     if not image_urls:         print("没有找到需要下载的图片。")         return          # 为每个图片 URL 创建一个下载任务     tasks = [download_file(url, save_directory) for url in image_urls]          # 使用 asyncio.gather 并行执行所有下载任务     await asyncio.gather(*tasks)  # 启动事件循环,开始下载图片 if __name__ == "__main__":     # API 地址,提供图片 URL 列表     api_url = "http://127.0.0.1:8000/images_list/"     # 指定保存下载图片的目录     save_directory = "downloads"        # 获取事件循环并运行下载任务     loop = asyncio.get_event_loop()     loop.run_until_complete(download_images(api_url, save_directory)) 

3.4 生产者消费者模型

生产者-消费者模型(Producer-Consumer Model)是一种经典的并发编程模式,旨在解决多个任务之间生产和消费的协调问题,从而确保资源得到合理利用并保证数据按顺序处理。该模型通过生产者和消费者两个角色,模拟共享资源的生产和消费过程。以下代码实现了一个基本的生产者-消费者模型,采用了asyncio进行异步任务处理:

import asyncio from asyncio import Queue from typing import List  # 生产者函数,负责将物品添加到队列 async def produce_items(queue: Queue, items: List[int], producer_name: str):     for item in items:         await queue.put(item)  # 将物品放入队列         print(f"{producer_name} 添加物品:{item}")         await asyncio.sleep(0.5)  # 模拟生产过程中的等待时间     print(f"{producer_name} 完成所有物品的生产")  # 消费者函数,负责从队列中取出并处理物品 async def consume_items(queue: Queue, consumer_name: str):     while True:         item = await queue.get()  # 阻塞直到获取到一个物品         if item is None:  # 使用None作为结束信号             queue.task_done()  # 标记任务完成             break  # 退出循环         print(f"{consumer_name} 处理物品:{item}")         await asyncio.sleep(1)  # 模拟处理物品的时间         queue.task_done()  # 标记任务完成  # 主函数,负责启动多个生产者和消费者任务 async def main():     queue = Queue()  # 创建一个队列     items_to_produce = ['A','B','C','D']  # 需要生产的物品列表          # 创建产者任务(例如3个生产者)     producer_tasks = [         asyncio.create_task(produce_items(queue, items_to_produce, f"生产者_{i}"))         for i in range(3)       ]          # 创建消费者任务(例如2个消费者)     consumer_tasks = [         asyncio.create_task(consume_items(queue, f"消费者_{i}"))         for i in range(2)     ]          # 等待所有生产者任务完成     await asyncio.gather(*producer_tasks)          # 生产者完成后,发送 None 给消费者,通知它们退出     for _ in consumer_tasks:         await queue.put(None)          # 等待队列中的所有任务处理完成     await queue.join()          # 等待所有消费者任务完成     await asyncio.gather(*consumer_tasks)  if __name__ == '__main__':     # 运行主函数     asyncio.run(main()) 

4 参考

发表评论

相关文章