resonance.api.core.beamline

resonance.api.core.beamline

Connection

Bases: BaseSettings

Connection settings loaded from environment variables.

Reads BCS_SERVER_ADDRESS and BCS_SERVER_PORT from the environment or a .env file. Used by Beamline.create().

Parameters:

Name Type Description Default
addr str

BCS server hostname or IP address (env: BCS_SERVER_ADDRESS). Default: "localhost".

required
port int

BCS server port (env: BCS_SERVER_PORT). Default: 5577.

required

Beamline(conn)

High-level interface for beamline hardware control.

Composes a BCSz server connection with typed accessors for motors, analog inputs, and digital I/O, plus a scan executor for running DataFrame-defined scans. Does not subclass BCSz.

Parameters:

Name Type Description Default
conn BCSServer

Connected BCS server instance. Use Beamline.create() to construct with automatic connection from environment variables.

required

Attributes:

Name Type Description
ai AIAccessor

Read and acquire analog input channels.

motors MotorAccessor

Read, move, and wait for motors.

dio DIOAccessor

Read and set digital I/O channels (e.g. shutter).

Examples:

>>> bl = await Beamline.create()
>>> data = await bl.ai.trigger_and_read(["Photodiode"], acquisition_time=1.0)
>>> await bl.motors.set("Sample X", 10.5)
>>> results = await bl.scan_from_dataframe(scan_df, ai_channels=["Photodiode"])
Source code in src/resonance/api/core/beamline.py
def __init__(self, conn: BCSz.BCSServer) -> None:
    self._conn = conn
    self.ai = AIAccessor(conn)
    self.motors = MotorAccessor(conn)
    self.dio = DIOAccessor(conn)
    self.detector = AreaDetector(conn)
    self._executor = ScanExecutor(conn)

is_scanning property

Whether a scan is currently running.

Returns:

Type Description
bool

True if a scan is in progress, False otherwise.

create() async classmethod

Create and connect a Beamline from environment variables.

Reads BCS_SERVER_ADDRESS and BCS_SERVER_PORT from the environment or a .env file, creates a BCSz server, and connects.

Returns:

Type Description
Beamline

A connected, ready-to-use Beamline instance.

Raises:

Type Description
ConnectionError

If the BCS server is unreachable or connection fails.

Source code in src/resonance/api/core/beamline.py
@classmethod
async def create(cls) -> Beamline:
    """
    Create and connect a Beamline from environment variables.

    Reads BCS_SERVER_ADDRESS and BCS_SERVER_PORT from the environment or
    a .env file, creates a BCSz server, and connects.

    Returns
    -------
    Beamline
        A connected, ready-to-use Beamline instance.

    Raises
    ------
    ConnectionError
        If the BCS server is unreachable or connection fails.
    """
    config = Connection()
    server = BCSz.BCSServer()
    buff = io.StringIO()
    with redirect_stdout(buff):
        await server.connect(**config.model_dump())
    return cls(server)

scan_from_dataframe(df, ai_channels=None, default_delay=0.1, shutter='Shutter Output', motor_timeout=30.0, progress=True, actuate_every=False, writer=None, with_detector=False) async

Execute a scan defined by a DataFrame.

Each row defines one scan point: motor columns set motor positions, and an optional exposure column sets per-point acquisition time.

Parameters:

Name Type Description Default
df DataFrame

Scan definition. Motor columns must match valid motor names. An optional "exposure" (or "exp", "count_time") column sets per-point acquisition time.

required
ai_channels list[str] or None

AI channels to acquire at each point. If None, uses ["Photodiode", "TEY signal", "AI 3 Izero"].

None
default_delay float

Settle delay after each motor move in seconds (default: 0.1).

0.1
shutter str

DIO channel name for the shutter (default: "Shutter Output").

'Shutter Output'
motor_timeout float

Maximum wait time for motor moves in seconds (default: 30.0).

30.0
progress bool

