refactor: tidy runner actions manager
This commit is contained in:
parent
e25735ae01
commit
09f2a1c4b2
3 changed files with 22 additions and 15 deletions
|
@ -14,8 +14,8 @@ type ActionsManager struct {
|
||||||
cacheRoot string
|
cacheRoot string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewActionsManager(cacheRoot string) ActionsManager {
|
func NewActionsManager(cacheRoot string) *ActionsManager {
|
||||||
return ActionsManager{
|
return &ActionsManager{
|
||||||
git: GitClient{},
|
git: GitClient{},
|
||||||
cacheRoot: cacheRoot,
|
cacheRoot: cacheRoot,
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,13 +12,19 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Supervisor structure that handles running workflows, triggering
|
||||||
|
// side-effects along the lifecycle of each run.
|
||||||
type Runner struct {
|
type Runner struct {
|
||||||
|
// Image labels that can be used by workflows.
|
||||||
Labels map[string]string
|
Labels map[string]string
|
||||||
|
// Handler for container operations.
|
||||||
Driver driver.ContainerDriver
|
Driver driver.ContainerDriver
|
||||||
Cache cache.CacheManager
|
// Runner's cache.
|
||||||
Runs int
|
Cache *cache.CacheManager
|
||||||
// Deferred tasks, in order their were scheduled.
|
// Deferred tasks that can be queued during runs.
|
||||||
deferred *DeferredTaskManager
|
Deferred *DeferredTaskManager
|
||||||
|
// Actions handling.
|
||||||
|
Actions *actions.ActionsManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoot string) Runner {
|
func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoot string) Runner {
|
||||||
|
@ -26,9 +32,10 @@ func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoo
|
||||||
|
|
||||||
return Runner{
|
return Runner{
|
||||||
Driver: driver,
|
Driver: driver,
|
||||||
Cache: *cacheManager,
|
Cache: cacheManager,
|
||||||
Labels: labels,
|
Labels: labels,
|
||||||
deferred: NewDeferredTaskManager(),
|
Deferred: NewDeferredTaskManager(),
|
||||||
|
Actions: actions.NewActionsManager(cacheManager.Root),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,13 +50,13 @@ func (r *Runner) RunWorkflow(workflow workflows.Workflow) TaskTracker {
|
||||||
|
|
||||||
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
|
workflowContext := context.WithValue(context.Background(), "workflow", workflow)
|
||||||
|
|
||||||
defer r.deferred.RunAllDeferredTasks()
|
defer r.Deferred.RunAllDeferredTasks()
|
||||||
|
|
||||||
workflow.Walk(func(n interface{}) {
|
workflow.Walk(func(n interface{}) {
|
||||||
switch n.(type) {
|
switch n.(type) {
|
||||||
case workflows.Step:
|
case workflows.Step:
|
||||||
if useAction := n.(workflows.Step).Uses; useAction != "" {
|
if useAction := n.(workflows.Step).Uses; useAction != "" {
|
||||||
actions.NewActionsManager(r.Cache.Root).PrefetchAction(useAction)
|
r.Actions.PrefetchAction(useAction)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -86,7 +93,7 @@ func (r Runner) runJob(jobContext context.Context, jobTracker *TaskTracker, jobW
|
||||||
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
|
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
|
||||||
|
|
||||||
defer jobWaitGroup.Done()
|
defer jobWaitGroup.Done()
|
||||||
defer r.deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
defer r.Deferred.RunDeferredTasksInScope(fmt.Sprintf("job-%s", containerName))
|
||||||
|
|
||||||
jobTracker.SetStatus("started")
|
jobTracker.SetStatus("started")
|
||||||
runnerImage := jobContext.Value("runnerImageUri").(string)
|
runnerImage := jobContext.Value("runnerImageUri").(string)
|
||||||
|
@ -139,7 +146,7 @@ func (r *Runner) RunCommandInContainer(containerId string, command string, optio
|
||||||
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
|
func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobContext context.Context) error {
|
||||||
r.Driver.Start(imageUri, containerId, r.Cache.Root)
|
r.Driver.Start(imageUri, containerId, r.Cache.Root)
|
||||||
|
|
||||||
r.deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
|
r.Deferred.Queue(fmt.Sprintf("job-%s", containerId), func() {
|
||||||
logger.Info("Started cleaning up %s", containerId)
|
logger.Info("Started cleaning up %s", containerId)
|
||||||
r.Driver.Stop(containerId)
|
r.Driver.Stop(containerId)
|
||||||
})
|
})
|
||||||
|
@ -174,7 +181,7 @@ func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobConte
|
||||||
Env: stepEnv,
|
Env: stepEnv,
|
||||||
Shell: stepShell,
|
Shell: stepShell,
|
||||||
}
|
}
|
||||||
stepError = actions.NewActionsManager(r.Cache.Root).UseAction(step.Uses, containerId, commandOptions)
|
stepError = r.Actions.UseAction(step.Uses, containerId, commandOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
if stepError != nil && !step.ContinueOnError {
|
if stepError != nil && !step.ContinueOnError {
|
||||||
|
|
|
@ -55,7 +55,7 @@ func TestRunJobInContainerSchedulesStoppingContainers(t *testing.T) {
|
||||||
|
|
||||||
runner.RunJobInContainer("uri", "container", jobCtx)
|
runner.RunJobInContainer("uri", "container", jobCtx)
|
||||||
|
|
||||||
if len(runner.deferred.GetAllTasks()) != 1 {
|
if len(runner.Deferred.GetAllTasks()) != 1 {
|
||||||
t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.deferred.GetAllTasks()))
|
t.Errorf("Expected 1 deferred task, found %d instead.", len(runner.Deferred.GetAllTasks()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue