refactor: expose workflow, job data to job routine to expose cwd
This commit is contained in:
parent
bf2c311eef
commit
ab3030ac7b
2 changed files with 22 additions and 6 deletions
|
@ -1,6 +1,7 @@
|
||||||
package runner
|
package runner
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
logger "courgette/internal/logging"
|
logger "courgette/internal/logging"
|
||||||
workflow "courgette/internal/workflow"
|
workflow "courgette/internal/workflow"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -62,14 +63,20 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
|
||||||
logger.Info("Executing workflow: %s", workflow.SourcePath)
|
logger.Info("Executing workflow: %s", workflow.SourcePath)
|
||||||
task := r.GetTask(r.AddTask())
|
task := r.GetTask(r.AddTask())
|
||||||
|
|
||||||
|
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
|
||||||
|
|
||||||
for _, group := range workflow.GetJobsAsGroups() {
|
for _, group := range workflow.GetJobsAsGroups() {
|
||||||
var groupWait sync.WaitGroup
|
var groupWait sync.WaitGroup
|
||||||
|
|
||||||
for _, job := range group {
|
for _, job := range group {
|
||||||
groupWait.Add(1)
|
groupWait.Add(1)
|
||||||
|
|
||||||
go func() {
|
jobContext := context.WithValue(workflowContext, "currentJob", job)
|
||||||
|
|
||||||
|
runJob := func(context context.Context) {
|
||||||
defer groupWait.Done()
|
defer groupWait.Done()
|
||||||
|
|
||||||
|
// FIXME: Disambiguate the usage of "context" as a term.
|
||||||
jobContext := task.GetJobContext(task.AddJob())
|
jobContext := task.GetJobContext(task.AddJob())
|
||||||
|
|
||||||
jobContext.SetStatus("started")
|
jobContext.SetStatus("started")
|
||||||
|
@ -84,14 +91,16 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil {
|
if runError := r.RunJobInContainer(runnerImage, containerName, context); runError != nil {
|
||||||
jobContext.SetStatus("failed").SetError(runError)
|
jobContext.SetStatus("failed").SetError(runError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
jobContext.SetStatus("success")
|
jobContext.SetStatus("success")
|
||||||
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
||||||
}()
|
}
|
||||||
|
|
||||||
|
go runJob(jobContext)
|
||||||
}
|
}
|
||||||
|
|
||||||
groupWait.Wait()
|
groupWait.Wait()
|
||||||
|
@ -123,7 +132,7 @@ func (r *Runner) RunCommandInContainer(containerId string, command string) error
|
||||||
// Executes a job within a container.
|
// Executes a job within a container.
|
||||||
//
|
//
|
||||||
// The container is started before the job steps are run and cleaned up after.
|
// The container is started before the job steps are run and cleaned up after.
|
||||||
func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error {
|
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
|
||||||
r.Driver.Start(imageUri, containerId)
|
r.Driver.Start(imageUri, containerId)
|
||||||
|
|
||||||
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
|
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
|
||||||
|
@ -131,9 +140,12 @@ func (r *Runner) RunJobInContainer(imageUri string, containerId string, job work
|
||||||
r.Driver.Stop(containerId)
|
r.Driver.Stop(containerId)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
job := jobContext.Value("currentJob").(workflow.Job)
|
||||||
|
|
||||||
logger.Info("Started %s", containerId)
|
logger.Info("Started %s", containerId)
|
||||||
for _, step := range job.Steps {
|
for stepIndex, step := range job.Steps {
|
||||||
logger.Info("Run: %s", step.Run)
|
logger.Info("Run: %s", step.Run)
|
||||||
|
logger.Info("Using working directory %s", jobContext.Value("workflow").(workflow.Workflow).GetWorkingDirectory(job.Name, stepIndex))
|
||||||
|
|
||||||
if step.Run != "" {
|
if step.Run != "" {
|
||||||
return r.RunCommandInContainer(containerId, step.Run)
|
return r.RunCommandInContainer(containerId, step.Run)
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package runner
|
package runner
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
logger "courgette/internal/logging"
|
logger "courgette/internal/logging"
|
||||||
workflow "courgette/internal/workflow"
|
workflow "courgette/internal/workflow"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -107,7 +108,10 @@ func TestRunJobInContainerSchedulesStoppingContainers(t *testing.T) {
|
||||||
|
|
||||||
runner := NewRunner(&mockDriver, map[string]string{})
|
runner := NewRunner(&mockDriver, map[string]string{})
|
||||||
|
|
||||||
runner.RunJobInContainer("uri", "container", workflow.Job{})
|
jobCtx := context.WithValue(context.Background(), "currentJob", workflow.Job{})
|
||||||
|
jobCtx = context.WithValue(jobCtx, "workflow", workflow.Workflow{})
|
||||||
|
|
||||||
|
runner.RunJobInContainer("uri", "container", jobCtx)
|
||||||
|
|
||||||
if len(runner.deferred.GetAllTasks()) != 1 {
|
if len(runner.deferred.GetAllTasks()) != 1 {
|
||||||
t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred.GetAllTasks()))
|
t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred.GetAllTasks()))
|
||||||
|
|
Loading…
Reference in a new issue