package runner import ( "context" actions "courgette/internal/actions" cache "courgette/internal/cache" driver "courgette/internal/driver" logger "courgette/internal/logging" workflows "courgette/internal/workflow" "errors" "fmt" "sync" ) type Runner struct { Labels map[string]string Driver driver.ContainerDriver Cache cache.CacheManager Runs int // Deferred tasks, in order their were scheduled. deferred *DeferredTaskManager } func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoot string) Runner { cacheManager, _ := cache.NewCacheManager(cacheRoot) return Runner{ Driver: driver, Cache: *cacheManager, Labels: labels, deferred: NewDeferredTaskManager(), } } // Executes a workflow using the runner. // // This is the high-level call that will set up the container // that the jobs will be executed in, run the jobs's steps and // tear down the container once no longer useful. func (r *Runner) RunWorkflow(workflow workflows.Workflow) TaskTracker { logger.Info("Executing workflow: %s", workflow.SourcePath) rootTracker := NewTaskTracker(nil) workflowContext := context.WithValue(context.Background(), "workflow", workflow) defer r.deferred.RunAllDeferredTasks() workflow.Walk(func(n interface{}) { switch n.(type) { case workflows.Step: if useAction := n.(workflows.Step).Use; useAction != "" { actions.PrefetchAction(actions.GitClient{}, useAction, r.Cache.Path(actions.GetActionKey(useAction))) } } }) for _, group := range workflow.GetJobsAsGroups() { var groupWait sync.WaitGroup for _, job := range group { jobTracker := NewTaskTracker(rootTracker) runnerImage, defined := r.Labels[job.RunsOn] if !defined { jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn)) continue } jobContext := context.WithValue(workflowContext, "currentJob", job) jobContext = context.WithValue(jobContext, "runnerImageUri", runnerImage) groupWait.Add(1) go r.runJob(jobContext, jobTracker, &groupWait) } groupWait.Wait() } return *rootTracker } // Runs a given job (provided a runner to run it on, its context, a task tracker for progress monitoring and // a WaitGroup to coordinate concurrent tasks) and updates the tracker with results. func (r Runner) runJob(jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) { job := jobContext.Value("currentJob").(workflows.Job) containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId) defer jobWaitGroup.Done() defer r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName)) jobTracker.SetStatus("started") runnerImage := jobContext.Value("runnerImageUri").(string) logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn) if pullError := r.Driver.Pull(runnerImage); pullError != nil { jobTracker.SetError(pullError) if !job.ContinueOnError { jobTracker.SetStatus("failed") return } } if runError := r.RunJobInContainer(runnerImage, containerName, jobContext); runError != nil { jobTracker.SetError(runError) if !job.ContinueOnError { jobTracker.SetStatus("failed") return } } jobTracker.SetStatus("success") } // Executes a command within the given container. // // If the command raises an error while in the container or fails to run // the command at all, an error is returned, otherwise nil. func (r *Runner) RunCommandInContainer(containerId string, command string, options driver.CommandOptions) error { result := r.Driver.Exec(containerId, command, options) if result.Error != nil { return result.Error } if result.ExitCode != 0 { return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode)) } return nil } // Executes a job within a container. // // The container is started before the job steps are run and cleaned up after. func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error { r.Driver.Start(imageUri, containerId) r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() { logger.Info("Started cleaning up %s", containerId) r.Driver.Stop(containerId) }) job := jobContext.Value("currentJob").(workflows.Job) logger.Info("Started %s", containerId) for stepIndex, step := range job.Steps { workflow := jobContext.Value("workflow").(workflows.Workflow) stepCwd := workflow.GetWorkingDirectory(job.Name, stepIndex) stepEnv := workflow.GetEnv(job.Name, stepIndex) stepShell := workflow.GetShell(job.Name, stepIndex) logger.Info("Run: %s", step.Run) logger.Info("Using working directory %s", stepCwd) var stepError error if step.Run != "" { commandOptions := driver.CommandOptions{ Cwd: stepCwd, Env: stepEnv, Shell: stepShell, } stepError = r.RunCommandInContainer(containerId, step.Run, commandOptions) } if stepError != nil && !step.ContinueOnError { return stepError } else if stepError != nil { logger.Warn("Step errored by continued: %s", stepError) } } return nil }