feat: hoist deferred task management to own struct

This commit is contained in:
Marc 2024-08-03 13:47:51 -04:00
parent 1a5826149f
commit 5c029344bc
Signed by: marc
GPG key ID: 048E042F22B5DC79
4 changed files with 162 additions and 38 deletions

View file

@ -0,0 +1,97 @@
package runner
import (
logger "courgette/internal/logging"
"sync"
)
type DeferredTask func()
type DeferredTaskManager struct {
deferredTasks map[string][]DeferredTask
}
func NewDeferredTaskManager() *DeferredTaskManager {
return &DeferredTaskManager{
deferredTasks: map[string][]DeferredTask{},
}
}
func (m DeferredTaskManager) GetTasksByScope(scope string) []DeferredTask {
tasks, scopeExists := m.deferredTasks[scope]
if scopeExists {
logger.Info("Collected %d tasks from scope %s.", len(tasks), scope)
return tasks
}
logger.Info("Queried deferred tasks for unknown scope: %s", scope)
return []DeferredTask{}
}
func (m DeferredTaskManager) GetAllTasks() []DeferredTask {
allTasks := []DeferredTask{}
for scope, tasks := range m.deferredTasks {
if len(tasks) == 0 {
continue
}
allTasks = append(allTasks, tasks...)
logger.Info("Collected %d deferred tasks from scope %s.", len(tasks), scope)
}
return allTasks
}
func (m *DeferredTaskManager) Queue(scope string, task DeferredTask) {
//if _, scopeExists := m.deferredTasks[scope]; !scopeExists {
// m.deferredTasks[scope] = []DeferredTask{}
//}
m.deferredTasks[scope] = append(m.deferredTasks[scope], task)
}
func (m DeferredTaskManager) RunDeferredTasksInScope(scope string) {
tasksInScope := m.GetTasksByScope(scope)
logger.Info("Running %d deferred tasks.", len(tasksInScope))
var tracker sync.WaitGroup
for _, deferred := range tasksInScope {
tracker.Add(1)
go func() {
defer tracker.Done()
deferred()
}()
}
m.deferredTasks[scope] = nil
tracker.Wait()
}
func (m *DeferredTaskManager) RunAllDeferredTasks() error {
allTasks := m.GetAllTasks()
logger.Info("Running %d deferred tasks.", len(allTasks))
var tracker sync.WaitGroup
for _, deferred := range allTasks {
tracker.Add(1)
go func() {
defer tracker.Done()
deferred()
}()
}
m.deferredTasks = map[string][]DeferredTask{}
tracker.Wait()
return nil
}

View file

@ -0,0 +1,49 @@
package runner
import (
logger "courgette/internal/logging"
"testing"
)
func init() {
logger.ConfigureLogger()
}
func TestQueueAddsTaskToTaskQueue(t *testing.T) {
manager := NewDeferredTaskManager()
manager.Queue("scope", func() {})
if len(manager.deferredTasks["scope"]) != 1 {
t.Errorf("Expected a queue of 1 to exist, got %d items instead.", len(manager.deferredTasks["scope"]))
}
}
func TestGetTasksByScopeReturnsTasksAssociatedWithScope(t *testing.T) {
manager := NewDeferredTaskManager()
manager.Queue("scope", func() {})
if len(manager.GetTasksByScope("scope")) != 1 {
t.Errorf("Expected a queue of 1 to exist, got %d items instead.", len(manager.GetTasksByScope("scope")))
}
}
func TestGetTasksByScopeReturnsEmptySliceIfUnknownScope(t *testing.T) {
manager := NewDeferredTaskManager()
if len(manager.GetTasksByScope("scope")) != 0 {
t.Errorf("Expected a queue of 0 to exist, got %d items instead.", len(manager.GetTasksByScope("scope")))
}
}
func TestGetAllTasksReturnsAllTasksRegardlessOfScope(t *testing.T) {
manager := NewDeferredTaskManager()
manager.Queue("scope-1", func() {})
manager.Queue("scope-2", func() {})
if len(manager.GetAllTasks()) != 2 {
t.Errorf("Expected a queue of 2 to exist, got %d items instead.", len(manager.GetAllTasks()))
}
}

