import json import os import subprocess import typing import pydantic class ContainerMetadata(pydantic.BaseModel): name: str status: str id: str class ServiceMetadata(pydantic.BaseModel): name: str id: str type: str containers: list[ContainerMetadata] status: str class PodmanManager(pydantic.BaseModel): def _get_pods(self) -> dict[str, typing.Any]: pod_result = subprocess.run( [ "podman", "pod", "ps", "--format=json", "--filter", "status=running", "--filter", "status=degraded", ], check=True, capture_output=True, env=os.environ, ) return json.loads(pod_result.stdout) def get_services(self) -> dict[str, ServiceMetadata]: """ Fetches metadata about all services currently running. Services are either pods or pod-less containers. """ services = [] for pod_meta in self._get_pods(): containers = [ ContainerMetadata( name=container_meta["Names"], status=container_meta["Status"], id=container_meta["Id"], ) for container_meta in pod_meta["Containers"] ] services.append( ServiceMetadata( name=pod_meta["Name"], id=pod_meta["Id"], type="pod", status=pod_meta["Status"], containers=containers, ) ) return services