Source code for ewoksxas.tasks.read_sources
from pathlib import Path
from typing import Any
import h5py
import numpy as np
import silx.resources
from ewokscore import Task
from ewokscore.model import BaseInputModel, BaseOutputModel
from ewoksorange.tests.utils import execute_task
from Orange.data import Domain, StringVariable, Table
from ewoksxas.converters.orange import Converter
from ewoksxas.io.filters import Chain, Filter
[docs]
class Outputs(BaseOutputModel):
Data: Table
def _create_empty_table() -> Table:
"""Create an empty table with Filename and Scan Name meta columns."""
domain = Domain([], [], [StringVariable("Filename"), StringVariable("Scan Name")])
return Table.from_domain(domain)
[docs]
class ReadSources(Task, input_model=Inputs, output_model=Outputs): # type: ignore
"""Task to read scan names from files and optionally filter them."""
[docs]
def run(self) -> None:
file_paths: list[str | Path] = self.inputs.file_paths
filters: list[dict[str, Any]] | None = self.inputs.filters or []
# Build the filter chain if filters are provided.
chain = Chain()
if filters:
for filter in filters:
try:
chain.add_filter(Filter.from_config(filter))
except (ValueError, TypeError): # noqa: PERF203
continue
file_path_scan_pairs: list[list[str]] = []
for file_path in file_paths:
path_str = str(file_path)
try:
scan_names: list[str] = []
if path_str.lower().endswith((".h5", ".hdf5", ".nx")):
with h5py.File(path_str, "r") as h5:
scan_names = list(h5.keys())
# Apply filters if any exist.
if chain.filters:
scan_info = [{"name": name} for name in scan_names]
results = [
chain._apply_on_scan(h5, info) for info in scan_info
]
scan_names = [
name
for name, matched in zip(
scan_names, results, strict=True
)
if matched
]
file_path_scan_pairs.extend(
[path_str, scan_name] for scan_name in scan_names
)
except (OSError, ValueError):
# Handle file not found or invalid format.
continue
converter = Converter()
if file_path_scan_pairs:
pairs = np.array(file_path_scan_pairs)
converter.add_meta(
"Filename", pairs[:, 0], var_type=StringVariable
).add_meta("Scan Name", pairs[:, 1], var_type=StringVariable)
self.outputs.Data = converter.to_table()
else:
self.outputs.Data = _create_empty_table()
[docs]
def main() -> None:
file_paths = [
silx.resources.resource_filename("ewoksxas:data/Fe2O3_Ka1Ka2_RIXS.h5")
]
inputs: dict[str, Any] = {
"file_paths": file_paths,
}
outputs = execute_task(ReadSources, inputs)
print(f"Loaded {len(outputs['Data'])} scans.")
print(outputs["Data"])
# Test with filters.
inputs_filtered: dict[str, Any] = {
"file_paths": file_paths,
"filters": [
{
"class": "ScanNumberFilter",
"name": "Scan Number",
"value": "11-15",
"path": "",
}
],
}
outputs_filtered = execute_task(ReadSources, inputs_filtered)
print(f"Filtered to {len(outputs_filtered['Data'])} scans.")
print(outputs_filtered["Data"])
if __name__ == "__main__":
main()