Skip to content

Storage

Persistence backends for traces, results, and metrics.

SQLite Backend

agentprobe.storage.sqlite

SQLite storage backend for traces and test results.

Uses Python's stdlib sqlite3 with run_in_executor for async wrapping. Enables WAL mode for concurrent access.

SQLiteStorage

SQLite-based storage for traces and test results.

Uses WAL mode for concurrent read access and stores full serialized models in a data TEXT column for lossless round-tripping.

Attributes:

Name Type Description
db_path

Path to the SQLite database file.

Source code in src/agentprobe/storage/sqlite.py
class SQLiteStorage:
    """SQLite-based storage for traces and test results.

    Uses WAL mode for concurrent read access and stores full
    serialized models in a ``data`` TEXT column for lossless
    round-tripping.

    Attributes:
        db_path: Path to the SQLite database file.
    """

    def __init__(self, db_path: str | Path = ".agentprobe/traces.db") -> None:
        """Initialize the SQLite storage.

        Args:
            db_path: Path to the database file. Parent directories
                will be created if they don't exist.
        """
        self._db_path = Path(db_path)
        self._conn: sqlite3.Connection | None = None

    def _get_conn(self) -> sqlite3.Connection:
        """Get or create the database connection."""
        if self._conn is None:
            self._db_path.parent.mkdir(parents=True, exist_ok=True)
            self._conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
            self._conn.execute("PRAGMA journal_mode=WAL")
            self._conn.execute("PRAGMA foreign_keys=ON")
            self._conn.row_factory = sqlite3.Row
        return self._conn

    async def _run(self, func: Callable[[], _T]) -> _T:
        """Run a sync function in the default executor."""
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, func)

    async def setup(self) -> None:
        """Create tables and indexes if they don't exist."""
        try:
            await self._run(partial(self._setup_sync))
            logger.info("SQLite storage initialized at %s", self._db_path)
        except Exception as exc:
            raise StorageError(f"Failed to initialize SQLite: {exc}") from exc

    def _setup_sync(self) -> None:
        conn = self._get_conn()
        conn.executescript(_SCHEMA)
        conn.commit()

    async def save_trace(self, trace: Trace) -> None:
        """Persist a trace to SQLite.

        Args:
            trace: The trace to save.
        """
        try:
            await self._run(partial(self._save_trace_sync, trace))
        except Exception as exc:
            raise StorageError(f"Failed to save trace: {exc}") from exc

    def _save_trace_sync(self, trace: Trace) -> None:
        conn = self._get_conn()
        data = trace.model_dump_json()
        tags_json = json.dumps(list(trace.tags))
        conn.execute(
            """INSERT OR REPLACE INTO traces
               (trace_id, agent_name, model, input_text, output_text,
                total_input_tokens, total_output_tokens, total_latency_ms,
                tags, data, created_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                trace.trace_id,
                trace.agent_name,
                trace.model,
                trace.input_text,
                trace.output_text,
                trace.total_input_tokens,
                trace.total_output_tokens,
                trace.total_latency_ms,
                tags_json,
                data,
                trace.created_at.isoformat(),
            ),
        )
        conn.commit()

    async def load_trace(self, trace_id: str) -> Trace | None:
        """Load a trace by ID.

        Args:
            trace_id: The unique identifier.

        Returns:
            The trace if found, otherwise None.
        """
        try:
            result = await self._run(partial(self._load_trace_sync, trace_id))
            return result
        except StorageError:
            raise
        except Exception as exc:
            raise StorageError(f"Failed to load trace: {exc}") from exc

    def _load_trace_sync(self, trace_id: str) -> Trace | None:
        conn = self._get_conn()
        row = conn.execute("SELECT data FROM traces WHERE trace_id = ?", (trace_id,)).fetchone()
        if row is None:
            return None
        return Trace.model_validate_json(row["data"])

    async def list_traces(
        self,
        agent_name: str | None = None,
        limit: int = 100,
    ) -> Sequence[Trace]:
        """List traces with optional filtering.

        Args:
            agent_name: Filter by agent name.
            limit: Maximum results.

        Returns:
            A list of matching traces.
        """
        try:
            result = await self._run(partial(self._list_traces_sync, agent_name, limit))
            return result
        except Exception as exc:
            raise StorageError(f"Failed to list traces: {exc}") from exc

    def _list_traces_sync(self, agent_name: str | None, limit: int) -> list[Trace]:
        conn = self._get_conn()
        if agent_name:
            rows = conn.execute(
                "SELECT data FROM traces WHERE agent_name = ? ORDER BY created_at DESC LIMIT ?",
                (agent_name, limit),
            ).fetchall()
        else:
            rows = conn.execute(
                "SELECT data FROM traces ORDER BY created_at DESC LIMIT ?",
                (limit,),
            ).fetchall()
        return [Trace.model_validate_json(row["data"]) for row in rows]

    async def save_result(self, result: TestResult) -> None:
        """Persist a test result.

        Args:
            result: The test result to save.
        """
        try:
            await self._run(partial(self._save_result_sync, result))
        except Exception as exc:
            raise StorageError(f"Failed to save result: {exc}") from exc

    def _save_result_sync(self, result: TestResult) -> None:
        conn = self._get_conn()
        data = result.model_dump_json()
        conn.execute(
            """INSERT OR REPLACE INTO test_results
               (result_id, test_name, status, score, duration_ms, data, created_at)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            (
                result.result_id,
                result.test_name,
                result.status.value,
                result.score,
                result.duration_ms,
                data,
                result.created_at.isoformat(),
            ),
        )
        conn.commit()

    async def load_results(
        self,
        test_name: str | None = None,
        limit: int = 100,
    ) -> Sequence[TestResult]:
        """Load test results with optional filtering.

        Args:
            test_name: Filter by test name.
            limit: Maximum results.

        Returns:
            A list of matching test results.
        """
        try:
            result = await self._run(partial(self._load_results_sync, test_name, limit))
            return result
        except Exception as exc:
            raise StorageError(f"Failed to load results: {exc}") from exc

    def _load_results_sync(self, test_name: str | None, limit: int) -> list[TestResult]:
        conn = self._get_conn()
        if test_name:
            rows = conn.execute(
                "SELECT data FROM test_results WHERE test_name = ? ORDER BY created_at DESC LIMIT ?",
                (test_name, limit),
            ).fetchall()
        else:
            rows = conn.execute(
                "SELECT data FROM test_results ORDER BY created_at DESC LIMIT ?",
                (limit,),
            ).fetchall()
        return [TestResult.model_validate_json(row["data"]) for row in rows]

    async def load_result(self, result_id: str) -> TestResult | None:
        """Load a single test result by ID.

        Args:
            result_id: The unique identifier.

        Returns:
            The test result if found, otherwise None.
        """
        try:
            return await self._run(partial(self._load_result_sync, result_id))
        except StorageError:
            raise
        except Exception as exc:
            raise StorageError(f"Failed to load result: {exc}") from exc

    def _load_result_sync(self, result_id: str) -> TestResult | None:
        conn = self._get_conn()
        row = conn.execute(
            "SELECT data FROM test_results WHERE result_id = ?", (result_id,)
        ).fetchone()
        if row is None:
            return None
        return TestResult.model_validate_json(row["data"])

    async def save_metrics(self, metrics: Sequence[MetricValue]) -> None:
        """Persist a batch of metric values.

        Args:
            metrics: The metric values to save.
        """
        if not metrics:
            return
        try:
            await self._run(partial(self._save_metrics_sync, metrics))
        except Exception as exc:
            raise StorageError(f"Failed to save metrics: {exc}") from exc

    def _save_metrics_sync(self, metrics: Sequence[MetricValue]) -> None:
        conn = self._get_conn()
        for mv in metrics:
            tags_json = json.dumps(list(mv.tags))
            meta_json = json.dumps(mv.metadata)
            conn.execute(
                """INSERT INTO metrics (metric_name, value, tags, metadata, timestamp)
                   VALUES (?, ?, ?, ?, ?)""",
                (mv.metric_name, mv.value, tags_json, meta_json, mv.timestamp.isoformat()),
            )
        conn.commit()

    async def load_metrics(
        self,
        metric_name: str | None = None,
        limit: int = 1000,
    ) -> Sequence[MetricValue]:
        """Load metric values with optional filtering.

        Args:
            metric_name: Filter by metric name.
            limit: Maximum values to return.

        Returns:
            A list of matching metric values.
        """
        try:
            return await self._run(partial(self._load_metrics_sync, metric_name, limit))
        except Exception as exc:
            raise StorageError(f"Failed to load metrics: {exc}") from exc

    def _load_metrics_sync(self, metric_name: str | None, limit: int) -> list[MetricValue]:
        conn = self._get_conn()
        if metric_name:
            rows = conn.execute(
                "SELECT metric_name, value, tags, metadata, timestamp "
                "FROM metrics WHERE metric_name = ? ORDER BY timestamp DESC LIMIT ?",
                (metric_name, limit),
            ).fetchall()
        else:
            rows = conn.execute(
                "SELECT metric_name, value, tags, metadata, timestamp "
                "FROM metrics ORDER BY timestamp DESC LIMIT ?",
                (limit,),
            ).fetchall()

        return [
            MetricValue(
                metric_name=row["metric_name"],
                value=row["value"],
                tags=tuple(json.loads(row["tags"])) if row["tags"] else (),
                metadata=json.loads(row["metadata"]) if row["metadata"] else {},
                timestamp=datetime.fromisoformat(row["timestamp"]),
            )
            for row in rows
        ]

    async def close(self) -> None:
        """Close the database connection."""
        if self._conn is not None:
            self._conn.close()
            self._conn = None

