Source code for ewoksxas.tasks.read_scans

from __future__ import annotations

from collections import defaultdict

import h5py
import numpy as np
from ewokscore import Task
from ewokscore.model import BaseInputModel, BaseOutputModel
from Orange.data import Table  # noqa: TC002

from ewoksxas.converters.orange import Converter
from ewoksxas.io.hdf5 import read_data_at_path


[docs] class Inputs(BaseInputModel): Data: Table x: str counters: list[dict[str, str]] metadata: list[dict[str, str]] x_interp_grid: list[float] | None = None
[docs] class Outputs(BaseOutputModel): Data: Table
[docs] class ReadScans(Task, input_model=Inputs, output_model=Outputs): # type: ignore """Task to read scan data and metadata from files."""
[docs] def run(self): # noqa: C901, PLR0912, PLR0915 data = self.inputs.Data x = self.inputs.x counters = self.inputs.counters metadata = self.inputs.metadata x_interp_grid = self.inputs.x_interp_grid # Validate that input table has no features. if len(data.domain.attributes) > 0: raise ValueError("Input data table should not contain features.") if not counters: raise ValueError("At least one counter must be specified.") scans_to_process: list[tuple[str, str | None]] = [] if data is not None: meta_names = [meta.name for meta in data.domain.metas] try: filename_index = meta_names.index("Filename") scan_name_index = meta_names.index("Scan Name") scans_to_process.extend( (str(row[filename_index]), str(row[scan_name_index])) for row in data.metas ) except ValueError: pass if not scans_to_process: raise ValueError("No scans to process.") all_counter_data = [] all_metas: list[dict] = [] common_x = None # Group by filename for efficiency. by_file = defaultdict(list) for f, s in scans_to_process: by_file[f].append(s) for file_path, scan_names in by_file.items(): with h5py.File(file_path, "r") as h5: for scan_name in scan_names: x_data = read_data_at_path(h5, f"{scan_name}/{x}") if x_data is None: continue # Raise error if x path points to a group instead of dataset. if isinstance(x_data, list): raise ValueError(f"x '{x}' points to a group, not a dataset") x_data = np.asarray(x_data, dtype=np.float64) if common_x is None: # Use custom grid if specified, otherwise use first scan's grid. if x_interp_grid: x_min, x_max, x_npoints = x_interp_grid common_x = np.linspace(x_min, x_max, int(x_npoints)) else: common_x = x_data # Read metadata once per scan. scan_metas: dict = {"Filename": file_path} if scan_name: scan_metas["Scan Name"] = scan_name for meta_info in metadata: name = meta_info["name"] path = meta_info["path"] full_path = f"{scan_name}/{path}" if scan_name else path value = read_data_at_path(h5, full_path) if value is not None: # Raise error if metadata path points to a group. if isinstance(value, list): raise ValueError( f"Metadata path '{path}' points to a group, " "not a dataset" ) scan_metas[name] = value # Create a row for each counter. for counter_info in counters: counter_name = counter_info["name"] counter_path = counter_info["path"] counter_data = read_data_at_path( h5, f"{scan_name}/{counter_path}" ) if counter_data is None: continue # Raise error if counter path points to a group. if isinstance(counter_data, list): raise ValueError( f"Counter '{counter_path}' points to a group, " "not a dataset" ) counter_data = np.asarray(counter_data, dtype=np.float64) # Interpolate to common x-axis grid if needed. if not np.array_equal(x_data, common_x): # np.interp requires x values in increasing order. # Sort x_data and apply same ordering to counter_data. sort_indices = np.argsort(x_data) x_data_sorted = x_data[sort_indices] counter_data_sorted = counter_data[sort_indices] counter_data = np.interp( common_x, x_data_sorted, counter_data_sorted ) all_counter_data.append(counter_data) # Copy scan metadata and add counter name. row_metas = scan_metas.copy() row_metas["Counter"] = counter_name all_metas.append(row_metas) if not all_counter_data or common_x is None: raise ValueError("No valid data found in the specified paths.") # Initialize converter and add features. converter = Converter() converter.add_features(common_x, np.array(all_counter_data)) # Add standard metas. filenames = [meta["Filename"] for meta in all_metas] converter.add_meta("Filename", np.array(filenames, dtype=object)) scan_names_list = [meta.get("Scan Name", "") for meta in all_metas] converter.add_meta("Scan Name", np.array(scan_names_list, dtype=object)) counter_names = [meta["Counter"] for meta in all_metas] converter.add_meta("Counter", np.array(counter_names, dtype=object)) # Add custom metas from the metadata paths. for meta_info in metadata: name = meta_info["name"] values = [] for meta in all_metas: value = meta.get(name) if value is not None: values.append(value) else: values.append(np.nan) for value in values: if not np.isscalar(value): raise ValueError( f"Metadata '{name}' contains non-scalar values (e.g. arrays). " "Metadata must be single values (numbers or strings)." ) converter.add_meta(name, np.array(values)) self.outputs.Data = converter.to_table()