courgette/internal/runner/runner.go

145 lines
3.4 KiB
Go
Raw Normal View History

package runner
import (
2024-08-03 17:10:01 +00:00
logger "courgette/internal/logging"
2024-08-02 23:49:56 +00:00
workflow "courgette/internal/workflow"
"errors"
"fmt"
"sync"
)
type Runner struct {
Labels map[string]string
Driver ContainerDriver
Tasks map[string]*Task
Runs int
// Deferred tasks, in order their were scheduled.
deferred *DeferredTaskManager
}
func NewRunner(driver ContainerDriver, labels map[string]string) Runner {
return Runner{
Driver: driver,
Labels: labels,
Tasks: map[string]*Task{},
deferred: NewDeferredTaskManager(),
}
}
func (r *Runner) GetImageUriByLabel(label string) string {
uri, exists := r.Labels[label]
if exists {
return uri
}
return "debian:latest"
}
func (r *Runner) GetContainerName(jobId string) string {
return fmt.Sprintf("runner-%s", jobId)
}
func (r *Runner) AddTask() string {
task := NewTask()
r.Tasks[task.Id] = &task
return task.Id
}
func (r *Runner) GetTask(taskId string) *Task {
task, _ := r.Tasks[taskId]
return task
}
// Executes a workflow using the runner.
//
// This is the high-level call that will set up the container
// that the jobs will be executed in, run the jobs's steps and
// tear down the container once no longer useful.
func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
2024-08-03 17:10:01 +00:00
logger.Info("Executing workflow: %s", workflow.SourcePath)
task := r.GetTask(r.AddTask())
for _, group := range workflow.GetJobsAsGroups() {
var groupWait sync.WaitGroup
for _, job := range group {
groupWait.Add(1)
go func() {
defer groupWait.Done()
jobContext := task.GetJobContext(task.AddJob())
jobContext.SetStatus("started")
runnerImage := r.GetImageUriByLabel(job.RunsOn)
containerName := r.GetContainerName(jobContext.Id)
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
jobContext.SetStatus("failed").SetError(pullError)
return
}
if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil {
jobContext.SetStatus("failed").SetError(runError)
return
}
jobContext.SetStatus("success")
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
}()
}
groupWait.Wait()
}
r.deferred.RunAllDeferredTasks()
return *task
}
// Executes a command within the given container.
//
// If the command raises an error while in the container or fails to run
// the command at all, an error is returned, otherwise nil.
func (r *Runner) RunCommandInContainer(containerId string, command string) error {
result := r.Driver.Exec(containerId, command)
if result.Error != nil {
return result.Error
}
if result.ExitCode != 0 {
return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode))
}
return nil
}
// Executes a job within a container.
//
// The container is started before the job steps are run and cleaned up after.
func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error {
r.Driver.Start(imageUri, containerId)
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
2024-08-03 17:10:01 +00:00
logger.Info("Started cleaning up %s", containerId)
r.Driver.Stop(containerId)
})
2024-08-03 17:10:01 +00:00
logger.Info("Started %s", containerId)
for _, step := range job.Steps {
2024-08-03 17:10:01 +00:00
logger.Info("Run: %s", step.Run)
if step.Run != "" {
return r.RunCommandInContainer(containerId, step.Run)
}
}
return nil
}