Show a tqdm progress bar (default: True).

True
actuate_every bool

If True, open/close the shutter per point. If False (default), open the shutter once for the entire scan.

False
writer RunWriter or None

If provided, scan data are persisted to the beamtime SQLite database via the writer. The caller is responsible for constructing and opening the writer before passing it here.

None
with_detector bool

If True, acquire a 2D detector image at each scan point using the beamline's AreaDetector. Requires a writer for image persistence. Shutter actuation is hardware-driven (default: False).

False

Returns:

Type Description
DataFrame

Results with columns: motor_position, channel_mean, channel_std, exposure, timestamp per row.

Notes

To abort a running scan from another Jupyter cell, call await bl.abort_scan() while the scan task is running.

Source code in src/resonance/api/core/beamline.py
async def scan_from_dataframe(
    self,
    df: pd.DataFrame,
    ai_channels: list[str] | None = None,
    default_delay: float = 0.1,
    shutter: str = "Shutter Output",
    motor_timeout: float = 30.0,
    progress: bool = True,
    actuate_every: bool = False,
    writer: RunWriter | None = None,
    with_detector: bool = False,
) -> pd.DataFrame:
    """
    Execute a scan defined by a DataFrame.

    Each row defines one scan point: motor columns set motor positions,
    and an optional exposure column sets per-point acquisition time.

    Parameters
    ----------
    df : pd.DataFrame
        Scan definition. Motor columns must match valid motor names.
        An optional "exposure" (or "exp", "count_time") column sets
        per-point acquisition time.
    ai_channels : list[str] or None, optional
        AI channels to acquire at each point. If None, uses
        ["Photodiode", "TEY signal", "AI 3 Izero"].
    default_delay : float, optional
        Settle delay after each motor move in seconds (default: 0.1).
    shutter : str, optional
        DIO channel name for the shutter (default: "Shutter Output").
    motor_timeout : float, optional
        Maximum wait time for motor moves in seconds (default: 30.0).
    progress : bool, optional
        Show a tqdm progress bar (default: True).
    actuate_every : bool, optional
        If True, open/close the shutter per point. If False (default),
        open the shutter once for the entire scan.
    writer : RunWriter or None, optional
        If provided, scan data are persisted to the beamtime SQLite database
        via the writer. The caller is responsible for constructing and
        opening the writer before passing it here.
    with_detector : bool, optional
        If True, acquire a 2D detector image at each scan point using the
        beamline's AreaDetector. Requires a writer for image persistence.
        Shutter actuation is hardware-driven (default: False).

    Returns
    -------
    pd.DataFrame
        Results with columns: motor_position, channel_mean, channel_std,
        exposure, timestamp per row.

    Notes
    -----
    To abort a running scan from another Jupyter cell, call
    ``await bl.abort_scan()`` while the scan task is running.
    """
    scan_plan = ScanPlan.from_dataframe(
        df,
        ai_channels=ai_channels,
        default_delay=default_delay,
        shutter=shutter,
        actuate_every=actuate_every,
    )
    return await self._executor.execute_scan(
        scan_plan,
        progress=progress,
        writer=writer,
        detector=self.detector if with_detector else None,
    )

abort_scan() async

Request an abort of the currently running scan.

Sets the internal abort flag. The scan stops after the current point completes and returns partial results.

Notes

Safe to call even when no scan is running (no-op). From Jupyter: run the scan as an asyncio.Task and call this method from another cell while it executes. Programmatic: call from any async context.

Source code in src/resonance/api/core/beamline.py
async def abort_scan(self) -> None:
    """
    Request an abort of the currently running scan.

    Sets the internal abort flag. The scan stops after the current
    point completes and returns partial results.

    Notes
    -----
    Safe to call even when no scan is running (no-op).
    From Jupyter: run the scan as an asyncio.Task and call this
    method from another cell while it executes.
    Programmatic: call from any async context.
    """
    await self._executor.abort()