From 56295fe89e0c6d40e18ff37536595ed6a32c044a Mon Sep 17 00:00:00 2001 From: Marc Cataford Date: Fri, 9 Aug 2024 00:17:29 -0400 Subject: [PATCH] feat: add job concurrency and basic support for needs job prop --- WORKFLOW_SUPPORT.md | 4 +- internal/runner/runner.go | 47 +++++++++++++++--------- internal/workflow/dependencies.go | 51 ++++++++++++++++++++++++++ internal/workflow/dependencies_test.go | 48 ++++++++++++++++++++++++ internal/workflow/models.go | 37 ++++++++++++++++++- 5 files changed, 166 insertions(+), 21 deletions(-) create mode 100644 internal/workflow/dependencies.go create mode 100644 internal/workflow/dependencies_test.go diff --git a/WORKFLOW_SUPPORT.md b/WORKFLOW_SUPPORT.md index 77a2de4..30a8555 100644 --- a/WORKFLOW_SUPPORT.md +++ b/WORKFLOW_SUPPORT.md @@ -14,7 +14,7 @@ syntax](https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for - [x] jobs - [ ] jobs..name - [ ] jobs..permissions - - [ ] jobs..needs + - [x] jobs..needs (ordering, not success) - [ ] jobs..if - [x] jobs..runs-on - [ ] jobs..environment @@ -46,7 +46,7 @@ syntax](https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for ## Behaviours -- [ ] Job concurrency +- [x] Job concurrency - [ ] Logging to files - [ ] Support Actions - [ ] Support built-in functions diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 51b2575..d07c7f7 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -5,6 +5,7 @@ import ( workflow "courgette/internal/workflow" "errors" "fmt" + "sync" ) type Runner struct { @@ -61,27 +62,39 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task { logger.Info("Executing workflow: %s", workflow.SourcePath) task := r.GetTask(r.AddTask()) - for _, job := range workflow.Jobs { - jobContext := task.GetJobContext(task.AddJob()) - jobContext.SetStatus("started") + for _, group := range workflow.GetJobsAsGroups() { + var groupWait sync.WaitGroup - runnerImage := r.GetImageUriByLabel(job.RunsOn) - containerName := r.GetContainerName(jobContext.Id) + for _, job := range group { + groupWait.Add(1) - logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn) + go func() { + defer groupWait.Done() + jobContext := task.GetJobContext(task.AddJob()) - if pullError := r.Driver.Pull(runnerImage); pullError != nil { - jobContext.SetStatus("failed").SetError(pullError) - continue + jobContext.SetStatus("started") + + runnerImage := r.GetImageUriByLabel(job.RunsOn) + containerName := r.GetContainerName(jobContext.Id) + + logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn) + + if pullError := r.Driver.Pull(runnerImage); pullError != nil { + jobContext.SetStatus("failed").SetError(pullError) + return + } + + if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil { + jobContext.SetStatus("failed").SetError(runError) + return + } + + jobContext.SetStatus("success") + r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName)) + }() } - if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil { - jobContext.SetStatus("failed").SetError(runError) - continue - } - - jobContext.SetStatus("success") - r.deferred.RunDeferredTasksInScope("job") + groupWait.Wait() } r.deferred.RunAllDeferredTasks() @@ -113,7 +126,7 @@ func (r *Runner) RunCommandInContainer(containerId string, command string) error func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error { r.Driver.Start(imageUri, containerId) - r.deferred.Queue("job", func() { + r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() { logger.Info("Started cleaning up %s", containerId) r.Driver.Stop(containerId) }) diff --git a/internal/workflow/dependencies.go b/internal/workflow/dependencies.go new file mode 100644 index 0000000..d2bc31b --- /dev/null +++ b/internal/workflow/dependencies.go @@ -0,0 +1,51 @@ +package workflow + +// Splits a flat map representing a graph's parent > children relationships +// into an ordered slice of levels. +func SplitFlatTreeIntoGroups(flatTree map[string][]string) [][]string { + type TreeNode struct { + name string + level int + } + + nodes := map[string]TreeNode{} + + deepestLevel := 0 + next := []string{""} + + for len(next) > 0 { + current := next[0] + next = next[1:] + children := flatTree[current] + + currentNode, found := nodes[current] + + // The negative value represents the graph root. + currentLevel := -1 + + if found { + currentLevel = currentNode.level + } + + for _, child := range children { + nodes[child] = TreeNode{ + name: child, + level: currentLevel + 1, + } + } + + if deepestLevel < currentLevel { + deepestLevel = currentLevel + } + + next = append(next, children...) + } + + levels := make([][]string, deepestLevel+1) + + for _, node := range nodes { + levels[node.level] = append(levels[node.level], node.name) + } + + return levels +} diff --git a/internal/workflow/dependencies_test.go b/internal/workflow/dependencies_test.go new file mode 100644 index 0000000..a10d255 --- /dev/null +++ b/internal/workflow/dependencies_test.go @@ -0,0 +1,48 @@ +package workflow + +import ( + "reflect" + "testing" +) + +func TestSplitFlatTreeIntoGroupsProducesListOfLevels(t *testing.T) { + flatTree := map[string][]string{ + "": []string{"a"}, + "a": []string{"b"}, + "b": []string{"c"}, + "c": []string{}, + } + + levels := SplitFlatTreeIntoGroups(flatTree) + + expectedLevels := [][]string{ + []string{"a"}, + []string{"b"}, + []string{"c"}, + } + + if !reflect.DeepEqual(levels, expectedLevels) { + t.Errorf("Expected levels to be %+v, got %+v instead.", expectedLevels, levels) + } +} + +func TestSplitFlatTreeIntoGroupsProducesListOfLevelsIfMultiparent(t *testing.T) { + flatTree := map[string][]string{ + "": []string{"a"}, + "a": []string{"b", "c"}, + "b": []string{"c"}, + "c": []string{}, + } + + levels := SplitFlatTreeIntoGroups(flatTree) + + expectedLevels := [][]string{ + []string{"a"}, + []string{"b"}, + []string{"c"}, + } + + if !reflect.DeepEqual(levels, expectedLevels) { + t.Errorf("Expected levels to be %+v, got %+v instead.", expectedLevels, levels) + } +} diff --git a/internal/workflow/models.go b/internal/workflow/models.go index a62463c..481dd13 100644 --- a/internal/workflow/models.go +++ b/internal/workflow/models.go @@ -5,8 +5,9 @@ import ( ) type Job struct { - RunsOn string `yaml:"runs-on"` - Steps []Step `yaml:"steps"` + RunsOn string `yaml:"runs-on"` + Steps []Step `yaml:"steps"` + Needs []string `yaml:"needs"` } func (j Job) Validate() []error { @@ -59,3 +60,35 @@ func (w Workflow) Validate() []error { return validationErrors } + +// Creates a deterministic, ordered collection of jobs that respects +// the jobs's dependencies. +func (w Workflow) GetJobsAsGroups() [][]Job { + dependenciesMap := map[string][]string{} + + for jobLabel, job := range w.Jobs { + if len(job.Needs) == 0 { + dependenciesMap[""] = append(dependenciesMap[""], jobLabel) + } + + for _, need := range job.Needs { + dependenciesMap[need] = append(dependenciesMap[need], jobLabel) + } + } + + levels := SplitFlatTreeIntoGroups(dependenciesMap) + + groups := [][]Job{} + + for _, jobLabels := range levels { + jobs := []Job{} + + for _, jobLabel := range jobLabels { + jobs = append(jobs, w.Jobs[jobLabel]) + } + + groups = append(groups, jobs) + } + + return groups +}