Source code for ewoksxas.tasks.read_sources

from pathlib import Path
from typing import Any

import h5py
import numpy as np
import silx.resources
from ewokscore import Task
from ewokscore.model import BaseInputModel, BaseOutputModel
from ewoksorange.tests.utils import execute_task
from Orange.data import Domain, StringVariable, Table

from ewoksxas.converters.orange import Converter
from ewoksxas.io.filters import Chain, Filter


[docs] class Inputs(BaseInputModel): file_paths: list[str | Path] filters: list[dict[str, Any]] | None = None
[docs] class Outputs(BaseOutputModel): Data: Table
def _create_empty_table() -> Table: """Create an empty table with Filename and Scan Name meta columns.""" domain = Domain([], [], [StringVariable("Filename"), StringVariable("Scan Name")]) return Table.from_domain(domain)
[docs] class ReadSources(Task, input_model=Inputs, output_model=Outputs): # type: ignore """Task to read scan names from files and optionally filter them."""
[docs] def run(self) -> None: file_paths: list[str | Path] = self.inputs.file_paths filters: list[dict[str, Any]] | None = self.inputs.filters or [] # Build the filter chain if filters are provided. chain = Chain() if filters: for filter in filters: try: chain.add_filter(Filter.from_config(filter)) except (ValueError, TypeError): # noqa: PERF203 continue file_path_scan_pairs: list[list[str]] = [] for file_path in file_paths: path_str = str(file_path) try: scan_names: list[str] = [] if path_str.lower().endswith((".h5", ".hdf5", ".nx")): with h5py.File(path_str, "r") as h5: scan_names = list(h5.keys()) # Apply filters if any exist. if chain.filters: scan_info = [{"name": name} for name in scan_names] results = [ chain._apply_on_scan(h5, info) for info in scan_info ] scan_names = [ name for name, matched in zip( scan_names, results, strict=True ) if matched ] file_path_scan_pairs.extend( [path_str, scan_name] for scan_name in scan_names ) except (OSError, ValueError): # Handle file not found or invalid format. continue converter = Converter() if file_path_scan_pairs: pairs = np.array(file_path_scan_pairs) converter.add_meta( "Filename", pairs[:, 0], var_type=StringVariable ).add_meta("Scan Name", pairs[:, 1], var_type=StringVariable) self.outputs.Data = converter.to_table() else: self.outputs.Data = _create_empty_table()
[docs] def main() -> None: file_paths = [ silx.resources.resource_filename("ewoksxas:data/Fe2O3_Ka1Ka2_RIXS.h5") ] inputs: dict[str, Any] = { "file_paths": file_paths, } outputs = execute_task(ReadSources, inputs) print(f"Loaded {len(outputs['Data'])} scans.") print(outputs["Data"]) # Test with filters. inputs_filtered: dict[str, Any] = { "file_paths": file_paths, "filters": [ { "class": "ScanNumberFilter", "name": "Scan Number", "value": "11-15", "path": "", } ], } outputs_filtered = execute_task(ReadSources, inputs_filtered) print(f"Filtered to {len(outputs_filtered['Data'])} scans.") print(outputs_filtered["Data"])
if __name__ == "__main__": main()