Source code for ewoksxas.converters.orange

"""Utilities for working with Orange Tables for spectroscopic data.

This module provides classes to convert between Orange Tables and
spectroscopic data representations. While using Orange Tables for this
purpose is not ideal, it allows reusing existing Orange components for data
manipulation and analysis.

Data Representation
-------------------
The names used in the module are inherited from machine learning terminology:
- **features**: Variables that describe the data (x-axis: e.g., energy or wavenumber).
- **targets**: The desired predictions, typically not required for spectroscopy.
- **metas**: Additional variables that provide extra information about the data, but
  are not used for prediction (e.g., motor positions).

For spectroscopic data:
- The x-axis values (e.g., energy) are represented as feature names.
- The intensity values are stored in the feature data, the bulk of the table.
- Metas can store experimental related information like motor positions.

Example Table Structure
-----------------------
    | location | motor_1 | energy_1  | energy_2  | ...       | energy_n  |
    | -------- | ------- | --------- | --------- | --------- | --------- |
    | (meta)   | (meta)  | (feature) | (feature) | (feature) | (feature) |
    | root     | 12.65   | 1.2       | 0.5       | ...       | 0.4       |
    | leaf     | 15.34   | 0.8       | 1.1       | ...       | 1.6       |
    | stem     | 11.07   | 2.0       | 0.3       | ...       | 2.8       |
    | leaf     | 12.13   | 1.5       | 0.7       | ...       | 3.1       |
    | ...      | ...     | ...       | ...       | ...       | ...       |

Usage
-----
Creating a simple table with the builder pattern:
    >>> x = np.linspace(0, 10, 100)
    >>> spectra = np.array([...])  # shape: (n_samples, n_energies)
    >>> table = Converter().add_features(x, spectra).to_table()

Adding metadata and targets:
    >>> areas = np.trapezoid(spectra, x)
    >>> table = (
    ...     Converter()
    ...     .add_features(x, spectra)
    ...     .add_meta("Area", areas)
    ...     .add_meta("Location", locations)
    ...     .add_target("Quality", predictions)
    ...     .to_table()
    ... )

With custom variable types:
    >>> table = (
    ...     Converter()
    ...     .add_features(x, spectra)
    ...     .add_meta("Location", locations, var_type=StringVariable)
    ...     .add_target("Quality", predictions, var_type=ContinuousVariable)
    ...     .to_table()
    ... )

Round-trip conversion:
    >>> table = (
    ...     Converter()
    ...     .add_features(x, spectra)
    ...     .add_meta("Area", areas)
    ...     .to_table()
    ... )
    >>> converter = Converter.from_table(table)
    >>> feature_names, spectra = converter.features
    >>> metas = converter.metas
"""

from __future__ import annotations

from typing import Any

import numpy as np
import numpy.typing as npt
from Orange.data import (
    ContinuousVariable,
    DiscreteVariable,
    Domain,
    StringVariable,
    Table,
    Variable,
)


