#!/bin/python from urllib.request import urlretrieve import pathlib import stat import logging import math import subprocess import sys import json import typing import multiprocessing import os logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class RunnerInstance(typing.TypedDict): config: str name: str class Metadata(typing.TypedDict): runner_version: str instances: list[RunnerInstance] def get_runner_url(version: str) -> str: """Returns a formatted URL pointing to a specific runner release.""" return f"https://code.forgejo.org/forgejo/runner/releases/download/v{version}/forgejo-runner-{version}-linux-amd64" def get_runner_runtime(runner_root: pathlib.Path, version: str) -> pathlib.Path: """ Downloads the runner binary of the selected version if it doesn't exist already, makes it executable and returns its path. """ runner_bin_path = runner_root.joinpath(f"runner_{version}") def report(chunk, chunk_size, total_size): total_chunks = "unknown" if total_size != -1: total_chunks = math.ceil(total_size / chunk_size) if not chunk % 100 or chunk == total_chunks: logger.info(f"Chunk {chunk} / {total_chunks} downloaded.") source_url = get_runner_url(version) if not runner_bin_path.exists(): urlretrieve(source_url, runner_bin_path, reporthook=report) if not runner_bin_path.exists(): raise RuntimeError(f"Failed to download runner from {source_url}.") runner_bin_path.chmod(runner_bin_path.stat().st_mode | stat.S_IEXEC) return runner_bin_path def get_runners_metadata(metadata_path: pathlib.Path) -> Metadata: if not metadata_path.exists(): raise RuntimeError("Failed to open metadata file.") with open(metadata_path, "r", encoding="utf8") as metadata_file: metadata = json.load(metadata_file) return metadata def start_runner(runner_path: pathlib.Path, meta: RunnerInstance): if not pathlib.Path(meta["config"]).exists(): raise RuntimeError(f"Runner config path does not exist: {meta['config']}") runner_root = runner_path.parent.joinpath(f"runner__{meta['name']}") runner_root.mkdir(parents=True, exist_ok=True) result = subprocess.Popen( [ runner_path.resolve(), "--config", pathlib.Path(meta["config"]).resolve(), "daemon", ], cwd=runner_root, ) if result.returncode != 0: print(result) raise RuntimeError(f"Failed to start runner {meta['name']}.") def start_runners(instances: list[RunnerInstance], runner_path: pathlib.Path): """Starts a runner instance with the given configuration.""" if not runner_path.exists(): raise RuntimeError(f"Runner path does not exist: {runner_path}") with multiprocessing.Pool(2) as p: p.starmap( start_runner, [(bin_path, runner_metadata) for runner_metadata in metadata["instances"]], ) def register_runners(instances: list[RunnerInstance], runner_bin_path: pathlib.Path): for runner_metadata in metadata["instances"]: runner_path = runner_root.joinpath(f"runner__{runner_metadata['name']}") runner_path.mkdir(exist_ok=True, parents=True) result = subprocess.run([bin_path, "register"], cwd=runner_path) if result.returncode != 0: raise RuntimeError("Failed to register runner.") if __name__ == "__main__": argcount = len(sys.argv) if 2 > argcount > 3: raise RuntimeError("Insufficient arguments.") runner_root = pathlib.Path(os.getenv("RUNNER_ROOT", pathlib.Path.cwd())) runner_root.mkdir(exist_ok=True, parents=True) metadata = get_runners_metadata(pathlib.Path.cwd().joinpath("metadata.json")) cmd = sys.argv[1] bin_path = get_runner_runtime(runner_root, metadata["runner_version"]) if cmd == "register": register_runners(metadata["instances"], bin_path) elif cmd == "start": start_runners(metadata["instances"], bin_path)