import re import sys import logging import requests import pydantic from opentelemetry import trace from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter logger = logging.getLogger("opentelemetry-export") logging.basicConfig(level=logging.INFO) class JobMetadata(pydantic.BaseModel, extra="ignore"): status: str duration: str name: str class RunMetadata(pydantic.BaseModel, extra="ignore"): jobs: list[JobMetadata] status: str def configure_tracer(*, endpoint: str, service_name: str): resource = Resource( attributes={ "service.name": service_name, } ) tracer_provider = TracerProvider(resource=resource) trace.set_tracer_provider(tracer_provider) span_processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint)) tracer_provider.add_span_processor(span_processor) def parse_duration_string_as_seconds(duration: str) -> int: """ Time is represented as "_h_m_s" strings where each blank is a numerical value. This converts those string values to seconds count. """ pattern = re.compile(r"((?P\d+)h)?((?P\d+)m)?((?P\d+)s)?") matches = pattern.search(duration) total = 0 total += int(matches.group("seconds")) if matches.group("seconds") else 0 total += int(matches.group("minutes")) * 60 if matches.group("minutes") else 0 total += int(matches.group("hours")) * 60 * 60 if matches.group("hours") else 0 return total def get_job_details( *, base_url: str, repo: str, token: str, run_id: int ) -> RunMetadata: """ Fetches the given Forgejo Action run metadata. Because the Actions API is not documented or properly exposed, it is used 'as the browser would', which implies that we have to preface our data fetch with a page fetch to get a session cookie matching the authentication token. This also generates the CSRF token needed since the Actions job API uses POSTs for data fetches. """ common_headers = {"Authorization": f"token {token}"} with requests.Session() as session: session.get(f"{base_url}/{repo}", headers=common_headers) run_response = session.post( f"{base_url}/{repo}/actions/runs/{run_id}/", headers={ **common_headers, "x-csrf-token": session.cookies.get_dict()["_csrf"], }, ) return RunMetadata(**run_response.json()["state"]["run"]) # FIXME: Adjust span durations based on event + API data. def create_span_from_run_metadata(run: RunMetadata): tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("run") as run_span: run_span.set_attribute("status", run.status) for job in run.jobs: with tracer.start_as_current_span(job.name) as job_span: duration = parse_duration_string_as_seconds(job.duration) job_span.set_attributes( { "name": job.name, "status": job.status, "duration": duration, } ) def run(): otlp_service_name, otlp_endpoint, token, base_url, repo, run_id = sys.argv[1:7] logger.info(f"Configuring tracer, target is {otlp_endpoint}") configure_tracer(endpoint=otlp_endpoint, service_name=otlp_service_name) logger.info(f"Getting run metadata for run #{run_id} on {repo} ({base_url}/{repo})") metadata = get_job_details( token=token, base_url=base_url, repo=repo, run_id=run_id, ) create_span_from_run_metadata(metadata) logger.info("Pushed trace to target!") if __name__ == "__main__": run()