feat: add job concurrency and basic support for needs job prop

This commit is contained in:
Marc 2024-08-09 00:17:29 -04:00
parent 072f40850d
commit 56295fe89e
Signed by: marc
GPG key ID: 048E042F22B5DC79
5 changed files with 166 additions and 21 deletions

View file

@ -14,7 +14,7 @@ syntax](https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for
- [x] jobs
- [ ] jobs.<job_id>.name
- [ ] jobs.<job_id>.permissions
- [ ] jobs.<job_id>.needs
- [x] jobs.<job_id>.needs (ordering, not success)
- [ ] jobs.<job_id>.if
- [x] jobs.<job_id>.runs-on
- [ ] jobs.<job_id>.environment
@ -46,7 +46,7 @@ syntax](https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for
## Behaviours
- [ ] Job concurrency
- [x] Job concurrency
- [ ] Logging to files
- [ ] Support Actions
- [ ] Support built-in functions

View file

@ -5,6 +5,7 @@ import (
workflow "courgette/internal/workflow"
"errors"
"fmt"
"sync"
)
type Runner struct {
@ -61,27 +62,39 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
logger.Info("Executing workflow: %s", workflow.SourcePath)
task := r.GetTask(r.AddTask())
for _, job := range workflow.Jobs {
jobContext := task.GetJobContext(task.AddJob())
jobContext.SetStatus("started")
for _, group := range workflow.GetJobsAsGroups() {
var groupWait sync.WaitGroup
runnerImage := r.GetImageUriByLabel(job.RunsOn)
containerName := r.GetContainerName(jobContext.Id)
for _, job := range group {
groupWait.Add(1)
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
go func() {
defer groupWait.Done()
jobContext := task.GetJobContext(task.AddJob())
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
jobContext.SetStatus("failed").SetError(pullError)
continue
jobContext.SetStatus("started")
runnerImage := r.GetImageUriByLabel(job.RunsOn)
containerName := r.GetContainerName(jobContext.Id)
logger.Info("Using image %s (label: %s)", runnerImage, job.RunsOn)
if pullError := r.Driver.Pull(runnerImage); pullError != nil {
jobContext.SetStatus("failed").SetError(pullError)
return
}
if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil {
jobContext.SetStatus("failed").SetError(runError)
return
}
jobContext.SetStatus("success")
r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
}()
}
if runError := r.RunJobInContainer(runnerImage, containerName, job); runError != nil {
jobContext.SetStatus("failed").SetError(runError)
continue
}
jobContext.SetStatus("success")
r.deferred.RunDeferredTasksInScope("job")
groupWait.Wait()
}
r.deferred.RunAllDeferredTasks()
@ -113,7 +126,7 @@ func (r *Runner) RunCommandInContainer(containerId string, command string) error
func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error {
r.Driver.Start(imageUri, containerId)
r.deferred.Queue("job", func() {
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
logger.Info("Started cleaning up %s", containerId)
r.Driver.Stop(containerId)
})

View file

@ -0,0 +1,51 @@
package workflow
// Splits a flat map representing a graph's parent > children relationships
// into an ordered slice of levels.
func SplitFlatTreeIntoGroups(flatTree map[string][]string) [][]string {
type TreeNode struct {
name string
level int
}
nodes := map[string]TreeNode{}
deepestLevel := 0
next := []string{""}
for len(next) > 0 {
current := next[0]
next = next[1:]
children := flatTree[current]
currentNode, found := nodes[current]
// The negative value represents the graph root.
currentLevel := -1
if found {
currentLevel = currentNode.level
}
for _, child := range children {
nodes[child] = TreeNode{
name: child,
level: currentLevel + 1,
}
}
if deepestLevel < currentLevel {
deepestLevel = currentLevel
}
next = append(next, children...)
}
levels := make([][]string, deepestLevel+1)
for _, node := range nodes {
levels[node.level] = append(levels[node.level], node.name)
}
return levels
}

View file

@ -0,0 +1,48 @@
package workflow
import (
"reflect"
"testing"
)
func TestSplitFlatTreeIntoGroupsProducesListOfLevels(t *testing.T) {
flatTree := map[string][]string{
"": []string{"a"},
"a": []string{"b"},
"b": []string{"c"},
"c": []string{},
}
levels := SplitFlatTreeIntoGroups(flatTree)
expectedLevels := [][]string{
[]string{"a"},
[]string{"b"},
[]string{"c"},
}
if !reflect.DeepEqual(levels, expectedLevels) {
t.Errorf("Expected levels to be %+v, got %+v instead.", expectedLevels, levels)
}
}
func TestSplitFlatTreeIntoGroupsProducesListOfLevelsIfMultiparent(t *testing.T) {
flatTree := map[string][]string{
"": []string{"a"},
"a": []string{"b", "c"},
"b": []string{"c"},
"c": []string{},
}
levels := SplitFlatTreeIntoGroups(flatTree)
expectedLevels := [][]string{
[]string{"a"},
[]string{"b"},
[]string{"c"},
}
if !reflect.DeepEqual(levels, expectedLevels) {
t.Errorf("Expected levels to be %+v, got %+v instead.", expectedLevels, levels)
}
}

View file

@ -5,8 +5,9 @@ import (
)
type Job struct {
RunsOn string `yaml:"runs-on"`
Steps []Step `yaml:"steps"`
RunsOn string `yaml:"runs-on"`
Steps []Step `yaml:"steps"`
Needs []string `yaml:"needs"`
}
func (j Job) Validate() []error {
@ -59,3 +60,35 @@ func (w Workflow) Validate() []error {
return validationErrors
}
// Creates a deterministic, ordered collection of jobs that respects
// the jobs's dependencies.
func (w Workflow) GetJobsAsGroups() [][]Job {
dependenciesMap := map[string][]string{}
for jobLabel, job := range w.Jobs {
if len(job.Needs) == 0 {
dependenciesMap[""] = append(dependenciesMap[""], jobLabel)
}
for _, need := range job.Needs {
dependenciesMap[need] = append(dependenciesMap[need], jobLabel)
}
}
levels := SplitFlatTreeIntoGroups(dependenciesMap)
groups := [][]Job{}
for _, jobLabels := range levels {
jobs := []Job{}
for _, jobLabel := range jobLabels {
jobs = append(jobs, w.Jobs[jobLabel])
}
groups = append(groups, jobs)
}
return groups
}