Source code for resonance.api.utils

"""
Utility functions for beamline operations.

This module provides helper functions for common beamline tasks
like motor alignment, grid scan generation, and data analysis.
"""

from typing import Literal, cast

import numpy as np
import pandas as pd


[docs] def create_grid_scan( x_range: tuple[float, float, int], y_range: tuple[float, float, int], exposure_time: float = 1.0, x_motor: str = "Sample X", y_motor: str = "Sample Y", ) -> pd.DataFrame: """ Create a 2D grid scan DataFrame. Parameters ---------- x_range : tuple[float, float, int] X range as (start, stop, num_points) y_range : tuple[float, float, int] Y range as (start, stop, num_points) exposure_time : float, optional Exposure time for all points (default: 1.0) x_motor : str, optional X motor name (default: "Sample X") y_motor : str, optional Y motor name (default: "Sample Y") Returns ------- pd.DataFrame Scan definition ready for scan_from_dataframe Examples -------- >>> # Create 3x3 grid >>> grid = create_grid_scan( ... x_range=(10, 12, 3), ... y_range=(0, 2, 3), ... exposure_time=1.0 ... ) >>> print(len(grid)) # 9 points 9 """ x_positions = np.linspace(x_range[0], x_range[1], x_range[2]) y_positions = np.linspace(y_range[0], y_range[1], y_range[2]) X, Y = np.meshgrid(x_positions, y_positions) return pd.DataFrame( { x_motor: X.flatten(), y_motor: Y.flatten(), "exposure": np.full(X.size, exposure_time), } )
[docs] def create_line_scan( motor: str, start: float, stop: float, num_points: int, exposure_time: float = 1.0, ) -> pd.DataFrame: """ Create a 1D line scan DataFrame. Parameters ---------- motor : str Motor name start : float Start position stop : float Stop position num_points : int Number of points exposure_time : float, optional Exposure time for all points (default: 1.0) Returns ------- pd.DataFrame Scan definition ready for scan_from_dataframe Examples -------- >>> scan = create_line_scan( ... motor="Sample X", ... start=10, ... stop=15, ... num_points=11, ... exposure_time=1.5 ... ) """ positions = np.linspace(start, stop, num_points) return pd.DataFrame( {motor: positions, "exposure": np.full(num_points, exposure_time)} )
[docs] def create_energy_scan( energies: np.ndarray, exposure_time: float | np.ndarray = 1.0, energy_motor: str = "Beamline Energy", ) -> pd.DataFrame: """ Create an energy scan DataFrame. Parameters ---------- energies : np.ndarray Array of photon energies exposure_time : float | np.ndarray, optional Exposure time(s) (default: 1.0) energy_motor : str, optional Energy motor name (default: "Beamline Energy") Returns ------- pd.DataFrame Scan definition ready for scan_from_dataframe Examples -------- >>> # Carbon K-edge >>> energies = np.linspace(280, 320, 200) >>> scan = create_energy_scan(energies, exposure_time=1.0) >>> >>> # Variable exposure times >>> energies = np.linspace(280, 320, 200) >>> exposure = np.ones(200) >>> exposure[100:150] = 2.0 # Longer at edge >>> scan = create_energy_scan(energies, exposure_time=exposure) """ if isinstance(exposure_time, (int, float)): exposure_time = np.full_like(energies, exposure_time, dtype=float) return pd.DataFrame({energy_motor: energies, "exposure": exposure_time})
[docs] def find_peak_position( scan_data: pd.DataFrame, motor_col: str, signal_col: str, use_mean: bool = True, ) -> tuple[float, float]: """ Find peak position in 1D scan data. Parameters ---------- scan_data : pd.DataFrame Scan results from scan_from_dataframe motor_col : str Motor position column name (e.g., "Sample X_position") signal_col : str Signal column name (e.g., "Photodiode_mean") use_mean : bool, optional Use _mean column if True, else use raw column (default: True) Returns ------- tuple[float, float] (peak_position, peak_value) Examples -------- >>> results = await server.scan_from_dataframe(scan_df, ...) >>> peak_x, peak_signal = find_peak_position( ... results, ... motor_col="Sample X_position", ... signal_col="Photodiode_mean" ... ) >>> print(f"Peak at {peak_x:.2f} with signal {peak_signal:.3f}") """ if use_mean and not signal_col.endswith("_mean"): signal_col = f"{signal_col}_mean" peak_idx = scan_data[signal_col].idxmax() peak_position = scan_data.loc[peak_idx, motor_col] peak_value = scan_data.loc[peak_idx, signal_col] return peak_position, peak_value
[docs] def calculate_center_of_mass( scan_data: pd.DataFrame, motor_col: str, signal_col: str, use_mean: bool = True, ) -> float: """ Calculate center of mass for alignment. Parameters ---------- scan_data : pd.DataFrame Scan results from scan_from_dataframe motor_col : str Motor position column name signal_col : str Signal column name use_mean : bool, optional Use _mean column if True (default: True) Returns ------- float Center of mass position Examples -------- >>> results = await server.scan_from_dataframe(scan_df, ...) >>> com = calculate_center_of_mass( ... results, ... motor_col="Sample X_position", ... signal_col="Photodiode_mean" ... ) >>> # Move to center of mass >>> await server.motor.set("Sample X", com) """ if use_mean and not signal_col.endswith("_mean"): signal_col = f"{signal_col}_mean" positions = scan_data[motor_col].to_numpy(dtype=float) signal = scan_data[signal_col].to_numpy(dtype=float) signal = signal - signal.min() if signal.sum() == 0: raise ValueError("Signal is zero or negative everywhere") com = np.sum(positions * signal) / np.sum(signal) return com
[docs] def resample_scan_data( scan_data: pd.DataFrame, motor_col: str, num_points: int, columns_to_interpolate: list[str] | None = None, ) -> pd.DataFrame: """ Resample scan data to uniform grid. Useful for combining scans with different point densities. Parameters ---------- scan_data : pd.DataFrame Original scan data motor_col : str Motor position column for interpolation axis num_points : int Number of points in resampled data columns_to_interpolate : list[str] | None, optional Columns to interpolate (default: all numeric columns) Returns ------- pd.DataFrame Resampled data with uniform spacing Examples -------- >>> # Resample to 100 points >>> resampled = resample_scan_data( ... results, ... motor_col="Beamline Energy_position", ... num_points=100 ... ) """ from scipy.interpolate import interp1d if columns_to_interpolate is None: # Auto-detect numeric columns except motor position columns_to_interpolate = [ col for col in scan_data.select_dtypes(include=[np.number]).columns if col != motor_col ] # Create uniform grid motor_positions = scan_data[motor_col].to_numpy(dtype=float) new_positions = np.linspace( motor_positions.min(), motor_positions.max(), num_points ) # Interpolate each column resampled_data = {motor_col: new_positions} for col in columns_to_interpolate: interpolator = interp1d( motor_positions, scan_data[col].to_numpy(dtype=float), kind="linear", bounds_error=False, fill_value=cast("float", "extrapolate"), ) resampled_data[col] = interpolator(new_positions) return pd.DataFrame(resampled_data)
[docs] def merge_scans( scans: list[pd.DataFrame], motor_col: str, average_overlaps: bool = True, ) -> pd.DataFrame: """ Merge multiple scans into a single dataset. Parameters ---------- scans : list[pd.DataFrame] List of scan DataFrames to merge motor_col : str Motor position column to use for alignment average_overlaps : bool, optional Average overlapping points (default: True) Returns ------- pd.DataFrame Merged scan data Examples -------- >>> # Combine multiple energy ranges >>> scan1 = await nexafs_scan(server, np.linspace(280, 290, 50)) >>> scan2 = await nexafs_scan(server, np.linspace(288, 300, 100)) >>> merged = merge_scans([scan1, scan2], motor_col="energy") """ if not scans: raise ValueError("Need at least one scan to merge") # Concatenate all scans combined = pd.concat(scans, ignore_index=True) # Sort by motor position combined = combined.sort_values(motor_col).reset_index(drop=True) if average_overlaps: # Group by motor position and average numeric_cols = combined.select_dtypes(include=[np.number]).columns combined = pd.DataFrame( combined.groupby(motor_col, as_index=False)[numeric_cols].mean() ) return combined
[docs] def knife_edge_analysis( scan_data: pd.DataFrame, motor_col: str, signal_col: str, threshold: float = 0.5, direct_beam: Literal["above", "below"] = "above", ) -> float: """ Analyze knife-edge scan to find edge position. Useful for aligning samples to incident beam. Parameters ---------- scan_data : pd.DataFrame Scan results from scan_from_dataframe motor_col : str Motor position column name signal_col : str Signal column name threshold : float, optional Fraction of max signal to define edge (default: 0.5) direct_beam : Literal["above", "below"], optional Direction of direct beam relative to edge (default: "above") Returns ------- float Estimated edge position Examples -------- >>> results = await server.scan_from_dataframe(scan_df, ...) >>> edge_pos = knife_edge_analysis( ... results, ... motor_col="Sample Z_position", ... signal_col="Photodiode_mean" ... ) >>> print(f"Edge position at {edge_pos:.2f}") """ if not signal_col.endswith("_mean"): signal_col = f"{signal_col}_mean" positions = scan_data[motor_col].to_numpy(dtype=float) signal = scan_data[signal_col].to_numpy(dtype=float) max_signal = float(np.max(signal)) threshold_value = threshold * max_signal # (above) calculate the first index where signal crosses threshold if direct_beam == "above": edge_idx = np.where(signal >= threshold_value)[0][0] # (below) reverse the signal and find the first crossing else: edge_idx = np.where(signal[::-1] <= threshold_value)[0][0] return float(positions[edge_idx])