/** * A test Runner that uses a {@link module:buffered-worker-pool}. * @module parallel-buffered-runner * @private */ 'use strict'; const allSettled = require('@ungap/promise-all-settled').bind(Promise); const Runner = require('../runner'); const {EVENT_RUN_BEGIN, EVENT_RUN_END} = Runner.constants; const debug = require('debug')('mocha:parallel:parallel-buffered-runner'); const {BufferedWorkerPool} = require('./buffered-worker-pool'); const {setInterval, clearInterval} = global; const {createMap, constants} = require('../utils'); const {MOCHA_ID_PROP_NAME} = constants; const {createFatalError} = require('../errors'); const DEFAULT_WORKER_REPORTER = require.resolve( './reporters/parallel-buffered' ); /** * List of options to _not_ serialize for transmission to workers */ const DENY_OPTIONS = [ 'globalSetup', 'globalTeardown', 'parallel', 'p', 'jobs', 'j' ]; /** * Outputs a debug statement with worker stats * @param {BufferedWorkerPool} pool - Worker pool */ /* istanbul ignore next */ const debugStats = pool => { const {totalWorkers, busyWorkers, idleWorkers, pendingTasks} = pool.stats(); debug( '%d/%d busy workers; %d idle; %d tasks queued', busyWorkers, totalWorkers, idleWorkers, pendingTasks ); }; /** * The interval at which we will display stats for worker processes in debug mode */ const DEBUG_STATS_INTERVAL = 5000; const ABORTED = 'ABORTED'; const IDLE = 'IDLE'; const ABORTING = 'ABORTING'; const RUNNING = 'RUNNING'; const BAILING = 'BAILING'; const BAILED = 'BAILED'; const COMPLETE = 'COMPLETE'; const states = createMap({ [IDLE]: new Set([RUNNING, ABORTING]), [RUNNING]: new Set([COMPLETE, BAILING, ABORTING]), [COMPLETE]: new Set(), [ABORTED]: new Set(), [ABORTING]: new Set([ABORTED]), [BAILING]: new Set([BAILED, ABORTING]), [BAILED]: new Set([COMPLETE, ABORTING]) }); /** * This `Runner` delegates tests runs to worker threads. Does not execute any * {@link Runnable}s by itself! * @public */ class ParallelBufferedRunner extends Runner { constructor(...args) { super(...args); let state = IDLE; Object.defineProperty(this, '_state', { get() { return state; }, set(newState) { if (states[state].has(newState)) { state = newState; } else { throw new Error(`invalid state transition: ${state} => ${newState}`); } } }); this._workerReporter = DEFAULT_WORKER_REPORTER; this._linkPartialObjects = false; this._linkedObjectMap = new Map(); this.once(Runner.constants.EVENT_RUN_END, () => { this._state = COMPLETE; }); } /** * Returns a mapping function to enqueue a file in the worker pool and return results of its execution. * @param {BufferedWorkerPool} pool - Worker pool * @param {Options} options - Mocha options * @returns {FileRunner} Mapping function * @private */ _createFileRunner(pool, options) { /** * Emits event and sets `BAILING` state, if necessary. * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error` * @param {number} failureCount - Failure count */ const emitEvent = (event, failureCount) => { this.emit(event.eventName, event.data, event.error); if ( this._state !== BAILING && event.data && event.data._bail && (failureCount || event.error) ) { debug('run(): nonzero failure count & found bail flag'); // we need to let the events complete for this file, as the worker // should run any cleanup hooks this._state = BAILING; } }; /** * Given an event, recursively find any objects in its data that have ID's, and create object references to already-seen objects. * @param {Object} event - Event having `eventName`, maybe `data` and maybe `error` */ const linkEvent = event => { const stack = [{parent: event, prop: 'data'}]; while (stack.length) { const {parent, prop} = stack.pop(); const obj = parent[prop]; let newObj; if (obj && typeof obj === 'object') { if (obj[MOCHA_ID_PROP_NAME]) { const id = obj[MOCHA_ID_PROP_NAME]; newObj = this._linkedObjectMap.has(id) ? Object.assign(this._linkedObjectMap.get(id), obj) : obj; this._linkedObjectMap.set(id, newObj); parent[prop] = newObj; } else { throw createFatalError( 'Object missing ID received in event data', obj ); } } Object.keys(newObj).forEach(key => { const value = obj[key]; if (value && typeof value === 'object' && value[MOCHA_ID_PROP_NAME]) { stack.push({obj: value, parent: newObj, prop: key}); } }); } }; return async file => { debug('run(): enqueueing test file %s', file); try { const {failureCount, events} = await pool.run(file, options); if (this._state === BAILED) { // short-circuit after a graceful bail. if this happens, // some other worker has bailed. // TODO: determine if this is the desired behavior, or if we // should report the events of this run anyway. return; } debug( 'run(): completed run of file %s; %d failures / %d events', file, failureCount, events.length ); this.failures += failureCount; // can this ever be non-numeric? let event = events.shift(); if (this._linkPartialObjects) { while (event) { linkEvent(event); emitEvent(event, failureCount); event = events.shift(); } } else { while (event) { emitEvent(event, failureCount); event = events.shift(); } } if (this._state === BAILING) { debug('run(): terminating pool due to "bail" flag'); this._state = BAILED; await pool.terminate(); } } catch (err) { if (this._state === BAILED || this._state === ABORTING) { debug( 'run(): worker pool terminated with intent; skipping file %s', file ); } else { // this is an uncaught exception debug('run(): encountered uncaught exception: %O', err); if (this.allowUncaught) { // still have to clean up this._state = ABORTING; await pool.terminate(true); } throw err; } } finally { debug('run(): done running file %s', file); } }; } /** * Listen on `Process.SIGINT`; terminate pool if caught. * Returns the listener for later call to `process.removeListener()`. * @param {BufferedWorkerPool} pool - Worker pool * @returns {SigIntListener} Listener * @private */ _bindSigIntListener(pool) { const sigIntListener = async () => { debug('run(): caught a SIGINT'); this._state = ABORTING; try { debug('run(): force-terminating worker pool'); await pool.terminate(true); } catch (err) { console.error( `Error while attempting to force-terminate worker pool: ${err}` ); process.exitCode = 1; } finally { process.nextTick(() => { debug('run(): imminent death'); this._state = ABORTED; process.kill(process.pid, 'SIGINT'); }); } }; process.once('SIGINT', sigIntListener); return sigIntListener; } /** * Runs Mocha tests by creating a thread pool, then delegating work to the * worker threads. * * Each worker receives one file, and as workers become available, they take a * file from the queue and run it. The worker thread execution is treated like * an RPC--it returns a `Promise` containing serialized information about the * run. The information is processed as it's received, and emitted to a * {@link Reporter}, which is likely listening for these events. * * @param {Function} callback - Called with an exit code corresponding to * number of test failures. * @param {{files: string[], options: Options}} opts - Files to run and * command-line options, respectively. */ run(callback, {files, options = {}} = {}) { /** * Listener on `Process.SIGINT` which tries to cleanly terminate the worker pool. */ let sigIntListener; // assign the reporter the worker will use, which will be different than the // main process' reporter options = {...options, reporter: this._workerReporter}; // This function should _not_ return a `Promise`; its parent (`Runner#run`) // returns this instance, so this should do the same. However, we want to make // use of `async`/`await`, so we use this IIFE. (async () => { /** * This is an interval that outputs stats about the worker pool every so often */ let debugInterval; /** * @type {BufferedWorkerPool} */ let pool; try { pool = BufferedWorkerPool.create({maxWorkers: options.jobs}); sigIntListener = this._bindSigIntListener(pool); /* istanbul ignore next */ debugInterval = setInterval( () => debugStats(pool), DEBUG_STATS_INTERVAL ).unref(); // this is set for uncaught exception handling in `Runner#uncaught` // TODO: `Runner` should be using a state machine instead. this.started = true; this._state = RUNNING; this.emit(EVENT_RUN_BEGIN); options = {...options}; DENY_OPTIONS.forEach(opt => { delete options[opt]; }); const results = await allSettled( files.map(this._createFileRunner(pool, options)) ); // note that pool may already be terminated due to --bail await pool.terminate(); results .filter(({status}) => status === 'rejected') .forEach(({reason}) => { if (this.allowUncaught) { // yep, just the first one. throw reason; } // "rejected" will correspond to uncaught exceptions. // unlike the serial runner, the parallel runner can always recover. this.uncaught(reason); }); if (this._state === ABORTING) { return; } this.emit(EVENT_RUN_END); debug('run(): completing with failure count %d', this.failures); callback(this.failures); } catch (err) { // this `nextTick` takes us out of the `Promise` scope, so the // exception will not be caught and returned as a rejected `Promise`, // which would lead to an `unhandledRejection` event. process.nextTick(() => { debug('run(): re-throwing uncaught exception'); throw err; }); } finally { clearInterval(debugInterval); process.removeListener('SIGINT', sigIntListener); } })(); return this; } /** * Toggle partial object linking behavior; used for building object references from * unique ID's. * @param {boolean} [value] - If `true`, enable partial object linking, otherwise disable * @returns {Runner} * @chainable * @public * @example * // this reporter needs proper object references when run in parallel mode * class MyReporter() { * constructor(runner) { * this.runner.linkPartialObjects(true) * .on(EVENT_SUITE_BEGIN, suite => { // this Suite may be the same object... * }) * .on(EVENT_TEST_BEGIN, test => { * // ...as the `test.parent` property * }); * } * } */ linkPartialObjects(value) { this._linkPartialObjects = Boolean(value); return super.linkPartialObjects(value); } /** * If this class is the `Runner` in use, then this is going to return `true`. * * For use by reporters. * @returns {true} * @public */ isParallelMode() { return true; } /** * Configures an alternate reporter for worker processes to use. Subclasses * using worker processes should implement this. * @public * @param {string} path - Absolute path to alternate reporter for worker processes to use * @returns {Runner} * @throws When in serial mode * @chainable */ workerReporter(reporter) { this._workerReporter = reporter; return this; } } module.exports = ParallelBufferedRunner; /** * Listener function intended to be bound to `Process.SIGINT` event * @private * @callback SigIntListener * @returns {Promise} */ /** * A function accepting a test file path and returning the results of a test run * @private * @callback FileRunner * @param {string} filename - File to run * @returns {Promise} */