72 lines
1.7 KiB
Python
72 lines
1.7 KiB
Python
|
import json
|
||
|
import os
|
||
|
import subprocess
|
||
|
import typing
|
||
|
|
||
|
import pydantic
|
||
|
|
||
|
|
||
|
class ContainerMetadata(pydantic.BaseModel):
|
||
|
name: str
|
||
|
status: str
|
||
|
id: str
|
||
|
|
||
|
|
||
|
class ServiceMetadata(pydantic.BaseModel):
|
||
|
name: str
|
||
|
id: str
|
||
|
type: str
|
||
|
containers: list[ContainerMetadata]
|
||
|
status: str
|
||
|
|
||
|
|
||
|
class PodmanManager(pydantic.BaseModel):
|
||
|
def _get_pods(self) -> dict[str, typing.Any]:
|
||
|
pod_result = subprocess.run(
|
||
|
[
|
||
|
"podman",
|
||
|
"pod",
|
||
|
"ps",
|
||
|
"--format=json",
|
||
|
"--filter",
|
||
|
"status=running",
|
||
|
"--filter",
|
||
|
"status=degraded",
|
||
|
],
|
||
|
check=True,
|
||
|
capture_output=True,
|
||
|
env=os.environ,
|
||
|
)
|
||
|
|
||
|
return json.loads(pod_result.stdout)
|
||
|
|
||
|
def get_services(self) -> dict[str, ServiceMetadata]:
|
||
|
"""
|
||
|
Fetches metadata about all services currently running.
|
||
|
|
||
|
Services are either pods or pod-less containers.
|
||
|
"""
|
||
|
|
||
|
services = []
|
||
|
for pod_meta in self._get_pods():
|
||
|
containers = [
|
||
|
ContainerMetadata(
|
||
|
name=container_meta["Names"],
|
||
|
status=container_meta["Status"],
|
||
|
id=container_meta["Id"],
|
||
|
)
|
||
|
for container_meta in pod_meta["Containers"]
|
||
|
]
|
||
|
|
||
|
services.append(
|
||
|
ServiceMetadata(
|
||
|
name=pod_meta["Name"],
|
||
|
id=pod_meta["Id"],
|
||
|
type="pod",
|
||
|
status=pod_meta["Status"],
|
||
|
containers=containers,
|
||
|
)
|
||
|
)
|
||
|
|
||
|
return services
|