__init__(db_path='.agentprobe/traces.db')

Initialize the SQLite storage.

Parameters:

Name Type Description Default
db_path str | Path

Path to the database file. Parent directories will be created if they don't exist.

'.agentprobe/traces.db'
Source code in src/agentprobe/storage/sqlite.py
def __init__(self, db_path: str | Path = ".agentprobe/traces.db") -> None:
    """Initialize the SQLite storage.

    Args:
        db_path: Path to the database file. Parent directories
            will be created if they don't exist.
    """
    self._db_path = Path(db_path)
    self._conn: sqlite3.Connection | None = None

setup() async

Create tables and indexes if they don't exist.

Source code in src/agentprobe/storage/sqlite.py
async def setup(self) -> None:
    """Create tables and indexes if they don't exist."""
    try:
        await self._run(partial(self._setup_sync))
        logger.info("SQLite storage initialized at %s", self._db_path)
    except Exception as exc:
        raise StorageError(f"Failed to initialize SQLite: {exc}") from exc

save_trace(trace) async

Persist a trace to SQLite.

Parameters:

Name Type Description Default
trace Trace

The trace to save.

required
Source code in src/agentprobe/storage/sqlite.py
async def save_trace(self, trace: Trace) -> None:
    """Persist a trace to SQLite.

    Args:
        trace: The trace to save.
    """
    try:
        await self._run(partial(self._save_trace_sync, trace))
    except Exception as exc:
        raise StorageError(f"Failed to save trace: {exc}") from exc

