resonance.api.core.motors

resonance.api.core.motors

MotorError

Bases: RsoxsError

Motor operation failed

MotorTimeoutError

Bases: MotorError

Motor move did not complete in time

ScanAbortedError

Bases: RsoxsError

Scan was aborted by user

MotorState(position, goal, status, time) dataclass

Snapshot of a single motor's state from BCS.

Parameters:

Name Type Description Default
position float

Current encoder position reported by BCS.

required
goal float

Commanded target position.

required
status int

Raw BCS status code for the motor.

required
time float

Timestamp of the reading as reported by BCS.

required

MotorAccessor(conn)

High-level async interface for reading and commanding BCS motors.

Wraps the BCSz.BCSServer motor API with name validation, typed return values, and cooperative cancellation support.

Parameters:

Name Type Description Default
conn BCSServer

Active BCS server connection.

required

Examples:

>>> motors = MotorAccessor(server)
>>> state = await motors.read("Sample X", "Sample Y")
>>> await motors.set("Sample X", 10.0)
>>> await motors.wait(["Sample X"])
Source code in src/resonance/api/core/motors.py
def __init__(self, conn: BCSz.BCSServer) -> None:
    self._conn = conn

read(*names) async

Read the current state of one or more motors.

Parameters:

Name Type Description Default
*names str

One or more motor names to query.

()

Returns:

Type Description
dict[str, MotorState]

Mapping of motor name to its current MotorState snapshot.

Raises:

Type Description
KeyError

If any name is not in the valid motor list.

Examples:

>>> states = await accessor.read("Sample X", "Sample Y")
>>> print(states["Sample X"].position)
Source code in src/resonance/api/core/motors.py
async def read(self, *names: str) -> dict[str, MotorState]:
    """
    Read the current state of one or more motors.

    Parameters
    ----------
    *names : str
        One or more motor names to query.

    Returns
    -------
    dict[str, MotorState]
        Mapping of motor name to its current MotorState snapshot.

    Raises
    ------
    KeyError
        If any name is not in the valid motor list.

    Examples
    --------
    >>> states = await accessor.read("Sample X", "Sample Y")
    >>> print(states["Sample X"].position)
    """
    self._validate_motors(names)
    response = await self._conn.get_motor(motors=list(names))
    return {
        entry["motor"]: MotorState(
            position=entry["position"],
            goal=entry["goal"],
            status=entry["status"],
            time=entry["time"],
        )
        for entry in response["data"]
    }

set(name, value, *, command='Backlash Move') async

Command a single motor to a target position.

Parameters:

Name Type Description Default
name str

Motor name to move.

required
value float

Target position.

required
command Command

BCS command string, by default "Backlash Move".

'Backlash Move'

Returns:

Type Description
None

Raises:

Type Description
KeyError

If name is not a valid motor.

ValueError

If command is not a valid BCS command string.

Examples:

>>> await accessor.set("Sample X", 5.0)
>>> await accessor.set("Sample X", 5.0, command="Normal Move")
Source code in src/resonance/api/core/motors.py
async def set(
    self,
    name: str,
    value: float,
    *,
    command: Command = "Backlash Move",
) -> None:
    """
    Command a single motor to a target position.

    Parameters
    ----------
    name : str
        Motor name to move.
    value : float
        Target position.
    command : Command, optional
        BCS command string, by default "Backlash Move".

    Returns
    -------
    None

    Raises
    ------
    KeyError
        If `name` is not a valid motor.
    ValueError
        If `command` is not a valid BCS command string.

    Examples
    --------
    >>> await accessor.set("Sample X", 5.0)
    >>> await accessor.set("Sample X", 5.0, command="Normal Move")
    """
    self._validate_motors((name,))
    self._validate_command(command)
    await self._conn.command_motor(
        commands=[command],
        motors=[name],
        goals=[value],
    )

set_many(targets, *, command='Backlash Move') async

Command multiple motors to target positions simultaneously.

Parameters:

