API Reference

pxr_reduce

Reducing pxr data from bl 11.0.1.2

This package provides a loader for PRSoXR data from beamline 11.0.1.2 taken with the area detector camera.

PrsoxrLoader(files, AI_file=None, *, auto_load=False, energy_resolution=20, **kwargs)

Loader for PRSoXR data from beamline 11.0.1.2 taken with the CCD camera.

Parameters

files : list List of .fits to be loaded. Include full filepaths

>>> # Recommended Usage
>>> import pathlib
>>> path_s = pathlib.Path('../ALS/2020 Nov/MF114A/spol/250eV')
>>> files = list(path_s.glob('*fits')) # All .fits in path_s

The newly created 'files' is now a list of filepaths to each reflectivity point.
pathlib.Path

A filepath to a complimentary .txt file that contains updated metadata. This is only used for data collected with the 'Beamline Scan Panel' at BL11012.

bool

Should the images be loaded automatically upon creating object?

**kwargs: process_vars that want to be updated at the time of creation.

Attributes

exposure_offset: float [s] Offset to add to camera exposure time. Time it takes to physically open and close shutter. This should be measured in advanced and not changed often. energy_resoltion: float Energy will be normalized based on the following equation: np.round(self.data['energy']energy_resolution)/energy_resolution Enables rounding to non-integer values. Default rounds to 0.05 eV sam_th_offset: float [th] Offset added to sam_th at the time of measurement default is None sam_th_correction: Bool Default is True. It will determine the sam_th_offset based on the initial measurement positions energy_offset: float [eV] Optional offset to the energy value. Defaults at 0 det_pixel_size: float [mm/pixel] Size of detector pixel. May change as detectors change roi_height: int Size of the ROI used to integrate over beam spot. Vertical dimension roi_width: int Size of the ROI used to integrate over beam spot. Horizontal Dimension trim_x: int Number of pixels on the edge of the detector to remove fromm consideration trim_y: int Number of pixels on the edge of the detector (vertical) to remove from considerations stitch_cutoff: float ['ratio'] Used to identify positions at which a 'stitch' has occured between the data. drop_failed_stitch: bool Decision on whether to drop all datapoints that did not stitch correctly. Defaults to True, a warning will be given stitch_mark_tol: float Value used to verify whether or not a tracked motor for stitching has moved dark_pix_offset: int [pixels] Number of pixels to offset the region used for dark subtraction from the edge of the frame new_scan_marker: float [deg] How far the 'sam_th' motor needs to move in order to indicate a new 'scan' starting from 0 drift_distance: int [pixels] Distance that the beam can drift from its nominal positions mask_threshold: int [counts] Threshold used to identify where data-points are potentially located filter_size: int Filter size for the zinged image darkside: 'RHS' or 'LHS' Side of the image to collect the dark image. saturate_threshold: int [counts] A value to indicate whether an image has been saturated or not. It will check how close the maximum intensity is to 2*16.

Notes

Print the loader to view variables that will be used in reduction. Update them using the attributes listed in this API.

loader = PrsoxrLoader(files, name='MF114A_spol') print(loader) #Default values Sample Name - MF114A Number of scans - 402 ____ Reduction Variables ____ Shutter offset = 0.00389278 Sample Location = 0 Angle Offset = -0.0 Energy Offset = 0 SNR Cutoff = 1.01 ____ Image Processing ____ Image X axis = 200 Image Y axis = 200 Image Edge Trim = (5, 5) Dark Calc Location = LHS Dizinger Threshold = 10 Dizinger Size = 3 loader.shutter_offset = 0.004 #Update the shutter offset

np.ndarray (Boolean)

Array with dimensions equal to an image. Elements set to False will be excluded when finding beamcenter.

loader = loader = PrsoxrLoader(files) mask = np.full_like(loader.images[0], True, dtype=bool) mask[:50,:50] = False # block out some region loader.mask = mask

Once process attributes have been setup by the user, the function can be called to load the data. An ROI will need to be specified at the time of processing. Use the self.check_spot() function to find appropriate dimensions.

refl = loader(h=40, w=30)

Data that has been loaded can be exported using the self.save_csv(path) and self.save_hdf5(path) functions.