load_trace(trace_id) async

Load a trace by ID.

Parameters:

Name Type Description Default
trace_id str

The unique identifier.

required

Returns:

Type Description
Trace | None

The trace if found, otherwise None.

Source code in src/agentprobe/storage/sqlite.py
async def load_trace(self, trace_id: str) -> Trace | None:
    """Load a trace by ID.

    Args:
        trace_id: The unique identifier.

    Returns:
        The trace if found, otherwise None.
    """
    try:
        result = await self._run(partial(self._load_trace_sync, trace_id))
        return result
    except StorageError:
        raise
    except Exception as exc:
        raise StorageError(f"Failed to load trace: {exc}") from exc

list_traces(agent_name=None, limit=100) async

List traces with optional filtering.

Parameters:

Name Type Description Default
agent_name str | None

Filter by agent name.

None
limit int

Maximum results.

100

Returns:

Type Description
Sequence[Trace]

A list of matching traces.

Source code in src/agentprobe/storage/sqlite.py
async def list_traces(
    self,
    agent_name: str | None = None,
    limit: int = 100,
) -> Sequence[Trace]:
    """List traces with optional filtering.

    Args:
        agent_name: Filter by agent name.
        limit: Maximum results.

    Returns:
        A list of matching traces.
    """
    try:
        result = await self._run(partial(self._list_traces_sync, agent_name, limit))
        return result
    except Exception as exc:
        raise StorageError(f"Failed to list traces: {exc}") from exc

save_result(result) async

Persist a test result.

Parameters:

Name Type Description Default
result TestResult

The test result to save.

required
Source code in src/agentprobe/storage/sqlite.py
async def save_result(self, result: TestResult) -> None:
    """Persist a test result.

    Args:
        result: The test result to save.
    """
    try:
        await self._run(partial(self._save_result_sync, result))
    except Exception as exc:
        raise StorageError(f"Failed to save result: {exc}") from exc

load_results(test_name=None, limit=100) async

Load test results with optional filtering.

Parameters:

Name Type Description Default
test_name str | None

Filter by test name.

None
limit int

Maximum results.

100

Returns:

Type Description
Sequence[TestResult]

A list of matching test results.

Source code in src/agentprobe/storage/sqlite.py
async def load_results(
    self,
    test_name: str | None = None,
    limit: int = 100,
) -> Sequence[TestResult]:
    """Load test results with optional filtering.

    Args:
        test_name: Filter by test name.
        limit: Maximum results.

    Returns:
        A list of matching test results.
    """
    try:
        result = await self._run(partial(self._load_results_sync, test_name, limit))
        return result
    except Exception as exc:
        raise StorageError(f"Failed to load results: {exc}") from exc

load_result(result_id) async

Load a single test result by ID.

Parameters:

Name Type Description Default
result_id str

The unique identifier.

required

Returns:

Type Description
TestResult | None

The test result if found, otherwise None.

Source code in src/agentprobe/storage/sqlite.py
async def load_result(self, result_id: str) -> TestResult | None:
    """Load a single test result by ID.

    Args:
        result_id: The unique identifier.

    Returns:
        The test result if found, otherwise None.
    """
    try:
        return await self._run(partial(self._load_result_sync, result_id))
    except StorageError:
        raise
    except Exception as exc:
        raise StorageError(f"Failed to load result: {exc}") from exc

save_metrics(metrics) async

Persist a batch of metric values.

Parameters:

Name Type Description Default
metrics Sequence[MetricValue]

The metric values to save.

