from __future__ import annotations
import asyncio
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Final
import numpy as np
if TYPE_CHECKING:
from bcs import BCSz
AXIS_PHOTONIQUE: Final[str] = "Axis Photonique"
async def get_acquired2d_string(conn: BCSz.BCSServer, name: str) -> dict[str, Any]:
"""
Get the acquired 2D string from the detector.
"""
return await conn.bcs_request("GetInstrumentAcquired2DString", dict(locals()))
[docs]
@dataclass
class ExposureQuality:
"""
Quality assessment of a single detector exposure.
Attributes
----------
overexposed : bool
True when the number of saturated pixels exceeds the configured threshold.
underexposed : bool
True when the number of dark pixels exceeds the configured threshold.
suggested_exposure_seconds : float or None
Recommended exposure time, or None when no suggestion is available.
"""
overexposed: bool
underexposed: bool
suggested_exposure_seconds: float | None
def _parse_acquired2d_string(payload: dict[str, Any]) -> np.ndarray:
"""
Parse the BCSz GetInstrumentAcquired2DString response into a 2-D array.
Parameters
----------
payload : dict[str, Any]
Response dict containing ``"Height"``, ``"Width"``, and ``"Data"`` keys.
``"Data"`` is a comma-separated string of integer pixel values.
Returns
-------
np.ndarray
Integer array of dtype ``int32`` and shape ``(height, width)``.
"""
height: int = int(payload["Height"])
width: int = int(payload["Width"])
tokens = [t for t in payload["Data"].split(",") if t.strip()]
return np.array(tokens, dtype=np.int32).reshape(height, width)
[docs]
class AreaDetector:
"""
Interface to the Axis Photonique 2-D area detector via BCSz.
Parameters
----------
conn : BCSz.BCSServer
Active BCSz server connection.
name : str, optional
Instrument name registered in BCSz, defaults to ``Axis Photonique``.
Examples
--------
>>> image = await bl.detector.acquire(exposure_seconds=0.1)
>>> print(image.shape)
(1024, 1024)
>>> quality = bl.detector.check_exposure(image)
>>> print(quality)
ExposureQuality(overexposed=False, underexposed=False, suggested_exposure_seconds=None)
>>> descriptor = bl.detector.describe()
>>> print(descriptor)
{'dtype': 'int32', 'source': 'detector', 'external': True, 'shape': [1024, 1024]}
"""
def __init__(self, conn: BCSz.BCSServer, *, name: str = AXIS_PHOTONIQUE) -> None:
self._conn = conn
self._name = name
self._last_shape: tuple[int, int] | None = None
[docs]
async def is_ready(self) -> bool:
"""
Return whether the instrument driver is running and the detector is ready.
Uses BCSz GetInstrumentDriverStatus for the configured instrument name.
Returns
-------
bool
True if the driver is running and the detector can accept acquisitions.
"""
res = await self._conn.get_instrument_driver_status(name=self._name)
return bool(res.get("running", False))
[docs]
async def setup(self, *, timeout: float = 30.0, poll_interval: float = 0.5) -> None:
"""
Ensure the detector driver is started and ready for acquisition.
Starts the instrument driver via BCSz if not already running, then
waits until is_ready() is True or timeout is reached.
Parameters
----------
timeout : float, optional
Maximum time in seconds to wait for the driver to become ready.
Default 30.0.
poll_interval : float, optional
Seconds between readiness checks. Default 0.5.
Raises
------
TimeoutError
If the driver did not report ready within timeout seconds.
"""
await self._conn.start_instrument_driver(name=self._name)
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while True:
if await self.is_ready():
return
if loop.time() >= deadline:
raise TimeoutError(
f"Detector {self._name!r} did not become ready within {timeout} s"
)
await asyncio.sleep(poll_interval)
[docs]
async def acquire(self, exposure_seconds: float) -> np.ndarray | None:
"""
Trigger an exposure and return the acquired image.
Parameters
----------
exposure_seconds : float
Integration time in seconds.
Returns
-------
np.ndarray or None
2-D ``int32`` array of shape ``(height, width)``, or ``None`` if
the acquisition reported failure.
Notes
-----
The shutter is detector-driven; no plan-level shutter wrapping is
required around this call.
"""
res = await self._conn.start_instrument_acquire(
name=self._name,
run_type="Exposure",
acq_time_s=exposure_seconds, # pyright: ignore[reportArgumentType]
)
if not res.get("success"):
return None
raw = await get_acquired2d_string(self._conn, self._name)
image = _parse_acquired2d_string(raw)
self._last_shape = (image.shape[0], image.shape[1])
return image
[docs]
def check_exposure(
self,
image: np.ndarray,
*,
over_threshold: int = int(2e5),
over_pixel_count: int = 500,
under_threshold: int = 50,
under_pixel_count: int = 950_000,
) -> ExposureQuality:
"""
Assess whether an image is over- or under-exposed.
Parameters
----------
image : np.ndarray
2-D detector image.
over_threshold : int, optional
Pixel value above which a pixel is considered saturated.
over_pixel_count : int, optional
Minimum number of saturated pixels required to flag overexposure.
under_threshold : int, optional
Pixel value below which a pixel is considered dark.
under_pixel_count : int, optional
Minimum number of dark pixels required to flag underexposure.
Returns
-------
ExposureQuality
Dataclass with ``overexposed``, ``underexposed``, and
``suggested_exposure_seconds`` fields.
Notes
-----
The pixel-count heuristic mirrors the sst-rsoxs GreatEyes thresholds
used for automated exposure quality decisions.
"""
overexposed = int(np.sum(image >= over_threshold)) >= over_pixel_count
underexposed = int(np.sum(image < under_threshold)) >= under_pixel_count
return ExposureQuality(
overexposed=overexposed,
underexposed=underexposed,
suggested_exposure_seconds=None,
)
[docs]
def describe(self) -> dict[str, Any]:
"""
Return a data_keys-compatible descriptor dict for use in RunWriter.open_stream.
Returns
-------
dict[str, Any]
Dictionary with keys ``dtype``, ``source``, ``external``, and
``shape``. ``shape`` reflects the last successfully acquired
image dimensions, or an empty list when no image has been
acquired yet.
"""
return {
"dtype": "int32",
"source": "detector",
"external": True,
"shape": list(self._last_shape) if self._last_shape is not None else [],
}