feat: collect and prefetch actions specified by use as urls

This commit is contained in:
Marc 2024-08-30 23:09:31 -04:00
parent 5490bba12f
commit 5410c7f6c3
Signed by: marc
GPG key ID: 048E042F22B5DC79
5 changed files with 95 additions and 11 deletions

54
internal/actions/act.go Normal file
View file

@ -0,0 +1,54 @@
// Implementation for basic Action-related operations.
package actions
import (
logger "courgette/internal/logging"
"os"
"os/exec"
"strings"
)
type CliClient interface {
Clone(url string, destination string) error
Exec(args ...string) error
}
type GitClient struct{}
func (g GitClient) Exec(args ...string) error {
cmd := exec.Command("git", args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd.Run()
}
func (g GitClient) Clone(url string, destination string) error {
return g.Exec("clone", url, destination, "--depth", "1")
}
// Returns a key that can be used as a directory / cache-key string
// based on the provided Action repository url.
func GetActionKey(url string) string {
url = strings.ToLower(url)
if strings.HasPrefix(url, "http://") {
url = strings.TrimPrefix(url, "http://")
} else if strings.HasPrefix(url, "https://") {
url = strings.TrimPrefix(url, "https://")
}
parts := strings.Split(url, "/")
return strings.Join(parts, "__")
}
// Prefetches and caches the action defined by <actionName> at <destination>.
//
// <actionName> is expected to be the full url of the repository where
// the action lives.
func PrefetchAction(client CliClient, actionName string, destination string) error {
logger.Info("Prefetching action: %s to %s", actionName, destination)
return client.Clone(actionName, destination)
}

View file

@ -30,14 +30,14 @@ func (c CacheManager) Path(elems ...string) string {
// Checks whether a specific top-level items exists in the cache.
func (c CacheManager) Exists(key string) bool {
if _, err := os.Stat(c.Path(key)); errors.Is(err, os.ErrNotExist) {
if _, err := os.Stat(c.Path(key)); errors.Is(err, os.ErrNotExist) {
return false
}
return true
return true
}
// Deletes an element from the cache, if it exists.
func (c CacheManager) Evict(key string) {
os.RemoveAll(c.Path(key))
os.RemoveAll(c.Path(key))
}

View file

@ -19,7 +19,7 @@ func ExecuteWorkflow(configuration Configuration, workflowFile string) error {
runnerInstance := runner.NewRunner(
driver,
configuration.Runner.Labels,
configuration.Cache.Dir,
configuration.GetCacheDir(),
)
workflow, err := workflow.FromYamlFile(workflowFile)

View file

@ -2,10 +2,11 @@ package runner
import (
"context"
actions "courgette/internal/actions"
cache "courgette/internal/cache"
driver "courgette/internal/driver"
logger "courgette/internal/logging"
workflow "courgette/internal/workflow"
workflows "courgette/internal/workflow"
"errors"
"fmt"
"sync"
@ -36,7 +37,7 @@ func NewRunner(driver driver.ContainerDriver, labels map[string]string, cacheRoo
// This is the high-level call that will set up the container
// that the jobs will be executed in, run the jobs's steps and
// tear down the container once no longer useful.
func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker {
func (r *Runner) RunWorkflow(workflow workflows.Workflow) TaskTracker {
logger.Info("Executing workflow: %s", workflow.SourcePath)
rootTracker := NewTaskTracker(nil)
@ -44,6 +45,15 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker {
defer r.deferred.RunAllDeferredTasks()
workflow.Walk(func(n interface{}) {
switch n.(type) {
case workflows.Step:
if useAction := n.(workflows.Step).Use; useAction != "" {
actions.PrefetchAction(actions.GitClient{}, useAction, r.Cache.Path(actions.GetActionKey(useAction)))
}
}
})
for _, group := range workflow.GetJobsAsGroups() {
var groupWait sync.WaitGroup
@ -72,7 +82,7 @@ func (r *Runner) RunWorkflow(workflow workflow.Workflow) TaskTracker {
// a WaitGroup to coordinate concurrent tasks) and updates the tracker with results.
func (r Runner) runJob(jobContext context.Context, jobTracker *TaskTracker, jobWaitGroup *sync.WaitGroup) {
job := jobContext.Value("currentJob").(workflow.Job)
job := jobContext.Value("currentJob").(workflows.Job)
containerName := fmt.Sprintf("runner-%s", jobTracker.TaskId)
defer jobWaitGroup.Done()
@ -134,11 +144,11 @@ func (r *Runner) RunJobInContainer(imageUri string, containerId string, jobConte
r.Driver.Stop(containerId)
})
job := jobContext.Value("currentJob").(workflow.Job)
job := jobContext.Value("currentJob").(workflows.Job)
logger.Info("Started %s", containerId)
for stepIndex, step := range job.Steps {
workflow := jobContext.Value("workflow").(workflow.Workflow)
workflow := jobContext.Value("workflow").(workflows.Workflow)
stepCwd := workflow.GetWorkingDirectory(job.Name, stepIndex)
stepEnv := workflow.GetEnv(job.Name, stepIndex)
stepShell := workflow.GetShell(job.Name, stepIndex)

View file

@ -22,6 +22,13 @@ type Job struct {
} `yaml:"defaults"`
}
func (j Job) Walk(handler func(node interface{})) {
handler(j)
for _, step := range j.Steps {
step.Walk(handler)
}
}
func (j Job) Validate() []error {
validationErrors := []error{}
@ -46,13 +53,18 @@ type Step struct {
ContinueOnError bool `yaml:"continue-on-error"`
Env map[string]string `yaml:"env"`
Shell string `yaml:"shell"`
Use string `yaml:"use"`
}
func (s Step) Walk(handler func(node interface{})) {
handler(s)
}
func (s Step) Validate() []error {
validationErrors := []error{}
if s.Run == "" {
validationErrors = append(validationErrors, errors.New("Missing \"run\" field on step."))
if s.Run == "" && s.Use == "" {
validationErrors = append(validationErrors, errors.New("Must have a \"run\" or \"step\" clause."))
}
return validationErrors
@ -64,6 +76,14 @@ type Workflow struct {
Env map[string]string `yaml:"env"`
}
func (w Workflow) Walk(handler func(node interface{})) {
handler(w)
for _, job := range w.Jobs {
job.Walk(handler)
}
}
// Returns the given workflow's job+step working directory inside the container
// that runs the job.
//