required
Source code in src/agentprobe/storage/sqlite.py
async def save_metrics(self, metrics: Sequence[MetricValue]) -> None:
    """Persist a batch of metric values.

    Args:
        metrics: The metric values to save.
    """
    if not metrics:
        return
    try:
        await self._run(partial(self._save_metrics_sync, metrics))
    except Exception as exc:
        raise StorageError(f"Failed to save metrics: {exc}") from exc

load_metrics(metric_name=None, limit=1000) async

Load metric values with optional filtering.

Parameters:

Name Type Description Default
metric_name str | None

Filter by metric name.

None
limit int

Maximum values to return.

1000

Returns:

Type Description
Sequence[MetricValue]

A list of matching metric values.

Source code in src/agentprobe/storage/sqlite.py
async def load_metrics(
    self,
    metric_name: str | None = None,
    limit: int = 1000,
) -> Sequence[MetricValue]:
    """Load metric values with optional filtering.

    Args:
        metric_name: Filter by metric name.
        limit: Maximum values to return.

    Returns:
        A list of matching metric values.
    """
    try:
        return await self._run(partial(self._load_metrics_sync, metric_name, limit))
    except Exception as exc:
        raise StorageError(f"Failed to load metrics: {exc}") from exc

close() async

Close the database connection.

Source code in src/agentprobe/storage/sqlite.py
async def close(self) -> None:
    """Close the database connection."""
    if self._conn is not None:
        self._conn.close()
        self._conn = None

PostgreSQL Backend

agentprobe.storage.postgres

PostgreSQL storage backend for traces, test results, and metrics.

Uses asyncpg for async database access. The asyncpg dependency is lazy-loaded so users without PostgreSQL are not affected.

PostgreSQLStorage

PostgreSQL-based storage for traces, results, and metrics.

Uses asyncpg connection pool for concurrent access. Full model data is stored in a TEXT data column for lossless round-tripping, with extracted columns for indexing and filtering.

Attributes:

Name Type Description
dsn

PostgreSQL connection string.

