code-forge/runners.py

135 lines
4 KiB
Python
Raw Permalink Normal View History

#!/bin/python
from urllib.request import urlretrieve
import pathlib
import stat
import logging
import math
import subprocess
import sys
import json
import typing
import multiprocessing
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RunnerInstance(typing.TypedDict):
config: str
name: str
class Metadata(typing.TypedDict):
runner_version: str
instances: list[RunnerInstance]
def get_runner_url(version: str) -> str:
"""Returns a formatted URL pointing to a specific runner release."""
return f"https://code.forgejo.org/forgejo/runner/releases/download/v{version}/forgejo-runner-{version}-linux-amd64"
def get_runner_runtime(runner_root: pathlib.Path, version: str) -> pathlib.Path:
"""
Downloads the runner binary of the selected version if it doesn't exist already, makes it
executable and returns its path.
"""
runner_bin_path = runner_root.joinpath(f"runner_{version}")
def report(chunk, chunk_size, total_size):
total_chunks = "unknown"
if total_size != -1:
total_chunks = math.ceil(total_size / chunk_size)
if not chunk % 100 or chunk == total_chunks:
logger.info(f"Chunk {chunk} / {total_chunks} downloaded.")
source_url = get_runner_url(version)
if not runner_bin_path.exists():
urlretrieve(source_url, runner_bin_path, reporthook=report)
if not runner_bin_path.exists():
raise RuntimeError(f"Failed to download runner from {source_url}.")
runner_bin_path.chmod(runner_bin_path.stat().st_mode | stat.S_IEXEC)
return runner_bin_path
def get_runners_metadata(metadata_path: pathlib.Path) -> Metadata:
if not metadata_path.exists():
raise RuntimeError("Failed to open metadata file.")
with open(metadata_path, "r", encoding="utf8") as metadata_file:
metadata = json.load(metadata_file)
return metadata
def start_runner(runner_path: pathlib.Path, meta: RunnerInstance):
if not pathlib.Path(meta["config"]).exists():
raise RuntimeError(f"Runner config path does not exist: {meta['config']}")
runner_root = runner_path.parent.joinpath(f"runner__{meta['name']}")
runner_root.mkdir(parents=True, exist_ok=True)
result = subprocess.Popen(
[
runner_path.resolve(),
"--config",
pathlib.Path(meta["config"]).resolve(),
"daemon",
],
cwd=runner_root,
)
if result.returncode != 0:
print(result)
raise RuntimeError(f"Failed to start runner {meta['name']}.")
def start_runners(instances: list[RunnerInstance], runner_path: pathlib.Path):
"""Starts a runner instance with the given configuration."""
if not runner_path.exists():
raise RuntimeError(f"Runner path does not exist: {runner_path}")
with multiprocessing.Pool(2) as p:
p.starmap(
start_runner,
[(bin_path, runner_metadata) for runner_metadata in metadata["instances"]],
)
def register_runners(instances: list[RunnerInstance], runner_bin_path: pathlib.Path):
for runner_metadata in metadata["instances"]:
runner_path = runner_root.joinpath(f"runner__{runner_metadata['name']}")
runner_path.mkdir(exist_ok=True, parents=True)
result = subprocess.run([bin_path, "register"], cwd=runner_path)
if result.returncode != 0:
raise RuntimeError("Failed to register runner.")
if __name__ == "__main__":
argcount = len(sys.argv)
if 2 > argcount > 3:
raise RuntimeError("Insufficient arguments.")
runner_root = pathlib.Path(os.getenv("RUNNER_ROOT", pathlib.Path.cwd()))
runner_root.mkdir(exist_ok=True, parents=True)
metadata = get_runners_metadata(pathlib.Path.cwd().joinpath("metadata.json"))
cmd = sys.argv[1]
bin_path = get_runner_runtime(runner_root, metadata["runner_version"])
if cmd == "register":
register_runners(metadata["instances"], bin_path)
elif cmd == "start":
start_runners(metadata["instances"], bin_path)