courgette/internal/runner/runner.go

149 lines
4.2 KiB
Go

package runner
import (
"context"
logger "courgette/internal/logging"
workflow "courgette/internal/workflow"
"errors"
"fmt"
"sync"
)
type Runner struct {
Labels map[string]string
Driver ContainerDriver
Runs int
// Deferred tasks, in order their were scheduled.
deferred *DeferredTaskManager
}
func NewRunner(driver ContainerDriver, labels map[string]string) Runner {
return Runner{
Driver: driver,
Labels: labels,
deferred: NewDeferredTaskManager(),
}
}
// Executes a workflow using the runner.
//
// This is the high-level call that will set up the container
// that the jobs will be executed in, run the jobs's steps and
// tear down the container once no longer useful.
func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker {
logger.Info("Executing workflow: %s", workflow.SourcePath)
rootTracker := NewTaskTracker(nil)
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
for _, group := range workflow.GetJobsAsGroups() {
var groupWait sync.WaitGroup
for _, job := range group {
groupWait.Add(1)
jobTracker := NewTaskTracker(rootTracker)
runnerImage, defined := r.Labels[job.RunsOn]
if !defined {
jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn))
continue
}
jobContext := context.WithValue(workflowContext, "currentJob", job)
jobContext = context.WithValue(jobContext, "runnerImageUri", runnerImage)
// Runs a given job (provided a runner to run it on, its context, a task tracker for progress monitoring and
// a WaitGroup to coordinate concurrent tasks) and updates the tracker with results.
runJob := func(runner *Runner, jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) {
defer jobWaitGroup.Done()
jobTracker.SetStatus("started")
runnerImage := jobContext.Value("runnerImageUri").(string)
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
if pullError := runner.Driver.Pull(runnerImage); pullError != nil {
jobTracker.SetError(pullError)
if !job.ContinueOnError {
jobTracker.SetStatus("failed")
return
}
}
if runError := runner.RunJobInContainer(runnerImage, containerName, jobContext); runError != nil {
jobTracker.SetError(runError)
if !job.ContinueOnError {
jobTracker.SetStatus("failed")
return
}
}
jobTracker.SetStatus("success")
runner.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
}
go runJob(r, jobContext, jobTracker, &groupWait)
}
groupWait.Wait()
}
r.deferred.RunAllDeferredTasks()
return *rootTracker
}
// Executes a command within the given container.
//
// If the command raises an error while in the container or fails to run
// the command at all, an error is returned, otherwise nil.
func (r *Runner) RunCommandInContainer(containerId string, command string, stepCwd string) error {
result := r.Driver.Exec(containerId, command, stepCwd)
if result.Error != nil {
return result.Error
}
if result.ExitCode != 0 {
return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode))
}
return nil
}
// Executes a job within a container.
//
// The container is started before the job steps are run and cleaned up after.
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
r.Driver.Start(imageUri, containerId)
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
logger.Info("Started cleaning up %s", containerId)
r.Driver.Stop(containerId)
})
job := jobContext.Value("currentJob").(workflow.Job)
logger.Info("Started %s", containerId)
for stepIndex, step := range job.Steps {
stepCwd := jobContext.Value("workflow").(workflow.Workflow).GetWorkingDirectory(job.Name, stepIndex)
logger.Info("Run: %s", step.Run)
logger.Info("Using working directory %s", stepCwd)
var stepError error
if step.Run != "" {
stepError = r.RunCommandInContainer(containerId, step.Run, stepCwd)
}
if stepError != nil && !step.ContinueOnError {
return stepError
} else if stepError != nil {
logger.Warn("Step errored by continued: %s", stepError)
}
}
return nil
}