fix(client): execute web workers concurrently

This commit is contained in:
Valeriy S
2019-03-14 12:08:15 +03:00
committed by mrugesh mohapatra
parent e8f5b54d63
commit 845b966bda
4 changed files with 134 additions and 91 deletions

View File

@ -95,7 +95,7 @@ async function transformSASS(element) {
await Promise.all( await Promise.all(
[].map.call(styleTags, async style => { [].map.call(styleTags, async style => {
style.type = 'text/css'; style.type = 'text/css';
style.innerHTML = await sassWorker.execute(style.innerHTML, 5000); style.innerHTML = await sassWorker.execute(style.innerHTML, 5000).done;
}) })
); );
} }

View File

@ -90,19 +90,12 @@ export function getTestRunner(buildData, proxyLogger, document) {
function getJSTestRunner({ build, sources }, proxyLogger) { function getJSTestRunner({ build, sources }, proxyLogger) {
const code = sources && 'index' in sources ? sources['index'] : ''; const code = sources && 'index' in sources ? sources['index'] : '';
const testWorker = createWorker('test-evaluator'); const testWorker = createWorker('test-evaluator', { terminateWorker: true });
return async (testString, testTimeout) => { return (testString, testTimeout) => {
try { return testWorker
testWorker.on('LOG', proxyLogger); .execute({ build, testString, code, sources }, testTimeout)
return await testWorker.execute( .on('LOG', proxyLogger).done;
{ build, testString, code, sources },
testTimeout
);
} finally {
testWorker.killWorker();
testWorker.remove('LOG', proxyLogger);
}
}; };
} }

View File

@ -1,101 +1,153 @@
class WorkerExecutor { class WorkerExecutor {
constructor(workerName, location) { constructor(
this.workerName = workerName; workerName,
this.worker = null; { location = '/js/', concurrency = 2, terminateWorker = false } = {}
this.observers = {}; ) {
this.location = location; this._workerName = workerName;
this._workers = [];
this._queue = [];
this._running = 0;
this._concurrency = concurrency;
this._terminateWorker = terminateWorker;
this._location = location;
this.execute = this.execute.bind(this); this._getWorker = this._getWorker.bind(this);
this.killWorker = this.killWorker.bind(this);
this.getWorker = this.getWorker.bind(this);
} }
async getWorker() { async _getWorker() {
if (this.worker === null) { let worker;
this.worker = await new Promise((resolve, reject) => { if (this._workers.length) {
const worker = new Worker(`${this.location}${this.workerName}.js`); worker = this._workers.shift();
} else {
worker = await new Promise((resolve, reject) => {
const worker = new Worker(`${this._location}${this._workerName}.js`);
worker.onmessage = e => { worker.onmessage = e => {
if (e.data && e.data.type && e.data.type === 'contentLoaded') { if (e.data && e.data.type && e.data.type === 'contentLoaded') {
resolve(worker); resolve(worker);
} }
}; };
worker.onerror = e => reject(e.message); worker.onerror = err => reject(err);
}); });
} }
return worker;
return this.worker;
} }
killWorker() { _pushTask(task) {
if (this.worker !== null) { this._queue.push(task);
this.worker.terminate(); this._next();
this.worker = null; }
_handleTaskEnd(task) {
return () => {
this._running--;
if (task._worker) {
const worker = task._worker;
if (this._terminateWorker) {
worker.terminate();
} else {
worker.onmessage = null;
worker.onerror = null;
this._workers.push(worker);
}
}
this._next();
};
}
_next() {
while (this._running < this._concurrency && this._queue.length) {
const task = this._queue.shift();
const handleTaskEnd = this._handleTaskEnd(task);
task._execute(this._getWorker).done.then(handleTaskEnd, handleTaskEnd);
this._running++;
} }
} }
async execute(data, timeout = 1000) { execute(data, timeout = 1000) {
const worker = await this.getWorker(); const task = eventify({});
return new Promise((resolve, reject) => { task._execute = function(getWorker) {
// Handle timeout getWorker().then(
worker => {
task._worker = worker;
const timeoutId = setTimeout(() => { const timeoutId = setTimeout(() => {
this.killWorker(); task._worker.terminate();
done('timeout'); task._worker = null;
this.emit('error', { message: 'timeout' });
}, timeout); }, timeout);
const done = (err, data) => {
clearTimeout(timeoutId);
this.remove('error', handleError);
if (err) {
reject(err);
} else {
resolve(data);
}
};
const handleError = e => {
done(e.message);
};
this.on('error', handleError);
worker.postMessage(data);
// Handle result
worker.onmessage = e => { worker.onmessage = e => {
if (e.data && e.data.type) { if (e.data && e.data.type) {
this.handleEvent(e.data.type, e.data.data); this.emit(e.data.type, e.data.data);
return; return;
} }
done(null, e.data); clearTimeout(timeoutId);
this.emit('done', e.data);
}; };
worker.onerror = e => { worker.onerror = e => {
this.handleEvent('error', { message: e.message }); clearTimeout(timeoutId);
this.emit('error', { message: e.message });
}; };
worker.postMessage(data);
},
err => this.emit('error', err)
);
return this;
};
task.done = new Promise((resolve, reject) => {
task
.once('done', data => resolve(data))
.once('error', err => reject(err.message));
});
this._pushTask(task);
return task;
}
}
const eventify = self => {
self._events = {};
self.on = (event, listener) => {
if (typeof self._events[event] === 'undefined') {
self._events[event] = [];
}
self._events[event].push(listener);
return self;
};
self.removeListener = (event, listener) => {
if (typeof self._events[event] !== 'undefined') {
const index = self._events[event].indexOf(listener);
if (index !== -1) {
self._events[event].splice(index, 1);
}
}
return self;
};
self.emit = (event, ...args) => {
if (typeof self._events[event] !== 'undefined') {
self._events[event].forEach(listener => {
listener.apply(self, args);
}); });
} }
return self;
};
handleEvent(type, data) { self.once = (event, listener) => {
const observers = this.observers[type] || []; self.on(event, function handler(...args) {
for (const observer of observers) { self.removeListener(handler);
observer(data); listener.apply(self, args);
} });
} return self;
};
on(type, callback) { return self;
const observers = this.observers[type] || []; };
observers.push(callback);
this.observers[type] = observers;
}
remove(type, callback) { export default function createWorkerExecutor(workerName, options) {
const observers = this.observers[type] || []; return new WorkerExecutor(workerName, options);
const index = observers.indexOf(callback);
if (index !== -1) {
observers.splice(index, 1);
}
}
}
export default function createWorkerExecutor(workerName, location = '/js/') {
return new WorkerExecutor(workerName, location);
} }

View File

@ -333,13 +333,13 @@ async function createTestRunnerForJSChallenge({ files }, solution) {
const { build, sources } = await buildJSChallenge({ files }); const { build, sources } = await buildJSChallenge({ files });
const code = sources && 'index' in sources ? sources['index'] : ''; const code = sources && 'index' in sources ? sources['index'] : '';
const testWorker = createWorker('test-evaluator'); const testWorker = createWorker('test-evaluator', { terminateWorker: true });
return async ({ text, testString }) => { return async ({ text, testString }) => {
try { try {
const { pass, err } = await testWorker.execute( const { pass, err } = await testWorker.execute(
{ testString, build, code, sources }, { testString, build, code, sources },
5000 5000
); ).done;
if (!pass) { if (!pass) {
throw new AssertionError(`${text}\n${err.message}`); throw new AssertionError(`${text}\n${err.message}`);
} }
@ -348,8 +348,6 @@ async function createTestRunnerForJSChallenge({ files }, solution) {
? `${text}\n${err}` ? `${text}\n${err}`
: (err.message = `${text} : (err.message = `${text}
${err.message}`); ${err.message}`);
} finally {
testWorker.killWorker();
} }
}; };
} }