From de5ff50c3b1d7a10bf17caffd84059677881cff4 Mon Sep 17 00:00:00 2001 From: Marc Cataford Date: Sat, 1 Apr 2023 16:24:44 -0400 Subject: [PATCH] feat: split runner and workers, set up ipc (#8) * feat: split runner and workers, set up ipc * docs: document worker a bit * wip: centralize runtime details to context obj * wip: clean up context, batching * wip: fail on failure * wip: propagate failure * refactor: move types * refactor: split collector into separate process for split opportunities * refactor: no need for the fs cache anymore --- src/collector.ts | 34 ++++++++++++++++++ src/context.ts | 15 -------- src/runner.ts | 85 ++++++++++++++++++++++++++++++-------------- src/testCaseUtils.ts | 4 +-- src/types.ts | 15 ++++++-- src/utils.ts | 34 ++++++++++++++++++ src/worker.ts | 35 ++++++++++++++++++ 7 files changed, 176 insertions(+), 46 deletions(-) create mode 100644 src/collector.ts delete mode 100644 src/context.ts create mode 100644 src/worker.ts diff --git a/src/collector.ts b/src/collector.ts new file mode 100644 index 0000000..4670aa1 --- /dev/null +++ b/src/collector.ts @@ -0,0 +1,34 @@ +import net from 'net' +import { type IContext } from './types' +import { getContext, exec, greenText } from './utils' + +// TODO: What should be message protocol / format be? +function formatMessage(results: string, failed: boolean): string { + return JSON.stringify({ results, failed }) +} + +async function collectCases(context: IContext, collectedPaths: Array): Promise { + let collectedCount = 0 + + for await (const collectedPath of collectedPaths) { + const result = await exec(`COLLECT=1 ${context.nodeRuntime} ${collectedPath}`, {}) + collectedCount += result.stdout.split('\n').filter((caseLabel) => caseLabel.length > 0).length + } + + return collectedCount +} + +/* + * Collector worker runtime + */ +async function work() { + const [, workerRuntime, ...assignedTestFiles] = process.argv + const context = getContext(workerRuntime) + const collectedCount = await collectCases(context, assignedTestFiles) + + console.log(collectedCount) +} + +work().catch((e) => { + console.log(e) +}) diff --git a/src/context.ts b/src/context.ts deleted file mode 100644 index 9febc95..0000000 --- a/src/context.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { type TestCaseLabel, type TestFilePath, type IContext } from './types' - -let runnerContext: IContext | null - -function getContext(): IContext { - if (!runnerContext) { - runnerContext = { - collectedTests: new Map(), - } - } - - return runnerContext -} - -export default getContext() diff --git a/src/runner.ts b/src/runner.ts index b9a3f68..e346050 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -1,8 +1,9 @@ -import Context from './context' -import { greenText, redText, exec, generateCachedCollectedPathFromActual } from './utils' +import { getContext, greenText, redText, exec, generateCachedCollectedPathFromActual, splitIntoBatches } from './utils' +import { type IContext, type TestServer } from './types' -import { promises as fs, type Dirent, type PathLike } from 'fs' +import { promises as fs } from 'fs' import path from 'path' +import net from 'net' /* * Collects test files recursively starting from the provided root @@ -32,46 +33,78 @@ async function collectTests(root: string): Promise> { return collectedHere } -async function runTests(collectedPaths: Array) { - for await (const collectedPath of collectedPaths) { - // FIXME: This should just use `node` and transform if TS is present instead. - const result = await exec(`ts-node ${collectedPath}`, {}) - console.log(result.stdout) - } +/* + * Splits the list of collected test files into `workerCount` batches and starts + * worker processes. + */ +async function assignTestsToWorkers(context: IContext, collectedPaths: Array, workerCount: number = 1) { + const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount) + + await Promise.all( + batchedCollectedPaths.map(async (batch) => + exec(`${context.nodeRuntime} ${context.workerRuntime} ${batch.join(' ')}`, {}), + ), + ) } -async function collectCases(collectedPaths: Array) { - let collectedCount = 0 +async function collectCases(context: IContext, collectedPaths: Array, workerCount: number = 1) { + const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount) - for await (const collectedPath of collectedPaths) { - // FIXME: This should just use `node` and transform if TS is present instead. - const result = await exec(`COLLECT=1 ts-node ${collectedPath}`, {}) - const collectedCases = await fs.readFile( - `.womm-cache/${generateCachedCollectedPathFromActual(path.resolve(collectedPath))}`, - { encoding: 'utf8' }, - ) - collectedCount += collectedCases.split('\n').length - } + const batchResults = await Promise.all( + batchedCollectedPaths.map(async (batch) => + exec(`${context.nodeRuntime} ${context.collectorRuntime} ${batch.join(' ')}`, {}), + ), + ) + + const collectedCount = batchResults.reduce((total, batchResult) => { + return total + parseInt(batchResult.stdout) + }, 0) console.log(greenText(`Collected ${collectedCount} cases`)) +} + +function setUpSocket(socketPath: string): TestServer { + const server: TestServer = net.createServer() + server.listen(socketPath, () => { + console.log('Listening for workers') + server.workersRegistered = 0 + }) + + server.on('connection', (s) => { + const workerId = server.workersRegistered + server.workersRegistered = (server.workersRegistered ?? 0) + 1 + console.log(`Worker ${workerId} registered.`) + + s.on('data', (d) => { + const workerReport: any = JSON.parse(d.toString('utf8')) + console.log(workerReport.results) + + if (workerReport.failed) server.failure = true + }) + }) + + return server } /* * Logic executed when running the test runner CLI. */ ;(async () => { - const [, , collectionRoot, ...omit] = process.argv + const [, runnerPath, collectionRoot, ...omit] = process.argv + const context = getContext(runnerPath) + let server + try { - await fs.mkdir('.womm-cache') - + server = setUpSocket(context.runnerSocket) const collectedTests = await collectTests(collectionRoot) + await collectCases(context, collectedTests) + await assignTestsToWorkers(context, collectedTests) - await collectCases(collectedTests) - await runTests(collectedTests) + if (server.failure) throw new Error('test') } catch (e) { console.group(redText('Test run failed')) console.log(redText(String(e))) console.groupEnd() } finally { - await fs.rm('.womm-cache', { force: true, recursive: true }) + server?.close() } })().catch((e) => { throw e diff --git a/src/testCaseUtils.ts b/src/testCaseUtils.ts index e84e434..d7b3462 100644 --- a/src/testCaseUtils.ts +++ b/src/testCaseUtils.ts @@ -1,5 +1,3 @@ -import Context from './context' - import { promises as fs } from 'fs' import expect from './expect' @@ -19,7 +17,7 @@ function describe(label: TestCaseLabel, testGroup: TestCaseGroup) { function test(label: TestCaseLabel, testCase: TestCaseFunction): void { if (process.env.COLLECT) { - fs.appendFile(`.womm-cache/${generateCachedCollectedPathFromActual(process.argv[1])}`, `${label}\n`) + console.log(label) return } diff --git a/src/types.ts b/src/types.ts index 8e021d0..62fd072 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,8 +1,19 @@ +import { type Server } from 'net' + export type TestCaseLabel = string export type TestFilePath = string export type TestCaseFunction = () => void export type TestCaseGroup = () => void -export interface IContext { - collectedTests: Map +export interface TestServer extends Server { + failure?: boolean + workersRegistered?: number +} + +export interface IContext { + workerRuntime: string + runnerRuntime: string + collectorRuntime: string + nodeRuntime: 'ts-node' | 'node' + runnerSocket: string } diff --git a/src/utils.ts b/src/utils.ts index ca35e30..4cd91b0 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,7 @@ import util from 'util' +import path from 'path' import childProcess from 'child_process' +import { type IContext } from './types' export const exec = util.promisify(childProcess.exec) @@ -14,3 +16,35 @@ export function greenText(text: string): string { export function redText(text: string): string { return `\x1b[31m${text}\x1b[0m` } + +export function getContext(runnerPath: string): IContext { + const installDirectory = path.dirname(runnerPath) + const runnerExtension = path.extname(runnerPath) + // TODO: We probably don't need this if we transform TS to JS before execution. + const nodeRuntime = runnerExtension === '.ts' ? 'ts-node' : 'node' + + return { + workerRuntime: path.join(installDirectory, `worker${runnerExtension}`), + runnerRuntime: runnerPath, + collectorRuntime: path.join(installDirectory, `collector${runnerExtension}`), + nodeRuntime, + runnerSocket: '/tmp/womm.sock', + } +} + +export function splitIntoBatches(data: Array, desiredBatchCount: number = 1): Array> { + const desiredBatchSize = Math.max(data.length / desiredBatchCount, 1) + return data.reduce((acc, item: T) => { + if (acc.length === 0) acc.push([]) + + const lastBatch = acc[acc.length - 1] + + if (lastBatch.length < desiredBatchSize) { + lastBatch.push(item) + } else { + acc.push([item]) + } + + return acc + }, [] as Array>) +} diff --git a/src/worker.ts b/src/worker.ts new file mode 100644 index 0000000..d215f53 --- /dev/null +++ b/src/worker.ts @@ -0,0 +1,35 @@ +import net from 'net' + +import { getContext, exec } from './utils' + +// TODO: What should be message protocol / format be? +function formatMessage(results: string, failed: boolean): string { + return JSON.stringify({ results, failed }) +} + +/* + * Worker runtime + * + * The worker executes the tests by called `node` on them. Since each test + * is an self-contained executable file, the worker can run each of them, + * collect output and relay it back to the runner process via IPC. + * + * Each worker process is responsible for as many test files as the runner + * decides to assign it and files assigned to the worker are only + * touched by the worker assigned to them. + */ +async function work() { + const [, workerRuntime, ...assignedTestFiles] = process.argv + const context = getContext(workerRuntime) + const socketConnection = net.createConnection(context.runnerSocket, async () => { + for await (const testFilePath of assignedTestFiles) { + const result = await exec(`${context.nodeRuntime} ${testFilePath}`, {}) + socketConnection.write(formatMessage(result.stdout, result.stdout.includes('FAILED'))) + } + socketConnection.destroy() + }) +} + +work().catch((e) => { + console.log(e) +})