fix: prevent timeout errors (#44714)

This commit is contained in:
Oliver Eyton-Williams
2022-01-08 07:12:16 +01:00
committed by GitHub
parent 0815f73bd2
commit 01a03f2961
2 changed files with 41 additions and 42 deletions

View File

@ -1,65 +1,59 @@
class WorkerExecutor {
constructor(
workerName,
{ location = '/js/', concurrency = 2, terminateWorker = false } = {}
{ location = '/js/', maxWorkers = 2, terminateWorker = false } = {}
) {
this._workerName = workerName;
this._workers = [];
this._queue = [];
this._running = 0;
this._concurrency = concurrency;
this._workerPool = [];
this._taskQueue = [];
this._workersInUse = 0;
this._maxWorkers = maxWorkers;
this._terminateWorker = terminateWorker;
this._location = location;
this._scriptURL = `${location}${workerName}.js`;
this._getWorker = this._getWorker.bind(this);
}
async _getWorker() {
let worker;
if (this._workers.length) {
worker = this._workers.shift();
} else {
worker = await new Promise((resolve, reject) => {
const worker = new Worker(`${this._location}${this._workerName}.js`);
worker.onmessage = e => {
if (e.data && e.data.type && e.data.type === 'contentLoaded') {
resolve(worker);
}
};
worker.onerror = err => reject(err);
});
}
return worker;
return this._workerPool.length
? this._workerPool.shift()
: this._createWorker();
}
_pushTask(task) {
this._queue.push(task);
this._next();
_createWorker() {
return new Promise((resolve, reject) => {
const newWorker = new Worker(this._scriptURL);
newWorker.onmessage = e => {
if (e.data?.type === 'contentLoaded') {
resolve(newWorker);
}
};
newWorker.onerror = err => reject(err);
});
}
_handleTaskEnd(task) {
return () => {
this._running--;
if (task._worker) {
const worker = task._worker;
this._workersInUse--;
const worker = task._worker;
if (worker) {
if (this._terminateWorker) {
worker.terminate();
} else {
worker.onmessage = null;
worker.onerror = null;
this._workers.push(worker);
this._workerPool.push(worker);
}
}
this._next();
this._processQueue();
};
}
_next() {
while (this._running < this._concurrency && this._queue.length) {
const task = this._queue.shift();
_processQueue() {
while (this._workersInUse < this._maxWorkers && this._taskQueue.length) {
const task = this._taskQueue.shift();
const handleTaskEnd = this._handleTaskEnd(task);
task._execute(this._getWorker).done.then(handleTaskEnd, handleTaskEnd);
this._running++;
this._workersInUse++;
}
}
@ -76,12 +70,15 @@ class WorkerExecutor {
}, timeout);
worker.onmessage = e => {
if (e.data && e.data.type) {
this.emit(e.data.type, e.data.data);
return;
}
clearTimeout(timeoutId);
this.emit('done', e.data);
// data.type is undefined when the message has been processed
// successfully and defined when something else has happened (e.g.
// an error occurred)
if (e.data?.type) {
this.emit(e.data.type, e.data.data);
} else {
this.emit('done', e.data);
}
};
worker.onerror = e => {
@ -102,11 +99,13 @@ class WorkerExecutor {
.once('error', err => reject(err.message));
});
this._pushTask(task);
this._taskQueue.push(task);
this._processQueue();
return task;
}
}
// Error and completion handling
const eventify = self => {
self._events = {};

View File

@ -124,7 +124,7 @@ it('Worker executor should successfully execute 3 tasks, use 3 workers and termi
// eslint-disable-next-line max-len
it('Worker executor should successfully execute 3 tasks in parallel and use 3 workers', async () => {
mockWorker();
const testWorker = createWorker('test', { concurrency: 3 });
const testWorker = createWorker('test', { maxWorkers: 3 });
const task1 = testWorker.execute('test1');
const task2 = testWorker.execute('test2');
@ -139,7 +139,7 @@ it('Worker executor should successfully execute 3 tasks in parallel and use 3 wo
// eslint-disable-next-line max-len
it('Worker executor should successfully execute 3 tasks and use 1 worker', async () => {
mockWorker();
const testWorker = createWorker('test', { concurrency: 1 });
const testWorker = createWorker('test', { maxWorkers: 1 });
const task1 = testWorker.execute('test1');
const task2 = testWorker.execute('test2');