Source code in src/agentprobe/storage/postgres.py
class PostgreSQLStorage:
    """PostgreSQL-based storage for traces, results, and metrics.

    Uses asyncpg connection pool for concurrent access. Full model data
    is stored in a TEXT ``data`` column for lossless round-tripping,
    with extracted columns for indexing and filtering.

    Attributes:
        dsn: PostgreSQL connection string.
    """

    def __init__(
        self,
        dsn: str = "postgresql://localhost/agentprobe",
        min_pool_size: int = 2,
        max_pool_size: int = 10,
    ) -> None:
        """Initialize the PostgreSQL storage.

        Args:
            dsn: PostgreSQL connection string.
            min_pool_size: Minimum pool connections.
            max_pool_size: Maximum pool connections.
        """
        self._dsn = dsn
        self._min_pool_size = min_pool_size
        self._max_pool_size = max_pool_size
        self._pool: Any = None
        self._migration = SchemaMigration()

    async def setup(self) -> None:  # pragma: no cover
        """Create the connection pool and run pending migrations.

        Raises:
            StorageError: If connection or migration fails.
        """
        try:
            import asyncpg  # type: ignore[import-not-found]  # noqa: PLC0415

            self._pool = await asyncpg.create_pool(
                self._dsn,
                min_size=self._min_pool_size,
                max_size=self._max_pool_size,
            )
            await self._run_migrations()
            logger.info("PostgreSQL storage initialized: %s", self._dsn)
        except Exception as exc:
            raise StorageError(f"Failed to initialize PostgreSQL: {exc}") from exc

    async def _run_migrations(self) -> None:  # pragma: no cover
        """Check current version and apply pending migrations."""
        async with self._pool.acquire() as conn:
            try:
                row = await conn.fetchrow(_SCHEMA_VERSION_QUERY)
                current = row["version"] if row else 0
            except Exception:
                current = 0

            async def _execute(sql: str) -> None:
                await conn.execute(sql)

            await self._migration.apply(current, _execute)

    async def save_trace(self, trace: Trace) -> None:
        """Persist a trace to PostgreSQL.

        Args:
            trace: The trace to save.

        Raises:
            StorageError: If the save operation fails.
        """
        try:
            data = trace.model_dump_json()
            tags_json = json.dumps(list(trace.tags))
            async with self._pool.acquire() as conn:
                await conn.execute(
                    """INSERT INTO traces
                       (trace_id, agent_name, model, input_text, output_text,
                        total_input_tokens, total_output_tokens, total_latency_ms,
                        tags, data, created_at)
                       VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
                       ON CONFLICT (trace_id) DO UPDATE SET data = $10""",
                    trace.trace_id,
                    trace.agent_name,
                    trace.model,
                    trace.input_text,
                    trace.output_text,
                    trace.total_input_tokens,
                    trace.total_output_tokens,
                    trace.total_latency_ms,
                    tags_json,
                    data,
                    trace.created_at,
                )
        except Exception as exc:
            raise StorageError(f"Failed to save trace: {exc}") from exc

    async def load_trace(self, trace_id: str) -> Trace | None:
        """Load a trace by ID.

        Args:
            trace_id: The unique identifier.

        Returns:
            The trace if found, otherwise None.
        """
        try:
            async with self._pool.acquire() as conn:
                row = await conn.fetchrow("SELECT data FROM traces WHERE trace_id = $1", trace_id)
                if row is None:
                    return None
                return Trace.model_validate_json(row["data"])
        except StorageError:
            raise
        except Exception as exc:
            raise StorageError(f"Failed to load trace: {exc}") from exc

    async def list_traces(
        self,
        agent_name: str | None = None,
        limit: int = 100,
    ) -> Sequence[Trace]:
        """List traces with optional filtering.

        Args:
            agent_name: Filter by agent name.
            limit: Maximum results.

        Returns:
            A list of matching traces.
        """
        try:
            async with self._pool.acquire() as conn:
                if agent_name:
                    rows = await conn.fetch(
                        "SELECT data FROM traces WHERE agent_name = $1 "
                        "ORDER BY created_at DESC LIMIT $2",
                        agent_name,
                        limit,
                    )
                else:
                    rows = await conn.fetch(
                        "SELECT data FROM traces ORDER BY created_at DESC LIMIT $1",
                        limit,
                    )
                return [Trace.model_validate_json(row["data"]) for row in rows]
        except Exception as exc:
            raise StorageError(f"Failed to list traces: {exc}") from exc

    async def save_result(self, result: TestResult) -> None:
        """Persist a test result.

        Args:
            result: The test result to save.

        Raises:
            StorageError: If the save operation fails.
        """
        try:
            data = result.model_dump_json()
            async with self._pool.acquire() as conn:
                await conn.execute(
                    """INSERT INTO test_results
                       (result_id, test_name, status, score, duration_ms, data, created_at)
                       VALUES ($1, $2, $3, $4, $5, $6, $7)
                       ON CONFLICT (result_id) DO UPDATE SET data = $6""",
                    result.result_id,
                    result.test_name,
                    result.status.value,
                    result.score,
                    result.duration_ms,
                    data,
                    result.created_at,
                )
        except Exception as exc:
            raise StorageError(f"Failed to save result: {exc}") from exc

    async def load_results(
        self,
        test_name: str | None = None,
        limit: int = 100,
    ) -> Sequence[TestResult]:
        """Load test results with optional filtering.

        Args:
            test_name: Filter by test name.
            limit: Maximum results.

        Returns:
            A list of matching test results.
        """
        try:
            async with self._pool.acquire() as conn:
                if test_name:
                    rows = await conn.fetch(
                        "SELECT data FROM test_results WHERE test_name = $1 "
                        "ORDER BY created_at DESC LIMIT $2",
                        test_name,
                        limit,
                    )
                else:
                    rows = await conn.fetch(
                        "SELECT data FROM test_results ORDER BY created_at DESC LIMIT $1",
                        limit,
                    )
                return [TestResult.model_validate_json(row["data"]) for row in rows]
        except Exception as exc:
            raise StorageError(f"Failed to load results: {exc}") from exc

    async def load_result(self, result_id: str) -> TestResult | None:
        """Load a single test result by ID.

        Args:
            result_id: The unique identifier.

        Returns:
            The test result if found, otherwise None.
        """
        try:
            async with self._pool.acquire() as conn:
                row = await conn.fetchrow(
                    "SELECT data FROM test_results WHERE result_id = $1", result_id
                )
                if row is None:
                    return None
                return TestResult.model_validate_json(row["data"])
        except StorageError:
            raise
        except Exception as exc:
            raise StorageError(f"Failed to load result: {exc}") from exc

    async def save_metrics(self, metrics: Sequence[MetricValue]) -> None:
        """Persist a batch of metric values.

        Args:
            metrics: The metric values to save.

        Raises:
            StorageError: If the save operation fails.
        """
        if not metrics:
            return
        try:
            async with self._pool.acquire() as conn:
                for mv in metrics:
                    tags_json = json.dumps(list(mv.tags))
                    meta_json = json.dumps(mv.metadata)
                    await conn.execute(
                        """INSERT INTO metrics (metric_name, value, tags, metadata, timestamp)
                           VALUES ($1, $2, $3, $4, $5)""",
                        mv.metric_name,
                        mv.value,
                        tags_json,
                        meta_json,
                        mv.timestamp,
                    )
        except Exception as exc:
            raise StorageError(f"Failed to save metrics: {exc}") from exc

    async def load_metrics(
        self,
        metric_name: str | None = None,
        limit: int = 1000,
    ) -> Sequence[MetricValue]:
        """Load metric values with optional filtering.

        Args:
            metric_name: Filter by metric name.
            limit: Maximum values to return.

        Returns:
            A sequence of matching metric values.
        """
        try:
            async with self._pool.acquire() as conn:
                if metric_name:
                    rows = await conn.fetch(
                        "SELECT metric_name, value, tags, metadata, timestamp "
                        "FROM metrics WHERE metric_name = $1 "
                        "ORDER BY timestamp DESC LIMIT $2",
                        metric_name,
                        limit,
                    )
                else:
                    rows = await conn.fetch(
                        "SELECT metric_name, value, tags, metadata, timestamp "
                        "FROM metrics ORDER BY timestamp DESC LIMIT $1",
                        limit,
                    )
                return [
                    MetricValue(
                        metric_name=row["metric_name"],
                        value=row["value"],
                        tags=tuple(json.loads(row["tags"])) if row["tags"] else (),
                        metadata=json.loads(row["metadata"]) if row["metadata"] else {},
                        timestamp=row["timestamp"],
                    )
                    for row in rows
                ]
        except Exception as exc:
            raise StorageError(f"Failed to load metrics: {exc}") from exc

    async def close(self) -> None:  # pragma: no cover
        """Close the connection pool."""
        if self._pool is not None:
            await self._pool.close()
            self._pool = None

