courgette/internal/runner/runner.go

195 lines
5.5 KiB
Go

package runner
import (
"context"
actions "courgette/internal/actions"
cache "courgette/internal/cache"
driver "courgette/internal/driver"
logger "courgette/internal/logging"
workflows "courgette/internal/workflow"
"errors"
"fmt"
"sync"
)
// Supervisor structure that handles running workflows, triggering
// side-effects along the lifecycle of each run.
type Runner struct {
// Image labels that can be used by workflows.
Labels map[string]string
// Handler for container operations.
Driver driver.ContainerDriver
// Runner's cache.
Cache *cache.CacheManager
// Deferred tasks that can be queued during runs.
Deferred *DeferredTaskManager
// Actions handling.
Actions *actions.ActionsManager
}
func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoot string) Runner {
cacheManager, _ := cache.NewCacheManager(cacheRoot)
return Runner{
Driver: driver,
Cache: cacheManager,
Labels: labels,
Deferred: NewDeferredTaskManager(),
Actions: actions.NewActionsManager(cacheManager.Root),
}
}
// Executes a workflow using the runner.
//
// This is the high-level call that will set up the container
// that the jobs will be executed in, run the jobs's steps and
// tear down the container once no longer useful.
func (r *Runner) RunWorkflow(workflow workflows.Workflow) TaskTracker {
logger.Info("Executing workflow: %s", workflow.SourcePath)
rootTracker := NewTaskTracker(nil)
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
defer r.Deferred.RunAllDeferredTasks()
workflow.Walk(func(n interface{}) {
switch n.(type) {
case workflows.Step:
if useAction := n.(workflows.Step).Uses; useAction != "" {
r.Actions.PrefetchAction(useAction)
}
}
})
for _, group := range workflow.GetJobsAsGroups() {
var groupWait sync.WaitGroup
for _, job := range group {
jobTracker := NewTaskTracker(rootTracker)
runnerImage, defined := r.Labels[job.RunsOn]
if !defined {
jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn))
continue
}
jobContext := context.WithValue(workflowContext, "currentJob", job)
jobContext = context.WithValue(jobContext, "runnerImageUri", runnerImage)
groupWait.Add(1)
go r.runJob(jobContext, jobTracker, &groupWait)
}
groupWait.Wait()
}
return *rootTracker
}
// Runs a given job (provided a runner to run it on, its context, a task tracker for progress monitoring and
// a WaitGroup to coordinate concurrent tasks) and updates the tracker with results.
func (r Runner) runJob(jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) {
job := jobContext.Value("currentJob").(workflows.Job)
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
defer jobWaitGroup.Done()
defer r.Deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
jobTracker.SetStatus("started")
runnerImage := jobContext.Value("runnerImageUri").(string)
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
jobTracker.SetError(pullError)
if !job.ContinueOnError {
jobTracker.SetStatus("failed")
return
}
}
if runError := r.RunJobInContainer(runnerImage, containerName, jobContext); runError != nil {
jobTracker.SetError(runError)
if !job.ContinueOnError {
jobTracker.SetStatus("failed")
return
}
}
jobTracker.SetStatus("success")
}
// Executes a command within the given container.
//
// If the command raises an error while in the container or fails to run
// the command at all, an error is returned, otherwise nil.
func (r *Runner) RunCommandInContainer(containerId string, command string, options driver.CommandOptions) error {
result := r.Driver.Exec(containerId, command, options)
if result.Error != nil {
return result.Error
}
if result.ExitCode != 0 {
return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode))
}
return nil
}
// Executes a job within a container.
//
// The container is started before the job steps are run and cleaned up after.
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
r.Driver.Start(imageUri, containerId, r.Cache.Root)
r.Deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
logger.Info("Started cleaning up %s", containerId)
r.Driver.Stop(containerId)
})
job := jobContext.Value("currentJob").(workflows.Job)
logger.Info("Started %s", containerId)
for stepIndex, step := range job.Steps {
workflow := jobContext.Value("workflow").(workflows.Workflow)
stepCwd := workflow.GetWorkingDirectory(job.Name, stepIndex)
stepEnv := workflow.GetEnv(job.Name, stepIndex)
stepShell := workflow.GetShell(job.Name, stepIndex)
logger.Info("Run: %s", step.Run)
logger.Info("Using working directory %s", stepCwd)
var stepError error
if step.Run != "" {
commandOptions := driver.CommandOptions{
Cwd: stepCwd,
Env: stepEnv,
Shell: stepShell,
}
stepError = r.RunCommandInContainer(containerId, step.Run, commandOptions)
} else if step.Uses != "" {
for key, value := range step.With {
stepEnv[actions.FormatInputEnvKey(key)] = value
}
commandOptions := driver.CommandOptions{
Cwd: stepCwd,
Env: stepEnv,
Shell: stepShell,
}
stepError = r.Actions.UseAction(step.Uses, containerId, commandOptions)
}
if stepError != nil && !step.ContinueOnError {
return stepError
} else if stepError != nil {
logger.Warn("Step errored by continued: %s", stepError)
}
}
return nil
}