Skip to content

Journal API

JournalQuery

systemd_client.journal._query.JournalQuery dataclass

Parameters for querying the journal.

Source code in src/systemd_client/journal/_query.py
@dataclass(frozen=True, slots=True)
class JournalQuery:
    """Parameters for querying the journal."""

    unit: str | None = None
    lines: int | None = None
    since: str | None = None
    until: str | None = None
    priority: JournalPriority | None = None
    grep: str | None = None
    boot: str | None = None
    reverse: bool = False
    follow: bool = False
    identifiers: list[str] = field(default_factory=list)
    scope: SystemdScope | None = None

    def to_args(self) -> list[str]:
        """Build journalctl command-line arguments from this query."""
        args: list[str] = ["--output=json", "--no-pager"]

        # Scope flag: --user or --system (system is journalctl default)
        if self.scope is not None:
            args.append(f"--{self.scope.value}")
        else:
            args.append("--user")

        if self.unit:
            args.extend(["--unit", self.unit])
        if self.lines is not None:
            args.extend(["--lines", str(self.lines)])
        if self.since:
            args.extend(["--since", self.since])
        if self.until:
            args.extend(["--until", self.until])
        if self.priority is not None:
            args.extend(["--priority", self.priority.value])
        if self.grep:
            args.extend(["--grep", self.grep])
        if self.boot is not None:
            args.extend(["--boot", self.boot])
        if self.reverse:
            args.append("--reverse")
        if self.follow:
            args.append("--follow")
        for ident in self.identifiers:
            args.extend(["--identifier", ident])

        return args

to_args()

Build journalctl command-line arguments from this query.

Source code in src/systemd_client/journal/_query.py
def to_args(self) -> list[str]:
    """Build journalctl command-line arguments from this query."""
    args: list[str] = ["--output=json", "--no-pager"]

    # Scope flag: --user or --system (system is journalctl default)
    if self.scope is not None:
        args.append(f"--{self.scope.value}")
    else:
        args.append("--user")

    if self.unit:
        args.extend(["--unit", self.unit])
    if self.lines is not None:
        args.extend(["--lines", str(self.lines)])
    if self.since:
        args.extend(["--since", self.since])
    if self.until:
        args.extend(["--until", self.until])
    if self.priority is not None:
        args.extend(["--priority", self.priority.value])
    if self.grep:
        args.extend(["--grep", self.grep])
    if self.boot is not None:
        args.extend(["--boot", self.boot])
    if self.reverse:
        args.append("--reverse")
    if self.follow:
        args.append("--follow")
    for ident in self.identifiers:
        args.extend(["--identifier", ident])

    return args

AsyncJournalReader

systemd_client.journal._reader.AsyncJournalReader

Async journal reader using journalctl subprocess.

Source code in src/systemd_client/journal/_reader.py
class AsyncJournalReader:
    """Async journal reader using journalctl subprocess."""

    def __init__(self, scope: SystemdScope = SystemdScope.USER) -> None:
        self._scope = scope

    async def query(self, q: JournalQuery) -> list[JournalEntry]:
        """Run a journal query and return all matching entries."""
        effective = self._with_scope(q)
        args = effective.to_args()
        cmd = ["journalctl", *args]

        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout_bytes, stderr_bytes = await proc.communicate()

        if proc.returncode and proc.returncode != 0:
            stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
            raise SubprocessError(cmd, proc.returncode, stderr)

        stdout = stdout_bytes.decode("utf-8", errors="replace")
        entries: list[JournalEntry] = []
        for line in stdout.splitlines():
            line = line.strip()
            if not line:
                continue
            try:
                entries.append(parse_journal_line(line))
            except JournalError:
                continue
        return entries

    async def follow(self, q: JournalQuery) -> AsyncIterator[JournalEntry]:
        """Follow journal output as an async generator."""
        effective = self._with_scope(q)
        follow_query = JournalQuery(
            unit=effective.unit,
            lines=effective.lines,
            since=effective.since,
            until=effective.until,
            priority=effective.priority,
            grep=effective.grep,
            boot=effective.boot,
            reverse=False,
            follow=True,
            identifiers=effective.identifiers,
            scope=effective.scope,
        )
        args = follow_query.to_args()
        cmd = ["journalctl", *args]

        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )

        try:
            assert proc.stdout is not None
            async for raw_line in proc.stdout:
                line = raw_line.decode("utf-8", errors="replace").strip()
                if not line:
                    continue
                try:
                    yield parse_journal_line(line)
                except JournalError:
                    continue
        finally:
            proc.terminate()
            await proc.wait()

    def _with_scope(self, q: JournalQuery) -> JournalQuery:
        """Return query with scope set if not already specified."""
        if q.scope is not None:
            return q
        return JournalQuery(
            unit=q.unit,
            lines=q.lines,
            since=q.since,
            until=q.until,
            priority=q.priority,
            grep=q.grep,
            boot=q.boot,
            reverse=q.reverse,
            follow=q.follow,
            identifiers=q.identifiers,
            scope=self._scope,
        )

