refactor: fork instead of socket (#17)

* refactor: use fork and child>parent messaging instead of socket

* feat: timing marks, standardize workers

* refactor: remove cruft from collector worker

* refactor: standardize worker,process spawn logic

* ci: drop support for node 14, eol

* refactor: needless double string convert

* refactor: no more log grouping

* fix: fail on failure
This commit is contained in:
Marc 2023-04-09 17:16:25 -04:00
parent 33c3bb6afa
commit 972d9e6edc
Signed by: marc
GPG key ID: 048E042F22B5DC79
9 changed files with 203 additions and 105 deletions

View file

@ -11,7 +11,7 @@ jobs:
strategy: strategy:
matrix: matrix:
node-version: [14, 16, 18] node-version: [16, 18]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
@ -77,7 +77,7 @@ jobs:
needs: dependencies needs: dependencies
strategy: strategy:
matrix: matrix:
node-version: [14, 16, 18] node-version: [16, 18]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3

View file

@ -3,7 +3,7 @@
import helpText from './help' import helpText from './help'
import parseArgs from './argumentParser' import parseArgs from './argumentParser'
import { getContext, redText } from './utils' import { getContext, redText } from './utils'
import { setUpSocket, collectTests, collectCases, assignTestsToWorkers } from './runner' import run from './runner'
/* /*
* Logic executed when running the test runner CLI. * Logic executed when running the test runner CLI.
@ -17,21 +17,13 @@ import { setUpSocket, collectTests, collectCases, assignTestsToWorkers } from '.
} }
const context = getContext(args.runtimePath) const context = getContext(args.runtimePath)
let server
try { try {
server = setUpSocket(context.runnerSocket) run(args, context)
const collectedTests = await collectTests(args.targets)
await collectCases(context, collectedTests)
await assignTestsToWorkers(context, collectedTests, args.workers)
if (server.failure) throw new Error()
} catch (e) { } catch (e) {
console.log(redText('Test run failed')) console.log(redText('Test run failed'))
} finally { throw e
server?.close()
} }
})().catch((e) => { })().catch((e) => {
throw e process.exit(1)
}) })

View file

@ -1,21 +1,29 @@
import net from 'net' #!/usr/bin/env ts-node
import { type IContext } from './types'
import { getContext, exec, greenText } from './utils'
// TODO: What should be message protocol / format be? import { type Context } from './types'
function formatMessage(results: string, failed: boolean): string { import { getContext, spawnProcess } from './utils'
return JSON.stringify({ results, failed })
}
async function collectCases(context: IContext, collectedPaths: Array<string>): Promise<number> {
let collectedCount = 0
async function collectCases(context: Context, collectedPaths: Array<string>): Promise<number> {
let totalCases = 0
for await (const collectedPath of collectedPaths) { for await (const collectedPath of collectedPaths) {
const result = await exec(`COLLECT=1 ${context.nodeRuntime} ${collectedPath}`, {}) const collectedCount = await new Promise<number>((resolve, reject) => {
collectedCount += result.stdout.split('\n').filter((caseLabel) => caseLabel.length > 0).length let count = 0
spawnProcess(context.nodeRuntime, [collectedPath], {
extraEnv: { COLLECT: '1' },
onClose: (code) => {
resolve(count)
},
onStdoutData: (message) => {
count += message.split('\n').filter((caseLabel: string) => caseLabel.length > 0).length
},
})
})
totalCases += collectedCount
} }
return collectedCount return totalCases
} }
/* /*
@ -25,8 +33,7 @@ async function work() {
const [, workerRuntime, ...assignedTestFiles] = process.argv const [, workerRuntime, ...assignedTestFiles] = process.argv
const context = getContext(workerRuntime) const context = getContext(workerRuntime)
const collectedCount = await collectCases(context, assignedTestFiles) const collectedCount = await collectCases(context, assignedTestFiles)
if (process.send) process.send(JSON.stringify({ total: collectedCount }))
console.log(collectedCount)
} }
work().catch((e) => { work().catch((e) => {

View file

@ -1,18 +1,15 @@
import { greenText, redText, exec, splitIntoBatches } from './utils' import { forkWorker, greenText, redText, boldText, splitIntoBatches } from './utils'
import { type Args, type IContext, type TestServer } from './types' import { type Args, type Context, type WorkerReport, type CollectorReport } from './types'
import { type Buffer } from 'buffer'
import { promises as fs } from 'fs' import { promises as fs } from 'fs'
import path from 'path' import path from 'path'
import net from 'net' import { performance } from 'perf_hooks'
class UnknownArgumentError extends Error {}
/* /*
* Collects test files recursively starting from the provided root * Collects test files recursively starting from the provided root
* path. * path.
*/ */
export async function collectTests(roots: Array<string>): Promise<Array<string>> { async function collectTests(roots: Array<string>): Promise<Array<string>> {
const collectedHere = [] const collectedHere = []
for (const root of roots) { for (const root of roots) {
@ -38,55 +35,110 @@ export async function collectTests(roots: Array<string>): Promise<Array<string>>
return collectedHere return collectedHere
} }
/* async function collectCases(context: Context, collectedPaths: Array<string>, workerCount: number = 1): Promise<number> {
* Splits the list of collected test files into `workerCount` batches and starts
* worker processes.
*/
export async function assignTestsToWorkers(context: IContext, collectedPaths: Array<string>, workerCount: number = 1) {
const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount)
await Promise.all(
batchedCollectedPaths.map(async (batch) =>
exec(`${context.nodeRuntime} ${context.workerRuntime} ${batch.join(' ')}`, {}),
),
)
}
export async function collectCases(context: IContext, collectedPaths: Array<string>, workerCount: number = 1) {
const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount) const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount)
const batchResults = await Promise.all( const batchResults = await Promise.all(
batchedCollectedPaths.map(async (batch) => batchedCollectedPaths.map(
exec(`${context.nodeRuntime} ${context.collectorRuntime} ${batch.join(' ')}`, {}), (batch): Promise<CollectorReport> =>
new Promise((resolve, reject) => {
const collectorReport: CollectorReport = { totalCases: 0 }
forkWorker(context.collectorRuntime, batch, {
onClose: (code) => {
resolve(collectorReport)
},
onMessage: (message: string) => {
collectorReport.totalCases += JSON.parse(message).total
},
})
}),
), ),
) )
const collectedCount = batchResults.reduce((total, batchResult) => { const collectedCount = batchResults.reduce((total, batchResult) => {
return total + parseInt(batchResult.stdout) return total + batchResult.totalCases
}, 0) }, 0)
console.log(greenText(`Collected ${collectedCount} cases`)) return collectedCount
} }
export function setUpSocket(socketPath: string): TestServer { /*
const server: TestServer = net.createServer() * Splits the list of collected test files into `workerCount` batches and starts
server.listen(socketPath, () => { * worker processes.
console.log('Listening for workers') */
server.workersRegistered = 0 async function assignTestsToWorkers(
}) context: Context,
collectedPaths: Array<string>,
workerCount: number = 1,
): Promise<{ [key: number]: WorkerReport }> {
const batchedCollectedPaths = splitIntoBatches(collectedPaths, workerCount)
server.on('connection', (s) => { const reports = await Promise.all(
const workerId = server.workersRegistered batchedCollectedPaths.map(
server.workersRegistered = (server.workersRegistered ?? 0) + 1 async (batch, index): Promise<WorkerReport> =>
console.log(`Worker ${workerId} registered.`) new Promise((resolve, reject) => {
performance.mark(`worker-${index}:start`)
const workerReport: WorkerReport = {
workerId: index,
pass: true,
returnCode: null,
runtime: null,
}
const workerProcess = forkWorker(context.workerRuntime, batch, {
onClose: (code) => {
performance.mark(`worker-${index}:end`)
const runtimePerf = performance.measure(
`worker-${index}:runtime`,
`worker-${index}:start`,
`worker-${index}:end`,
)
workerReport.returnCode = code
workerReport.runtime = runtimePerf.duration
resolve(workerReport)
},
onMessage: (message: string) => {
const workerMessage: { results: string; failed: boolean } = JSON.parse(message)
if (workerMessage.failed) workerReport.pass = false
s.on('data', (rawMessage: Buffer) => { console.log(workerMessage.results)
const workerReport: { results: string; failed: boolean } = JSON.parse(rawMessage.toString('utf8')) },
console.log(workerReport.results) })
}),
),
)
if (workerReport.failed) server.failure = true return reports.reduce((summary, report) => {
}) summary[report.workerId] = report
}) return summary
}, {} as { [key: number]: WorkerReport })
return server
} }
async function run(args: Args, context: Context) {
performance.mark('run:start')
performance.mark('test-collect:start')
const collectedTests = await collectTests(args.targets)
performance.mark('test-collect:end')
const testCollectTime = performance.measure('test-collect', 'test-collect:start', 'test-collect:end').duration
console.log(
`Collected ${boldText(collectedTests.length)} test files in ${boldText((testCollectTime / 1000).toFixed(3))}s`,
)
performance.mark('case-collect:start')
const collectedCaseCount = await collectCases(context, collectedTests)
performance.mark('case-collect:end')
const caseCollectTime = performance.measure('case-collect', 'case-collect:start', 'case-collect:end').duration
console.log(
`Collected ${boldText(collectedCaseCount)} test files in ${boldText((caseCollectTime / 1000).toFixed(3))}s`,
)
const summary = await assignTestsToWorkers(context, collectedTests, args.workers)
const hasFailed = Object.values(summary).filter((workerReport) => !workerReport.pass).length > 0
performance.mark('run:end')
const overallTime = performance.measure('run', 'run:start', 'run:end').duration
console.log(`Ran tests in ${boldText(overallTime / 1000)}s`)
if (hasFailed) throw new Error('Test run failed')
}
export default run