__init__(dsn='postgresql://localhost/agentprobe', min_pool_size=2, max_pool_size=10)

Initialize the PostgreSQL storage.

Parameters:

Name Type Description Default
dsn str

PostgreSQL connection string.

'postgresql://localhost/agentprobe'
min_pool_size int

Minimum pool connections.

2
max_pool_size int

Maximum pool connections.

10
Source code in src/agentprobe/storage/postgres.py
def __init__(
    self,
    dsn: str = "postgresql://localhost/agentprobe",
    min_pool_size: int = 2,
    max_pool_size: int = 10,
) -> None:
    """Initialize the PostgreSQL storage.

    Args:
        dsn: PostgreSQL connection string.
        min_pool_size: Minimum pool connections.
        max_pool_size: Maximum pool connections.
    """
    self._dsn = dsn
    self._min_pool_size = min_pool_size
    self._max_pool_size = max_pool_size
    self._pool: Any = None
    self._migration = SchemaMigration()

setup() async

Create the connection pool and run pending migrations.

Raises:

Type Description
StorageError

If connection or migration fails.

Source code in src/agentprobe/storage/postgres.py
async def setup(self) -> None:  # pragma: no cover
    """Create the connection pool and run pending migrations.

    Raises:
        StorageError: If connection or migration fails.
    """
    try:
        import asyncpg  # type: ignore[import-not-found]  # noqa: PLC0415

        self._pool = await asyncpg.create_pool(
            self._dsn,
            min_size=self._min_pool_size,
            max_size=self._max_pool_size,
        )
        await self._run_migrations()
        logger.info("PostgreSQL storage initialized: %s", self._dsn)
    except Exception as exc:
        raise StorageError(f"Failed to initialize PostgreSQL: {exc}") from exc

save_trace(trace) async

Persist a trace to PostgreSQL.

Parameters:

Name Type Description Default
trace Trace

The trace to save.

required

Raises:

Type Description
StorageError

If the save operation fails.

Source code in src/agentprobe/storage/postgres.py
async def save_trace(self, trace: Trace) -> None:
    """Persist a trace to PostgreSQL.

    Args:
        trace: The trace to save.

    Raises:
        StorageError: If the save operation fails.
    """
    try:
        data = trace.model_dump_json()
        tags_json = json.dumps(list(trace.tags))
        async with self._pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO traces
                   (trace_id, agent_name, model, input_text, output_text,
                    total_input_tokens, total_output_tokens, total_latency_ms,
                    tags, data, created_at)
                   VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
                   ON CONFLICT (trace_id) DO UPDATE SET data = $10""",
                trace.trace_id,
                trace.agent_name,
                trace.model,
                trace.input_text,
                trace.output_text,
                trace.total_input_tokens,
                trace.total_output_tokens,
                trace.total_latency_ms,
                tags_json,
                data,
                trace.created_at,
            )
    except Exception as exc:
        raise StorageError(f"Failed to save trace: {exc}") from exc

load_trace(trace_id) async

Load a trace by ID.

Parameters:

Name Type Description Default
trace_id str

The unique identifier.

required

Returns:

Type Description
Trace | None

The trace if found, otherwise None.

Source code in src/agentprobe/storage/postgres.py
async def load_trace(self, trace_id: str) -> Trace | None:
    """Load a trace by ID.

    Args:
        trace_id: The unique identifier.

    Returns:
        The trace if found, otherwise None.
    """
    try:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow("SELECT data FROM traces WHERE trace_id = $1", trace_id)
            if row is None:
                return None
            return Trace.model_validate_json(row["data"])
    except StorageError:
        raise
    except Exception as exc:
        raise StorageError(f"Failed to load trace: {exc}") from exc

list_traces(agent_name=None, limit=100) async

List traces with optional filtering.

Parameters:

Name Type Description Default
agent_name str | None

Filter by agent name.

None
limit int

Maximum results.

100

Returns:

Type Description
Sequence[Trace]

A list of matching traces.

Source code in src/agentprobe/storage/postgres.py
async def list_traces(
    self,
    agent_name: str | None = None,
    limit: int = 100,
) -> Sequence[Trace]:
    """List traces with optional filtering.

    Args:
        agent_name: Filter by agent name.
        limit: Maximum results.

    Returns:
        A list of matching traces.
    """
    try:
        async with self._pool.acquire() as conn:
            if agent_name:
                rows = await conn.fetch(
                    "SELECT data FROM traces WHERE agent_name = $1 "
                    "ORDER BY created_at DESC LIMIT $2",
                    agent_name,
                    limit,
                )
            else:
                rows = await conn.fetch(
                    "SELECT data FROM traces ORDER BY created_at DESC LIMIT $1",
                    limit,
                )
            return [Trace.model_validate_json(row["data"]) for row in rows]
    except Exception as exc:
        raise StorageError(f"Failed to list traces: {exc}") from exc

save_result(result) async

Persist a test result.

Parameters:

Name Type Description Default
result TestResult

The test result to save.

required

Raises:

Type Description
StorageError

If the save operation fails.

Source code in src/agentprobe/storage/postgres.py
async def save_result(self, result: TestResult) -> None:
    """Persist a test result.

    Args:
        result: The test result to save.

    Raises:
        StorageError: If the save operation fails.
    """
    try:
        data = result.model_dump_json()
        async with self._pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO test_results
                   (result_id, test_name, status, score, duration_ms, data, created_at)
                   VALUES ($1, $2, $3, $4, $5, $6, $7)
                   ON CONFLICT (result_id) DO UPDATE SET data = $6""",
                result.result_id,
                result.test_name,
                result.status.value,
                result.score,
                result.duration_ms,
                data,
                result.created_at,
            )
    except Exception as exc:
        raise StorageError(f"Failed to save result: {exc}") from exc