Source code in src/pxr_reduce/loader.py
def __init__(
    self,
    files: Iterable[str | Path] | Path | str,
    AI_file: str | Path | None = None,
    *,
    auto_load: bool = False,
    energy_resolution=20,
    **kwargs,
):
    # Update the process variables with any initial conditions
    self.process_vars = _new_process_vars(**kwargs)
    if energy_resolution not in self.process_vars:
        self.process_vars["energy_resolution"] = energy_resolution

    # Assert breaking behaviour on incorrect IO inputs
    self.files: list[Path] = []
    match files:
        case Path():
            # Check if it is a directory if so glob for .fits files
            if files.is_dir():
                path_list = list(files.rglob("**/**.fits"))
            else:
                from warnings import warn

                warn("A single file will not process correctly", stacklevel=2)
                path_list = [files]
        case list():
            path_list = [Path(file) for file in files]
        case _:
            raise TypeError(f"Invalid files input: {files}")
    try:
        re_name = name.infer_index_regex(
            [path.name for path in path_list], prefix_group="re_sample_name"
        )  # This will raise a valueerror if it won't work
        msg = f"\n Naming convention successfully identified: {re_name}\n"
        print(msg)
    except ValueError as ve:
        msg = "Files do not conform to any known naming convention.\n"
        msg += f"Error details: {ve}"
        raise ValueError(msg) from ve

    for fp in path_list:
        file: Path = Path(fp)
        if not file.is_file():
            raise FileNotFoundError(f"{file} is not a valid file.")
        if file.suffix != ".fits":
            raise ValueError(f"{file} is not a FITS file.")
        self.files.append(file)

    # Check AI File
    if isinstance(AI_file, (str, pathlib.Path)):
        print("Loading AI-file to supplement FITS meta-data")
        AI_file = pathlib.Path(AI_file)
        if not AI_file.is_file():
            msg = f"{AI_file} is not a valid file."
            raise FileNotFoundError(msg)
        if AI_file.suffix != ".txt":
            msg = f"{AI_file} is not valid for provided data"
            raise ValueError(msg)
    else:
        # Look for a companion AI file in the parent directory of the first file
        AI_file = (
            self.files[0].parent / f"{self.files[0].stem.split('-')[0]}-AI.txt"
        )
        AI_file = AI_file.resolve() if AI_file.exists() else None

    # Load the files into the Loader
    tmp = []
    # Get information about the sample / path from the first fits file
    path0 = self.files[0]
    self.path = path0.parent
    m = re.search(re_name, path0.name)
    self.name = m.group("re_sample_name") if m else "Unknown Sample"
    print("")
    print(f"Sample name identified as: {self.name}")
    print("")

    for file in tqdm(self.files, "Loading .fits", total=len(self.files)):
        # Collect information about the filepath to save --
        fits_name = file.name  # The name of the current file
        # fits_index = int(re.search(r'[ _-](\d+)\.fits$', fits_name).group(1))
        # Index of the file (if it gets messed up for some reason)
        # fits_index = extract_index(fits_name)
        m = re.search(re_name, fits_name)
        fits_index = int(m.group("index")) if m else -1
        # Load the data
        df_fits = dict_load_fits(file)  # Load .fits files into a dictionary
        if (
            AI_file is not None
        ):  # Only run if the meta-data needs to be reuplodaed from the .txt file
            temp_meta = self.load_AI_meta(file=AI_file)
            df_fits = self.update_meta(
                df_fits, temp_meta.iloc[fits_index]
            )  # get the correct line item in the AI file --
        df_fits["fits_index"] = fits_index  # Save the index
        tmp.append(df_fits)  # save the file
    data_dict = {key: [d[key] for d in tmp] for key in tmp[0]}
    df = pd.DataFrame(data_dict)
    # Rename the files and only extract those that matter--
    self.data: pd.DataFrame = (
        df[list(header_names.keys())]
        .rename(columns=header_names)
        .round(header_resolutions)
    )
    self.data["energy"] = (
        np.round(self.data["energy"] * self.process_vars["energy_resolution"])
        / self.process_vars["energy_resolution"]
    )  # round energy to the nearest 0.25 eV

    # Sort the files are if not in order of the file index
    self.data = self.data.sort_values("fits_index", ignore_index=True)
    self.data.insert(
        1, "scan", 0
    )  # populate a new volumn that identifies the individual scans

    # Other useful columns
    self.data["mask"] = None
    self.data["beam_spot"] = None

    # Has the data been processed?
    self.data_processed = False
    if auto_load:
        self.reprocess_images()

calc_refl(drop_duplicates=True)

Function that performs a data reduction of PRSOXR data.

Source code in src/pxr_reduce/loader.py
def calc_refl(self, drop_duplicates=True):
    """
    Function that performs a data reduction of PRSOXR data.
    """

    # Verify that the data has been processed and the metadata does not need to be
    #  recalculated
    if not self.data_processed:
        print("Data has not processed. Please run 'loader.reprocess_images()'.")
        return 0
    elif self.process_vars["reprocess_vars"]:
        self.cleanup_metadata()
        # self.locate_beam_byscan()
        self.process_images()
        self.reprocess_vars = False

    if self.data["is_saturated"].sum() > 0:
        warnings.warn(
            "The CCD was likely saturated. Stitching may be impacted",
            stacklevel=2,
        )

    _scans = (
        self.data["scan"].iloc[-1] + 1
    )  # How many total scans are included in each calculation? (starts at 0)
    self.data = (
        self.data.groupby("scan")
        .apply(self.normalize_scan, include_groups=False)
        .reset_index(level="scan")
    )
    self.data = (
        self.data.groupby("scan")
        .apply(self.find_stitch_points, include_groups=False)
        .reset_index(level="scan")
    )
    self.data = (
        self.data.groupby("scan")
        .apply(self.calc_scale_factors, include_groups=False)
        .reset_index(level="scan")
    )
    self.data["R"] = self.data.apply(lambda df: df["R"] / df["scale"], axis=1)
    # self.data['R_err'] = self.data.apply(
    #   lambda df: (
    #       df['R'] * ((df['R_err']/df['R'])**2
    # + (df['scale_err']/df['scale'])**2)**0.5) ,
    #   axis=1
    # )
    # Generate output mask
    mask = self.data["i0_mask"] < 1
    mask &= not self.data["is_saturated"]
    mask &= self.data["R"] > 0
    if self.process_vars["drop_failed_stitch"]:
        mask &= self.data["failed_stitch_mask"] < 1

    out = self.data[mask][
        ["scan", "energy", "polarization", "sam_th", "q", "R", "R_err"]
    ]
    if drop_duplicates:
        out = out.groupby(
            ["sam_th", "energy", "polarization"], as_index=False
        ).mean()

    return out