View file

@ -5,7 +5,6 @@ import (
workflow "courgette/internal/workflow" workflow "courgette/internal/workflow"
"errors" "errors"
"fmt" "fmt"
"sync"
) )
type Runner struct { type Runner struct {
@ -14,7 +13,7 @@ type Runner struct {
Tasks map[string]*Task Tasks map[string]*Task
Runs int Runs int
// Deferred tasks, in order their were scheduled. // Deferred tasks, in order their were scheduled.
deferred []func() deferred *DeferredTaskManager
} }
func NewRunner(driver ContainerDriver, labels map[string]string) Runner { func NewRunner(driver ContainerDriver, labels map[string]string) Runner {
@ -22,35 +21,10 @@ func NewRunner(driver ContainerDriver, labels map[string]string) Runner {
Driver: driver, Driver: driver,
Labels: labels, Labels: labels,
Tasks: map[string]*Task{}, Tasks: map[string]*Task{},
deferred: NewDeferredTaskManager(),
} }
} }
// Adds a task to deferral queue.
func (r *Runner) DeferTask(task func()) {
r.deferred = append(r.deferred, task)
}
// Consumes deferred tasks, if any.
//
// Each task is executed within a go routine and the call will
// wait until all the tasks are completed before returning.
func (r *Runner) RunDeferredTasks() {
logger.Info("Running %d deferred tasks.", len(r.deferred))
var tracker sync.WaitGroup
for _, deferred := range r.deferred {
tracker.Add(1)
go func() {
defer tracker.Done()
deferred()
}()
r.deferred = nil
}
tracker.Wait()
}
func (r *Runner) GetImageUriByLabel(label string) string { func (r *Runner) GetImageUriByLabel(label string) string {
uri, exists := r.Labels[label] uri, exists := r.Labels[label]
@ -107,9 +81,10 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task {
} }
jobContext.SetStatus("success") jobContext.SetStatus("success")
r.deferred.RunDeferredTasksInScope("job")
} }
r.RunDeferredTasks() r.deferred.RunAllDeferredTasks()
return *task return *task
} }
@ -138,7 +113,7 @@ func (r *Runner) RunCommandInContainer(containerId string, command string) error
func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error { func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error {
r.Driver.Start(imageUri, containerId) r.Driver.Start(imageUri, containerId)
r.DeferTask(func() { r.deferred.Queue("job", func() {
logger.Info("Started cleaning up %s", containerId) logger.Info("Started cleaning up %s", containerId)
r.Driver.Stop(containerId) r.Driver.Stop(containerId)
}) })

View file

@ -1,6 +1,7 @@
package runner package runner
import ( import (
logger "courgette/internal/logging"
workflow "courgette/internal/workflow" workflow "courgette/internal/workflow"
"errors" "errors"
"testing" "testing"
@ -16,6 +17,10 @@ type MockDriver struct {
mockResult *CommandResult mockResult *CommandResult
} }
func init() {
logger.ConfigureLogger()
}
func NewMockDriver() MockDriver { func NewMockDriver() MockDriver {
return MockDriver{ return MockDriver{
calls: map[string][]MockCall{}, calls: map[string][]MockCall{},
@ -100,13 +105,11 @@ func TestRunJobInContainerSchedulesStoppingContainers(t *testing.T) {
mockDriver := NewMockDriver() mockDriver := NewMockDriver()
mockDriver.WithCommandResult(&CommandResult{ExitCode: 1, Error: nil}) mockDriver.WithCommandResult(&CommandResult{ExitCode: 1, Error: nil})
runner := Runner{ runner := NewRunner(&mockDriver, map[string]string{})
Driver: &mockDriver,
}
runner.RunJobInContainer("uri", "container", workflow.Job{}) runner.RunJobInContainer("uri", "container", workflow.Job{})
if len(runner.deferred) != 1 { if len(runner.deferred.GetAllTasks()) != 1 {
t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred)) t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred.GetAllTasks()))
} }
} }