load_results(test_name=None, limit=100) async

Load test results with optional filtering.

Parameters:

Name Type Description Default
test_name str | None

Filter by test name.

None
limit int

Maximum results.

100

Returns:

Type Description
Sequence[TestResult]

A list of matching test results.

Source code in src/agentprobe/storage/postgres.py
async def load_results(
    self,
    test_name: str | None = None,
    limit: int = 100,
) -> Sequence[TestResult]:
    """Load test results with optional filtering.

    Args:
        test_name: Filter by test name.
        limit: Maximum results.

    Returns:
        A list of matching test results.
    """
    try:
        async with self._pool.acquire() as conn:
            if test_name:
                rows = await conn.fetch(
                    "SELECT data FROM test_results WHERE test_name = $1 "
                    "ORDER BY created_at DESC LIMIT $2",
                    test_name,
                    limit,
                )
            else:
                rows = await conn.fetch(
                    "SELECT data FROM test_results ORDER BY created_at DESC LIMIT $1",
                    limit,
                )
            return [TestResult.model_validate_json(row["data"]) for row in rows]
    except Exception as exc:
        raise StorageError(f"Failed to load results: {exc}") from exc

load_result(result_id) async

Load a single test result by ID.

Parameters:

Name Type Description Default
result_id str

The unique identifier.

required

Returns:

Type Description
TestResult | None

The test result if found, otherwise None.

Source code in src/agentprobe/storage/postgres.py
async def load_result(self, result_id: str) -> TestResult | None:
    """Load a single test result by ID.

    Args:
        result_id: The unique identifier.

    Returns:
        The test result if found, otherwise None.
    """
    try:
        async with self._pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT data FROM test_results WHERE result_id = $1", result_id
            )
            if row is None:
                return None
            return TestResult.model_validate_json(row["data"])
    except StorageError:
        raise
    except Exception as exc:
        raise StorageError(f"Failed to load result: {exc}") from exc

save_metrics(metrics) async

Persist a batch of metric values.

Parameters:

Name Type Description Default
metrics Sequence[MetricValue]

The metric values to save.

required

Raises:

Type Description
StorageError

If the save operation fails.

Source code in src/agentprobe/storage/postgres.py
async def save_metrics(self, metrics: Sequence[MetricValue]) -> None:
    """Persist a batch of metric values.

    Args:
        metrics: The metric values to save.

    Raises:
        StorageError: If the save operation fails.
    """
    if not metrics:
        return
    try:
        async with self._pool.acquire() as conn:
            for mv in metrics:
                tags_json = json.dumps(list(mv.tags))
                meta_json = json.dumps(mv.metadata)
                await conn.execute(
                    """INSERT INTO metrics (metric_name, value, tags, metadata, timestamp)
                       VALUES ($1, $2, $3, $4, $5)""",
                    mv.metric_name,
                    mv.value,
                    tags_json,
                    meta_json,
                    mv.timestamp,
                )
    except Exception as exc:
        raise StorageError(f"Failed to save metrics: {exc}") from exc

load_metrics(metric_name=None, limit=1000) async

Load metric values with optional filtering.

Parameters:

Name Type Description Default
metric_name str | None

Filter by metric name.

None
limit int

Maximum values to return.

1000

