Source code for resonance.api.core.ai
from __future__ import annotations
from typing import TYPE_CHECKING, get_args
import numpy as np
from uncertainties import Variable, ufloat
from resonance.api.types import AI, AcquisitionError
if TYPE_CHECKING:
from bcs import BCSz
_AI_CHANNELS: tuple[str, ...] = get_args(AI.__value__)
[docs]
class AIAccessor:
"""Accessor for analog input channels via the BCS server.
Parameters
----------
conn : BCSz.BCSServer
Active connection to the BCS hardware server.
Notes
-----
Wraps `acquire_data` and `get_acquired_array` for typed, validated
access to AI channels defined in `resonance.api.types.AI`.
Examples
--------
>>> data = await bl.ai.read("Photodiode", "TEY signal", "AI 3 Izero")
>>> print(data)
{'Photodiode': [0.0], 'TEY signal': [0.0], 'AI 3 Izero': [0.0]}
"""
def __init__(self, conn: BCSz.BCSServer) -> None:
self._conn = conn
[docs]
async def read(self, *channels: str) -> dict[str, list[float]]:
"""Return the last-acquired raw array for each channel.
Parameters
----------
*channels : str
One or more AI channel names from `resonance.api.types.AI`.
Returns
-------
dict[str, list[float]]
Mapping of channel name to raw sample array.
Raises
------
KeyError
If any channel name is not a valid AI channel.
AcquisitionError
If the BCS response contains an empty data array for a channel.
Notes
-----
Does not trigger a new acquisition. Returns the most recent data
buffered by BCS. Call `trigger_and_read` to acquire fresh data.
Use `read` only when data was already acquired (e.g. after `acquire_data`
was called externally).
"""
invalid = [c for c in channels if c not in _AI_CHANNELS]
if invalid:
raise KeyError(
f"Invalid AI channel(s): {invalid}. Valid channels: {list(_AI_CHANNELS)}"
)
response: dict = await self._conn.get_acquired_array(chans=list(channels))
result: dict[str, list[float]] = {}
for entry in response["chans"]:
name: str = entry["chan"]
data: list[float] = entry["data"]
if not data:
raise AcquisitionError(f"Empty data returned for channel '{name}'")
result[name] = data
return result
[docs]
async def trigger_and_read(
self,
channels: list[str],
acquisition_time: float = 1.0,
) -> dict[str, Variable]:
"""Trigger acquisition and return mean and standard error per channel.
Parameters
----------
channels : list[str]
AI channel names from `resonance.api.types.AI`.
acquisition_time : float, optional
Duration of acquisition in seconds. Must be positive. Default is 1.0.
Returns
-------
dict[str, Variable]
Mapping of channel name to `ufloat(mean, std_err)`.
Raises
------
KeyError
If any channel name is not a valid AI channel.
ValueError
If `acquisition_time` is not strictly positive.
AcquisitionError
If the BCS response contains an empty data array for a channel.
Notes
-----
Performs a blocking acquisition of `acquisition_time` seconds.
Use per scan point to get mean and standard error for each channel.
Standard error is computed as std_dev / sqrt(N). For N=1, std_err is 0.
Examples
--------
>>> data = await bl.ai.trigger_and_read(["Photodiode", "TEY signal"], acquisition_time=1.0)
>>> print(data["Photodiode"]) # ufloat(mean, std_err)
"""
invalid = [c for c in channels if c not in _AI_CHANNELS]
if invalid:
raise KeyError(
f"Invalid AI channel(s): {invalid}. Valid channels: {list(_AI_CHANNELS)}"
)
if acquisition_time <= 0:
raise ValueError(
f"acquisition_time must be positive, got {acquisition_time}"
)
await self._conn.acquire_data(chans=channels, time=acquisition_time) # pyright: ignore[reportArgumentType]
response: dict = await self._conn.get_acquired_array(chans=channels)
# TODO: add optional return of raw arrays for debugging or downstream processing
result: dict[str, Variable] = {}
for entry in response["chans"]:
name: str = entry["chan"]
data: list[float] = entry["data"]
if not data:
raise AcquisitionError(f"Empty data returned for channel '{name}'")
arr = np.asarray(data, dtype=float)
mean = float(np.nanmean(arr))
std_err = (
float(np.nanstd(arr, ddof=1) / np.sqrt(len(arr)))
if len(arr) > 1
else 0.0
)
result[name] = ufloat(mean, std_err)
return result