98 lines
1.9 KiB
Go
98 lines
1.9 KiB
Go
|
package runner
|
||
|
|
||
|
import (
|
||
|
logger "courgette/internal/logging"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
type DeferredTask func()
|
||
|
|
||
|
type DeferredTaskManager struct {
|
||
|
deferredTasks map[string][]DeferredTask
|
||
|
}
|
||
|
|
||
|
func NewDeferredTaskManager() *DeferredTaskManager {
|
||
|
return &DeferredTaskManager{
|
||
|
deferredTasks: map[string][]DeferredTask{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (m DeferredTaskManager) GetTasksByScope(scope string) []DeferredTask {
|
||
|
tasks, scopeExists := m.deferredTasks[scope]
|
||
|
|
||
|
if scopeExists {
|
||
|
logger.Info("Collected %d tasks from scope %s.", len(tasks), scope)
|
||
|
return tasks
|
||
|
}
|
||
|
|
||
|
logger.Info("Queried deferred tasks for unknown scope: %s", scope)
|
||
|
|
||
|
return []DeferredTask{}
|
||
|
}
|
||
|
|
||
|
func (m DeferredTaskManager) GetAllTasks() []DeferredTask {
|
||
|
allTasks := []DeferredTask{}
|
||
|
|
||
|
for scope, tasks := range m.deferredTasks {
|
||
|
if len(tasks) == 0 {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
allTasks = append(allTasks, tasks...)
|
||
|
|
||
|
logger.Info("Collected %d deferred tasks from scope %s.", len(tasks), scope)
|
||
|
}
|
||
|
|
||
|
return allTasks
|
||
|
}
|
||
|
|
||
|
func (m *DeferredTaskManager) Queue(scope string, task DeferredTask) {
|
||
|
//if _, scopeExists := m.deferredTasks[scope]; !scopeExists {
|
||
|
// m.deferredTasks[scope] = []DeferredTask{}
|
||
|
//}
|
||
|
|
||
|
m.deferredTasks[scope] = append(m.deferredTasks[scope], task)
|
||
|
}
|
||
|
|
||
|
func (m DeferredTaskManager) RunDeferredTasksInScope(scope string) {
|
||
|
tasksInScope := m.GetTasksByScope(scope)
|
||
|
|
||
|
logger.Info("Running %d deferred tasks.", len(tasksInScope))
|
||
|
|
||
|
var tracker sync.WaitGroup
|
||
|
|
||
|
for _, deferred := range tasksInScope {
|
||
|
tracker.Add(1)
|
||
|
go func() {
|
||
|
defer tracker.Done()
|
||
|
deferred()
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
m.deferredTasks[scope] = nil
|
||
|
|
||
|
tracker.Wait()
|
||
|
}
|
||
|
|
||
|
func (m *DeferredTaskManager) RunAllDeferredTasks() error {
|
||
|
allTasks := m.GetAllTasks()
|
||
|
|
||
|
logger.Info("Running %d deferred tasks.", len(allTasks))
|
||
|
|
||
|
var tracker sync.WaitGroup
|
||
|
|
||
|
for _, deferred := range allTasks {
|
||
|
tracker.Add(1)
|
||
|
go func() {
|
||
|
defer tracker.Done()
|
||
|
deferred()
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
m.deferredTasks = map[string][]DeferredTask{}
|
||
|
|
||
|
tracker.Wait()
|
||
|
|
||
|
return nil
|
||
|
}
|