Async Client¶
The AsyncSystemdClient is the canonical implementation -- all methods are native coroutines. The sync SystemdClient is actually a thin wrapper around this.
If you're building an async application (FastAPI, aiohttp, or just plain asyncio), this is the client for you.
Create a Client¶
from systemd_client import AsyncSystemdClient, BackendType
async with AsyncSystemdClient() as client: # (1)!
units = await client.list_units()
- Use
async withfor proper resource cleanup. Same backend options as the sync client:BackendType.AUTO(default),BackendType.SUBPROCESS, orBackendType.DBUS. Addscope=SystemdScope.SYSTEMfor system services.
Basic Usage¶
Let's start with the fundamentals. Every method is a coroutine, so you'll await each call:
import asyncio
from systemd_client import AsyncSystemdClient
async def main():
client = AsyncSystemdClient()
# List services
units = await client.list_units(unit_type="service") # (1)!
for unit in units:
print(f"{unit.name}: {unit.active_state}")
status = await client.status("my-app.service") # (2)!
print(f"PID: {status.main_pid}")
await client.restart("my-app.service") # (3)!
is_active = await client.is_active("my-app.service")
print(f"Active: {is_active}")
asyncio.run(main())
- Returns the same
list[UnitInfo]as the sync client -- the data models are identical. - Returns a
UnitStatusfrozen dataclass. - All operations (
start,stop,restart,reload,enable,disable,mask,unmask) work the same way.
Info
The async client has the exact same API as the sync client -- just add await before each call. All the same exceptions, data models, and parameters apply.
Concurrent Operations¶
Here's where the async client really shines. You can run multiple operations in parallel with asyncio.gather:
import asyncio
from systemd_client import AsyncSystemdClient
async def main():
client = AsyncSystemdClient()
services = ["app.service", "worker.service", "scheduler.service"]
results = await asyncio.gather( # (1)!
*(client.is_active(svc) for svc in services)
)
for svc, active in zip(services, results):
print(f"{svc}: {'UP' if active else 'DOWN'}")
asyncio.run(main())
- All three
is_activecalls run simultaneously instead of one after another. With three services, this is roughly 3x faster than sequential calls.
Tip
Use asyncio.gather any time you need to check or operate on multiple units. The speedup grows linearly with the number of services.
Fetch Status and Journal at the Same Time¶
You can combine different operations in a single gather call too:
async def full_report(client, unit_name):
status, journal = await asyncio.gather( # (1)!
client.status(unit_name),
client.journal(unit_name, lines=10),
)
return status, journal
- The status fetch and journal read happen concurrently -- no waiting for one to finish before starting the other.
Follow the Journal (Async)¶
The async journal follow is a non-blocking async generator. It yields entries as they arrive without blocking your event loop:
async def follow_logs():
client = AsyncSystemdClient()
async for entry in client.journal_follow("my-app.service"):
print(f"[{entry.priority.name}] {entry.message}") # (1)!
try:
asyncio.run(follow_logs())
except KeyboardInterrupt:
pass # (2)!
- Each
JournalEntryhas.timestamp,.priority,.message,.pid, and many more fields. - Gracefully handle Ctrl+C -- the async generator cleans up automatically.
Follow with a Timeout¶
Need to watch logs for a limited time? Wrap it with asyncio.wait_for:
async def follow_with_timeout(unit: str, seconds: int = 30):
client = AsyncSystemdClient()
async def _follow():
async for entry in client.journal_follow(unit):
print(entry.message)
try:
await asyncio.wait_for(_follow(), timeout=seconds) # (1)!
except asyncio.TimeoutError:
print(f"Stopped after {seconds}s")
- After
secondselapses,asyncio.wait_forcancels the coroutine and raisesTimeoutError.
Integration with Async Frameworks¶
FastAPI¶
The async client fits naturally into FastAPI endpoints:
from fastapi import FastAPI
from systemd_client import AsyncSystemdClient
app = FastAPI()
client = AsyncSystemdClient() # (1)!
@app.get("/services")
async def list_services():
units = await client.list_units(unit_type="service") # (2)!
return [{"name": u.name, "state": u.active_state} for u in units]
@app.post("/services/{name}/restart")
async def restart_service(name: str):
await client.restart(name)
return {"status": "restarted"}
- Create the client once at module level -- it's lightweight and reusable.
- Since FastAPI is async-native, the
awaithere doesn't block any other requests.
Check
Your FastAPI app can now manage systemd services via HTTP endpoints.
aiohttp¶
Works just as well with aiohttp:
from aiohttp import web
from systemd_client import AsyncSystemdClient
client = AsyncSystemdClient()
async def health_check(request):
services = ["app.service", "db.service"]
results = await asyncio.gather(
*(client.is_active(s) for s in services)
)
all_ok = all(results)
return web.json_response(
{"healthy": all_ok},
status=200 if all_ok else 503, # (1)!
)
- Returns 503 Service Unavailable if any service is down -- useful for load balancer health checks.
Error Handling¶
Same exceptions as the sync client -- just use try / await:
from systemd_client import UnitNotFoundError, UnitOperationError
try:
await client.restart("my-app.service")
except UnitNotFoundError as e:
print(f"Unit {e.unit_name} not found") # (1)!
except UnitOperationError as e:
print(f"Operation {e.operation} failed: {e.detail}") # (2)!
- The unit doesn't exist at all.
- The unit exists, but the operation failed (e.g., the service crashed on start).
Note
For the full list of exceptions and their attributes, see Error Handling.