Source code for resonance.api.data.catalog

from __future__ import annotations

import json
import sqlite3
from pathlib import Path  # noqa: TC003
from typing import Any

import numpy as np
import pandas as pd
import zarr

from resonance.api.data.models import RunSummary, SampleMetadata

_SQL_RECENT = """
SELECT r.uid, r.plan_name, r.time_start, r.time_stop, r.exit_status,
       s.name AS sample_name
FROM runs r
LEFT JOIN samples s ON r.sample_id = s.id
ORDER BY r.time_start DESC
LIMIT ?
"""

_SQL_BY_SAMPLE = """
SELECT r.uid, r.plan_name, r.time_start, r.time_stop, r.exit_status,
       s.name AS sample_name
FROM runs r
JOIN samples s ON r.sample_id = s.id
WHERE s.name = ?
ORDER BY r.time_start DESC
"""

_SQL_RUN_BY_UID = "SELECT * FROM runs WHERE uid = ?"

_SQL_SAMPLE_BY_ID = "SELECT * FROM samples WHERE id = ?"

_SQL_EVENTS = """
SELECT e.seq_num, e.time, e.data
FROM events e
JOIN streams s ON e.stream_uid = s.uid
WHERE s.run_uid = ? AND s.name = ?
ORDER BY e.seq_num
"""

_SQL_IMAGE_REFS = """
    SELECT ir.zarr_group, ir.index_in_stack, ir.shape_x, ir.shape_y, ir.dtype, e.seq_num
    FROM image_refs ir
    JOIN events e ON ir.event_uid = e.uid
    JOIN streams s ON e.stream_uid = s.uid
    WHERE s.run_uid = ? AND ir.field_name = ?
    ORDER BY e.seq_num
"""


def _row_to_run_summary(row: sqlite3.Row) -> RunSummary:
    return RunSummary(
        uid=row["uid"],
        plan_name=row["plan_name"],
        time_start=row["time_start"],
        time_stop=row["time_stop"],
        exit_status=row["exit_status"],
        sample_name=row["sample_name"],
    )


