Source code for resonance.api.data.schema

from __future__ import annotations

from typing import TYPE_CHECKING, Final

if TYPE_CHECKING:
    import sqlite3

BEAMTIME_SCHEMA_VERSION: Final[int] = 2

_DDL = """
PRAGMA foreign_keys = ON;

CREATE TABLE IF NOT EXISTS samples (
    id           INTEGER PRIMARY KEY AUTOINCREMENT,
    name         TEXT NOT NULL,
    formula      TEXT,
    serial       TEXT,
    tags         TEXT,
    beamline_pos TEXT,
    extra        TEXT
);

CREATE INDEX IF NOT EXISTS idx_samples_name ON samples(name);

CREATE TABLE IF NOT EXISTS runs (
    uid          TEXT PRIMARY KEY,
    sample_id    INTEGER REFERENCES samples(id) ON DELETE SET NULL,
    plan_name    TEXT NOT NULL,
    time_start   REAL NOT NULL,
    time_stop    REAL,
    exit_status  TEXT,
    num_events   INTEGER DEFAULT 0,
    operator     TEXT,
    metadata     TEXT
);

CREATE INDEX IF NOT EXISTS idx_runs_sample ON runs(sample_id);
CREATE INDEX IF NOT EXISTS idx_runs_plan   ON runs(plan_name);
CREATE INDEX IF NOT EXISTS idx_runs_tstart ON runs(time_start);

CREATE TABLE IF NOT EXISTS streams (
    uid          TEXT PRIMARY KEY,
    run_uid      TEXT NOT NULL REFERENCES runs(uid) ON DELETE CASCADE,
    name         TEXT NOT NULL,
    data_keys    TEXT NOT NULL,
    time_created REAL NOT NULL
);

CREATE INDEX IF NOT EXISTS idx_streams_run ON streams(run_uid);
CREATE UNIQUE INDEX IF NOT EXISTS idx_streams_run_name ON streams(run_uid, name);

CREATE TABLE IF NOT EXISTS events (
    uid        TEXT PRIMARY KEY,
    stream_uid TEXT NOT NULL REFERENCES streams(uid) ON DELETE CASCADE,
    seq_num    INTEGER NOT NULL,
    time       REAL NOT NULL,
    data       TEXT NOT NULL,
    timestamps TEXT
);

CREATE INDEX IF NOT EXISTS idx_events_stream ON events(stream_uid);
CREATE UNIQUE INDEX IF NOT EXISTS idx_events_seq ON events(stream_uid, seq_num);

CREATE TABLE IF NOT EXISTS image_refs (
    id                INTEGER PRIMARY KEY AUTOINCREMENT,
    event_uid         TEXT NOT NULL REFERENCES events(uid) ON DELETE CASCADE,
    field_name        TEXT NOT NULL,
    zarr_group        TEXT NOT NULL,
    index_in_stack    INTEGER NOT NULL,
    shape_x           INTEGER NOT NULL,
    shape_y           INTEGER NOT NULL,
    dtype             TEXT NOT NULL,
    compression_codec TEXT
);

CREATE INDEX IF NOT EXISTS idx_image_refs_event ON image_refs(event_uid);
"""


