2024-08-01 22:43:18 +00:00
|
|
|
package runner
|
|
|
|
|
|
|
|
import (
|
2024-08-15 23:13:33 +00:00
|
|
|
"context"
|
2024-08-31 03:09:31 +00:00
|
|
|
actions "courgette/internal/actions"
|
2024-08-31 02:20:56 +00:00
|
|
|
cache "courgette/internal/cache"
|
2024-08-23 03:11:00 +00:00
|
|
|
driver "courgette/internal/driver"
|
2024-08-03 17:10:01 +00:00
|
|
|
logger "courgette/internal/logging"
|
2024-08-31 03:09:31 +00:00
|
|
|
workflows "courgette/internal/workflow"
|
2024-08-02 20:38:18 +00:00
|
|
|
"errors"
|
2024-08-01 22:43:18 +00:00
|
|
|
"fmt"
|
2024-08-09 04:17:29 +00:00
|
|
|
"sync"
|
2024-08-01 22:43:18 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type Runner struct {
|
|
|
|
Labels map[string]string
|
2024-08-23 03:11:00 +00:00
|
|
|
Driver driver.ContainerDriver
|
2024-08-31 02:20:56 +00:00
|
|
|
Cache cache.CacheManager
|
2024-08-01 22:43:18 +00:00
|
|
|
Runs int
|
2024-08-02 23:30:37 +00:00
|
|
|
// Deferred tasks, in order their were scheduled.
|
2024-08-03 17:47:51 +00:00
|
|
|
deferred *DeferredTaskManager
|
2024-08-01 22:43:18 +00:00
|
|
|
}
|
|
|
|
|
2024-08-31 02:20:56 +00:00
|
|
|
func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoot string) Runner {
|
|
|
|
cacheManager, _ := cache.NewCacheManager(cacheRoot)
|
|
|
|
|
2024-08-01 22:43:18 +00:00
|
|
|
return Runner{
|
2024-08-03 17:47:51 +00:00
|
|
|
Driver: driver,
|
2024-08-31 02:20:56 +00:00
|
|
|
Cache: *cacheManager,
|
2024-08-03 17:47:51 +00:00
|
|
|
Labels: labels,
|
|
|
|
deferred: NewDeferredTaskManager(),
|
2024-08-01 22:43:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Executes a workflow using the runner.
|
|
|
|
//
|
|
|
|
// This is the high-level call that will set up the container
|
|
|
|
// that the jobs will be executed in, run the jobs's steps and
|
|
|
|
// tear down the container once no longer useful.
|
2024-08-31 03:09:31 +00:00
|
|
|
func (r *Runner) RunWorkflow(workflow workflows.Workflow) TaskTracker {
|
2024-08-03 17:10:01 +00:00
|
|
|
logger.Info("Executing workflow: %s", workflow.SourcePath)
|
2024-08-17 04:18:53 +00:00
|
|
|
rootTracker := NewTaskTracker(nil)
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-15 23:13:33 +00:00
|
|
|
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
|
|
|
|
|
2024-08-20 03:44:36 +00:00
|
|
|
defer r.deferred.RunAllDeferredTasks()
|
|
|
|
|
2024-08-31 03:09:31 +00:00
|
|
|
workflow.Walk(func(n interface{}) {
|
|
|
|
switch n.(type) {
|
|
|
|
case workflows.Step:
|
|
|
|
if useAction := n.(workflows.Step).Use; useAction != "" {
|
2024-09-01 17:38:53 +00:00
|
|
|
actions.NewActionsManager().PrefetchAction(useAction, r.Cache.Path(actions.GetActionKey(useAction)))
|
2024-08-31 03:09:31 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
2024-08-09 04:17:29 +00:00
|
|
|
for _, group := range workflow.GetJobsAsGroups() {
|
|
|
|
var groupWait sync.WaitGroup
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-09 04:17:29 +00:00
|
|
|
for _, job := range group {
|
2024-08-18 03:45:11 +00:00
|
|
|
jobTracker := NewTaskTracker(rootTracker)
|
|
|
|
runnerImage, defined := r.Labels[job.RunsOn]
|
|
|
|
if !defined {
|
|
|
|
jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn))
|
|
|
|
continue
|
|
|
|
}
|
2024-08-09 04:17:29 +00:00
|
|
|
|
2024-08-18 03:45:11 +00:00
|
|
|
jobContext := context.WithValue(workflowContext, "currentJob", job)
|
|
|
|
jobContext = context.WithValue(jobContext, "runnerImageUri", runnerImage)
|
2024-08-18 03:27:26 +00:00
|
|
|
|
2024-08-21 17:56:30 +00:00
|
|
|
groupWait.Add(1)
|
2024-08-20 04:18:14 +00:00
|
|
|
go r.runJob(jobContext, jobTracker, &groupWait)
|
|
|
|
}
|
2024-08-18 03:27:26 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
groupWait.Wait()
|
|
|
|
}
|
2024-08-09 04:17:29 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
return *rootTracker
|
|
|
|
}
|
2024-08-20 03:35:33 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
// Runs a given job (provided a runner to run it on, its context, a task tracker for progress monitoring and
|
|
|
|
// a WaitGroup to coordinate concurrent tasks) and updates the tracker with results.
|
2024-08-20 03:35:33 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
func (r Runner) runJob(jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) {
|
2024-08-31 03:09:31 +00:00
|
|
|
job := jobContext.Value("currentJob").(workflows.Job)
|
2024-08-20 04:18:14 +00:00
|
|
|
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
|
2024-08-21 17:56:30 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
defer jobWaitGroup.Done()
|
2024-08-21 17:56:30 +00:00
|
|
|
defer r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
2024-08-09 04:17:29 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
jobTracker.SetStatus("started")
|
|
|
|
runnerImage := jobContext.Value("runnerImageUri").(string)
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
|
2024-08-20 03:44:36 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
|
|
|
|
jobTracker.SetError(pullError)
|
2024-08-15 23:13:33 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
if !job.ContinueOnError {
|
|
|
|
jobTracker.SetStatus("failed")
|
|
|
|
return
|
2024-08-01 22:43:18 +00:00
|
|
|
}
|
|
|
|
|
2024-08-02 02:48:54 +00:00
|
|
|
}
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-20 04:18:14 +00:00
|
|
|
if runError := r.RunJobInContainer(runnerImage, containerName, jobContext); runError != nil {
|
|
|
|
jobTracker.SetError(runError)
|
|
|
|
if !job.ContinueOnError {
|
|
|
|
jobTracker.SetStatus("failed")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
jobTracker.SetStatus("success")
|
|
|
|
|
2024-08-01 22:43:18 +00:00
|
|
|
}
|
|
|
|
|
2024-08-02 23:40:06 +00:00
|
|
|
// Executes a command within the given container.
|
|
|
|
//
|
|
|
|
// If the command raises an error while in the container or fails to run
|
|
|
|
// the command at all, an error is returned, otherwise nil.
|
2024-08-23 03:38:32 +00:00
|
|
|
func (r *Runner) RunCommandInContainer(containerId string, command string, options driver.CommandOptions) error {
|
|
|
|
result := r.Driver.Exec(containerId, command, options)
|
2024-08-02 23:40:06 +00:00
|
|
|
|
|
|
|
if result.Error != nil {
|
|
|
|
return result.Error
|
|
|
|
}
|
|
|
|
|
|
|
|
if result.ExitCode != 0 {
|
|
|
|
return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode))
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-08-01 22:43:18 +00:00
|
|
|
// Executes a job within a container.
|
|
|
|
//
|
|
|
|
// The container is started before the job steps are run and cleaned up after.
|
2024-08-15 23:13:33 +00:00
|
|
|
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
|
2024-08-02 23:30:37 +00:00
|
|
|
r.Driver.Start(imageUri, containerId)
|
2024-08-02 20:38:18 +00:00
|
|
|
|
2024-08-09 04:17:29 +00:00
|
|
|
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
|
2024-08-03 17:10:01 +00:00
|
|
|
logger.Info("Started cleaning up %s", containerId)
|
2024-08-02 23:30:37 +00:00
|
|
|
r.Driver.Stop(containerId)
|
|
|
|
})
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-31 03:09:31 +00:00
|
|
|
job := jobContext.Value("currentJob").(workflows.Job)
|
2024-08-15 23:13:33 +00:00
|
|
|
|
2024-08-03 17:10:01 +00:00
|
|
|
logger.Info("Started %s", containerId)
|
2024-08-15 23:13:33 +00:00
|
|
|
for stepIndex, step := range job.Steps {
|
2024-08-31 03:09:31 +00:00
|
|
|
workflow := jobContext.Value("workflow").(workflows.Workflow)
|
2024-08-23 03:38:32 +00:00
|
|
|
stepCwd := workflow.GetWorkingDirectory(job.Name, stepIndex)
|
|
|
|
stepEnv := workflow.GetEnv(job.Name, stepIndex)
|
|
|
|
stepShell := workflow.GetShell(job.Name, stepIndex)
|
2024-08-21 03:35:49 +00:00
|
|
|
|
2024-08-03 17:10:01 +00:00
|
|
|
logger.Info("Run: %s", step.Run)
|
2024-08-18 03:58:54 +00:00
|
|
|
logger.Info("Using working directory %s", stepCwd)
|
2024-08-17 04:18:53 +00:00
|
|
|
var stepError error
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-15 22:37:00 +00:00
|
|
|
if step.Run != "" {
|
2024-08-23 03:38:32 +00:00
|
|
|
commandOptions := driver.CommandOptions{
|
|
|
|
Cwd: stepCwd,
|
|
|
|
Env: stepEnv,
|
|
|
|
Shell: stepShell,
|
|
|
|
}
|
|
|
|
stepError = r.RunCommandInContainer(containerId, step.Run, commandOptions)
|
2024-08-17 04:18:53 +00:00
|
|
|
}
|
|
|
|
|
2024-08-18 05:28:20 +00:00
|
|
|
if stepError != nil && !step.ContinueOnError {
|
2024-08-17 04:18:53 +00:00
|
|
|
return stepError
|
2024-08-18 05:28:20 +00:00
|
|
|
} else if stepError != nil {
|
|
|
|
logger.Warn("Step errored by continued: %s", stepError)
|
2024-08-02 20:38:18 +00:00
|
|
|
}
|
|
|
|
}
|
2024-08-01 22:43:18 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|