[docs] class Catalog: """Read-only catalog over a per-beamtime SQLite database. The catalog reads from a ``.db`` SQLite file and an adjacent ``.zarr`` directory store that holds detector image arrays. Parameters ---------- db_path : Path Path to the beamtime SQLite database file. The Zarr store is expected at ``db_path.with_suffix(".zarr")``. Attributes ---------- _conn : sqlite3.Connection Read-only connection to the database. _db_path : Path Resolved path passed at construction time. _zarr_path : Path Path to the adjacent Zarr store directory. """ def __init__(self, db_path: Path) -> None: self._db_path = db_path self._zarr_path = db_path.with_suffix(".zarr") self._conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) self._conn.row_factory = sqlite3.Row
[docs] def recent(self, n: int = 10) -> list[RunSummary]: """Return the n most recently started runs. Parameters ---------- n : int, optional Maximum number of runs to return. Defaults to 10. Returns ------- list[RunSummary] Run summaries ordered newest-first. """ rows = self._conn.execute(_SQL_RECENT, (n,)).fetchall() return [_row_to_run_summary(r) for r in rows]
[docs] def by_sample(self, name: str) -> list[RunSummary]: """Return all runs associated with a sample by name. Parameters ---------- name : str Exact sample name as stored in the ``samples`` table. Returns ------- list[RunSummary] Run summaries for the given sample, ordered newest-first. """ rows = self._conn.execute(_SQL_BY_SAMPLE, (name,)).fetchall() return [_row_to_run_summary(r) for r in rows]
[docs] def __getitem__(self, uid: str) -> Run: """Return the full Run object for the given UID. Parameters ---------- uid : str Hex UUID of the run. Returns ------- Run Fully-featured run accessor backed by the open connection. Raises ------ KeyError If no run with the given UID exists in the database. """ row = self._conn.execute(_SQL_RUN_BY_UID, (uid,)).fetchone() if row is None: raise KeyError(uid) return Run(self._conn, dict(row), zarr_path=self._zarr_path)
[docs] def close(self) -> None: """Close the underlying database connection.""" self._conn.close()
def __enter__(self) -> Catalog: return self def __exit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, tb: object, ) -> None: self.close()
[docs] class Run: """Full accessor for a single run, including scalar event data and sample metadata. Parameters ---------- conn : sqlite3.Connection Active connection to the beamtime database. row : dict[str, Any] Deserialized row from the ``runs`` table. zarr_path : Path Path to the adjacent Zarr store directory. Attributes ---------- _conn : sqlite3.Connection Shared database connection from the parent Catalog. _row : dict[str, Any] Raw column values for this run. _zarr_path : Path Path to the Zarr store for detector images. """ def __init__( self, conn: sqlite3.Connection, row: dict[str, Any], zarr_path: Path ) -> None: self._conn = conn self._row = row self._zarr_path = zarr_path @property def uid(self) -> str: """str: Hex UUID of the run.""" return self._row["uid"] @property def plan_name(self) -> str: """str: Name of the scan plan.""" return self._row["plan_name"] @property def time_start(self) -> float: """float: Unix timestamp of run start.""" return self._row["time_start"] @property def time_stop(self) -> float | None: """float or None: Unix timestamp of run stop.""" return self._row["time_stop"] @property def exit_status(self) -> str | None: """str or None: Final run status, e.g. 'success', 'failed'.""" return self._row["exit_status"] @property def num_events(self) -> int: """int: Total number of events recorded in the primary stream.""" return self._row["num_events"] @property def sample(self) -> SampleMetadata | None: """Return the associated sample metadata, or None if not set. Returns ------- SampleMetadata or None Populated from the ``samples`` table row, with ``tags`` and ``extra`` deserialized from JSON. Returns None if ``sample_id`` is null or the referenced row does not exist. """ sample_id = self._row.get("sample_id") if sample_id is None: return None row = self._conn.execute(_SQL_SAMPLE_BY_ID, (sample_id,)).fetchone() if row is None: return None return SampleMetadata( name=row["name"], formula=row["formula"], serial=row["serial"], tags=json.loads(row["tags"] or "[]"), beamline_pos=row["beamline_pos"], extra=json.loads(row["extra"] or "{}"), id=row["id"], )
[docs] def table(self, stream: str = "primary") -> pd.DataFrame: """Load scalar event data from a named stream as a DataFrame. Parameters ---------- stream : str, optional Name of the stream to load. Defaults to "primary". Returns ------- pd.DataFrame Columns are ``seq_num``, ``time``, followed by all scalar fields present in the event ``data`` JSON. Returns an empty DataFrame with columns ``["seq_num", "time"]`` if the stream has no events. """ rows = self._conn.execute(_SQL_EVENTS, (self._row["uid"], stream)).fetchall() if not rows: return pd.DataFrame(columns=pd.Index(["seq_num", "time"])) records: list[dict[str, Any]] = [] for row in rows: record: dict[str, Any] = {"seq_num": row["seq_num"], "time": row["time"]} record.update(json.loads(row["data"])) records.append(record) df = pd.DataFrame(records) leading = ["seq_num", "time"] remaining = [c for c in df.columns if c not in leading] return pd.DataFrame(df[leading + remaining])
[docs] def images(self, field: str = "detector_image") -> LazyImageSequence: """Return a lazy accessor for detector images in this run. Images are not loaded until explicitly indexed. Each frame is stored as a slice of a Zarr array on the filesystem. Parameters ---------- field : str, optional The image field name to load (default: "detector_image"). Returns ------- LazyImageSequence Lazy accessor with length equal to the number of images in this run. Notes ----- Returns an empty LazyImageSequence if no images are stored for the given field or if the Zarr store does not exist. """ rows = self._conn.execute(_SQL_IMAGE_REFS, (self._row["uid"], field)).fetchall() refs = [dict(r) for r in rows] return LazyImageSequence(self._conn, refs, zarr_store_path=self._zarr_path)
[docs] class LazyImageSequence: """Lazy accessor for detector images referenced via Zarr. Images are not loaded until explicitly indexed. Each image is stored as a frame in a Zarr array on the filesystem; this class holds the reference metadata and defers loading. Parameters ---------- conn : sqlite3.Connection Active connection to the beamtime database. refs : list[dict[str, Any]] List of deserialized rows from the ``image_refs`` table. zarr_store_path : Path Path to the Zarr store directory containing detector arrays. Attributes ---------- _conn : sqlite3.Connection Shared database connection from the parent Catalog. _refs : list[dict[str, Any]] Image reference metadata, one entry per frame. _zarr_store_path : Path Path to the Zarr store directory. """ def __init__( self, conn: sqlite3.Connection, refs: list[dict[str, Any]], zarr_store_path: Path, ) -> None: self._conn = conn self._refs = refs self._zarr_store_path = zarr_store_path def __len__(self) -> int: return len(self._refs) @property def shape(self) -> tuple[int, int, int]: """tuple[int, int, int]: (n_frames, shape_x, shape_y).""" if not self._refs: return (0, 0, 0) return (len(self._refs), self._refs[0]["shape_x"], self._refs[0]["shape_y"])
[docs] def __getitem__(self, idx: int | slice) -> np.ndarray: """Load one or more detector images from the Zarr store. Parameters ---------- idx : int or slice Frame index or slice. Returns ------- np.ndarray A single (shape_x, shape_y) array for int index, or a stacked (n, shape_x, shape_y) array for slice index. Raises ------ IndexError If idx is out of range. TypeError If idx is not int or slice. """ if isinstance(idx, int): if idx < -len(self._refs) or idx >= len(self._refs): raise IndexError( f"index {idx} out of range for LazyImageSequence of length {len(self._refs)}" ) ref = self._refs[idx] if idx >= 0 else self._refs[len(self._refs) + idx] store = zarr.open_group(str(self._zarr_store_path), mode="r") arr = store[ref["zarr_group"]] return np.asarray(arr[ref["index_in_stack"]]) if isinstance(idx, slice): indices = range(*idx.indices(len(self._refs))) return ( np.stack([self[i] for i in indices]) if indices else np.empty((0,), dtype=np.int32) ) raise TypeError(f"indices must be int or slice, not {type(idx).__name__}")