[docs] def create_beamtime_schema(conn: sqlite3.Connection) -> None: """ Create all per-beamtime tables, indexes, and pragmas. Parameters ---------- conn : sqlite3.Connection Open connection to the beamtime SQLite database file. Raises ------ sqlite3.DatabaseError If DDL execution fails due to a malformed database or I/O error. Notes ----- Each beamtime session is stored in its own SQLite `.db` file alongside a Zarr store directory. The `image_refs` table holds references into the Zarr array (group path and frame index) rather than binary BLOBs, keeping the SQLite file small and enabling memory-mapped array access. Foreign keys are enforced on every connection via ``PRAGMA foreign_keys = ON``, which must be re-applied per connection because SQLite resets it on open. """ conn.executescript(_DDL) conn.execute(f"PRAGMA user_version = {BEAMTIME_SCHEMA_VERSION}") conn.commit()
[docs] def migrate_beamtime_schema( conn: sqlite3.Connection, target_version: int = BEAMTIME_SCHEMA_VERSION ) -> None: """Apply pending schema migrations up to target_version. Parameters ---------- conn : sqlite3.Connection Open connection to the beamtime SQLite database file. target_version : int, optional Schema version to migrate to. Defaults to BEAMTIME_SCHEMA_VERSION. Raises ------ RuntimeError If target_version is less than the current schema version (downgrade not supported). """ current = conn.execute("PRAGMA user_version").fetchone()[0] if current == target_version: return if current > target_version: raise RuntimeError( f"Cannot downgrade schema from version {current} to {target_version}." ) if current < 1: create_beamtime_schema(conn) return if current == 1 and target_version >= 2: conn.execute("ALTER TABLE image_refs ADD COLUMN compression_codec TEXT") conn.execute("PRAGMA user_version = 2") conn.commit() current = 2 if current < target_version: raise NotImplementedError( f"No migration path from schema version {current} to {target_version}. " "Schema migrations have not yet been implemented." )
INDEX_SCHEMA_VERSION: Final[int] = 1 _INDEX_DDL = """ PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS researchers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, email TEXT, orcid TEXT, affiliation TEXT, root_path TEXT NOT NULL, extra TEXT ); CREATE UNIQUE INDEX IF NOT EXISTS idx_researchers_name ON researchers(name); CREATE TABLE IF NOT EXISTS beamtimes ( id INTEGER PRIMARY KEY AUTOINCREMENT, researcher_id INTEGER NOT NULL REFERENCES researchers(id) ON DELETE CASCADE, label TEXT NOT NULL, db_path TEXT NOT NULL, time_start REAL, time_stop REAL, description TEXT, extra TEXT ); CREATE INDEX IF NOT EXISTS idx_beamtimes_researcher ON beamtimes(researcher_id); CREATE INDEX IF NOT EXISTS idx_beamtimes_start ON beamtimes(time_start); CREATE UNIQUE INDEX IF NOT EXISTS idx_beamtimes_db_path ON beamtimes(db_path); CREATE TABLE IF NOT EXISTS runs_index ( uid TEXT PRIMARY KEY, researcher_id INTEGER NOT NULL REFERENCES researchers(id) ON DELETE CASCADE, beamtime_id INTEGER NOT NULL REFERENCES beamtimes(id) ON DELETE CASCADE, plan_name TEXT NOT NULL, sample_name TEXT, time_start REAL NOT NULL, tags TEXT ); CREATE INDEX IF NOT EXISTS idx_runs_index_plan ON runs_index(plan_name); CREATE INDEX IF NOT EXISTS idx_runs_index_sample ON runs_index(sample_name); CREATE INDEX IF NOT EXISTS idx_runs_index_time ON runs_index(time_start); """
[docs] def create_index_schema(conn: sqlite3.Connection) -> None: """ Create master index tables, indexes, and pragmas. Parameters ---------- conn : sqlite3.Connection Open connection to the master index SQLite database file. Raises ------ sqlite3.DatabaseError If DDL execution fails due to a malformed database or I/O error. Notes ----- The master index database aggregates metadata from all per-beamtime databases into three tables: ``researchers``, ``beamtimes``, and ``runs_index``. This allows cross-beamtime queries without opening individual session databases. The ``runs_index`` table mirrors key fields from the per-beamtime ``runs`` table, identified by the same ``uid``. Foreign keys are enforced via ``PRAGMA foreign_keys = ON``, which must be re-applied per connection. """ conn.executescript(_INDEX_DDL) conn.execute(f"PRAGMA user_version = {INDEX_SCHEMA_VERSION}") conn.commit()
[docs] def migrate_index_schema( conn: sqlite3.Connection, target_version: int = INDEX_SCHEMA_VERSION, ) -> None: """ Apply pending index schema migrations up to target_version. Parameters ---------- conn : sqlite3.Connection Open connection to the master index SQLite database file. target_version : int, optional Schema version to migrate to. Defaults to ``INDEX_SCHEMA_VERSION``. Raises ------ RuntimeError If the database's current ``user_version`` is greater than ``target_version``, indicating a downgrade attempt or a database written by a newer version of this software. NotImplementedError If migrations are required (current_version < target_version) but no migration path has been implemented yet. Notes ----- Migration steps are applied sequentially from current_version + 1 up to target_version. Version 1 is the initial schema; future versions will add individual ``ALTER TABLE`` or data-transform statements here. """ (current_version,) = conn.execute("PRAGMA user_version").fetchone() if current_version == target_version: return if current_version > target_version: raise RuntimeError( f"Index schema version {current_version} is newer than " f"target version {target_version}. Downgrade is not supported." ) raise NotImplementedError( f"No migration path from index schema version {current_version} to {target_version}. " "Schema migrations have not yet been implemented." )