Source code for ewoksxas.tasks.read_scans
from __future__ import annotations
from collections import defaultdict
import h5py
import numpy as np
from ewokscore import Task
from ewokscore.model import BaseInputModel, BaseOutputModel
from Orange.data import Table # noqa: TC002
from ewoksxas.converters.orange import Converter
from ewoksxas.io.hdf5 import read_data_at_path
[docs]
class Outputs(BaseOutputModel):
Data: Table
[docs]
class ReadScans(Task, input_model=Inputs, output_model=Outputs): # type: ignore
"""Task to read scan data and metadata from files."""
[docs]
def run(self): # noqa: C901, PLR0912, PLR0915
data = self.inputs.Data
x = self.inputs.x
counters = self.inputs.counters
metadata = self.inputs.metadata
x_interp_grid = self.inputs.x_interp_grid
# Validate that input table has no features.
if len(data.domain.attributes) > 0:
raise ValueError("Input data table should not contain features.")
if not counters:
raise ValueError("At least one counter must be specified.")
scans_to_process: list[tuple[str, str | None]] = []
if data is not None:
meta_names = [meta.name for meta in data.domain.metas]
try:
filename_index = meta_names.index("Filename")
scan_name_index = meta_names.index("Scan Name")
scans_to_process.extend(
(str(row[filename_index]), str(row[scan_name_index]))
for row in data.metas
)
except ValueError:
pass
if not scans_to_process:
raise ValueError("No scans to process.")
all_counter_data = []
all_metas: list[dict] = []
common_x = None
# Group by filename for efficiency.
by_file = defaultdict(list)
for f, s in scans_to_process:
by_file[f].append(s)
for file_path, scan_names in by_file.items():
with h5py.File(file_path, "r") as h5:
for scan_name in scan_names:
x_data = read_data_at_path(h5, f"{scan_name}/{x}")
if x_data is None:
continue
# Raise error if x path points to a group instead of dataset.
if isinstance(x_data, list):
raise ValueError(f"x '{x}' points to a group, not a dataset")
x_data = np.asarray(x_data, dtype=np.float64)
if common_x is None:
# Use custom grid if specified, otherwise use first scan's grid.
if x_interp_grid:
x_min, x_max, x_npoints = x_interp_grid
common_x = np.linspace(x_min, x_max, int(x_npoints))
else:
common_x = x_data
# Read metadata once per scan.
scan_metas: dict = {"Filename": file_path}
if scan_name:
scan_metas["Scan Name"] = scan_name
for meta_info in metadata:
name = meta_info["name"]
path = meta_info["path"]
full_path = f"{scan_name}/{path}" if scan_name else path
value = read_data_at_path(h5, full_path)
if value is not None:
# Raise error if metadata path points to a group.
if isinstance(value, list):
raise ValueError(
f"Metadata path '{path}' points to a group, "
"not a dataset"
)
scan_metas[name] = value
# Create a row for each counter.
for counter_info in counters:
counter_name = counter_info["name"]
counter_path = counter_info["path"]
counter_data = read_data_at_path(
h5, f"{scan_name}/{counter_path}"
)
if counter_data is None:
continue
# Raise error if counter path points to a group.
if isinstance(counter_data, list):
raise ValueError(
f"Counter '{counter_path}' points to a group, "
"not a dataset"
)
counter_data = np.asarray(counter_data, dtype=np.float64)
# Interpolate to common x-axis grid if needed.
if not np.array_equal(x_data, common_x):
# np.interp requires x values in increasing order.
# Sort x_data and apply same ordering to counter_data.
sort_indices = np.argsort(x_data)
x_data_sorted = x_data[sort_indices]
counter_data_sorted = counter_data[sort_indices]
counter_data = np.interp(
common_x, x_data_sorted, counter_data_sorted
)
all_counter_data.append(counter_data)
# Copy scan metadata and add counter name.
row_metas = scan_metas.copy()
row_metas["Counter"] = counter_name
all_metas.append(row_metas)
if not all_counter_data or common_x is None:
raise ValueError("No valid data found in the specified paths.")
# Initialize converter and add features.
converter = Converter()
converter.add_features(common_x, np.array(all_counter_data))
# Add standard metas.
filenames = [meta["Filename"] for meta in all_metas]
converter.add_meta("Filename", np.array(filenames, dtype=object))
scan_names_list = [meta.get("Scan Name", "") for meta in all_metas]
converter.add_meta("Scan Name", np.array(scan_names_list, dtype=object))
counter_names = [meta["Counter"] for meta in all_metas]
converter.add_meta("Counter", np.array(counter_names, dtype=object))
# Add custom metas from the metadata paths.
for meta_info in metadata:
name = meta_info["name"]
values = []
for meta in all_metas:
value = meta.get(name)
if value is not None:
values.append(value)
else:
values.append(np.nan)
for value in values:
if not np.isscalar(value):
raise ValueError(
f"Metadata '{name}' contains non-scalar values (e.g. arrays). "
"Metadata must be single values (numbers or strings)."
)
converter.add_meta(name, np.array(values))
self.outputs.Data = converter.to_table()