refactor: simpler task tracker, remove ambiguous references to context
This commit is contained in:
parent
ab3030ac7b
commit
a970cb1f5d
4 changed files with 55 additions and 92 deletions
|
@ -42,13 +42,13 @@ func ExecuteWorkflow(configuration Configuration, workflowFile string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, job := range taskResult.Context.Jobs {
|
for _, job := range taskResult.Children {
|
||||||
if job.Status == "success" {
|
if job.Status == "success" {
|
||||||
logger.Info(logger.Green("Job %s: %s"), job.Id, job.Status)
|
logger.Info(logger.Green("Job %s: %s"), job.TaskId, job.Status)
|
||||||
} else if job.Status == "failed" {
|
} else if job.Status == "failed" {
|
||||||
logger.Error(logger.Red("Job %s: %s"), job.Id, job.Status)
|
logger.Error(logger.Red("Job %s: %s"), job.TaskId, job.Status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return fmt.Errorf("Task %s failed with at least 1 error.", taskResult.Id)
|
return fmt.Errorf("Task %s failed with at least 1 error.", taskResult.TaskId)
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
type Runner struct {
|
type Runner struct {
|
||||||
Labels map[string]string
|
Labels map[string]string
|
||||||
Driver ContainerDriver
|
Driver ContainerDriver
|
||||||
Tasks map[string]*Task
|
|
||||||
Runs int
|
Runs int
|
||||||
// Deferred tasks, in order their were scheduled.
|
// Deferred tasks, in order their were scheduled.
|
||||||
deferred *DeferredTaskManager
|
deferred *DeferredTaskManager
|
||||||
|
@ -22,7 +21,6 @@ func NewRunner(driver ContainerDriver, labels map[string]string) Runner {
|
||||||
return Runner{
|
return Runner{
|
||||||
Driver: driver,
|
Driver: driver,
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
Tasks: map[string]*Task{},
|
|
||||||
deferred: NewDeferredTaskManager(),
|
deferred: NewDeferredTaskManager(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,27 +39,14 @@ func (r *Runner) GetContainerName(jobId string) string {
|
||||||
return fmt.Sprintf("runner-%s", jobId)
|
return fmt.Sprintf("runner-%s", jobId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Runner) AddTask() string {
|
|
||||||
task := NewTask()
|
|
||||||
r.Tasks[task.Id] = &task
|
|
||||||
|
|
||||||
return task.Id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Runner) GetTask(taskId string) *Task {
|
|
||||||
task, _ := r.Tasks[taskId]
|
|
||||||
|
|
||||||
return task
|
|
||||||
}
|
|
||||||
|
|
||||||
// Executes a workflow using the runner.
|
// Executes a workflow using the runner.
|
||||||
//
|
//
|
||||||
// This is the high-level call that will set up the container
|
// This is the high-level call that will set up the container
|
||||||
// that the jobs will be executed in, run the jobs's steps and
|
// that the jobs will be executed in, run the jobs's steps and
|
||||||
// tear down the container once no longer useful.
|
// tear down the container once no longer useful.
|
||||||
func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
|
func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker {
|
||||||
logger.Info("Executing workflow: %s", workflow.SourcePath)
|
logger.Info("Executing workflow: %s", workflow.SourcePath)
|
||||||
task := r.GetTask(r.AddTask())
|
rootTracker := NewTaskTracker(nil)
|
||||||
|
|
||||||
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
|
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
|
||||||
|
|
||||||
|
@ -75,28 +60,24 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
|
||||||
|
|
||||||
runJob := func(context context.Context) {
|
runJob := func(context context.Context) {
|
||||||
defer groupWait.Done()
|
defer groupWait.Done()
|
||||||
|
jobTracker := NewTaskTracker(rootTracker).SetStatus("started")
|
||||||
// FIXME: Disambiguate the usage of "context" as a term.
|
|
||||||
jobContext := task.GetJobContext(task.AddJob())
|
|
||||||
|
|
||||||
jobContext.SetStatus("started")
|
|
||||||
|
|
||||||
runnerImage := r.GetImageUriByLabel(job.RunsOn)
|
runnerImage := r.GetImageUriByLabel(job.RunsOn)
|
||||||
containerName := r.GetContainerName(jobContext.Id)
|
containerName := r.GetContainerName(jobTracker.TaskId)
|
||||||
|
|
||||||
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
|
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
|
||||||
|
|
||||||
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
|
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
|
||||||
jobContext.SetStatus("failed").SetError(pullError)
|
jobTracker.SetStatus("failed").SetError(pullError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if runError := r.RunJobInContainer(runnerImage, containerName, context); runError != nil {
|
if runError := r.RunJobInContainer(runnerImage, containerName, context); runError != nil {
|
||||||
jobContext.SetStatus("failed").SetError(runError)
|
jobTracker.SetStatus("failed").SetError(runError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
jobContext.SetStatus("success")
|
jobTracker.SetStatus("success")
|
||||||
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +89,7 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
|
||||||
|
|
||||||
r.deferred.RunAllDeferredTasks()
|
r.deferred.RunAllDeferredTasks()
|
||||||
|
|
||||||
return *task
|
return *rootTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
// Executes a command within the given container.
|
// Executes a command within the given container.
|
||||||
|
@ -146,9 +127,14 @@ func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobConte
|
||||||
for stepIndex, step := range job.Steps {
|
for stepIndex, step := range job.Steps {
|
||||||
logger.Info("Run: %s", step.Run)
|
logger.Info("Run: %s", step.Run)
|
||||||
logger.Info("Using working directory %s", jobContext.Value("workflow").(workflow.Workflow).GetWorkingDirectory(job.Name, stepIndex))
|
logger.Info("Using working directory %s", jobContext.Value("workflow").(workflow.Workflow).GetWorkingDirectory(job.Name, stepIndex))
|
||||||
|
var stepError error
|
||||||
|
|
||||||
if step.Run != "" {
|
if step.Run != "" {
|
||||||
return r.RunCommandInContainer(containerId, step.Run)
|
stepError = r.RunCommandInContainer(containerId, step.Run)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stepError != nil {
|
||||||
|
return stepError
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,70 +8,47 @@ import (
|
||||||
"github.com/gofrs/uuid"
|
"github.com/gofrs/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobContext struct {
|
type TaskTracker struct {
|
||||||
Id string
|
TaskId string
|
||||||
Status string
|
Status string
|
||||||
Error error
|
Error error
|
||||||
|
Parent *TaskTracker
|
||||||
|
Children []*TaskTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
type TaskContext struct {
|
func NewTaskTracker(parent *TaskTracker) *TaskTracker {
|
||||||
Jobs map[string]*JobContext
|
newTask := &TaskTracker{TaskId: uuid.Must(uuid.NewV1()).String()}
|
||||||
}
|
|
||||||
|
|
||||||
type Task struct {
|
if parent != nil {
|
||||||
Id string
|
parent.Children = append(parent.Children, newTask)
|
||||||
Context TaskContext
|
newTask.Parent = parent
|
||||||
}
|
|
||||||
|
|
||||||
func NewTask() Task {
|
|
||||||
return Task{
|
|
||||||
Id: uuid.Must(uuid.NewV1()).String(),
|
|
||||||
Context: TaskContext{
|
|
||||||
Jobs: map[string]*JobContext{},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Task) AddJob() string {
|
|
||||||
jobContext := NewJobContext()
|
|
||||||
t.Context.Jobs[jobContext.Id] = &jobContext
|
|
||||||
|
|
||||||
return jobContext.Id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *Task) GetJobContext(jobId string) *JobContext {
|
|
||||||
ctx, exists := t.Context.Jobs[jobId]
|
|
||||||
|
|
||||||
if exists {
|
|
||||||
return ctx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return newTask
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t Task) HasError() bool {
|
func (t *TaskTracker) SetStatus(status string) *TaskTracker {
|
||||||
for _, job := range t.Context.Jobs {
|
t.Status = status
|
||||||
if job.Error != nil {
|
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TaskTracker) SetError(err error) *TaskTracker {
|
||||||
|
t.Error = err
|
||||||
|
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t TaskTracker) HasError() bool {
|
||||||
|
if t.Error != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, child := range t.Children {
|
||||||
|
if child.HasError() {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewJobContext() JobContext {
|
|
||||||
return JobContext{
|
|
||||||
Id: uuid.Must(uuid.NewV1()).String(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *JobContext) SetStatus(newStatus string) *JobContext {
|
|
||||||
c.Status = newStatus
|
|
||||||
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *JobContext) SetError(err error) *JobContext {
|
|
||||||
c.Error = err
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
|
@ -6,8 +6,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTaskHasErrorReturnsFalseIfNoUnderlyingJobHaveErrors(t *testing.T) {
|
func TestTaskHasErrorReturnsFalseIfNoUnderlyingJobHaveErrors(t *testing.T) {
|
||||||
task := NewTask()
|
task := NewTaskTracker(nil)
|
||||||
task.AddJob()
|
NewTaskTracker(task)
|
||||||
|
|
||||||
if task.HasError() {
|
if task.HasError() {
|
||||||
t.Errorf("Expected false, got true.")
|
t.Errorf("Expected false, got true.")
|
||||||
|
@ -15,13 +15,13 @@ func TestTaskHasErrorReturnsFalseIfNoUnderlyingJobHaveErrors(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTaskHasErrorReturnsTrueIfAnyJobHasErrors(t *testing.T) {
|
func TestTaskHasErrorReturnsTrueIfAnyJobHasErrors(t *testing.T) {
|
||||||
task := NewTask()
|
task := NewTaskTracker(nil)
|
||||||
|
|
||||||
// Two jobs, one of which has an error.
|
// Two jobs, one of which has an error.
|
||||||
task.AddJob()
|
NewTaskTracker(task)
|
||||||
jobId := task.AddJob()
|
subTask := NewTaskTracker(task)
|
||||||
|
|
||||||
task.GetJobContext(jobId).SetError(errors.New("test"))
|
subTask.SetError(errors.New("test"))
|
||||||
|
|
||||||
if !task.HasError() {
|
if !task.HasError() {
|
||||||
t.Errorf("Expected true, got false.")
|
t.Errorf("Expected true, got false.")
|
||||||
|
|
Loading…
Reference in a new issue