Source code for resonance.api.core.scan

"""Scan orchestration and execution using core primitives."""

from __future__ import annotations

import asyncio
import time
from typing import TYPE_CHECKING, Any

try:
    from tqdm.asyncio import tqdm as _tqdm

    _TQDM_AVAILABLE = True
except ImportError:
    _TQDM_AVAILABLE = False

import numpy as np
import pandas as pd
from uncertainties import ufloat

from resonance.api.core.primitives import (
    AbortFlag,
    motor_move,
    shutter_control,
    wait_for_settle,
)
from resonance.api.header_map import normalize_header
from resonance.api.types import ScanAbortedError, ScanPoint, ScanResult
from resonance.api.validation import validate_scan_dataframe

if TYPE_CHECKING:
    from collections.abc import Iterator

    from bcs import BCSz
    from uncertainties import Variable

    from resonance.api.core.det import AreaDetector
    from resonance.api.data.writer import RunWriter


[docs] class ScanPlan: """ Validated scan plan constructed from a list of points or a DataFrame. Parameters ---------- points : list[ScanPoint] Ordered sequence of scan points. motor_names : list[str] Names of all motors referenced in the scan. ai_channels : list[str] Analog input channel names to acquire at each point. shutter : str DIO channel name of the light shutter. actuate_every : bool If True, the shutter is opened and closed per point. If False, the shutter is opened once for the entire scan. Examples -------- >>> df = pd.DataFrame({ ... "Sample X": [0, 10, 20], ... "Sample Y": [0, 0, 0], ... "exposure": [0.1, 0.1, 0.1], ... }) >>> scan_plan = ScanPlan.from_dataframe(df, ai_channels=["Photodiode"]) """ def __init__( self, points: list[ScanPoint], motor_names: list[str], ai_channels: list[str], shutter: str = "Light Output", actuate_every: bool = False, ) -> None: self.points = points self.motor_names = motor_names self.ai_channels = ai_channels self.shutter = shutter self.actuate_every = actuate_every
[docs] @classmethod def from_dataframe( cls, df: pd.DataFrame, ai_channels: list[str] | None = None, default_exposure: float = 1.0, default_delay: float = 0.2, shutter: str = "Light Output", actuate_every: bool = False, ) -> ScanPlan: """ Build a validated scan plan from a DataFrame. Each row becomes one `ScanPoint`. Motor columns are detected via `validate_scan_dataframe`; an optional exposure column is also detected. Parameters ---------- df : pd.DataFrame DataFrame whose columns are motor names and optionally an exposure time column. ai_channels : list[str] or None Channels to acquire at each point. If None, a beamline default set is used. default_exposure : float Exposure time in seconds applied when the DataFrame has no exposure column. default_delay : float Settle delay in seconds after each motor move. shutter : str DIO channel name of the light shutter. actuate_every : bool Per-point shutter mode. Returns ------- ScanPlan Fully validated scan plan. Raises ------ ValidationError If the DataFrame fails structural or value validation. """ motor_cols, exposure_col = validate_scan_dataframe(df) # TODO: make default AI channels configurable at the Beamline level if ai_channels is None: ai_channels = ["Photodiode", "TEY signal", "AI 3 Izero"] points: list[ScanPoint] = [] for idx, row in df.iterrows(): motors = {col: float(row[col]) for col in motor_cols} exposure = float(row[exposure_col]) if exposure_col else default_exposure point = ScanPoint( index=int(idx), # type: ignore[arg-type] motors=motors, exposure_time=exposure, ai_channels=ai_channels, delay_after_move=default_delay, ) point.validate() points.append(point) return cls( points=points, motor_names=motor_cols, ai_channels=ai_channels, shutter=shutter, actuate_every=actuate_every, )
[docs] def estimated_duration_seconds( self, motor_time: float = 0.1, api_time: float = 0.5, ) -> float: """ Compute estimated total scan duration. Per-point cost: motor_time + api_time + exposure_time + delay_after_move. Parameters ---------- motor_time : float Expected motor-move overhead per point in seconds. api_time : float Expected API round-trip overhead per point in seconds. Returns ------- float Total estimated duration in seconds. Notes ----- Does not account for shutter actuation overhead when ``actuate_every=True``. """ # TODO: account for shutter actuation time when actuate_every=True return sum( motor_time + api_time + p.exposure_time + p.delay_after_move for p in self.points )
[docs] def describe( self, motor_time: float = 0.1, api_time: float = 0.5, ) -> None: """ Print a human-readable summary of the scan plan. Displays point count, unique motor values, and estimated duration. Parameters ---------- motor_time : float Motor-move overhead per point in seconds used for the estimate. api_time : float API overhead per point in seconds used for the estimate. """ lines: list[str] = [ f"Scan plan: {len(self.points)} points, {len(self.motor_names)} motors", "", "Unique values:", ] for m in self.motor_names: n = len({p.motors[m] for p in self.points}) lines.append(f" {m}: {n}") lines.append(f" exposure: {len({p.exposure_time for p in self.points})}") delay_val = self.points[0].delay_after_move if self.points else 0.0 lines.append( f" delay: {len({p.delay_after_move for p in self.points})} ({delay_val} s)" ) lines.append("") duration = self.estimated_duration_seconds( motor_time=motor_time, api_time=api_time ) minutes, hours = duration / 60.0, duration / 3600.0 if hours >= 1: time_str = f"{hours:.1f} h ({duration:.0f} s)" elif minutes >= 1: time_str = f"{minutes:.1f} min ({duration:.0f} s)" else: time_str = f"{duration:.1f} s" lines.append(f"Estimated duration: {time_str}") lines.append( f" per point: {motor_time} s (motor) + {api_time} s (api)" f" + exposure + {delay_val} s (delay)" ) print("\n".join(lines))
def __len__(self) -> int: return len(self.points) def __iter__(self) -> Iterator[ScanPoint]: return iter(self.points)
[docs] class ScanExecutor: """ Executes `ScanPlan` instances against a live BCS server. Supports two interrupt modes: 1. Programmatic abort: call ``await executor.abort()`` from any async context to set the abort flag. The next check inside `execute_point` or `wait_for_settle` raises `ScanAbortedError`. 2. Jupyter / IPython interrupt: create the scan as an ``asyncio.Task`` and call ``await bl.abort_scan()`` from another cell to set the abort flag and stop after the current point. ``asyncio.CancelledError`` is raised when the Task is cancelled via ``task.cancel()``. ``execute_scan`` catches this, sets the abort flag, and returns any partial results already collected. If no results have been collected the error is re-raised. Parameters ---------- conn : BCSz.BCSServer Active BCS server connection. """ def __init__(self, conn: BCSz.BCSServer) -> None: self._conn = conn self._abort_flag = AbortFlag() self._current_scan: ScanPlan | None = None @property def current_scan(self) -> ScanPlan | None: """ Currently running scan plan. Returns ------- ScanPlan or None The active `ScanPlan` during execution, or ``None`` when idle. """ return self._current_scan
[docs] async def abort(self) -> None: """ Request abort of the running scan. Sets the internal `AbortFlag`. The scan loop will raise `ScanAbortedError` at the next abort-check site (start of `execute_point`, inside `wait_for_settle`, or inside `wait_for_motors`). Notes ----- Safe to call from a separate task or thread-pool executor while the scan is running. """ await self._abort_flag.set()
[docs] async def execute_point( self, point: ScanPoint, motor_timeout: float = 30.0, restore_motors: bool = False, use_shutter: bool = True, detector: AreaDetector | None = None, ) -> ScanResult: """ Execute a single scan point. Parameters ---------- point : ScanPoint The point to execute. motor_timeout : float Maximum time in seconds to wait for motors to reach position. restore_motors : bool If True, motor positions are restored to their pre-move values after the point completes. use_shutter : bool If True, the shutter is opened and closed around acquisition. Pass False when the caller already holds the shutter open. detector : AreaDetector or None, optional If provided, a 2D detector image is acquired after AI acquisition using the point's exposure_time. Shutter actuation is hardware-driven; no plan-level shutter wraps this call. Returns ------- ScanResult Measured values with per-channel `ufloat` statistics. Raises ------ ScanAbortedError If the abort flag is set before or during execution. MotorError If a motor move or restore fails. """ if await self._abort_flag.is_set(): raise ScanAbortedError("Scan aborted before point execution") async with motor_move( self._conn, point.motors, timeout=motor_timeout, restore_on_exit=restore_motors, ): await wait_for_settle(point.delay_after_move, self._abort_flag) channels = point.ai_channels or ( self._current_scan.ai_channels if self._current_scan else [] ) async def _acquire() -> dict[str, Any]: if await self._abort_flag.is_set(): raise ScanAbortedError("Scan aborted before acquisition") await self._conn.acquire_data(chans=channels, time=point.exposure_time) # pyright: ignore[reportArgumentType] return await self._conn.get_acquired_array(chans=channels) if use_shutter: shutter_name = ( self._current_scan.shutter if self._current_scan else "Light Output" ) async with shutter_control(self._conn, shutter=shutter_name): result = await _acquire() else: result = await _acquire() ai_data: dict[str, Variable] = {} raw_data: dict[str, list[float]] = {} for chan_data in result["chans"]: name: str = chan_data["chan"] data = np.array(chan_data["data"], dtype=float) raw_data[name] = data.tolist() if data.size == 0: ai_data[name] = ufloat(np.nan, np.nan) else: mean = float(np.nanmean(data)) std_err = float(np.nanstd(data, ddof=1) / np.sqrt(data.size)) ai_data[name] = ufloat(mean, std_err) scan_result = ScanResult( index=point.index, motors=point.motors, ai_data=ai_data, exposure_time=point.exposure_time, timestamp=time.time(), raw_data=raw_data, ) if detector is not None: scan_result.image = await detector.acquire(point.exposure_time) return scan_result
[docs] async def execute_scan( self, scan_plan: ScanPlan, progress: bool = True, writer: RunWriter | None = None, detector: AreaDetector | None = None, ) -> pd.DataFrame: """ Execute a complete scan plan and return results as a DataFrame. The shutter behaviour depends on `scan_plan.actuate_every`: - ``False`` (default): the shutter is opened once before the first point and closed after the last point (or on abort). - ``True``: the shutter is actuated individually for every point via `execute_point`. Interrupt modes --------------- Programmatic: call ``await executor.abort()`` from any async context. The `AbortFlag` is checked at the start of each point; `ScanAbortedError` is raised and partial results are returned. Jupyter / IPython: create this coroutine as an ``asyncio.Task`` and call ``await bl.abort_scan()`` from another cell to set the abort flag and stop after the current point. ``asyncio.CancelledError``: raised when the Task is cancelled via ``task.cancel()``. Partial results are returned if any points completed; otherwise the error is re-raised. Parameters ---------- scan_plan : ScanPlan Validated scan plan to execute. progress : bool If True and tqdm is installed, show an async progress bar. Falls back to simple per-point print statements. writer : RunWriter or None, optional If provided, scalar scan data (motor positions, AI means, exposure, timestamps) are written to the open beamtime database. The writer must not have an open run before this method is called; it will call open_run, open_stream, write_event, and close_run internally. detector : AreaDetector or None, optional If provided, a 2D image is acquired at each scan point and written to the "detector_image" field in the primary stream. Requires writer to be set for persistence. Shutter is hardware-driven. Returns ------- pd.DataFrame One row per completed point with motor positions, per-channel mean/std columns, exposure time, and timestamp. Notes ----- # TODO: add Bluesky-compatible start/stop document emission once RunWriter is stable """ self._current_scan = scan_plan await self._abort_flag.clear() if writer is not None: data_keys: dict[str, dict[str, str]] = { **{ f"{normalize_header(m)}_position": { "dtype": "number", "units": "mm", "source": "motor", } for m in scan_plan.motor_names }, **{ normalize_header(ch): { "dtype": "number", "units": "V", "source": "ai", } for ch in scan_plan.ai_channels }, normalize_header("exposure"): { "dtype": "number", "units": "s", "source": "plan", }, } if detector is not None: data_keys["detector_image"] = detector.describe() # TODO: propagate plan_name from ScanPlan once the attribute is added writer.open_run("scan") writer.open_stream("primary", data_keys) results: list[ScanResult] = [] if progress and _TQDM_AVAILABLE: iterator = _tqdm(scan_plan.points, desc="Scanning", unit="pt") else: if progress and not _TQDM_AVAILABLE: print("tqdm not available, showing simple progress") progress = False iterator = iter(scan_plan.points) async def _run_points() -> None: for i, point in enumerate(iterator): if not progress: print(f"Point {i + 1}/{len(scan_plan.points)}", end="\r") result = await self.execute_point( point, restore_motors=False, use_shutter=scan_plan.actuate_every, detector=detector, ) results.append(result) if writer is not None: event_data: dict[str, float | int | str | bool] = { **{ f"{normalize_header(m)}_position": v for m, v in result.motors.items() }, **{ normalize_header(ch): result.ai_data[ch].nominal_value for ch in result.ai_data }, normalize_header("exposure"): result.exposure_time, } timestamps: dict[str, float] = { **{ f"{normalize_header(m)}_position": result.timestamp for m in result.motors }, **{ normalize_header(ch): result.timestamp for ch in result.ai_data }, normalize_header("exposure"): result.timestamp, } event_uid = writer.write_event(event_data, timestamps) if result.image is not None: writer.write_image(event_uid, "detector_image", result.image) _exit_status = "success" try: if scan_plan.actuate_every: await _run_points() else: async with shutter_control(self._conn, shutter=scan_plan.shutter): await _run_points() except ScanAbortedError: _exit_status = "aborted" print(f"\nScan aborted after {len(results)}/{len(scan_plan.points)} points") if not results: raise except asyncio.CancelledError: _exit_status = "aborted" await self._abort_flag.set() print( f"\nScan interrupted after {len(results)}/{len(scan_plan.points)} points" ) if not results: raise # Intentionally not re-raising: returns partial results to the caller. # Task cancellation propagation is sacrificed for interactive usability. finally: self._current_scan = None if writer is not None: writer.close_run(exit_status=_exit_status) if not progress: print() return pd.DataFrame([r.to_series() for r in results])