195 lines
5.5 KiB
Go
195 lines
5.5 KiB
Go
package runner
|
|
|
|
import (
|
|
"context"
|
|
actions "courgette/internal/actions"
|
|
cache "courgette/internal/cache"
|
|
driver "courgette/internal/driver"
|
|
logger "courgette/internal/logging"
|
|
workflows "courgette/internal/workflow"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
)
|
|
|
|
// Supervisor structure that handles running workflows, triggering
|
|
// side-effects along the lifecycle of each run.
|
|
type Runner struct {
|
|
// Image labels that can be used by workflows.
|
|
Labels map[string]string
|
|
// Handler for container operations.
|
|
Driver driver.ContainerDriver
|
|
// Runner's cache.
|
|
Cache *cache.CacheManager
|
|
// Deferred tasks that can be queued during runs.
|
|
Deferred *DeferredTaskManager
|
|
// Actions handling.
|
|
Actions *actions.ActionsManager
|
|
}
|
|
|
|
func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoot string) Runner {
|
|
cacheManager, _ := cache.NewCacheManager(cacheRoot)
|
|
|
|
return Runner{
|
|
Driver: driver,
|
|
Cache: cacheManager,
|
|
Labels: labels,
|
|
Deferred: NewDeferredTaskManager(),
|
|
Actions: actions.NewActionsManager(cacheManager.Root),
|
|
}
|
|
}
|
|
|
|
// Executes a workflow using the runner.
|
|
//
|
|
// This is the high-level call that will set up the container
|
|
// that the jobs will be executed in, run the jobs's steps and
|
|
// tear down the container once no longer useful.
|
|
func (r *Runner) RunWorkflow(workflow workflows.Workflow) TaskTracker {
|
|
logger.Info("Executing workflow: %s", workflow.SourcePath)
|
|
rootTracker := NewTaskTracker(nil)
|
|
|
|
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
|
|
|
|
defer r.Deferred.RunAllDeferredTasks()
|
|
|
|
workflow.Walk(func(n interface{}) {
|
|
switch n.(type) {
|
|
case workflows.Step:
|
|
if useAction := n.(workflows.Step).Uses; useAction != "" {
|
|
r.Actions.PrefetchAction(useAction)
|
|
}
|
|
}
|
|
})
|
|
|
|
for _, group := range workflow.GetJobsAsGroups() {
|
|
var groupWait sync.WaitGroup
|
|
|
|
for _, job := range group {
|
|
jobTracker := NewTaskTracker(rootTracker)
|
|
runnerImage, defined := r.Labels[job.RunsOn]
|
|
if !defined {
|
|
jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn))
|
|
continue
|
|
}
|
|
|
|
jobContext := context.WithValue(workflowContext, "currentJob", job)
|
|
jobContext = context.WithValue(jobContext, "runnerImageUri", runnerImage)
|
|
|
|
groupWait.Add(1)
|
|
go r.runJob(jobContext, jobTracker, &groupWait)
|
|
}
|
|
|
|
groupWait.Wait()
|
|
}
|
|
|
|
return *rootTracker
|
|
}
|
|
|
|
// Runs a given job (provided a runner to run it on, its context, a task tracker for progress monitoring and
|
|
// a WaitGroup to coordinate concurrent tasks) and updates the tracker with results.
|
|
|
|
func (r Runner) runJob(jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) {
|
|
job := jobContext.Value("currentJob").(workflows.Job)
|
|
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
|
|
|
|
defer jobWaitGroup.Done()
|
|
defer r.Deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
|
|
|
jobTracker.SetStatus("started")
|
|
runnerImage := jobContext.Value("runnerImageUri").(string)
|
|
|
|
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
|
|
|
|
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
|
|
jobTracker.SetError(pullError)
|
|
|
|
if !job.ContinueOnError {
|
|
jobTracker.SetStatus("failed")
|
|
return
|
|
}
|
|
|
|
}
|
|
|
|
if runError := r.RunJobInContainer(runnerImage, containerName, jobContext); runError != nil {
|
|
jobTracker.SetError(runError)
|
|
if !job.ContinueOnError {
|
|
jobTracker.SetStatus("failed")
|
|
return
|
|
}
|
|
}
|
|
|
|
jobTracker.SetStatus("success")
|
|
|
|
}
|
|
|
|
// Executes a command within the given container.
|
|
//
|
|
// If the command raises an error while in the container or fails to run
|
|
// the command at all, an error is returned, otherwise nil.
|
|
func (r *Runner) RunCommandInContainer(containerId string, command string, options driver.CommandOptions) error {
|
|
result := r.Driver.Exec(containerId, command, options)
|
|
|
|
if result.Error != nil {
|
|
return result.Error
|
|
}
|
|
|
|
if result.ExitCode != 0 {
|
|
return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Executes a job within a container.
|
|
//
|
|
// The container is started before the job steps are run and cleaned up after.
|
|
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
|
|
r.Driver.Start(imageUri, containerId, r.Cache.Root)
|
|
|
|
r.Deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
|
|
logger.Info("Started cleaning up %s", containerId)
|
|
r.Driver.Stop(containerId)
|
|
})
|
|
|
|
job := jobContext.Value("currentJob").(workflows.Job)
|
|
|
|
logger.Info("Started %s", containerId)
|
|
for stepIndex, step := range job.Steps {
|
|
workflow := jobContext.Value("workflow").(workflows.Workflow)
|
|
stepCwd := workflow.GetWorkingDirectory(job.Name, stepIndex)
|
|
stepEnv := workflow.GetEnv(job.Name, stepIndex)
|
|
stepShell := workflow.GetShell(job.Name, stepIndex)
|
|
|
|
logger.Info("Run: %s", step.Run)
|
|
logger.Info("Using working directory %s", stepCwd)
|
|
var stepError error
|
|
|
|
if step.Run != "" {
|
|
commandOptions := driver.CommandOptions{
|
|
Cwd: stepCwd,
|
|
Env: stepEnv,
|
|
Shell: stepShell,
|
|
}
|
|
stepError = r.RunCommandInContainer(containerId, step.Run, commandOptions)
|
|
} else if step.Uses != "" {
|
|
for key, value := range step.With {
|
|
stepEnv[actions.FormatInputEnvKey(key)] = value
|
|
}
|
|
|
|
commandOptions := driver.CommandOptions{
|
|
Cwd: stepCwd,
|
|
Env: stepEnv,
|
|
Shell: stepShell,
|
|
}
|
|
stepError = r.Actions.UseAction(step.Uses, containerId, commandOptions)
|
|
}
|
|
|
|
if stepError != nil && !step.ContinueOnError {
|
|
return stepError
|
|
} else if stepError != nil {
|
|
logger.Warn("Step errored by continued: %s", stepError)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|