[docs] class Converter: """Helper class to convert between spectroscopic data and Orange Tables.""" def __init__(self) -> None: """Initialize an empty Converter. Use add_features(), add_meta(), and add_target() to populate data. """ self._features: dict[str, Any] | None = None self._metas: list[dict[str, Any]] = [] self._targets: list[dict[str, Any]] = []
[docs] def add_meta( self, name: str, data: npt.NDArray[Any], var_type: type[Variable] | None = None, **kwargs: Any, ) -> Converter: """Add a metadata variable. Args: name: Name of the metadata variable. data: Array of values, one for each sample. var_type: Optional Orange Variable type (ContinuousVariable or StringVariable). If None, inferred from data. **kwargs: Additional arguments for the Variable constructor. Returns: Self for chaining. Raises: ValueError: If data length mismatch or incompatible data type. """ data = np.asarray(data) # Infer the variable type if not provided. if var_type is None: if data.dtype.kind in "iuf": var_type = ContinuousVariable else: var_type = StringVariable # Validate data compatibility with the variable type. if var_type == ContinuousVariable: try: np.asarray(data, dtype=np.float64) except (ValueError, TypeError) as err: raise ValueError( f"Data for ContinuousVariable '{name}' must be numeric." ) from err elif var_type == StringVariable: pass elif var_type == DiscreteVariable: raise ValueError( "DiscreteVariable not allowed for metas; " "use ContinuousVariable or StringVariable." ) self._store_column(self._metas, name, data, var_type, kwargs) return self
[docs] def add_target( # noqa: C901 self, name: str, data: npt.NDArray, var_type: type[Variable] | None = None, **kwargs: Any, ) -> Converter: """Add a target variable. Targets can only be of ContinuousVariable or DiscreteVariable type. Args: name: Name of the target variable. data: Array of values, one for each sample. var_type: Optional Orange Variable type (ContinuousVariable or DiscreteVariable). If None, inferred from data. **kwargs: Additional arguments for the Variable constructor. Returns: Self for chaining. Raises: ValueError: If incompatible data type or too many unique values for inference. """ data = np.asarray(data) # Infer var_type if not provided. Floats are of ContinuousVariable # type. Strings are of DiscreteVariable type up to a small number of # categories. CATEGORY_THRESHOLD = 10 if var_type is None: if data.dtype.kind in "iuf": var_type = ContinuousVariable else: unique_vals = np.unique(data.astype(str)) n_unique = len(unique_vals) if n_unique >= CATEGORY_THRESHOLD: raise ValueError( f"Too many unique values ({n_unique}) for target '{name}'. " "Targets should have less than ten unique categories or " "be numeric." ) var_type = DiscreteVariable kwargs["values"] = sorted(unique_vals) # Validate data compatibility with var_type. if var_type == DiscreteVariable: if "values" not in kwargs: raise ValueError( f"DiscreteVariable for '{name}' requires 'values' in kwargs." ) values = kwargs["values"] if data.dtype.kind in "OSU": value_to_index = {str(v): i for i, v in enumerate(values)} invalid = [x for x in np.unique(data) if str(x) not in value_to_index] if invalid: raise ValueError(f"Data contains values not in 'values': {invalid}") # Check range (allows both int and float like 0.0, 1.0). elif data.min() < 0 or data.max() >= len(values): raise ValueError( f"Data indices for DiscreteVariable '{name}' " f"must be in range 0 to {len(values) - 1}." ) elif var_type == ContinuousVariable: if data.dtype.kind not in "iuf": raise ValueError( f"Data for ContinuousVariable '{name}' must be numeric." ) elif var_type == StringVariable: raise ValueError( "StringVariable not allowed for targets; " "use ContinuousVariable or DiscreteVariable type." ) self._store_column(self._targets, name, data, var_type, kwargs) return self
[docs] def add_features( self, names: npt.NDArray[np.float64], data: npt.NDArray[np.float64], replace: bool = False, ) -> Converter: """Add or update feature data (spectral x-axis and y-values). Args: names: 1D array of x-axis values (e.g., energy, wavenumber). data: 2D array of spectral intensities with shape (n_samples, n_features). If 1D, treated as a single sample and reshaped to (1, n_features). replace: If True, replaces existing features. If False (default), appends rows to existing features (requires matching names). Returns: Self for method chaining. Raises: ValueError: If shape mismatch between names and data columns, or if appending with mismatched names. """ # Normalize inputs. names = np.atleast_1d(np.asarray(names, dtype=np.float64)) data = np.asarray(data, dtype=np.float64) if data.ndim == 1: data = data.reshape(1, -1) # Validate shape. if data.shape[1] != len(names): raise ValueError( f"Shape mismatch between names has {len(names)} points, " f"but data has {data.shape[1]} columns." ) # Case 1: No existing features (first call). if self._features is None: self._features = { "names": names, "data": data, } return self # Case 2: Replace existing features. if replace: self._features = { "names": names, "data": data, } return self # Case 3: Append rows (extend). if not np.allclose(self._features["names"], names): raise ValueError( "Cannot append features because names do not match existing names. " "Use replace=True to overwrite." ) self._features["data"] = np.vstack([self._features["data"], data]) return self
def _store_column( self, var_storage: list[dict[str, Any]], name: str, data: npt.NDArray, var_type: type[Variable] | None, kwargs: dict[str, Any], ) -> None: # Convert string data or numeric float arrays to integer indices for # DiscreteVariable. if var_type == DiscreteVariable and "values" in kwargs: values = kwargs["values"] # If strings, map to indices. if isinstance(data, (np.ndarray,)) and data.dtype.kind in "OSU": value_to_index = {str(v): i for i, v in enumerate(values)} data = np.array([value_to_index[str(x)] for x in data], dtype=int) # If numeric float-like, cast to int. elif not np.issubdtype(np.asarray(data).dtype, np.integer): data = np.asarray(data).astype(int) var_storage.append( {"name": name, "data": data, "type": var_type, "kwargs": kwargs} ) def _process_columns(self, columns: list[dict[str, Any]]): if not columns: n_samples = 0 if self._features is not None: n_samples = self._features["data"].shape[0] elif self._metas: n_samples = len(self._metas[0]["data"]) elif self._targets: n_samples = len(self._targets[0]["data"]) return [], np.empty((n_samples, 0)) vars_list: list[Variable] = [] data_list: list[npt.NDArray] = [] for column in columns: var_type = column["type"] kwargs = column["kwargs"] var = var_type(str(column["name"]), **kwargs) var.raw_name = column["name"] # type: ignore vars_list.append(var) # Use object dtype to preserve original types when columns are stacked. data_list.append(column["data"].astype(object)) return vars_list, np.column_stack(data_list) @property def features(self) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]: """Get the feature names and data. Returns: A tuple containing feature names and feature data. Raises: ValueError: If no features have been set. """ if self._features is None: raise ValueError("Converter has no features data.") return self._features["names"], self._features["data"] @property def metas(self) -> list[dict[str, Any]]: """Get the list of metadata variable definitions. Returns: List of metadata variable definitions. """ return self._metas
[docs] def get_meta(self, name: str) -> Any: for meta in self.metas: if meta.get("name") == name: return meta.get("data") return None
@property def targets(self) -> list[dict[str, Any]]: """Get the list of target variable definitions. Returns: List of target variable definitions. """ return self._targets
[docs] def to_table(self) -> Table: """Construct and return the Orange Table. The table will have: - features: spectral intensities as ContinuousVariables. - metas: metadata variables (e.g., motor positions, filenames). - attributes: feature names as ContinuousVariables. Targets included if added via `add_target`, but typically not used in spectroscopic data. Returns: An Orange.data.Table object. """ feature_vars: list[Variable] = [] if self._features is not None: for val in self._features["names"]: var = ContinuousVariable(f"{val:.6f}") # Store original value for recovery. var.raw_name = val # type: ignore feature_vars.append(var) meta_vars, meta_data = self._process_columns(self._metas) target_vars, target_data = self._process_columns(self._targets) domain = Domain(feature_vars, target_vars, meta_vars) if self._features is not None: X = self._features["data"].astype(np.float64) else: n_samples = 0 if self._metas: n_samples = len(self._metas[0]["data"]) elif self._targets: n_samples = len(self._targets[0]["data"]) X = np.empty((n_samples, 0)) return Table.from_numpy( domain, X=X, Y=target_data if target_data.size > 0 else None, metas=meta_data if meta_data.size > 0 else None, )
[docs] @classmethod def from_table(cls, table: Table) -> Converter: """Create a Converter instance from an existing Orange Table. Extracts features, targets, and metadata from the table. Feature names are recovered from the table domain attributes. Args: table: The source Orange Table. Returns: A Converter instance containing data from the table. Raises: ValueError: If the table has no features. """ feature_names: list[float | str] = [] for var in table.domain.attributes: # Use raw_name if available to get exact float value, else parse name. val = getattr(var, "raw_name", var.name) try: feature_names.append(float(val)) except (ValueError, TypeError): feature_names.append(val) converter = cls() if feature_names: converter.add_features(np.array(feature_names), table.X) if table.domain.class_vars: y_data = table.Y if y_data.ndim == 1: y_data = y_data.reshape(-1, 1) for i, var in enumerate(table.domain.class_vars): if isinstance(var, DiscreteVariable): converter.add_target( var.name, y_data[:, i], var_type=type(var), values=var.values ) # type: ignore else: converter.add_target(var.name, y_data[:, i], var_type=type(var)) # type: ignore if table.domain.metas: for i, var in enumerate(table.domain.metas): converter.add_meta(var.name, table.metas[:, i], var_type=type(var)) # type: ignore return converter
[docs] def main(): x = np.linspace(0, 10, 100) SPECTRA_PARAMETERS = [ (5.0, 0.5, 3.0), (2.0, 1.0, 5.0), (8.0, 0.8, 4.0), (1.5, 1.5, 6.0), (3.5, 0.6, 3.5), ] # Create spectra with Gaussian-like peaks of different intensities and widths. spectra: npt.NDArray[np.float64] = np.array([]) for intensity, width, center in SPECTRA_PARAMETERS: spectrum = intensity * np.exp(-((x - center) ** 2) / (2 * width**2)) + 0.5 spectra = ( np.append(spectra, [spectrum], axis=0) if spectra.size else np.array([spectrum]) ) areas = np.asarray(np.trapezoid(spectra, x)) maxima = np.max(spectra, axis=1) table = ( Converter() .add_features(x, spectra) .add_meta("Area", areas) .add_meta("Maximum", maxima) .to_table() ) converter = Converter.from_table(table) if converter._features is not None: feature_names, feature_data = converter.features print(f"Feature names shape: {feature_names.shape}") print(f"Feature data shape: {feature_data.shape}") metas = converter.metas print(f"Metas: {metas}") quality_scores = np.random.default_rng().random(len(spectra)) locations = np.asarray(["root", "leaf", "stem", "leaf", "root"]) table = ( Converter() .add_features(x, spectra) .add_target("QualityScore", quality_scores) .add_meta("Area", areas) .add_meta("Location", locations, var_type=StringVariable) .to_table() )
if __name__ == "__main__": main()