courgette/internal/runner/runner.go

135 lines
3.5 KiB
Go
Raw Normal View History

package runner
import (
"context"
2024-08-03 17:10:01 +00:00
logger "courgette/internal/logging"
2024-08-02 23:49:56 +00:00
workflow "courgette/internal/workflow"
"errors"
"fmt"
"sync"
)
type Runner struct {
Labels map[string]string
Driver ContainerDriver
Runs int
// Deferred tasks, in order their were scheduled.
deferred *DeferredTaskManager
}
func NewRunner(driver ContainerDriver, labels map[string]string) Runner {
return Runner{
Driver: driver,
Labels: labels,
deferred: NewDeferredTaskManager(),
}
}
// Executes a workflow using the runner.
//
// This is the high-level call that will set up the container
// that the jobs will be executed in, run the jobs's steps and
// tear down the container once no longer useful.
func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker {
2024-08-03 17:10:01 +00:00
logger.Info("Executing workflow: %s", workflow.SourcePath)
rootTracker := NewTaskTracker(nil)
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
for _, group := range workflow.GetJobsAsGroups() {
var groupWait sync.WaitGroup
for _, job := range group {
groupWait.Add(1)
jobContext := context.WithValue(workflowContext, "currentJob", job)
runJob := func(context context.Context) {
defer groupWait.Done()
jobTracker := NewTaskTracker(rootTracker).SetStatus("started")
runnerImage, defined := r.Labels[job.RunsOn]
if !defined {
jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn))
return
}
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
jobTracker.SetStatus("failed").SetError(pullError)
return
}
if runError := r.RunJobInContainer(runnerImage, containerName, context); runError != nil {
jobTracker.SetStatus("failed").SetError(runError)
return
}
jobTracker.SetStatus("success")
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
}
go runJob(jobContext)
}
groupWait.Wait()
}
r.deferred.RunAllDeferredTasks()
return *rootTracker
}
// Executes a command within the given container.
//
// If the command raises an error while in the container or fails to run
// the command at all, an error is returned, otherwise nil.
func (r *Runner) RunCommandInContainer(containerId string, command string) error {
result := r.Driver.Exec(containerId, command)
if result.Error != nil {
return result.Error
}
if result.ExitCode != 0 {
return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode))
}
return nil
}
// Executes a job within a container.
//
// The container is started before the job steps are run and cleaned up after.
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
r.Driver.Start(imageUri, containerId)
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
2024-08-03 17:10:01 +00:00
logger.Info("Started cleaning up %s", containerId)
r.Driver.Stop(containerId)
})
job := jobContext.Value("currentJob").(workflow.Job)
2024-08-03 17:10:01 +00:00
logger.Info("Started %s", containerId)
for stepIndex, step := range job.Steps {
2024-08-03 17:10:01 +00:00
logger.Info("Run: %s", step.Run)
logger.Info("Using working directory %s", jobContext.Value("workflow").(workflow.Workflow).GetWorkingDirectory(job.Name, stepIndex))
var stepError error
if step.Run != "" {
stepError = r.RunCommandInContainer(containerId, step.Run)
}
if stepError != nil {
return stepError
}
}
return nil
}