Введение
В процессе разработки мы иногда сталкиваемся с необходимостью контролировать количество одновременно выполняемых задач.
Например, краулер может снизить частоту запросов, ограничив количество одновременных задач, тем самым избежав проблемы бана из-за слишком частых запросов.
Далее в этой статье описывается, как реализовать параллельный контроллер.
2. Примеры
const task = timeout => new Promise((resolve) => setTimeout(() => {
resolve(timeout);
}, timeout))
const taskList = [1000, 3000, 200, 1300, 800, 2000];
async function startNoConcurrentControl() {
console.time(NO_CONCURRENT_CONTROL_LOG);
await Promise.all(taskList.map(item => task(item)));
console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
}
startNoConcurrentControl();
Приведенный выше пример кода использует метод Promise.all для имитации сценария, в котором 6 задач выполняются одновременно, а общее время, необходимое для выполнения всех задач, составляет 3000 миллисекунд.
Этот пример будет использоваться ниже для проверки правильности метода реализации.
3. Реализация
Поскольку количество одновременно выполняемых задач ограничено, для управления постоянно генерируемыми задачами необходима структура данных.
очередь"первым прибыл, первым обслужен"Функции могут гарантировать порядок одновременного выполнения задач, что можно сделать в JavaScript с помощью"Массивы для имитации очередей":
class Queue {
constructor() {
this._queue = [];
}
push(value) {
return this._queue.push(value);
}
shift() {
return this._queue.shift();
}
isEmpty() {
return this._queue.length === 0;
}
}
Для каждой задачи нужно управлять ее функцией выполнения и параметрами:
class DelayedTask {
constructor(resolve, fn, args) {
this.resolve = resolve;
this.fn = fn;
this.args = args;
}
}
Далее реализуем базовый класс TaskPool, который в основном используется для управления выполнением задач:
class TaskPool {
constructor(size) {
this.size = size;
this.queue = new Queue();
}
addTask(fn, args) {
return new Promise((resolve) => {
this.queue.push(new DelayedTask(resolve, fn, args));
if (this.size) {
this.size--;
const { resolve: taskResole, fn, args } = this.queue.shift();
taskResole(this.runTask(fn, args));
}
})
}
pullTask() {
if (this.queue.isEmpty()) {
return;
}
if (this.size === 0) {
return;
}
this.size++;
const { resolve, fn, args } = this.queue.shift();
resolve(this.runTask(fn, args));
}
runTask(fn, args) {
const result = Promise.resolve(fn(...args));
result.then(() => {
this.size--;
this.pullTask();
}).catch(() => {
this.size--;
this.pullTask();
})
return result;
}
}
TaskPool содержит три ключевых метода:
- addTask: поместить новую задачу в очередь и активировать определение состояния пула задач.Если текущий пул задач загружен не полностью, задача будет взята из очереди и помещена в пул задач для выполнения.
- runTask: Выполнить текущую задачу.После выполнения задачи обновить статус пула задач.В это время срабатывает механизм активного подтягивания новых задач.
- pullTask: если текущая очередь не пуста и пул задач не заполнен, он возьмет на себя инициативу по извлечению задач из очереди для выполнения.
Затем установите количество параллелизма в предыдущем примере на 2:
const cc = new ConcurrentControl(2);
async function startConcurrentControl() {
console.time(CONCURRENT_CONTROL_LOG);
await Promise.all(taskList.map(item => cc.addTask(task, [item])))
console.timeEnd(CONCURRENT_CONTROL_LOG);
}
startConcurrentControl();
Процесс выполнения следующий:
Общее время выполнения задачи составляет 5000 миллисекунд.
Четвертый, передача параметра оптимизации функции более высокого порядка
await Promise.all(taskList.map(item => cc.addTask(task, [item])))
Способ ручной передачи параметров каждой задачи очень громоздкий, тут можно пройти"Функция высокого порядка реализует автоматическую прозрачную передачу параметров":
addTask(fn) {
return (...args) => {
return new Promise((resolve) => {
this.queue.push(new DelayedTask(resolve, fn, args));
if (this.size) {
this.size--;
const { resolve: taskResole, fn: taskFn, args: taskArgs } = this.queue.shift();
taskResole(this.runTask(taskFn, taskArgs));
}
})
}
}
Код после преобразования намного проще:
await Promise.all(taskList.map(cc.addTask(task)))
Пять, оптимизировать работу очереди
Массивы обычно основаны на блоке"непрерывная память"Для сохранения при вызове метода сдвига массива сначала удаляется головной элемент (временная сложность O(1)), а затем неудаленные элементы необходимо сдвинуть на один бит влево (временная сложность O(n)) , поэтому операция сдвига Временная сложность равна O(n).
Из-за особенностей языка JavaScript V8 обеспечивает компромиссное решение по пространству и времени при реализации JSArray.В различных сценариях JSArray будет переключаться между режимами FixedArray и HashTable.
В режиме хеш-таблицы операция сдвига экономит временную сложность сдвига влево, а ее временная сложность может быть уменьшена до O (1), но даже в этом случае сдвиг по-прежнему является трудоемкой операцией.
В случае, когда элементов массива много и операцию сдвига нужно выполнять часто, можно передать"reverse + pop"способ оптимизации.
const Benchmark = require('benchmark');
const suite = new Benchmark.Suite;
suite.add('shift', function() {
let count = 10;
const arr = generateArray(count);
while (count--) {
arr.shift();
}
})
.add('reverse + pop', function() {
let count = 10;
const arr = generateArray(count);
arr.reverse();
while (count--) {
arr.pop();
}
})
.on('cycle', function(event) {
console.log(String(event.target));
})
.on('complete', function() {
console.log('Fastest is ' + this.filter('fastest').map('name'));
console.log('\n')
})
.run({
async: true
})
С помощью данных эталонных тестов, запущенных с помощью веб-сайта Benchmark.js, легко увидеть, какой метод более эффективен:
Оглядываясь назад на предыдущую реализацию класса Queue, поскольку существует только один массив для хранения задач, использование reverse + pop напрямую неизбежно повлияет на порядок выполнения задач.
Здесь нам нужно представить дизайн двойных массивов, один массив отвечает за операцию постановки в очередь, а другой массив отвечает за операцию удаления из очереди.
class HighPerformanceQueue {
constructor() {
this.q1 = []; // 用于 push 数据
this.q2 = []; // 用于 shift 数据
}
push(value) {
return this.q1.push(value);
}
shift() {
let q2 = this.q2;
if (q2.length === 0) {
const q1 = this.q1;
if (q1.length === 0) {
return;
}
this.q1 = q2; // 感谢 @shaonialife 同学指正
q2 = this.q2 = q1.reverse();
}
return q2.pop();
}
isEmpty() {
if (this.q1.length === 0 && this.q2.length === 0) {
return true;
}
return false;
}
}
Наконец, эффект оптимизации проверяется бенчмаркингом:
напиши в конце
наконец,"Если эта статья была вам полезна, вы можете подписаться (публичный аккаунт [рассказать о большом интерфейсе]), поставить лайк и переслать ε=ε=ε=┏(゜ロ゜;)┛."