From 4ef95277e84447381e8bd1866bf7fbc827271dd1 Mon Sep 17 00:00:00 2001 From: Marc Cataford Date: Sat, 17 Aug 2024 23:45:11 -0400 Subject: [PATCH] refactor: remove scope dependencies on routine running jobs --- internal/runner/runner.go | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 98461f6..db6384c 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -42,38 +42,42 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker { for _, job := range group { groupWait.Add(1) + jobTracker := NewTaskTracker(rootTracker) + runnerImage, defined := r.Labels[job.RunsOn] + if !defined { + jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn)) + continue + } + jobContext := context.WithValue(workflowContext, "currentJob", job) + jobContext = context.WithValue(jobContext, "runnerImageUri", runnerImage) - runJob := func(context context.Context) { - defer groupWait.Done() - jobTracker := NewTaskTracker(rootTracker).SetStatus("started") - - runnerImage, defined := r.Labels[job.RunsOn] - - if !defined { - jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn)) - return - } + // Runs a given job (provided a runner to run it on, its context, a task tracker for progress monitoring and + // a WaitGroup to coordinate concurrent tasks) and updates the tracker with results. + runJob := func(runner *Runner, jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) { + defer jobWaitGroup.Done() + jobTracker.SetStatus("started") + runnerImage := jobContext.Value("runnerImageUri").(string) containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId) logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn) - if pullError := r.Driver.Pull(runnerImage); pullError != nil { + if pullError := runner.Driver.Pull(runnerImage); pullError != nil { jobTracker.SetStatus("failed").SetError(pullError) return } - if runError := r.RunJobInContainer(runnerImage, containerName, context); runError != nil { + if runError := runner.RunJobInContainer(runnerImage, containerName, jobContext); runError != nil { jobTracker.SetStatus("failed").SetError(runError) return } jobTracker.SetStatus("success") - r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName)) + runner.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName)) } - go runJob(jobContext) + go runJob(r, jobContext, jobTracker, &groupWait) } groupWait.Wait()