Returns:

Type Description
Sequence[MetricValue]

A sequence of matching metric values.

Source code in src/agentprobe/storage/postgres.py
async def load_metrics(
    self,
    metric_name: str | None = None,
    limit: int = 1000,
) -> Sequence[MetricValue]:
    """Load metric values with optional filtering.

    Args:
        metric_name: Filter by metric name.
        limit: Maximum values to return.

    Returns:
        A sequence of matching metric values.
    """
    try:
        async with self._pool.acquire() as conn:
            if metric_name:
                rows = await conn.fetch(
                    "SELECT metric_name, value, tags, metadata, timestamp "
                    "FROM metrics WHERE metric_name = $1 "
                    "ORDER BY timestamp DESC LIMIT $2",
                    metric_name,
                    limit,
                )
            else:
                rows = await conn.fetch(
                    "SELECT metric_name, value, tags, metadata, timestamp "
                    "FROM metrics ORDER BY timestamp DESC LIMIT $1",
                    limit,
                )
            return [
                MetricValue(
                    metric_name=row["metric_name"],
                    value=row["value"],
                    tags=tuple(json.loads(row["tags"])) if row["tags"] else (),
                    metadata=json.loads(row["metadata"]) if row["metadata"] else {},
                    timestamp=row["timestamp"],
                )
                for row in rows
            ]
    except Exception as exc:
        raise StorageError(f"Failed to load metrics: {exc}") from exc

close() async

Close the connection pool.

Source code in src/agentprobe/storage/postgres.py
async def close(self) -> None:  # pragma: no cover
    """Close the connection pool."""
    if self._pool is not None:
        await self._pool.close()
        self._pool = None

Migrations

agentprobe.storage.migrations

Schema migrations for storage backends.

Provides a linear version-based migration system for database schemas. Each migration is a pair of (version, SQL statements) applied in order.

SchemaMigration

Manages linear schema migrations for PostgreSQL.

Tracks the current schema version and applies any pending migrations in order.

Source code in src/agentprobe/storage/migrations.py
class SchemaMigration:
    """Manages linear schema migrations for PostgreSQL.

    Tracks the current schema version and applies any pending
    migrations in order.
    """

    def __init__(self) -> None:
        self._migrations = list(_MIGRATIONS)

    @property
    def latest_version(self) -> int:
        """Return the latest available schema version."""
        if not self._migrations:
            return 0
        return self._migrations[-1][0]

    def get_pending(self, current_version: int) -> list[tuple[int, str]]:
        """Get migrations that haven't been applied yet.

        Args:
            current_version: The currently applied schema version.

        Returns:
            List of (version, sql) tuples to apply.
        """
        return [(v, sql) for v, sql in self._migrations if v > current_version]

    async def apply(
        self,
        current_version: int,
        execute_fn: Any,
    ) -> int:
        """Apply pending migrations using the provided execution function.

        Args:
            current_version: The currently applied schema version.
            execute_fn: Async callable that executes SQL strings.

        Returns:
            The new schema version after applying migrations.
        """
        pending = self.get_pending(current_version)
        if not pending:
            logger.info("Schema is up to date at version %d", current_version)
            return current_version

        for version, sql in pending:
            logger.info("Applying migration V%d", version)
            await execute_fn(sql)

        new_version = pending[-1][0]
        logger.info("Schema migrated to version %d", new_version)
        return new_version

latest_version property

Return the latest available schema version.

get_pending(current_version)

Get migrations that haven't been applied yet.

Parameters:

Name Type Description Default
current_version int

The currently applied schema version.

required

Returns:

Type Description
list[tuple[int, str]]

List of (version, sql) tuples to apply.

Source code in src/agentprobe/storage/migrations.py
def get_pending(self, current_version: int) -> list[tuple[int, str]]:
    """Get migrations that haven't been applied yet.

    Args:
        current_version: The currently applied schema version.

    Returns:
        List of (version, sql) tuples to apply.
    """
    return [(v, sql) for v, sql in self._migrations if v > current_version]

apply(current_version, execute_fn) async

Apply pending migrations using the provided execution function.

Parameters:

Name Type Description Default
current_version int

The currently applied schema version.

required
execute_fn Any

Async callable that executes SQL strings.

required

Returns:

Type Description
int

The new schema version after applying migrations.

Source code in src/agentprobe/storage/migrations.py
async def apply(
    self,
    current_version: int,
    execute_fn: Any,
) -> int:
    """Apply pending migrations using the provided execution function.

    Args:
        current_version: The currently applied schema version.
        execute_fn: Async callable that executes SQL strings.

    Returns:
        The new schema version after applying migrations.
    """
    pending = self.get_pending(current_version)
    if not pending:
        logger.info("Schema is up to date at version %d", current_version)
        return current_version

    for version, sql in pending:
        logger.info("Applying migration V%d", version)
        await execute_fn(sql)

    new_version = pending[-1][0]
    logger.info("Schema migrated to version %d", new_version)
    return new_version