Name Type Description Default
targets dict[str, float]

Mapping of motor name to target position.

required
command Command

BCS command string applied to all motors, by default "Backlash Move".

'Backlash Move'

Returns:

Type Description
None

Raises:

Type Description
KeyError

If any motor name in targets is not valid.

ValueError

If command is not a valid BCS command string.

Examples:

>>> await accessor.set_many({"Sample X": 5.0, "Sample Y": -2.5})
Source code in src/resonance/api/core/motors.py
async def set_many(
    self,
    targets: dict[str, float],
    *,
    command: Command = "Backlash Move",
) -> None:
    """
    Command multiple motors to target positions simultaneously.

    Parameters
    ----------
    targets : dict[str, float]
        Mapping of motor name to target position.
    command : Command, optional
        BCS command string applied to all motors, by default "Backlash Move".

    Returns
    -------
    None

    Raises
    ------
    KeyError
        If any motor name in `targets` is not valid.
    ValueError
        If `command` is not a valid BCS command string.

    Examples
    --------
    >>> await accessor.set_many({"Sample X": 5.0, "Sample Y": -2.5})
    """
    names = list(targets.keys())
    self._validate_motors(names)
    self._validate_command(command)
    await self._conn.command_motor(
        commands=[command] * len(names),
        motors=names,
        goals=list(targets.values()),
    )

wait(names, timeout=30.0, abort=None) async

Wait for one or more motors to reach their target positions.

Parameters:

Name Type Description Default
names list[str]

Motor names to wait on.

required
timeout float

Maximum seconds to wait before raising MotorTimeoutError, by default 30.0.

30.0
abort AbortFlag or None

If provided, the wait will raise ScanAbortedError when the flag is set.

None

Returns:

Type Description
None

Raises:

Type Description
KeyError

If any name is not a valid motor.

MotorTimeoutError

If any motor does not reach its target within timeout seconds.

ScanAbortedError

If abort flag is set during the wait.

Notes

Delegates to wait_for_motors from resonance.api.core.primitives. Pass an AbortFlag to enable cooperative cancellation from Jupyter or programmatic interfaces: set the flag from another task/cell to stop waiting and raise ScanAbortedError.

Examples:

>>> flag = AbortFlag()
>>> await accessor.set_many({"Sample X": 10.0, "Sample Y": 5.0})
>>> await accessor.wait(["Sample X", "Sample Y"], timeout=60.0, abort=flag)
Source code in src/resonance/api/core/motors.py
async def wait(
    self,
    names: list[str],
    timeout: float = 30.0,
    abort: AbortFlag | None = None,
) -> None:
    """
    Wait for one or more motors to reach their target positions.

    Parameters
    ----------
    names : list[str]
        Motor names to wait on.
    timeout : float, optional
        Maximum seconds to wait before raising MotorTimeoutError, by default 30.0.
    abort : AbortFlag or None, optional
        If provided, the wait will raise ScanAbortedError when the flag is set.

    Returns
    -------
    None

    Raises
    ------
    KeyError
        If any name is not a valid motor.
    MotorTimeoutError
        If any motor does not reach its target within `timeout` seconds.
    ScanAbortedError
        If `abort` flag is set during the wait.

    Notes
    -----
    Delegates to `wait_for_motors` from `resonance.api.core.primitives`.
    Pass an `AbortFlag` to enable cooperative cancellation from Jupyter
    or programmatic interfaces: set the flag from another task/cell to
    stop waiting and raise `ScanAbortedError`.

    Examples
    --------
    >>> flag = AbortFlag()
    >>> await accessor.set_many({"Sample X": 10.0, "Sample Y": 5.0})
    >>> await accessor.wait(["Sample X", "Sample Y"], timeout=60.0, abort=flag)
    """
    # TODO: expose check_interval as a parameter for fine-grained polling control
    self._validate_motors(names)
    await wait_for_motors(
        server=self._conn,
        motors=names,
        timeout=timeout,
        abort_flag=abort,
    )