127 lines
3.8 KiB
Python
127 lines
3.8 KiB
Python
import re
|
|
import sys
|
|
import logging
|
|
|
|
import requests
|
|
import pydantic
|
|
from opentelemetry import trace
|
|
from opentelemetry.sdk.resources import Resource
|
|
from opentelemetry.sdk.trace import TracerProvider
|
|
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
|
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
|
|
|
|
logger = logging.getLogger("opentelemetry-export")
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
class JobMetadata(pydantic.BaseModel, extra="ignore"):
|
|
status: str
|
|
duration: str
|
|
name: str
|
|
|
|
|
|
class RunMetadata(pydantic.BaseModel, extra="ignore"):
|
|
jobs: list[JobMetadata]
|
|
status: str
|
|
|
|
|
|
def configure_tracer(*, endpoint: str, service_name: str):
|
|
resource = Resource(
|
|
attributes={
|
|
"service.name": service_name,
|
|
}
|
|
)
|
|
|
|
tracer_provider = TracerProvider(resource=resource)
|
|
trace.set_tracer_provider(tracer_provider)
|
|
span_processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint))
|
|
tracer_provider.add_span_processor(span_processor)
|
|
|
|
|
|
def parse_duration_string_as_seconds(duration: str) -> int:
|
|
"""
|
|
Time is represented as "_h_m_s" strings where each blank is a numerical value. This
|
|
converts those string values to seconds count.
|
|
"""
|
|
|
|
pattern = re.compile(r"((?P<hours>\d+)h)?((?P<minutes>\d+)m)?((?P<seconds>\d+)s)?")
|
|
|
|
matches = pattern.search(duration)
|
|
|
|
total = 0
|
|
|
|
total += int(matches.group("seconds")) if matches.group("seconds") else 0
|
|
total += int(matches.group("minutes")) * 60 if matches.group("minutes") else 0
|
|
total += int(matches.group("hours")) * 60 * 60 if matches.group("hours") else 0
|
|
|
|
return total
|
|
|
|
|
|
def get_job_details(
|
|
*, base_url: str, repo: str, token: str, run_id: int
|
|
) -> RunMetadata:
|
|
"""
|
|
Fetches the given Forgejo Action run metadata.
|
|
|
|
Because the Actions API is not documented or properly exposed, it is used 'as the browser would',
|
|
which implies that we have to preface our data fetch with a page fetch to get a session cookie
|
|
matching the authentication token. This also generates the CSRF token needed since the Actions
|
|
job API uses POSTs for data fetches.
|
|
"""
|
|
|
|
common_headers = {"Authorization": f"token {token}"}
|
|
with requests.Session() as session:
|
|
session.get(f"{base_url}/{repo}", headers=common_headers)
|
|
run_response = session.post(
|
|
f"{base_url}/{repo}/actions/runs/{run_id}/",
|
|
headers={
|
|
**common_headers,
|
|
"x-csrf-token": session.cookies.get_dict()["_csrf"],
|
|
},
|
|
)
|
|
|
|
return RunMetadata(**run_response.json()["state"]["run"])
|
|
|
|
|
|
# FIXME: Adjust span durations based on event + API data.
|
|
def create_span_from_run_metadata(run: RunMetadata):
|
|
tracer = trace.get_tracer(__name__)
|
|
|
|
with tracer.start_as_current_span("run") as run_span:
|
|
run_span.set_attribute("status", run.status)
|
|
for job in run.jobs:
|
|
with tracer.start_as_current_span(job.name) as job_span:
|
|
duration = parse_duration_string_as_seconds(job.duration)
|
|
job_span.set_attributes(
|
|
{
|
|
"name": job.name,
|
|
"status": job.status,
|
|
"duration": duration,
|
|
}
|
|
)
|
|
|
|
|
|
def run():
|
|
otlp_service_name, otlp_endpoint, token, base_url, repo, run_id = sys.argv[1:7]
|
|
|
|
logger.info(f"Configuring tracer, target is {otlp_endpoint}")
|
|
|
|
configure_tracer(endpoint=otlp_endpoint, service_name=otlp_service_name)
|
|
|
|
logger.info(f"Getting run metadata for run #{run_id} on {repo} ({base_url}/{repo})")
|
|
|
|
metadata = get_job_details(
|
|
token=token,
|
|
base_url=base_url,
|
|
repo=repo,
|
|
run_id=run_id,
|
|
)
|
|
|
|
create_span_from_run_metadata(metadata)
|
|
|
|
logger.info("Pushed trace to target!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|