feat: split runner and workers, set up ipc (#8)

* feat: split runner and workers, set up ipc

* docs: document worker a bit

* wip: centralize runtime details to context obj

* wip: clean up context, batching

* wip: fail on failure

* wip: propagate failure

* refactor: move types

* refactor: split collector into separate process for split opportunities

* refactor: no need for the fs cache anymore
This commit is contained in:
Marc 2023-04-01 16:24:44 -04:00
parent a04a5b87ca
commit de5ff50c3b
Signed by: marc
GPG key ID: 048E042F22B5DC79
7 changed files with 176 additions and 46 deletions

34
src/collector.ts Normal file
View file

@ -0,0 +1,34 @@
import net from 'net'
import { type IContext } from './types'
import { getContext, exec, greenText } from './utils'
// TODO: What should be message protocol / format be?
function formatMessage(results: string, failed: boolean): string {
return JSON.stringify({ results, failed })
}
async function collectCases(context: IContext, collectedPaths: Array<string>): Promise<number> {
let collectedCount = 0
for await (const collectedPath of collectedPaths) {
const result = await exec(`COLLECT=1 ${context.nodeRuntime} ${collectedPath}`, {})
collectedCount += result.stdout.split('\n').filter((caseLabel) => caseLabel.length > 0).length
}
return collectedCount
}
/*
* Collector worker runtime
*/
async function work() {
const [, workerRuntime, ...assignedTestFiles] = process.argv
const context = getContext(workerRuntime)
const collectedCount = await collectCases(context, assignedTestFiles)
console.log(collectedCount)
}
work().catch((e) => {
console.log(e)
})

View file

@ -1,15 +0,0 @@
import { type TestCaseLabel, type TestFilePath, type IContext } from './types'
let runnerContext: IContext | null
function getContext(): IContext {
if (!runnerContext) {
runnerContext = {
collectedTests: new Map<TestFilePath, any>(),
}
}
return runnerContext
}
export default getContext()

View file

@ -1,8 +1,9 @@
import Context from './context' import { getContext, greenText, redText, exec, generateCachedCollectedPathFromActual, splitIntoBatches } from './utils'
import { greenText, redText, exec, generateCachedCollectedPathFromActual } from './utils' import { type IContext, type TestServer } from './types'
import { promises as fs, type Dirent, type PathLike } from 'fs' import { promises as fs } from 'fs'
import path from 'path' import path from 'path'
import net from 'net'
/* /*
* Collects test files recursively starting from the provided root * Collects test files recursively starting from the provided root
@ -32,46 +33,78 @@ async function collectTests(root: string): Promise<Array<string>> {
return collectedHere return collectedHere
} }
async function runTests(collectedPaths: Array<string>) { /*
for await (const collectedPath of collectedPaths) { * Splits the list of collected test files into `workerCount` batches and starts
// FIXME: This should just use `node` and transform if TS is present instead. * worker processes.
const result = await exec(`ts-node ${collectedPath}`, {}) */
console.log(result.stdout) async function assignTestsToWorkers(context: IContext, collectedPaths: Array<string>, workerCount: number = 1) {
} const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount)
await Promise.all(
batchedCollectedPaths.map(async (batch) =>
exec(`${context.nodeRuntime} ${context.workerRuntime} ${batch.join(' ')}`, {}),
),
)
} }
async function collectCases(collectedPaths: Array<string>) { async function collectCases(context: IContext, collectedPaths: Array<string>, workerCount: number = 1) {
let collectedCount = 0 const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount)
for await (const collectedPath of collectedPaths) { const batchResults = await Promise.all(
// FIXME: This should just use `node` and transform if TS is present instead. batchedCollectedPaths.map(async (batch) =>
const result = await exec(`COLLECT=1 ts-node ${collectedPath}`, {}) exec(`${context.nodeRuntime} ${context.collectorRuntime} ${batch.join(' ')}`, {}),
const collectedCases = await fs.readFile( ),
`.womm-cache/${generateCachedCollectedPathFromActual(path.resolve(collectedPath))}`, )
{ encoding: 'utf8' },
) const collectedCount = batchResults.reduce((total, batchResult) => {
collectedCount += collectedCases.split('\n').length return total + parseInt(batchResult.stdout)
} }, 0)
console.log(greenText(`Collected ${collectedCount} cases`)) console.log(greenText(`Collected ${collectedCount} cases`))
}
function setUpSocket(socketPath: string): TestServer {
const server: TestServer = net.createServer()
server.listen(socketPath, () => {
console.log('Listening for workers')
server.workersRegistered = 0
})
server.on('connection', (s) => {
const workerId = server.workersRegistered
server.workersRegistered = (server.workersRegistered ?? 0) + 1
console.log(`Worker ${workerId} registered.`)
s.on('data', (d) => {
const workerReport: any = JSON.parse(d.toString('utf8'))
console.log(workerReport.results)
if (workerReport.failed) server.failure = true
})
})
return server
} /* } /*
* Logic executed when running the test runner CLI. * Logic executed when running the test runner CLI.
*/ */
;(async () => { ;(async () => {
const [, , collectionRoot, ...omit] = process.argv const [, runnerPath, collectionRoot, ...omit] = process.argv
const context = getContext(runnerPath)
let server
try { try {
await fs.mkdir('.womm-cache') server = setUpSocket(context.runnerSocket)
const collectedTests = await collectTests(collectionRoot) const collectedTests = await collectTests(collectionRoot)
await collectCases(context, collectedTests)
await assignTestsToWorkers(context, collectedTests)
await collectCases(collectedTests) if (server.failure) throw new Error('test')
await runTests(collectedTests)
} catch (e) { } catch (e) {
console.group(redText('Test run failed')) console.group(redText('Test run failed'))
console.log(redText(String(e))) console.log(redText(String(e)))
console.groupEnd() console.groupEnd()
} finally { } finally {
await fs.rm('.womm-cache', { force: true, recursive: true }) server?.close()
} }
})().catch((e) => { })().catch((e) => {
throw e throw e

View file

@ -1,5 +1,3 @@
import Context from './context'
import { promises as fs } from 'fs' import { promises as fs } from 'fs'
import expect from './expect' import expect from './expect'
@ -19,7 +17,7 @@ function describe(label: TestCaseLabel, testGroup: TestCaseGroup) {
function test(label: TestCaseLabel, testCase: TestCaseFunction): void { function test(label: TestCaseLabel, testCase: TestCaseFunction): void {
if (process.env.COLLECT) { if (process.env.COLLECT) {
fs.appendFile(`.womm-cache/${generateCachedCollectedPathFromActual(process.argv[1])}`, `${label}\n`) console.log(label)
return return
} }

View file

@ -1,8 +1,19 @@
import { type Server } from 'net'
export type TestCaseLabel = string export type TestCaseLabel = string
export type TestFilePath = string export type TestFilePath = string
export type TestCaseFunction = () => void export type TestCaseFunction = () => void
export type TestCaseGroup = () => void export type TestCaseGroup = () => void
export interface IContext { export interface TestServer extends Server {
collectedTests: Map<TestFilePath, any> failure?: boolean
workersRegistered?: number
}
export interface IContext {
workerRuntime: string
runnerRuntime: string
collectorRuntime: string
nodeRuntime: 'ts-node' | 'node'
runnerSocket: string
} }

View file

@ -1,5 +1,7 @@
import util from 'util' import util from 'util'
import path from 'path'
import childProcess from 'child_process' import childProcess from 'child_process'
import { type IContext } from './types'
export const exec = util.promisify(childProcess.exec) export const exec = util.promisify(childProcess.exec)
@ -14,3 +16,35 @@ export function greenText(text: string): string {
export function redText(text: string): string { export function redText(text: string): string {
return `\x1b[31m${text}\x1b[0m` return `\x1b[31m${text}\x1b[0m`
} }
export function getContext(runnerPath: string): IContext {
const installDirectory = path.dirname(runnerPath)
const runnerExtension = path.extname(runnerPath)
// TODO: We probably don't need this if we transform TS to JS before execution.
const nodeRuntime = runnerExtension === '.ts' ? 'ts-node' : 'node'
return {
workerRuntime: path.join(installDirectory, `worker${runnerExtension}`),
runnerRuntime: runnerPath,
collectorRuntime: path.join(installDirectory, `collector${runnerExtension}`),
nodeRuntime,
runnerSocket: '/tmp/womm.sock',
}
}
export function splitIntoBatches<T>(data: Array<T>, desiredBatchCount: number = 1): Array<Array<T>> {
const desiredBatchSize = Math.max(data.length / desiredBatchCount, 1)
return data.reduce((acc, item: T) => {
if (acc.length === 0) acc.push([])
const lastBatch = acc[acc.length - 1]
if (lastBatch.length < desiredBatchSize) {
lastBatch.push(item)
} else {
acc.push([item])
}
return acc
}, [] as Array<Array<T>>)
}

35
src/worker.ts Normal file
View file

@ -0,0 +1,35 @@
import net from 'net'
import { getContext, exec } from './utils'
// TODO: What should be message protocol / format be?
function formatMessage(results: string, failed: boolean): string {
return JSON.stringify({ results, failed })
}
/*
* Worker runtime
*
* The worker executes the tests by called `node` on them. Since each test
* is an self-contained executable file, the worker can run each of them,
* collect output and relay it back to the runner process via IPC.
*
* Each worker process is responsible for as many test files as the runner
* decides to assign it and files assigned to the worker are only
* touched by the worker assigned to them.
*/
async function work() {
const [, workerRuntime, ...assignedTestFiles] = process.argv
const context = getContext(workerRuntime)
const socketConnection = net.createConnection(context.runnerSocket, async () => {
for await (const testFilePath of assignedTestFiles) {
const result = await exec(`${context.nodeRuntime} ${testFilePath}`, {})
socketConnection.write(formatMessage(result.stdout, result.stdout.includes('FAILED')))
}
socketConnection.destroy()
})
}
work().catch((e) => {
console.log(e)
})