From ab3030ac7b220da135b907934b28e534b3dd8b6e Mon Sep 17 00:00:00 2001 From: Marc Cataford Date: Thu, 15 Aug 2024 19:13:33 -0400 Subject: [PATCH] refactor: expose workflow, job data to job routine to expose cwd --- internal/runner/runner.go | 22 +++++++++++++++++----- internal/runner/runner_flow_test.go | 6 +++++- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 4c307bc..04e9254 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -1,6 +1,7 @@ package runner import ( + "context" logger "courgette/internal/logging" workflow "courgette/internal/workflow" "errors" @@ -62,14 +63,20 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task { logger.Info("Executing workflow: %s", workflow.SourcePath) task := r.GetTask(r.AddTask()) + workflowContext := context.WithValue(context.Background(), "workflow", workflow) + for _, group := range workflow.GetJobsAsGroups() { var groupWait sync.WaitGroup for _, job := range group { groupWait.Add(1) - go func() { + jobContext := context.WithValue(workflowContext, "currentJob", job) + + runJob := func(context context.Context) { defer groupWait.Done() + + // FIXME: Disambiguate the usage of "context" as a term. jobContext := task.GetJobContext(task.AddJob()) jobContext.SetStatus("started") @@ -84,14 +91,16 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task { return } - if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil { + if runError := r.RunJobInContainer(runnerImage, containerName, context); runError != nil { jobContext.SetStatus("failed").SetError(runError) return } jobContext.SetStatus("success") r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName)) - }() + } + + go runJob(jobContext) } groupWait.Wait() @@ -123,7 +132,7 @@ func (r *Runner) RunCommandInContainer(containerId string, command string) error // Executes a job within a container. // // The container is started before the job steps are run and cleaned up after. -func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error { +func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error { r.Driver.Start(imageUri, containerId) r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() { @@ -131,9 +140,12 @@ func (r *Runner) RunJobInContainer(imageUri string, containerId string, job work r.Driver.Stop(containerId) }) + job := jobContext.Value("currentJob").(workflow.Job) + logger.Info("Started %s", containerId) - for _, step := range job.Steps { + for stepIndex, step := range job.Steps { logger.Info("Run: %s", step.Run) + logger.Info("Using working directory %s", jobContext.Value("workflow").(workflow.Workflow).GetWorkingDirectory(job.Name, stepIndex)) if step.Run != "" { return r.RunCommandInContainer(containerId, step.Run) diff --git a/internal/runner/runner_flow_test.go b/internal/runner/runner_flow_test.go index 4cfc1f8..ec8d36d 100644 --- a/internal/runner/runner_flow_test.go +++ b/internal/runner/runner_flow_test.go @@ -1,6 +1,7 @@ package runner import ( + "context" logger "courgette/internal/logging" workflow "courgette/internal/workflow" "errors" @@ -107,7 +108,10 @@ func TestRunJobInContainerSchedulesStoppingContainers(t *testing.T) { runner := NewRunner(&mockDriver, map[string]string{}) - runner.RunJobInContainer("uri", "container", workflow.Job{}) + jobCtx := context.WithValue(context.Background(), "currentJob", workflow.Job{}) + jobCtx = context.WithValue(jobCtx, "workflow", workflow.Workflow{}) + + runner.RunJobInContainer("uri", "container", jobCtx) if len(runner.deferred.GetAllTasks()) != 1 { t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred.GetAllTasks()))