2024-08-01 22:43:18 +00:00
|
|
|
package runner
|
|
|
|
|
|
|
|
import (
|
2024-08-02 20:38:18 +00:00
|
|
|
"errors"
|
2024-08-01 22:43:18 +00:00
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
workflow "runner/internal/workflow"
|
2024-08-02 23:30:37 +00:00
|
|
|
"sync"
|
2024-08-01 22:43:18 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type Runner struct {
|
|
|
|
Labels map[string]string
|
|
|
|
Driver ContainerDriver
|
2024-08-02 02:48:54 +00:00
|
|
|
Tasks map[string]*Task
|
2024-08-01 22:43:18 +00:00
|
|
|
Runs int
|
2024-08-02 23:30:37 +00:00
|
|
|
// Deferred tasks, in order their were scheduled.
|
|
|
|
deferred []func()
|
2024-08-01 22:43:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewRunner(driver ContainerDriver, labels map[string]string) Runner {
|
|
|
|
return Runner{
|
|
|
|
Driver: driver,
|
|
|
|
Labels: labels,
|
2024-08-02 02:48:54 +00:00
|
|
|
Tasks: map[string]*Task{},
|
2024-08-01 22:43:18 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-08-02 23:30:37 +00:00
|
|
|
// Adds a task to deferral queue.
|
|
|
|
func (r *Runner) DeferTask(task func()) {
|
|
|
|
r.deferred = append(r.deferred, task)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Consumes deferred tasks, if any.
|
|
|
|
//
|
|
|
|
// Each task is executed within a go routine and the call will
|
|
|
|
// wait until all the tasks are completed before returning.
|
|
|
|
func (r *Runner) RunDeferredTasks() {
|
|
|
|
log.Printf("Running %d deferred tasks.", len(r.deferred))
|
|
|
|
|
|
|
|
var tracker sync.WaitGroup
|
|
|
|
|
|
|
|
for _, deferred := range r.deferred {
|
|
|
|
tracker.Add(1)
|
|
|
|
go func() {
|
|
|
|
defer tracker.Done()
|
|
|
|
deferred()
|
|
|
|
}()
|
|
|
|
r.deferred = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
tracker.Wait()
|
|
|
|
}
|
|
|
|
|
2024-08-01 22:43:18 +00:00
|
|
|
func (r *Runner) GetImageUriByLabel(label string) string {
|
|
|
|
uri, exists := r.Labels[label]
|
|
|
|
|
|
|
|
if exists {
|
|
|
|
return uri
|
|
|
|
}
|
|
|
|
|
|
|
|
return "debian:latest"
|
|
|
|
}
|
|
|
|
|
2024-08-02 02:48:54 +00:00
|
|
|
func (r *Runner) GetContainerName(jobId string) string {
|
|
|
|
return fmt.Sprintf("runner-%s", jobId)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Runner) AddTask() string {
|
|
|
|
task := NewTask()
|
|
|
|
r.Tasks[task.Id] = &task
|
|
|
|
|
|
|
|
return task.Id
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *Runner) GetTask(taskId string) *Task {
|
|
|
|
task, _ := r.Tasks[taskId]
|
|
|
|
|
|
|
|
return task
|
|
|
|
}
|
|
|
|
|
2024-08-01 22:43:18 +00:00
|
|
|
// Executes a workflow using the runner.
|
|
|
|
//
|
|
|
|
// This is the high-level call that will set up the container
|
|
|
|
// that the jobs will be executed in, run the jobs's steps and
|
|
|
|
// tear down the container once no longer useful.
|
2024-08-02 23:40:06 +00:00
|
|
|
func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
|
2024-08-01 22:43:18 +00:00
|
|
|
log.Printf("Executing workflow: %s", workflow.SourcePath)
|
2024-08-02 02:48:54 +00:00
|
|
|
task := r.GetTask(r.AddTask())
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-02 02:48:54 +00:00
|
|
|
for _, job := range workflow.Jobs {
|
|
|
|
jobContext := task.GetJobContext(task.AddJob())
|
|
|
|
jobContext.SetStatus("started")
|
2024-08-01 22:43:18 +00:00
|
|
|
|
|
|
|
runnerImage := r.GetImageUriByLabel(job.RunsOn)
|
2024-08-02 02:48:54 +00:00
|
|
|
containerName := r.GetContainerName(jobContext.Id)
|
2024-08-01 22:43:18 +00:00
|
|
|
|
|
|
|
log.Printf("Using image %s (label: %s)", runnerImage, job.RunsOn)
|
|
|
|
|
2024-08-02 23:34:25 +00:00
|
|
|
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
|
2024-08-02 02:48:54 +00:00
|
|
|
jobContext.SetStatus("failed").SetError(pullError)
|
2024-08-01 22:43:18 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil {
|
2024-08-02 02:48:54 +00:00
|
|
|
jobContext.SetStatus("failed").SetError(runError)
|
2024-08-01 22:43:18 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-08-02 02:48:54 +00:00
|
|
|
jobContext.SetStatus("success")
|
|
|
|
}
|
2024-08-01 22:43:18 +00:00
|
|
|
|
2024-08-02 23:30:37 +00:00
|
|
|
r.RunDeferredTasks()
|
|
|
|
|
2024-08-02 02:48:54 +00:00
|
|
|
return *task
|
2024-08-01 22:43:18 +00:00
|
|
|
}
|
|
|
|
|
2024-08-02 23:40:06 +00:00
|
|
|
// Executes a command within the given container.
|
|
|
|
//
|
|
|
|
// If the command raises an error while in the container or fails to run
|
|
|
|
// the command at all, an error is returned, otherwise nil.
|
|
|
|
func (r *Runner) RunCommandInContainer(containerId string, command string) error {
|
|
|
|
result := r.Driver.Exec(containerId, command)
|
|
|
|
|
|
|
|
if result.Error != nil {
|
|
|
|
return result.Error
|
|
|
|
}
|
|
|
|
|
|
|
|
if result.ExitCode != 0 {
|
|
|
|
return errors.New(fmt.Sprintf("Command returned a non-zero exit code (%d).", result.ExitCode))
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-08-01 22:43:18 +00:00
|
|
|
// Executes a job within a container.
|
|
|
|
//
|
|
|
|
// The container is started before the job steps are run and cleaned up after.
|
|
|
|
func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error {
|
2024-08-02 23:30:37 +00:00
|
|
|
r.Driver.Start(imageUri, containerId)
|
2024-08-02 20:38:18 +00:00
|
|
|
|
2024-08-02 23:30:37 +00:00
|
|
|
r.DeferTask(func() {
|
|
|
|
log.Printf("Started cleaning up %s", containerId)
|
|
|
|
r.Driver.Stop(containerId)
|
|
|
|
})
|
2024-08-01 22:43:18 +00:00
|
|
|
|
|
|
|
log.Printf("Started %s", containerId)
|
|
|
|
for _, step := range job.Steps {
|
|
|
|
log.Printf("Run: %s", step.Run)
|
|
|
|
|
2024-08-02 20:38:18 +00:00
|
|
|
if err := r.RunCommandInContainer(containerId, step.Run); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2024-08-01 22:43:18 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|