opentelemetry-trace-export-.../tracer.py

128 lines
3.9 KiB
Python

import re
import sys
import logging
import requests
import pydantic
from opentelemetry import trace
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
logger = logging.getLogger("opentelemetry-export")
logging.basicConfig(level=logging.INFO)
class JobMetadata(pydantic.BaseModel, extra="ignore"):
status: str
duration: str
name: str
class RunMetadata(pydantic.BaseModel, extra="ignore"):
jobs: list[JobMetadata]
status: str
def configure_tracer(*, endpoint: str, service_name: str):
resource = Resource(
attributes={
"service.name": service_name,
}
)
tracer_provider = TracerProvider(resource=resource)
trace.set_tracer_provider(tracer_provider)
span_processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=endpoint))
tracer_provider.add_span_processor(span_processor)
def parse_duration_string_as_seconds(duration: str) -> int:
"""
Time is represented as "_h_m_s" strings where each blank is a numerical value. This
converts those string values to seconds count.
"""
pattern = re.compile(r"((?P<hours>\d+)h)?((?P<minutes>\d+)m)?((?P<seconds>\d+)s)?")
matches = pattern.search(duration)
total = 0
total += int(matches.group("seconds")) if matches.group("seconds") else 0
total += int(matches.group("minutes")) * 60 if matches.group("minutes") else 0
total += int(matches.group("hours")) * 60 * 60 if matches.group("hours") else 0
return total
def get_job_details(
*, base_url: str, repo: str, token: str, run_id: int
) -> RunMetadata:
"""
Fetches the given Forgejo Action run metadata.
Because the Actions API is not documented or properly exposed, it is used 'as the browser would',
which implies that we have to preface our data fetch with a page fetch to get a session cookie
matching the authentication token. This also generates the CSRF token needed since the Actions
job API uses POSTs for data fetches.
"""
common_headers = {"Authorization": f"token {token}"}
with requests.Session() as session:
token_response = session.get(f"{base_url}/{repo}", headers=common_headers)
run_response = session.post(
f"{base_url}/{repo}/actions/runs/{run_id}/",
headers={
**common_headers,
"x-csrf-token": session.cookies.get_dict()["_csrf"],
},
)
return RunMetadata(**run_response.json()["state"]["run"])
# FIXME: Adjust span durations based on event + API data.
def create_span_from_run_metadata(run: RunMetadata):
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("run") as run_span:
run_span.set_attribute("status", run.status)
for job in run.jobs:
pos = 0
with tracer.start_as_current_span(job.name) as job_span:
duration = parse_duration_string_as_seconds(job.duration)
job_span.set_attributes(
{
"name": job.name,
"status": job.status,
"duration": duration,
}
)
def run():
otlp_service_name, otlp_endpoint, token, base_url, repo, run_id = sys.argv[1:7]
logger.info(f"Configuring tracer, target is {otlp_endpoint}")
configure_tracer(endpoint=otlp_endpoint, service_name=otlp_service_name)
logger.info(f"Getting run metadata for run #{run_id} on {repo} ({base_url}/{repo})")
metadata = get_job_details(
token=token,
base_url=base_url,
repo=repo,
run_id=run_id,
)
create_span_from_run_metadata(metadata)
logger.info("Pushed trace to target!")
if __name__ == "__main__":
run()