diff --git a/internal/runner/deferred_task.go b/internal/runner/deferred_task.go new file mode 100644 index 0000000..5530f0e --- /dev/null +++ b/internal/runner/deferred_task.go @@ -0,0 +1,97 @@ +package runner + +import ( + logger "courgette/internal/logging" + "sync" +) + +type DeferredTask func() + +type DeferredTaskManager struct { + deferredTasks map[string][]DeferredTask +} + +func NewDeferredTaskManager() *DeferredTaskManager { + return &DeferredTaskManager{ + deferredTasks: map[string][]DeferredTask{}, + } +} + +func (m DeferredTaskManager) GetTasksByScope(scope string) []DeferredTask { + tasks, scopeExists := m.deferredTasks[scope] + + if scopeExists { + logger.Info("Collected %d tasks from scope %s.", len(tasks), scope) + return tasks + } + + logger.Info("Queried deferred tasks for unknown scope: %s", scope) + + return []DeferredTask{} +} + +func (m DeferredTaskManager) GetAllTasks() []DeferredTask { + allTasks := []DeferredTask{} + + for scope, tasks := range m.deferredTasks { + if len(tasks) == 0 { + continue + } + + allTasks = append(allTasks, tasks...) + + logger.Info("Collected %d deferred tasks from scope %s.", len(tasks), scope) + } + + return allTasks +} + +func (m *DeferredTaskManager) Queue(scope string, task DeferredTask) { + //if _, scopeExists := m.deferredTasks[scope]; !scopeExists { + // m.deferredTasks[scope] = []DeferredTask{} + //} + + m.deferredTasks[scope] = append(m.deferredTasks[scope], task) +} + +func (m DeferredTaskManager) RunDeferredTasksInScope(scope string) { + tasksInScope := m.GetTasksByScope(scope) + + logger.Info("Running %d deferred tasks.", len(tasksInScope)) + + var tracker sync.WaitGroup + + for _, deferred := range tasksInScope { + tracker.Add(1) + go func() { + defer tracker.Done() + deferred() + }() + } + + m.deferredTasks[scope] = nil + + tracker.Wait() +} + +func (m *DeferredTaskManager) RunAllDeferredTasks() error { + allTasks := m.GetAllTasks() + + logger.Info("Running %d deferred tasks.", len(allTasks)) + + var tracker sync.WaitGroup + + for _, deferred := range allTasks { + tracker.Add(1) + go func() { + defer tracker.Done() + deferred() + }() + } + + m.deferredTasks = map[string][]DeferredTask{} + + tracker.Wait() + + return nil +} diff --git a/internal/runner/deferred_task_test.go b/internal/runner/deferred_task_test.go new file mode 100644 index 0000000..7159e5f --- /dev/null +++ b/internal/runner/deferred_task_test.go @@ -0,0 +1,49 @@ +package runner + +import ( + logger "courgette/internal/logging" + "testing" +) + +func init() { + logger.ConfigureLogger() +} + +func TestQueueAddsTaskToTaskQueue(t *testing.T) { + manager := NewDeferredTaskManager() + + manager.Queue("scope", func() {}) + + if len(manager.deferredTasks["scope"]) != 1 { + t.Errorf("Expected a queue of 1 to exist, got %d items instead.", len(manager.deferredTasks["scope"])) + } +} + +func TestGetTasksByScopeReturnsTasksAssociatedWithScope(t *testing.T) { + manager := NewDeferredTaskManager() + + manager.Queue("scope", func() {}) + + if len(manager.GetTasksByScope("scope")) != 1 { + t.Errorf("Expected a queue of 1 to exist, got %d items instead.", len(manager.GetTasksByScope("scope"))) + } +} + +func TestGetTasksByScopeReturnsEmptySliceIfUnknownScope(t *testing.T) { + manager := NewDeferredTaskManager() + + if len(manager.GetTasksByScope("scope")) != 0 { + t.Errorf("Expected a queue of 0 to exist, got %d items instead.", len(manager.GetTasksByScope("scope"))) + } +} + +func TestGetAllTasksReturnsAllTasksRegardlessOfScope(t *testing.T) { + manager := NewDeferredTaskManager() + + manager.Queue("scope-1", func() {}) + manager.Queue("scope-2", func() {}) + + if len(manager.GetAllTasks()) != 2 { + t.Errorf("Expected a queue of 2 to exist, got %d items instead.", len(manager.GetAllTasks())) + } +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index cf3c204..51b2575 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -5,7 +5,6 @@ import ( workflow "courgette/internal/workflow" "errors" "fmt" - "sync" ) type Runner struct { @@ -14,43 +13,18 @@ type Runner struct { Tasks map[string]*Task Runs int // Deferred tasks, in order their were scheduled. - deferred []func() + deferred *DeferredTaskManager } func NewRunner(driver ContainerDriver, labels map[string]string) Runner { return Runner{ - Driver: driver, - Labels: labels, - Tasks: map[string]*Task{}, + Driver: driver, + Labels: labels, + Tasks: map[string]*Task{}, + deferred: NewDeferredTaskManager(), } } -// Adds a task to deferral queue. -func (r *Runner) DeferTask(task func()) { - r.deferred = append(r.deferred, task) -} - -// Consumes deferred tasks, if any. -// -// Each task is executed within a go routine and the call will -// wait until all the tasks are completed before returning. -func (r *Runner) RunDeferredTasks() { - logger.Info("Running %d deferred tasks.", len(r.deferred)) - - var tracker sync.WaitGroup - - for _, deferred := range r.deferred { - tracker.Add(1) - go func() { - defer tracker.Done() - deferred() - }() - r.deferred = nil - } - - tracker.Wait() -} - func (r *Runner) GetImageUriByLabel(label string) string { uri, exists := r.Labels[label] @@ -107,9 +81,10 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) Task { } jobContext.SetStatus("success") + r.deferred.RunDeferredTasksInScope("job") } - r.RunDeferredTasks() + r.deferred.RunAllDeferredTasks() return *task } @@ -138,7 +113,7 @@ func (r *Runner) RunCommandInContainer(containerId string, command string) error func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error { r.Driver.Start(imageUri, containerId) - r.DeferTask(func() { + r.deferred.Queue("job", func() { logger.Info("Started cleaning up %s", containerId) r.Driver.Stop(containerId) }) diff --git a/internal/runner/runner_flow_test.go b/internal/runner/runner_flow_test.go index 05abf44..4cfc1f8 100644 --- a/internal/runner/runner_flow_test.go +++ b/internal/runner/runner_flow_test.go @@ -1,6 +1,7 @@ package runner import ( + logger "courgette/internal/logging" workflow "courgette/internal/workflow" "errors" "testing" @@ -16,6 +17,10 @@ type MockDriver struct { mockResult *CommandResult } +func init() { + logger.ConfigureLogger() +} + func NewMockDriver() MockDriver { return MockDriver{ calls: map[string][]MockCall{}, @@ -100,13 +105,11 @@ func TestRunJobInContainerSchedulesStoppingContainers(t *testing.T) { mockDriver := NewMockDriver() mockDriver.WithCommandResult(&CommandResult{ExitCode: 1, Error: nil}) - runner := Runner{ - Driver: &mockDriver, - } + runner := NewRunner(&mockDriver, map[string]string{}) runner.RunJobInContainer("uri", "container", workflow.Job{}) - if len(runner.deferred) != 1 { - t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred)) + if len(runner.deferred.GetAllTasks()) != 1 { + t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred.GetAllTasks())) } }