Source code for ewoksxas.tasks.tests.test_read_sources

from typing import Any

import pytest
import silx.resources
from ewoksorange.tests.utils import execute_task

from ewoksxas.tasks.read_sources import ReadSources


[docs] @pytest.fixture def file_paths() -> list[str]: data_path = silx.resources.resource_filename("ewoksxas:data/Fe_foil_0001.h5") return [data_path]
[docs] def test_read_sources_no_filters(file_paths: list[str]) -> None: """No filters should return all scans.""" inputs: dict[str, Any] = {"file_paths": file_paths, "filters": []} outputs = execute_task(ReadSources, inputs=inputs) assert len(outputs["Data"]) == 20
[docs] def test_read_sources_number_range_with_stride(file_paths: list[str]) -> None: """Test scan number filter with stride.""" filters = [ { "class": "ScanNumberFilter", "name": "Scan Number", "value": "1-20:5", # Every 5th scan from 1 to 20. "path": "", "operator": None, } ] inputs: dict[str, Any] = {"file_paths": file_paths, "filters": filters} outputs = execute_task(ReadSources, inputs=inputs) # Should match scans: 1.1, 1.2, 6.1, 6.2 (4 scans). assert len(outputs["Data"]) == 4 scan_names = {str(row[1]) for row in outputs["Data"].metas} assert scan_names == {"1.1", "1.2", "6.1", "6.2"}
[docs] def test_read_sources_number_discontinuous_range(file_paths: list[str]) -> None: """Test scan number filter with range.""" filters = [ { "class": "ScanNumberFilter", "name": "Scan Number", "value": "1-2", # First 2 scan numbers. "path": "", "operator": None, } ] inputs: dict[str, Any] = {"file_paths": file_paths, "filters": filters} outputs = execute_task(ReadSources, inputs=inputs) assert len(outputs["Data"]) == 4 # 1.1, 1.2, 2.1, 2.2. scan_names = {str(row[1]) for row in outputs["Data"].metas} assert "1.1" in scan_names assert "2.2" in scan_names assert "3.1" not in scan_names
[docs] def test_read_sources_combined_and_filters(file_paths: list[str]) -> None: """Test combined AND filters.""" filters = [ { "class": "ScanNumberFilter", "name": "Scan Number", "value": "1-3", # Scans 1-3 "path": "", "operator": None, }, { "class": "RegexFilter", "name": "Regex", "value": r"mvirtual_energy", # Match mvirtual_energy in title. "path": "title", "operator": "and", }, ] inputs: dict[str, Any] = {"file_paths": file_paths, "filters": filters} outputs = execute_task(ReadSources, inputs=inputs) # Should match scans 1-3 that have 'mvirtual_energy' in title. scan_names = [str(row[1]) for row in outputs["Data"].metas] # Verify we only got scans from the 1-3 range. assert all(int(name.split(".")[0]) in range(1, 4) for name in scan_names) assert len(outputs["Data"]) > 0 # At least some matches.
[docs] def test_read_sources_regex_specific_positioner(file_paths: list[str]) -> None: """Test regex filter on nested path.""" filters = [ { "class": "RegexFilter", "name": "Regex", "value": "^cryo", # Positioners starting with 'cryo'. "path": "instrument/positioners", "operator": None, } ] inputs: dict[str, Any] = {"file_paths": file_paths, "filters": filters} outputs = execute_task(ReadSources, inputs=inputs) # All scans have cryo positioners (cryoty, cryotz, etc.). assert len(outputs["Data"]) == 20
[docs] def test_read_sources_no_matches(file_paths: list[str]) -> None: """Test filter that matches nothing.""" filters = [ { "class": "RegexFilter", "name": "Regex", "value": "nonexistent_pattern", "path": "title", "operator": None, } ] inputs: dict[str, Any] = {"file_paths": file_paths, "filters": filters} outputs = execute_task(ReadSources, inputs=inputs) # Should return empty table. assert len(outputs["Data"]) == 0
[docs] def test_read_sources_or_operator(file_paths: list[str]) -> None: """Test OR operator between filters.""" filters = [ { "class": "ScanNumberFilter", "name": "Scan Number", "value": "1", # Only scan 1. "path": "", "operator": None, }, { "class": "ScanNumberFilter", "name": "Scan Number", "value": "10", # Only scan 10. "path": "", "operator": "or", }, ] inputs: dict[str, Any] = {"file_paths": file_paths, "filters": filters} outputs = execute_task(ReadSources, inputs=inputs) # Should match scans 1.1, 1.2, 10.1, 10.2 (4 scans). assert len(outputs["Data"]) == 4 scan_names = {str(row[1]) for row in outputs["Data"].metas} assert scan_names == {"1.1", "1.2", "10.1", "10.2"}
[docs] def test_read_sources_regex_empty_path_matches_scan_name(file_paths: list[str]) -> None: """RegexFilter should match against scan name when path is empty.""" filters = [ { "class": "RegexFilter", "name": "Regex", "value": r"\.1$", "path": "", "operator": None, } ] inputs: dict[str, Any] = {"file_paths": file_paths, "filters": filters} outputs = execute_task(ReadSources, inputs=inputs) # Should match scans: 1.1, 2.1, ..., 10.1 (10 scans). assert len(outputs["Data"]) == 10 scan_names = {str(row[1]) for row in outputs["Data"].metas} assert all(name.endswith(".1") for name in scan_names)