from __future__ import annotations
import json
import sqlite3
import time
from pathlib import Path # noqa: TC003
from typing import TYPE_CHECKING, Any, cast
from uuid import uuid4
import zarr
if TYPE_CHECKING:
import numpy as np
from resonance.api.data.models import SampleMetadata
from resonance.api.data.schema import create_beamtime_schema
[docs]
class RunWriter:
"""Manages a SQLite connection to a beamtime database and writes scan data.
Parameters
----------
db_path : Path
Path to the beamtime SQLite database file. Created on first open if it
does not exist.
sample : SampleMetadata
Sample to associate with runs written through this writer. If
``sample.id`` is None, the sample is upserted on ``open``.
Attributes
----------
_db_path : Path
Resolved path to the SQLite file.
_sample : SampleMetadata
Sample metadata, with ``id`` populated after ``open``.
_conn : sqlite3.Connection or None
Active database connection, or None when closed.
_run_uid : str
Hex UID of the currently open run, or empty string.
_stream_uid : str
Hex UID of the currently open stream, or empty string.
_seq_num : int
Event sequence counter within the current stream.
_zarr_store : zarr.Group or None
Open Zarr group for image storage, or None when closed.
"""
def __init__(self, db_path: Path, sample: SampleMetadata) -> None:
self._db_path = db_path
self._sample = sample
self._conn: sqlite3.Connection | None = None
self._run_uid: str = ""
self._stream_uid: str = ""
self._seq_num: int = 0
self._zarr_store: zarr.Group | None = None
[docs]
def open(self) -> None:
"""Open the database connection and upsert the sample.
Creates the beamtime schema if the database does not yet exist. If
``self._sample.id`` is None, the sample is looked up by name; if a
matching row exists its id is loaded, otherwise a new row is inserted.
A Zarr store is opened (or created) alongside the ``.db`` file at
``{db_path.stem}.zarr/``.
Raises
------
RuntimeError
If the writer is already open (``self._conn`` is not None).
sqlite3.DatabaseError
If the database file is corrupt or unreadable.
"""
if self._conn is not None:
raise RuntimeError("RunWriter is already open")
conn = sqlite3.connect(self._db_path)
conn.execute("PRAGMA foreign_keys = ON")
create_beamtime_schema(conn)
if self._sample.id is None:
row = conn.execute(
"SELECT id FROM samples WHERE name = ?", (self._sample.name,)
).fetchone()
if row is not None:
self._sample.id = row[0]
else:
cur = conn.execute(
"INSERT INTO samples (name, formula, serial, tags, beamline_pos, extra) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
self._sample.name,
self._sample.formula,
self._sample.serial,
json.dumps(self._sample.tags),
self._sample.beamline_pos,
json.dumps(self._sample.extra),
),
)
conn.commit()
self._sample.id = cur.lastrowid
self._conn = conn
zarr_path = self._db_path.with_suffix(".zarr")
self._zarr_store = zarr.open_group(str(zarr_path), mode="a")
[docs]
def open_run(
self, plan_name: str, *, metadata: dict[str, Any] | None = None
) -> str:
"""Insert a new run row and return its UID.
Parameters
----------
plan_name : str
Name of the scan plan, e.g. "en_scan".
metadata : dict[str, Any] or None, optional
Arbitrary run-level metadata serialized to JSON. Defaults to an
empty dict.
Returns
-------
str
Hex UUID of the newly created run.
Raises
------
RuntimeError
If the writer is not open.
"""
if self._conn is None:
raise RuntimeError("RunWriter is not open")
uid = uuid4().hex
self._run_uid = uid
self._seq_num = 0
self._conn.execute(
"INSERT INTO runs (uid, sample_id, plan_name, time_start, metadata) "
"VALUES (?, ?, ?, ?, ?)",
(uid, self._sample.id, plan_name, time.time(), json.dumps(metadata or {})),
)
self._conn.commit()
return uid
[docs]
def open_stream(self, name: str, data_keys: dict[str, Any]) -> str:
"""Insert a new stream row and return its UID.
Parameters
----------
name : str
Stream name, e.g. "primary".
data_keys : dict[str, Any]
Descriptor mapping field names to their metadata, serialized to JSON.
Returns
-------
str
Hex UUID of the newly created stream.
Raises
------
RuntimeError
If the writer is not open or no run has been opened.
"""
if self._conn is None:
raise RuntimeError("RunWriter is not open")
if not self._run_uid:
raise RuntimeError("No open run")
uid = uuid4().hex
self._stream_uid = uid
self._conn.execute(
"INSERT INTO streams (uid, run_uid, name, data_keys, time_created) "
"VALUES (?, ?, ?, ?, ?)",
(uid, self._run_uid, name, json.dumps(data_keys), time.time()),
)
self._conn.commit()
return uid
[docs]
def write_event(
self,
data: dict[str, float | int | str | bool],
timestamps: dict[str, float] | None = None,
) -> str:
"""Insert an event row and return its UID.
Events are not committed individually; the commit is deferred to
``close_run`` for performance.
Parameters
----------
data : dict[str, float | int | str | bool]
Measured values keyed by field name.
timestamps : dict[str, float] or None, optional
Per-field acquisition timestamps. Defaults to an empty dict.
Returns
-------
str
Hex UUID of the newly inserted event.
Raises
------
RuntimeError
If the writer is not open, no run has been opened, or no stream
has been opened.
"""
if self._conn is None:
raise RuntimeError("RunWriter is not open")
if not self._run_uid:
raise RuntimeError("No open run")
if not self._stream_uid:
raise RuntimeError("No open stream")
self._seq_num += 1
uid = uuid4().hex
self._conn.execute(
"INSERT INTO events (uid, stream_uid, seq_num, time, data, timestamps) "
"VALUES (?, ?, ?, ?, ?, ?)",
(
uid,
self._stream_uid,
self._seq_num,
time.time(),
json.dumps(data),
json.dumps(timestamps or {}),
),
)
return uid
[docs]
def write_image(self, event_uid: str, field_name: str, image: np.ndarray) -> None:
"""Append a 2-D image frame to the Zarr store and record a reference in SQLite.
Parameters
----------
event_uid : str
Hex UUID of the parent event row in the ``events`` table.
field_name : str
Detector field name, e.g. ``"ccd"``.
image : np.ndarray
2-D array of shape ``(height, width)`` to append.
Raises
------
RuntimeError
If the writer is not open (``self._conn`` is None).
RuntimeError
If no run has been opened (``self._run_uid`` is empty).
RuntimeError
If no stream has been opened (``self._stream_uid`` is empty).
Notes
-----
The Zarr store lives at ``{db_path.stem}.zarr/`` alongside the ``.db``
file. Frames are appended to the array at
``runs/{run_uid}/{field_name}`` inside the store. Each call grows the
array by one frame along axis 0. The ``image_refs`` row stores the
zarr group path and the zero-based frame index so the frame can be
retrieved without scanning the full array. ``compression_codec``
is recorded as ``"blosc"`` to document the Zarr default; actual
compression is controlled by zarr's compressor setting.
"""
if self._conn is None:
raise RuntimeError("RunWriter is not open")
if not self._run_uid:
raise RuntimeError("No open run")
if not self._stream_uid:
raise RuntimeError("No open stream")
store = cast("zarr.Group", self._zarr_store)
zarr_group: str = f"runs/{self._run_uid}/{field_name}"
if zarr_group in store:
arr = cast("zarr.Array", store[zarr_group])
else:
arr = store.require_array(
zarr_group,
shape=(0, *image.shape),
chunks=(1, *image.shape),
dtype=image.dtype,
)
current_len = arr.shape[0]
arr.resize((current_len + 1, *image.shape))
arr[current_len] = image
index_in_stack = current_len
self._conn.execute(
"INSERT INTO image_refs "
"(event_uid, field_name, zarr_group, index_in_stack, shape_x, shape_y, dtype, compression_codec) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(
event_uid,
field_name,
zarr_group,
index_in_stack,
image.shape[0],
image.shape[1],
str(image.dtype),
"blosc",
),
)
[docs]
def close_run(self, *, exit_status: str = "success") -> None:
"""Finalize the current run and commit all pending events.
Parameters
----------
exit_status : str, optional
Final status string. Expected values are "success", "aborted", or
"failed". Defaults to "success".
Raises
------
RuntimeError
If the writer is not open or no run has been opened.
"""
if self._conn is None:
raise RuntimeError("RunWriter is not open")
if not self._run_uid:
raise RuntimeError("No open run")
self._conn.execute(
"UPDATE runs SET time_stop = ?, exit_status = ?, num_events = ? "
"WHERE uid = ?",
(time.time(), exit_status, self._seq_num, self._run_uid),
)
self._conn.commit()
self._run_uid = ""
self._stream_uid = ""
[docs]
def close(self) -> None:
"""Commit any remaining work and close the database connection.
Raises
------
RuntimeError
If the writer is not open.
"""
if self._conn is None:
raise RuntimeError("RunWriter is not open")
self._conn.commit()
self._conn.close()
self._conn = None
self._zarr_store = None
[docs]
def __enter__(self) -> RunWriter:
"""Open the writer and return self.
Returns
-------
RunWriter
The opened writer instance.
"""
self.open()
return self
[docs]
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: object,
) -> None:
"""Close the run and connection, propagating any exception.
Parameters
----------
exc_type : type[BaseException] or None
Exception type if an exception occurred, otherwise None.
exc : BaseException or None
Exception instance if an exception occurred, otherwise None.
tb : object
Traceback object if an exception occurred, otherwise None.
"""
if exc_type is not None and self._run_uid:
self.close_run(exit_status="failed")
elif self._run_uid:
self.close_run()
self.close()
[docs]
class IndexWriter:
"""Writer for the master index database aggregating cross-beamtime metadata.
Parameters
----------
index_db_path : Path
Path to the master index SQLite database file.
Notes
-----
This class is a skeleton. All methods raise ``NotImplementedError`` until
the master index feature is implemented.
"""
def __init__(self, index_db_path: Path) -> None:
self._index_db_path = index_db_path
[docs]
def ensure_schema(self) -> None:
"""Create the index schema if it does not already exist.
Raises
------
NotImplementedError
Always; not yet implemented.
"""
raise NotImplementedError("IndexWriter is not yet implemented")
[docs]
def register_researcher(
self,
name: str,
root_path: str,
*,
email: str | None = None,
orcid: str | None = None,
affiliation: str | None = None,
) -> int:
"""Insert or retrieve a researcher row and return its id.
Parameters
----------
name : str
Full name of the researcher.
root_path : str
Filesystem root under which this researcher's data lives.
email : str or None, optional
Contact email address.
orcid : str or None, optional
ORCID identifier string.
affiliation : str or None, optional
Institutional affiliation.
Returns
-------
int
Primary key of the researcher row.
Raises
------
NotImplementedError
Always; not yet implemented.
"""
raise NotImplementedError("IndexWriter is not yet implemented")
[docs]
def register_beamtime(
self,
researcher_id: int,
label: str,
db_path: str,
*,
time_start: float | None = None,
) -> int:
"""Insert or retrieve a beamtime row and return its id.
Parameters
----------
researcher_id : int
Foreign key into the ``researchers`` table.
label : str
Human-readable beamtime label, e.g. "2026-03-05".
db_path : str
Path to the per-beamtime SQLite file.
time_start : float or None, optional
Unix timestamp for the start of the beamtime session.
Returns
-------
int
Primary key of the beamtime row.
Raises
------
NotImplementedError
Always; not yet implemented.
"""
raise NotImplementedError("IndexWriter is not yet implemented")
[docs]
def index_run(
self,
uid: str,
researcher_id: int,
beamtime_id: int,
plan_name: str,
*,
sample_name: str | None = None,
time_start: float | None = None,
tags: list[str] | None = None,
) -> None:
"""Insert a run summary row into the master index.
Parameters
----------
uid : str
Hex UUID of the run, matching the per-beamtime ``runs.uid``.
researcher_id : int
Foreign key into the ``researchers`` table.
beamtime_id : int
Foreign key into the ``beamtimes`` table.
plan_name : str
Name of the scan plan.
sample_name : str or None, optional
Sample name denormalized from the per-beamtime database.
time_start : float or None, optional
Unix timestamp for the start of the run.
tags : list[str] or None, optional
Arbitrary string tags for filtering.
Raises
------
NotImplementedError
Always; not yet implemented.
"""
raise NotImplementedError("IndexWriter is not yet implemented")