from __future__ import annotations
import io
from contextlib import redirect_stdout
from typing import TYPE_CHECKING
from bcs import BCSz
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
from resonance.api.core.ai import AIAccessor
from resonance.api.core.det import AreaDetector
from resonance.api.core.dio import DIOAccessor
from resonance.api.core.motors import MotorAccessor
from resonance.api.core.scan import ScanExecutor, ScanPlan
if TYPE_CHECKING:
import pandas as pd
from resonance.api.data.writer import RunWriter
[docs]
class Connection(BaseSettings):
"""
Connection settings loaded from environment variables.
Reads BCS_SERVER_ADDRESS and BCS_SERVER_PORT from the environment
or a .env file. Used by `Beamline.create()`.
Parameters
----------
addr : str
BCS server hostname or IP address (env: BCS_SERVER_ADDRESS).
Default: "localhost".
port : int
BCS server port (env: BCS_SERVER_PORT).
Default: 5577.
"""
addr: str = Field(default="localhost", alias="BCS_SERVER_ADDRESS")
port: int = Field(default=5577, alias="BCS_SERVER_PORT")
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
[docs]
class Beamline:
"""
High-level interface for beamline hardware control.
Composes a BCSz server connection with typed accessors for motors,
analog inputs, and digital I/O, plus a scan executor for running
DataFrame-defined scans. Does not subclass BCSz.
Parameters
----------
conn : BCSz.BCSServer
Connected BCS server instance. Use `Beamline.create()` to
construct with automatic connection from environment variables.
Attributes
----------
ai : AIAccessor
Read and acquire analog input channels.
motors : MotorAccessor
Read, move, and wait for motors.
dio : DIOAccessor
Read and set digital I/O channels (e.g. shutter).
Examples
--------
>>> bl = await Beamline.create()
>>> data = await bl.ai.trigger_and_read(["Photodiode"], acquisition_time=1.0)
>>> await bl.motors.set("Sample X", 10.5)
>>> results = await bl.scan_from_dataframe(scan_df, ai_channels=["Photodiode"])
"""
def __init__(self, conn: BCSz.BCSServer) -> None:
self._conn = conn
self.ai = AIAccessor(conn)
self.motors = MotorAccessor(conn)
self.dio = DIOAccessor(conn)
self.detector = AreaDetector(conn)
self._executor = ScanExecutor(conn)
# TODO: add optional detector setup (e.g. CCD warm-up, status check) here
# TODO: add future EPICS/Bluesky adapter hook when migrating from BCSz
[docs]
@classmethod
async def create(cls) -> Beamline:
"""
Create and connect a Beamline from environment variables.
Reads BCS_SERVER_ADDRESS and BCS_SERVER_PORT from the environment or
a .env file, creates a BCSz server, and connects.
Returns
-------
Beamline
A connected, ready-to-use Beamline instance.
Raises
------
ConnectionError
If the BCS server is unreachable or connection fails.
"""
config = Connection()
server = BCSz.BCSServer()
buff = io.StringIO()
with redirect_stdout(buff):
await server.connect(**config.model_dump())
return cls(server)
[docs]
async def scan_from_dataframe(
self,
df: pd.DataFrame,
ai_channels: list[str] | None = None,
default_delay: float = 0.1,
shutter: str = "Shutter Output",
motor_timeout: float = 30.0,
progress: bool = True,
actuate_every: bool = False,
writer: RunWriter | None = None,
with_detector: bool = False,
) -> pd.DataFrame:
"""
Execute a scan defined by a DataFrame.
Each row defines one scan point: motor columns set motor positions,
and an optional exposure column sets per-point acquisition time.
Parameters
----------
df : pd.DataFrame
Scan definition. Motor columns must match valid motor names.
An optional "exposure" (or "exp", "count_time") column sets
per-point acquisition time.
ai_channels : list[str] or None, optional
AI channels to acquire at each point. If None, uses
["Photodiode", "TEY signal", "AI 3 Izero"].
default_delay : float, optional
Settle delay after each motor move in seconds (default: 0.1).
shutter : str, optional
DIO channel name for the shutter (default: "Shutter Output").
motor_timeout : float, optional
Maximum wait time for motor moves in seconds (default: 30.0).
progress : bool, optional
Show a tqdm progress bar (default: True).
actuate_every : bool, optional
If True, open/close the shutter per point. If False (default),
open the shutter once for the entire scan.
writer : RunWriter or None, optional
If provided, scan data are persisted to the beamtime SQLite database
via the writer. The caller is responsible for constructing and
opening the writer before passing it here.
with_detector : bool, optional
If True, acquire a 2D detector image at each scan point using the
beamline's AreaDetector. Requires a writer for image persistence.
Shutter actuation is hardware-driven (default: False).
Returns
-------
pd.DataFrame
Results with columns: motor_position, channel_mean, channel_std,
exposure, timestamp per row.
Notes
-----
To abort a running scan from another Jupyter cell, call
``await bl.abort_scan()`` while the scan task is running.
"""
scan_plan = ScanPlan.from_dataframe(
df,
ai_channels=ai_channels,
default_delay=default_delay,
shutter=shutter,
actuate_every=actuate_every,
)
return await self._executor.execute_scan(
scan_plan,
progress=progress,
writer=writer,
detector=self.detector if with_detector else None,
)
[docs]
async def abort_scan(self) -> None:
"""
Request an abort of the currently running scan.
Sets the internal abort flag. The scan stops after the current
point completes and returns partial results.
Notes
-----
Safe to call even when no scan is running (no-op).
From Jupyter: run the scan as an asyncio.Task and call this
method from another cell while it executes.
Programmatic: call from any async context.
"""
await self._executor.abort()
@property
def is_scanning(self) -> bool:
"""
Whether a scan is currently running.
Returns
-------
bool
True if a scan is in progress, False otherwise.
"""
return self._executor.current_scan is not None