From 2e31942b6d2198d829d863d39dc0b0c03a14c9d1 Mon Sep 17 00:00:00 2001 From: Marc Cataford Date: Fri, 2 Aug 2024 19:30:37 -0400 Subject: [PATCH] feat: group all cleanup tasks and defer to end of run --- internal/runner/runner.go | 51 ++++++++++++++++++++--------- internal/runner/runner_flow_test.go | 44 ++----------------------- 2 files changed, 39 insertions(+), 56 deletions(-) diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 1cd235e..afa9874 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -5,6 +5,7 @@ import ( "fmt" "log" workflow "runner/internal/workflow" + "sync" ) type Runner struct { @@ -12,6 +13,8 @@ type Runner struct { Driver ContainerDriver Tasks map[string]*Task Runs int + // Deferred tasks, in order their were scheduled. + deferred []func() } func NewRunner(driver ContainerDriver, labels map[string]string) Runner { @@ -22,6 +25,32 @@ func NewRunner(driver ContainerDriver, labels map[string]string) Runner { } } +// Adds a task to deferral queue. +func (r *Runner) DeferTask(task func()) { + r.deferred = append(r.deferred, task) +} + +// Consumes deferred tasks, if any. +// +// Each task is executed within a go routine and the call will +// wait until all the tasks are completed before returning. +func (r *Runner) RunDeferredTasks() { + log.Printf("Running %d deferred tasks.", len(r.deferred)) + + var tracker sync.WaitGroup + + for _, deferred := range r.deferred { + tracker.Add(1) + go func() { + defer tracker.Done() + deferred() + }() + r.deferred = nil + } + + tracker.Wait() +} + func (r *Runner) GetImageUriByLabel(label string) string { uri, exists := r.Labels[label] @@ -80,6 +109,8 @@ func (r *Runner) Execute(workflow workflow.Workflow) Task { jobContext.SetStatus("success") } + r.RunDeferredTasks() + return *task } @@ -87,12 +118,12 @@ func (r *Runner) Execute(workflow workflow.Workflow) Task { // // The container is started before the job steps are run and cleaned up after. func (r *Runner) RunJobInContainer(imageUri string, containerId string, job workflow.Job) error { - r.StartContainer(imageUri, containerId) + r.Driver.Start(imageUri, containerId) - defer func() { - log.Printf("Cleaning up %s", containerId) - r.StopContainer(containerId) - }() + r.DeferTask(func() { + log.Printf("Started cleaning up %s", containerId) + r.Driver.Stop(containerId) + }) log.Printf("Started %s", containerId) for _, step := range job.Steps { @@ -111,11 +142,6 @@ func (r *Runner) PullContainer(uri string) error { return r.Driver.Pull(uri) } -// Starts a container from the given image uri with the given name. -func (r *Runner) StartContainer(uri string, containerName string) error { - return r.Driver.Start(uri, containerName) -} - // Executes a command within the given container. // // If the command raises an error while in the container or fails to run @@ -133,8 +159,3 @@ func (r *Runner) RunCommandInContainer(containerId string, command string) error return nil } - -// Stops the given container. -func (r *Runner) StopContainer(containerName string) { - r.Driver.Stop(containerName) -} diff --git a/internal/runner/runner_flow_test.go b/internal/runner/runner_flow_test.go index b8e376c..1596c34 100644 --- a/internal/runner/runner_flow_test.go +++ b/internal/runner/runner_flow_test.go @@ -97,7 +97,7 @@ func TestRunnerRunCommandInContainerReturnsErrorIfCommandExitCodeNonzero(t *test } } -func TestRunJobInContainerStopsContainersOnExit(t *testing.T) { +func TestRunJobInContainerSchedulesStoppingContainers(t *testing.T) { mockDriver := NewMockDriver() mockDriver.WithCommandResult(&CommandResult{ExitCode: 1, Error: nil}) @@ -107,8 +107,8 @@ func TestRunJobInContainerStopsContainersOnExit(t *testing.T) { runner.RunJobInContainer("uri", "container", workflow.Job{}) - if len(mockDriver.calls["Stop"]) != 1 { - t.Errorf("Expected 1 call to Stop, found calls: %#v", mockDriver.calls["Stop"]) + if len(runner.deferred) != 1 { + t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred)) } } @@ -130,41 +130,3 @@ func TestRunnerPullContainerCallsDriverPull(t *testing.T) { t.Errorf("Expected call args to be %#v, got %#v instead.", expectedArgs, actualArgs) } } - -func TestRunnerStartContainerCallsDriverStart(t *testing.T) { - mockDriver := NewMockDriver() - runner := Runner{ - Driver: &mockDriver, - } - - runner.StartContainer("test-image", "container") - - if len(mockDriver.calls) != 1 { - t.Error("Expected start to have been called.") - } - - expectedArgs := []string{"test-image", "container"} - actualArgs := mockDriver.calls["Start"][0].args - if !slices.Equal(actualArgs, expectedArgs) { - t.Errorf("Expected call args to be %#v, got %#v instead.", expectedArgs, actualArgs) - } -} - -func TestRunnerStopContainerCallsDriverStop(t *testing.T) { - mockDriver := NewMockDriver() - runner := Runner{ - Driver: &mockDriver, - } - - runner.StopContainer("container") - - if len(mockDriver.calls) != 1 { - t.Error("Expected stop to have been called.") - } - - expectedArgs := []string{"container"} - actualArgs := mockDriver.calls["Stop"][0].args - if !slices.Equal(actualArgs, expectedArgs) { - t.Errorf("Expected call args to be %#v, got %#v instead.", expectedArgs, actualArgs) - } -}