resonance.api.core.ai

resonance.api.core.ai

AIAccessor(conn)

Accessor for analog input channels via the BCS server.

Parameters:

Name Type Description Default
conn BCSServer

Active connection to the BCS hardware server.

required
Notes

Wraps acquire_data and get_acquired_array for typed, validated access to AI channels defined in resonance.api.types.AI.

Examples:

>>> data = await bl.ai.read("Photodiode", "TEY signal", "AI 3 Izero")
>>> print(data)
{'Photodiode': [0.0], 'TEY signal': [0.0], 'AI 3 Izero': [0.0]}
Source code in src/resonance/api/core/ai.py
def __init__(self, conn: BCSz.BCSServer) -> None:
    self._conn = conn

read(*channels) async

Return the last-acquired raw array for each channel.

Parameters:

Name Type Description Default
*channels str

One or more AI channel names from resonance.api.types.AI.

()

Returns:

Type Description
dict[str, list[float]]

Mapping of channel name to raw sample array.

Raises:

Type Description
KeyError

If any channel name is not a valid AI channel.

AcquisitionError

If the BCS response contains an empty data array for a channel.

Notes

Does not trigger a new acquisition. Returns the most recent data buffered by BCS. Call trigger_and_read to acquire fresh data. Use read only when data was already acquired (e.g. after acquire_data was called externally).

Source code in src/resonance/api/core/ai.py
async def read(self, *channels: str) -> dict[str, list[float]]:
    """Return the last-acquired raw array for each channel.

    Parameters
    ----------
    *channels : str
        One or more AI channel names from `resonance.api.types.AI`.

    Returns
    -------
    dict[str, list[float]]
        Mapping of channel name to raw sample array.

    Raises
    ------
    KeyError
        If any channel name is not a valid AI channel.
    AcquisitionError
        If the BCS response contains an empty data array for a channel.

    Notes
    -----
    Does not trigger a new acquisition. Returns the most recent data
    buffered by BCS. Call `trigger_and_read` to acquire fresh data.
    Use `read` only when data was already acquired (e.g. after `acquire_data`
    was called externally).
    """
    invalid = [c for c in channels if c not in _AI_CHANNELS]
    if invalid:
        raise KeyError(
            f"Invalid AI channel(s): {invalid}. Valid channels: {list(_AI_CHANNELS)}"
        )

    response: dict = await self._conn.get_acquired_array(chans=list(channels))
    result: dict[str, list[float]] = {}
    for entry in response["chans"]:
        name: str = entry["chan"]
        data: list[float] = entry["data"]
        if not data:
            raise AcquisitionError(f"Empty data returned for channel '{name}'")
        result[name] = data
    return result

trigger_and_read(channels, acquisition_time=1.0) async

Trigger acquisition and return mean and standard error per channel.

Parameters:

Name Type Description Default
channels list[str]

AI channel names from resonance.api.types.AI.

required
acquisition_time float

Duration of acquisition in seconds. Must be positive. Default is 1.0.

1.0

Returns:

Type Description
dict[str, Variable]

Mapping of channel name to ufloat(mean, std_err).

Raises:

Type Description
KeyError

If any channel name is not a valid AI channel.

ValueError

If acquisition_time is not strictly positive.

AcquisitionError

If the BCS response contains an empty data array for a channel.

Notes

Performs a blocking acquisition of acquisition_time seconds. Use per scan point to get mean and standard error for each channel. Standard error is computed as std_dev / sqrt(N). For N=1, std_err is 0.

Examples:

>>> data = await bl.ai.trigger_and_read(["Photodiode", "TEY signal"], acquisition_time=1.0)
>>> print(data["Photodiode"])  # ufloat(mean, std_err)
Source code in src/resonance/api/core/ai.py
async def trigger_and_read(
    self,
    channels: list[str],
    acquisition_time: float = 1.0,
) -> dict[str, Variable]:
    """Trigger acquisition and return mean and standard error per channel.

    Parameters
    ----------
    channels : list[str]
        AI channel names from `resonance.api.types.AI`.
    acquisition_time : float, optional
        Duration of acquisition in seconds. Must be positive. Default is 1.0.

    Returns
    -------
    dict[str, Variable]
        Mapping of channel name to `ufloat(mean, std_err)`.

    Raises
    ------
    KeyError
        If any channel name is not a valid AI channel.
    ValueError
        If `acquisition_time` is not strictly positive.
    AcquisitionError
        If the BCS response contains an empty data array for a channel.

    Notes
    -----
    Performs a blocking acquisition of `acquisition_time` seconds.
    Use per scan point to get mean and standard error for each channel.
    Standard error is computed as std_dev / sqrt(N). For N=1, std_err is 0.

    Examples
    --------
    >>> data = await bl.ai.trigger_and_read(["Photodiode", "TEY signal"], acquisition_time=1.0)
    >>> print(data["Photodiode"])  # ufloat(mean, std_err)
    """
    invalid = [c for c in channels if c not in _AI_CHANNELS]
    if invalid:
        raise KeyError(
            f"Invalid AI channel(s): {invalid}. Valid channels: {list(_AI_CHANNELS)}"
        )
    if acquisition_time <= 0:
        raise ValueError(
            f"acquisition_time must be positive, got {acquisition_time}"
        )

    await self._conn.acquire_data(chans=channels, time=acquisition_time)  # pyright: ignore[reportArgumentType]
    response: dict = await self._conn.get_acquired_array(chans=channels)

    # TODO: add optional return of raw arrays for debugging or downstream processing
    result: dict[str, Variable] = {}
    for entry in response["chans"]:
        name: str = entry["chan"]
        data: list[float] = entry["data"]
        if not data:
            raise AcquisitionError(f"Empty data returned for channel '{name}'")
        arr = np.asarray(data, dtype=float)
        mean = float(np.nanmean(arr))
        std_err = (
            float(np.nanstd(arr, ddof=1) / np.sqrt(len(arr)))
            if len(arr) > 1
            else 0.0
        )
        result[name] = ufloat(mean, std_err)
    return result