View file

@ -20,9 +20,8 @@ function describe(label: TestCaseLabel, testGroup: TestCaseGroup) {
return return
} }
console.group(greenText(label)) console.log(greenText(label))
testGroup() testGroup()
console.groupEnd()
} }
Object.defineProperty(describe, 'each', { Object.defineProperty(describe, 'each', {

View file

@ -1,5 +1,5 @@
import { promises as fs } from 'fs' import { promises as fs } from 'fs'
import { performance } from 'perf_hooks'
import { greenText, redText } from '../utils' import { greenText, redText } from '../utils'
import { type TestCaseLabel, type TestCaseFunction, type TestCaseGroup } from '../types' import { type TestCaseLabel, type TestCaseFunction, type TestCaseGroup } from '../types'
@ -13,19 +13,24 @@ import { type TestCaseLabel, type TestCaseFunction, type TestCaseGroup } from '.
* ``` * ```
*/ */
function test(label: TestCaseLabel, testCase: TestCaseFunction): void { function test(label: TestCaseLabel, testCase: TestCaseFunction): void {
performance.mark(`test-${label}:start`)
if (process.env.COLLECT) { if (process.env.COLLECT) {
console.log(label) console.log(label)
return return
} }
let hasFailed = false
try { try {
testCase() testCase()
console.log(greenText(`[PASSED] ${label}`))
} catch (e) { } catch (e) {
console.group(redText(`[FAILED] ${label}`)) hasFailed = true
console.log(redText(String(e))) console.log(redText(String(e)))
console.groupEnd()
} }
performance.mark(`test-${label}:end`)
const testDuration = performance.measure(`test-${label}`, `test-${label}:start`, `test-${label}:end`).duration
if (hasFailed) console.log(redText(`❌ [FAILED] ${label} (${(testDuration / 1000).toFixed(3)}s)`))
else console.log(greenText(`✅ [PASS] ${label} (${(testDuration / 1000).toFixed(3)}s)`))
} }
Object.defineProperty(test, 'each', { Object.defineProperty(test, 'each', {

View file

@ -1,21 +1,13 @@
import { type Server } from 'net'
export type TestCaseLabel = string export type TestCaseLabel = string
export type TestFilePath = string export type TestFilePath = string
export type TestCaseFunction = (...args: Array<unknown>) => void export type TestCaseFunction = (...args: Array<unknown>) => void
export type TestCaseGroup = (...args: Array<unknown>) => void export type TestCaseGroup = (...args: Array<unknown>) => void
export interface TestServer extends Server { export interface Context {
failure?: boolean
workersRegistered?: number
}
export interface IContext {
workerRuntime: string workerRuntime: string
runnerRuntime: string runnerRuntime: string
collectorRuntime: string collectorRuntime: string
nodeRuntime: 'ts-node' | 'node' nodeRuntime: 'ts-node' | 'node'
runnerSocket: string
} }
export interface Args { export interface Args {
@ -56,3 +48,14 @@ interface FlagConfiguration {
export interface FlagConfigurationMap { export interface FlagConfigurationMap {
[key: string]: FlagConfiguration [key: string]: FlagConfiguration
} }
export interface WorkerReport {
workerId: number
pass: boolean
returnCode: number | null
runtime: number | null
}
export interface CollectorReport {
totalCases: number
}

View file

@ -1,22 +1,21 @@
import util from 'util' import util from 'util'
import path from 'path' import path from 'path'
import childProcess from 'child_process' import childProcess from 'child_process'
import { type IContext } from './types' import { type Context } from './types'
import { type Buffer } from 'buffer'
export const exec = util.promisify(childProcess.exec)
/* /*
* Terminal text style * Terminal text style
*/ */
export function boldText(text: string): string { export function boldText(text: string | number): string {
return `\x1b[1m${text}\x1b[0m` return `\x1b[1m${text}\x1b[0m`
} }
export function greenText(text: string): string { export function greenText(text: string | number): string {
return `\x1b[32m${text}\x1b[0m` return `\x1b[32m${text}\x1b[0m`
} }
export function redText(text: string): string { export function redText(text: string | number): string {
return `\x1b[31m${text}\x1b[0m` return `\x1b[31m${text}\x1b[0m`
} }
@ -26,7 +25,7 @@ export function redText(text: string): string {
* `process.argv[1]`, which will allow all the other paths * `process.argv[1]`, which will allow all the other paths
* to be set properly. * to be set properly.
*/ */
export function getContext(runnerPath: string): IContext { export function getContext(runnerPath: string): Context {
const installDirectory = path.dirname(runnerPath) const installDirectory = path.dirname(runnerPath)
const runnerExtension = path.extname(runnerPath) const runnerExtension = path.extname(runnerPath)
// TODO: We probably don't need this if we transform TS to JS before execution. // TODO: We probably don't need this if we transform TS to JS before execution.
@ -37,7 +36,6 @@ export function getContext(runnerPath: string): IContext {
runnerRuntime: runnerPath, runnerRuntime: runnerPath,
collectorRuntime: path.join(installDirectory, `collector${runnerExtension}`), collectorRuntime: path.join(installDirectory, `collector${runnerExtension}`),
nodeRuntime, nodeRuntime,
runnerSocket: '/tmp/womm.sock',
} }
} }
@ -61,3 +59,32 @@ export function splitIntoBatches<T>(data: Array<T>, desiredBatchCount: number =
return acc return acc
}, [] as Array<Array<T>>) }, [] as Array<Array<T>>)
} }
export function forkWorker(
runtime: string,
args: Array<string>,
{ onClose, onMessage }: { onClose: (code: number) => void; onMessage: (message: string) => void },
): childProcess.ChildProcess {
const workerProcess = childProcess.fork(runtime, args, {})
workerProcess.on('message', onMessage)
workerProcess.on('close', onClose)
return workerProcess
}
export function spawnProcess(
command: string,
args: Array<string>,
{
onStdoutData,
onClose,
extraEnv,
}: { onStdoutData: (message: string) => void; onClose: (code: number) => void; extraEnv?: { [key: string]: string } },
): childProcess.ChildProcess {
const spawnedProcess = childProcess.spawn(command, args, { env: { ...process.env, ...(extraEnv ?? {}) } })
spawnedProcess.stdout.on('data', (message: Buffer) => onStdoutData(message.toString('utf8')))
spawnedProcess.on('close', onClose)
return spawnedProcess
}

View file

@ -1,6 +1,8 @@
import net from 'net' #!/usr/bin/env ts-node
import { getContext, exec } from './utils' import path from 'path'
import { getContext, spawnProcess } from './utils'
// TODO: What should be message protocol / format be? // TODO: What should be message protocol / format be?
function formatMessage(results: string, failed: boolean): string { function formatMessage(results: string, failed: boolean): string {
@ -19,15 +21,26 @@ function formatMessage(results: string, failed: boolean): string {
* touched by the worker assigned to them. * touched by the worker assigned to them.
*/ */
async function work() { async function work() {
if (process?.send === undefined) throw Error('No process global found')
const [, workerRuntime, ...assignedTestFiles] = process.argv const [, workerRuntime, ...assignedTestFiles] = process.argv
const context = getContext(workerRuntime) const context = getContext(workerRuntime)
const socketConnection = net.createConnection(context.runnerSocket, async () => {
for await (const testFilePath of assignedTestFiles) { await Promise.all(
const result = await exec(`${context.nodeRuntime} ${testFilePath}`, {}) assignedTestFiles.map(
socketConnection.write(formatMessage(result.stdout, result.stdout.includes('FAILED'))) (testFilePath) =>
} new Promise((resolve, reject) => {
socketConnection.destroy() spawnProcess(context.nodeRuntime, [path.resolve(testFilePath)], {
}) onClose: (code) => {
resolve(code)
},
onStdoutData: (message) => {
process?.send?.(formatMessage(message.trim(), message.includes('FAILED')))
},
})
}),
),
)
} }
work().catch((e) => { work().catch((e) => {