Source code for resonance.api.core.motors

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, get_args

from resonance.api.core.primitives import AbortFlag, wait_for_motors
from resonance.api.types import (
    Command,
    Motor,
    MotorError,
    MotorTimeoutError,
    ScanAbortedError,
)

if TYPE_CHECKING:
    from bcs import BCSz

__all__ = [
    "MotorState",
    "MotorAccessor",
    "MotorError",
    "MotorTimeoutError",
    "ScanAbortedError",
]

_valid_motors: tuple[str, ...] = get_args(Motor.__value__)
_valid_commands: tuple[str, ...] = get_args(Command.__value__)


[docs] @dataclass class MotorState: """ Snapshot of a single motor's state from BCS. Parameters ---------- position : float Current encoder position reported by BCS. goal : float Commanded target position. status : int Raw BCS status code for the motor. time : float Timestamp of the reading as reported by BCS. """ position: float goal: float status: int # TODO: replace with a MotorStatus enum once BCSz MotorStatus values are documented time: float
[docs] class MotorAccessor: """ High-level async interface for reading and commanding BCS motors. Wraps the BCSz.BCSServer motor API with name validation, typed return values, and cooperative cancellation support. Parameters ---------- conn : BCSz.BCSServer Active BCS server connection. Examples -------- >>> motors = MotorAccessor(server) >>> state = await motors.read("Sample X", "Sample Y") >>> await motors.set("Sample X", 10.0) >>> await motors.wait(["Sample X"]) """ def __init__(self, conn: BCSz.BCSServer) -> None: self._conn = conn def _validate_motors(self, names: tuple[str, ...] | list[str]) -> None: invalid = [n for n in names if n not in _valid_motors] if invalid: raise KeyError( f"Unknown motor name(s): {invalid}. " f"Valid motors are defined in resonance.api.types.Motor." ) def _validate_command(self, cmd: str) -> None: if cmd not in _valid_commands: raise ValueError( f"Unknown command: {cmd!r}. " f"Valid commands are defined in resonance.api.types.Command." )
[docs] async def read(self, *names: str) -> dict[str, MotorState]: """ Read the current state of one or more motors. Parameters ---------- *names : str One or more motor names to query. Returns ------- dict[str, MotorState] Mapping of motor name to its current MotorState snapshot. Raises ------ KeyError If any name is not in the valid motor list. Examples -------- >>> states = await accessor.read("Sample X", "Sample Y") >>> print(states["Sample X"].position) """ self._validate_motors(names) response = await self._conn.get_motor(motors=list(names)) return { entry["motor"]: MotorState( position=entry["position"], goal=entry["goal"], status=entry["status"], time=entry["time"], ) for entry in response["data"] }
[docs] async def set( self, name: str, value: float, *, command: Command = "Backlash Move", ) -> None: """ Command a single motor to a target position. Parameters ---------- name : str Motor name to move. value : float Target position. command : Command, optional BCS command string, by default "Backlash Move". Returns ------- None Raises ------ KeyError If `name` is not a valid motor. ValueError If `command` is not a valid BCS command string. Examples -------- >>> await accessor.set("Sample X", 5.0) >>> await accessor.set("Sample X", 5.0, command="Normal Move") """ self._validate_motors((name,)) self._validate_command(command) await self._conn.command_motor( commands=[command], motors=[name], goals=[value], )
[docs] async def set_many( self, targets: dict[str, float], *, command: Command = "Backlash Move", ) -> None: """ Command multiple motors to target positions simultaneously. Parameters ---------- targets : dict[str, float] Mapping of motor name to target position. command : Command, optional BCS command string applied to all motors, by default "Backlash Move". Returns ------- None Raises ------ KeyError If any motor name in `targets` is not valid. ValueError If `command` is not a valid BCS command string. Examples -------- >>> await accessor.set_many({"Sample X": 5.0, "Sample Y": -2.5}) """ names = list(targets.keys()) self._validate_motors(names) self._validate_command(command) await self._conn.command_motor( commands=[command] * len(names), motors=names, goals=list(targets.values()), )
[docs] async def wait( self, names: list[str], timeout: float = 30.0, abort: AbortFlag | None = None, ) -> None: """ Wait for one or more motors to reach their target positions. Parameters ---------- names : list[str] Motor names to wait on. timeout : float, optional Maximum seconds to wait before raising MotorTimeoutError, by default 30.0. abort : AbortFlag or None, optional If provided, the wait will raise ScanAbortedError when the flag is set. Returns ------- None Raises ------ KeyError If any name is not a valid motor. MotorTimeoutError If any motor does not reach its target within `timeout` seconds. ScanAbortedError If `abort` flag is set during the wait. Notes ----- Delegates to `wait_for_motors` from `resonance.api.core.primitives`. Pass an `AbortFlag` to enable cooperative cancellation from Jupyter or programmatic interfaces: set the flag from another task/cell to stop waiting and raise `ScanAbortedError`. Examples -------- >>> flag = AbortFlag() >>> await accessor.set_many({"Sample X": 10.0, "Sample Y": 5.0}) >>> await accessor.wait(["Sample X", "Sample Y"], timeout=60.0, abort=flag) """ # TODO: expose check_interval as a parameter for fine-grained polling control self._validate_motors(names) await wait_for_motors( server=self._conn, motors=names, timeout=timeout, abort_flag=abort, )