package runner import ( "errors" "fmt" "log" workflow "runner/internal/workflow" "sync" ) type Runner struct { Labels map[string]string Driver ContainerDriver Tasks map[string]*Task Runs int // Deferred tasks, in order their were scheduled. deferred []func() } func NewRunner(driver ContainerDriver, labels map[string]string) Runner { return Runner{ Driver: driver, Labels: labels, Tasks: map[string]*Task{}, } } // Adds a task to deferral queue. func (r *Runner) DeferTask(task func()) { r.deferred = append(r.deferred, task) } // Consumes deferred tasks, if any. // // Each task is executed within a go routine and the call will // wait until all the tasks are completed before returning. func (r *Runner) RunDeferredTasks() { log.Printf("Running %d deferred tasks.", len(r.deferred)) var tracker sync.WaitGroup for _, deferred := range r.deferred { tracker.Add(1) go func() { defer tracker.Done() deferred() }() r.deferred = nil } tracker.Wait() } func (r *Runner) GetImageUriByLabel(label string) string { uri, exists := r.Labels[label] if exists { return uri } return "debian:latest" } func (r *Runner) GetContainerName(jobId string) string { return fmt.Sprintf("runner-%s", jobId) } func (r *Runner) AddTask() string { task := NewTask() r.Tasks[task.Id] = &task return task.Id } func (r *Runner) GetTask(taskId string) *Task { task, _ := r.Tasks[taskId] return task } // Executes a workflow using the runner. // // This is the high-level call that will set up the container // that the jobs will be executed in, run the jobs's steps and // tear down the container once no longer useful. func (r *Runner) Execute(workflow workflow.Workflow) Task { log.Printf("Executing workflow: %s", workflow.SourcePath) task := r.GetTask(r.AddTask()) for _, job := range workflow.Jobs { jobContext := task.GetJobContext(task.AddJob()) jobContext.SetStatus("started") runnerImage := r.GetImageUriByLabel(job.RunsOn) containerName := r.GetContainerName(jobContext.Id) log.Printf("Using image %s (label: %s)", runnerImage, job.RunsOn) if pullError := r.PullContainer(runnerImage); pullError != nil { jobContext.SetStatus("failed").SetError(pullError) continue } if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil { jobContext.SetStatus("failed").SetError(runError) continue } jobContext.SetStatus("success") } r.RunDeferredTasks() return *task } // Executes a job within a container. // // The container is started before the job steps are run and cleaned up after. func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error { r.Driver.Start(imageUri, containerId) r.DeferTask(func() { log.Printf("Started cleaning up %s", containerId) r.Driver.Stop(containerId) }) log.Printf("Started %s", containerId) for _, step := range job.Steps { log.Printf("Run: %s", step.Run) if err := r.RunCommandInContainer(containerId, step.Run); err != nil { return err } } return nil } // Pulls the container from the registry provided its image uri. func (r *Runner) PullContainer(uri string) error { return r.Driver.Pull(uri) } // Executes a command within the given container. // // If the command raises an error while in the container or fails to run // the command at all, an error is returned, otherwise nil. func (r *Runner) RunCommandInContainer(containerId string, command string) error { result := r.Driver.Exec(containerId, command) if result.Error != nil { return result.Error } if result.ExitCode != 0 { return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode)) } return nil }