refactor: remove scope dependencies on routine running jobs
This commit is contained in:
parent
ac2b95f8ac
commit
4ef95277e8
1 changed files with 18 additions and 14 deletions
|
@ -42,38 +42,42 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker {
|
|||
for _, job := range group {
|
||||
groupWait.Add(1)
|
||||
|
||||
jobTracker := NewTaskTracker(rootTracker)
|
||||
runnerImage, defined := r.Labels[job.RunsOn]
|
||||
if !defined {
|
||||
jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn))
|
||||
continue
|
||||
}
|
||||
|
||||
jobContext := context.WithValue(workflowContext, "currentJob", job)
|
||||
jobContext = context.WithValue(jobContext, "runnerImageUri", runnerImage)
|
||||
|
||||
runJob := func(context context.Context) {
|
||||
defer groupWait.Done()
|
||||
jobTracker := NewTaskTracker(rootTracker).SetStatus("started")
|
||||
|
||||
runnerImage, defined := r.Labels[job.RunsOn]
|
||||
|
||||
if !defined {
|
||||
jobTracker.SetStatus("failed").SetError(fmt.Errorf("Unknown runner image label: %s", job.RunsOn))
|
||||
return
|
||||
}
|
||||
// Runs a given job (provided a runner to run it on, its context, a task tracker for progress monitoring and
|
||||
// a WaitGroup to coordinate concurrent tasks) and updates the tracker with results.
|
||||
runJob := func(runner *Runner, jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) {
|
||||
defer jobWaitGroup.Done()
|
||||
jobTracker.SetStatus("started")
|
||||
runnerImage := jobContext.Value("runnerImageUri").(string)
|
||||
|
||||
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
|
||||
|
||||
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
|
||||
|
||||
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
|
||||
if pullError := runner.Driver.Pull(runnerImage); pullError != nil {
|
||||
jobTracker.SetStatus("failed").SetError(pullError)
|
||||
return
|
||||
}
|
||||
|
||||
if runError := r.RunJobInContainer(runnerImage, containerName, context); runError != nil {
|
||||
if runError := runner.RunJobInContainer(runnerImage, containerName, jobContext); runError != nil {
|
||||
jobTracker.SetStatus("failed").SetError(runError)
|
||||
return
|
||||
}
|
||||
|
||||
jobTracker.SetStatus("success")
|
||||
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
||||
runner.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
||||
}
|
||||
|
||||
go runJob(jobContext)
|
||||
go runJob(r, jobContext, jobTracker, &groupWait)
|
||||
}
|
||||
|
||||
groupWait.Wait()
|
||||
|
|
Loading…
Reference in a new issue