本文写于 2022 年 5 月 13 日
在多线程语言中,我们通常不会随意的在需要启动线程的时候去启动,而是会选择创建一个线程池。
所谓线程池,本意其实就是(不止这些作用,其余作用可以自行查阅):
对于 JavaScript 来说,虽然不存在“启动线程”这种问题,但我们还是可以通过类似的思想,来限制我们做异步操作的数量。
首先我们需要一个数组,用它来存储尚未执行的任务,每个任务都是一个函数,这个函数必须要返回一个 Promise。
type Task = () => Promise<unknown>; const tasks: Task[] = [];
其次我们需要一个方法来进行任务的添加。
function addTask(task: Task): void;
最后我们需要一个函数来执行我们所有的 task。
而在这之前,我们还需要定义一个值,来定义同时执行的异步任务的最大数量。
function execTasks(): void;
根据我们的分析,我们可以写下基础的代码如下:
interface TaskPool { addTask(task: Task): void; } type Task = () => Promise<unknown>; function newTaskPool(max = 10): TaskPool { const tasks: Task[] = []; function addTask(task: Task): void {} function execTasks(): void {} }
新增任务非常简单,我们写出如下代码填充 addTask
。
function addTask(task: Task): void { tasks.push(task); }
接下来就是重头戏。如何实现 execTasks
方法来限制最大异步任务数量呢?
首先我们来明确一点,在下面这个场景中,如果 foo
函数是异步操作,那么是不会阻塞我们的代码执行的。
console.log("Before"); foo(); console.log("After");
那么我们可以这么操作:
execTasks
时,会选取当前任务数量和空闲任务数二者相比较小的一个;tasks
第一位的任务进行执行;execTasks
。let leisure = max; function execTasks(): void { if (tasks.length === 0) return; const execTaskNum = Math.min(tasks.length, leisure); for (let i = 0; i < execTaskNum; i++) { const task = tasks.shift(); if (!task) continue; leisure--; task().finally(() => { leisure++; execTasks(); }); } }
最后我们只剩下了一个问题了,我们如何在 addTask
后执行 execTasks
,但又不会让下面这种情况导致频繁执行 execTasks
:
for (let i = 0; i < 100; i++) addTask();
可以利用防抖 + setTimeout(() => {},0)
的特性来完成。
function addTask(task: Task) { tasks.push(task); execTasksAfterAdd(); } // 这里借用了 lodash 的 debounce 函数,具体实现不多说,可以看我以前的文章:防抖与节流 const execTasksAfterAdd = debounce(execTasks);
完整代码:
import { debounce } from "lodash"; interface TaskQueue { addTask: (task: () => Promise<any>) => void; } function newTaskQueue(maxTaskNum = 10): TaskQueue { let _leisure = maxTaskNum; const _tasks: Array<() => Promise<any>> = []; function addTask(task: () => Promise<any>) { _tasks.push(task); execAfterTask(); } const execAfterTask = debounce(execTasks); function execTasks() { if (_tasks.length === 0) return; const execTaskNum = Math.min(_tasks.length, _leisure); for (let i = 0; i < execTaskNum; i++) { const task = _tasks.shift(); if (!task) continue; _leisure--; task().finally(() => { _leisure++; execTasks(); }); } } return { addTask }; } const queue = newTaskQueue(5); for (let i = 0; i < 10; i++) { queue.addTask(function () { return new Promise<void>((resolve) => { setTimeout(() => resolve(), 800); }); }); }
其实这种做法的使用场景是比较少的。
绝大多数情况我们都不需要这么去做,除非碰到很极端的需求。
例如我们需要用 Node.js 去设计一个吞吐量极大的服务,那么同时发生大量的网络请求很可能把带宽直接打满,导致后续的请求无法打到该服务,此时就可以使用任务池来控制最大网络请求量。
(完)