follow(q) async

Follow journal output as an async generator.

Source code in src/systemd_client/journal/_reader.py
async def follow(self, q: JournalQuery) -> AsyncIterator[JournalEntry]:
    """Follow journal output as an async generator."""
    effective = self._with_scope(q)
    follow_query = JournalQuery(
        unit=effective.unit,
        lines=effective.lines,
        since=effective.since,
        until=effective.until,
        priority=effective.priority,
        grep=effective.grep,
        boot=effective.boot,
        reverse=False,
        follow=True,
        identifiers=effective.identifiers,
        scope=effective.scope,
    )
    args = follow_query.to_args()
    cmd = ["journalctl", *args]

    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )

    try:
        assert proc.stdout is not None
        async for raw_line in proc.stdout:
            line = raw_line.decode("utf-8", errors="replace").strip()
            if not line:
                continue
            try:
                yield parse_journal_line(line)
            except JournalError:
                continue
    finally:
        proc.terminate()
        await proc.wait()

query(q) async

Run a journal query and return all matching entries.

Source code in src/systemd_client/journal/_reader.py
async def query(self, q: JournalQuery) -> list[JournalEntry]:
    """Run a journal query and return all matching entries."""
    effective = self._with_scope(q)
    args = effective.to_args()
    cmd = ["journalctl", *args]

    proc = await asyncio.create_subprocess_exec(
        *cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout_bytes, stderr_bytes = await proc.communicate()

    if proc.returncode and proc.returncode != 0:
        stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
        raise SubprocessError(cmd, proc.returncode, stderr)

    stdout = stdout_bytes.decode("utf-8", errors="replace")
    entries: list[JournalEntry] = []
    for line in stdout.splitlines():
        line = line.strip()
        if not line:
            continue
        try:
            entries.append(parse_journal_line(line))
        except JournalError:
            continue
    return entries

JournalReader

systemd_client.journal._reader.JournalReader

Synchronous journal reader wrapping AsyncJournalReader.

Source code in src/systemd_client/journal/_reader.py
class JournalReader:
    """Synchronous journal reader wrapping AsyncJournalReader."""

    def __init__(self, scope: SystemdScope = SystemdScope.USER) -> None:
        self._async_reader = AsyncJournalReader(scope=scope)

    def query(self, q: JournalQuery) -> list[JournalEntry]:
        """Run a journal query and return all matching entries."""
        return run_sync(self._async_reader.query(q))

    def follow(self, q: JournalQuery) -> Iterator[JournalEntry]:
        """Follow journal output as a synchronous iterator."""
        return sync_generator_bridge(lambda: self._async_reader.follow(q))

follow(q)

Follow journal output as a synchronous iterator.

Source code in src/systemd_client/journal/_reader.py
def follow(self, q: JournalQuery) -> Iterator[JournalEntry]:
    """Follow journal output as a synchronous iterator."""
    return sync_generator_bridge(lambda: self._async_reader.follow(q))

query(q)

Run a journal query and return all matching entries.

Source code in src/systemd_client/journal/_reader.py
def query(self, q: JournalQuery) -> list[JournalEntry]:
    """Run a journal query and return all matching entries."""
    return run_sync(self._async_reader.query(q))