diff --git a/flixopt/calculation.py b/flixopt/calculation.py index c7367cad2..03cf8b9a6 100644 --- a/flixopt/calculation.py +++ b/flixopt/calculation.py @@ -183,8 +183,8 @@ def solve(self, solver: _Solver, log_file: Optional[pathlib.Path] = None, log_ma def _activate_time_series(self): self.flow_system.transform_data() - self.flow_system.time_series_collection.activate_timesteps( - active_timesteps=self.active_timesteps, + self.flow_system.time_series_collection.set_selection( + timesteps=self.active_timesteps ) @@ -217,6 +217,8 @@ def __init__( list with indices, which should be used for calculation. If None, then all timesteps are used. folder: folder where results should be saved. If None, then the current working directory is used. """ + if flow_system.time_series_collection.scenarios is not None: + raise ValueError('Aggregation is not supported for scenarios yet. Please use FullCalculation instead.') super().__init__(name, flow_system, active_timesteps, folder=folder) self.aggregation_parameters = aggregation_parameters self.components_to_clusterize = components_to_clusterize @@ -272,9 +274,9 @@ def _perform_aggregation(self): # Aggregation - creation of aggregated timeseries: self.aggregation = Aggregation( - original_data=self.flow_system.time_series_collection.to_dataframe( - include_extra_timestep=False - ), # Exclude last row (NaN) + original_data=self.flow_system.time_series_collection.as_dataset( + with_extra_timestep=False, with_constants=False + ).to_dataframe(), hours_per_time_step=float(dt_min), hours_per_period=self.aggregation_parameters.hours_per_period, nr_of_periods=self.aggregation_parameters.nr_of_periods, @@ -286,9 +288,11 @@ def _perform_aggregation(self): self.aggregation.cluster() self.aggregation.plot(show=True, save=self.folder / 'aggregation.html') if self.aggregation_parameters.aggregate_data_and_fix_non_binary_vars: - self.flow_system.time_series_collection.insert_new_data( - self.aggregation.aggregated_data, include_extra_timestep=False - ) + for col in self.aggregation.aggregated_data.columns: + data = self.aggregation.aggregated_data[col].values + if col in self.flow_system.time_series_collection._has_extra_timestep: + data = np.append(data, data[-1]) + self.flow_system.time_series_collection.update_time_series(col, data) self.durations['aggregation'] = round(timeit.default_timer() - t_start_agg, 2) @@ -327,8 +331,8 @@ def __init__( self.nr_of_previous_values = nr_of_previous_values self.sub_calculations: List[FullCalculation] = [] - self.all_timesteps = self.flow_system.time_series_collection.all_timesteps - self.all_timesteps_extra = self.flow_system.time_series_collection.all_timesteps_extra + self.all_timesteps = self.flow_system.time_series_collection._full_timesteps + self.all_timesteps_extra = self.flow_system.time_series_collection._full_timesteps_extra self.segment_names = [ f'Segment_{i + 1}' for i in range(math.ceil(len(self.all_timesteps) / self.timesteps_per_segment)) diff --git a/flixopt/components.py b/flixopt/components.py index d5d1df12d..2a69c6165 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -194,12 +194,12 @@ def transform_data(self, flow_system: 'FlowSystem') -> None: self.relative_minimum_charge_state = flow_system.create_time_series( f'{self.label_full}|relative_minimum_charge_state', self.relative_minimum_charge_state, - needs_extra_timestep=True, + has_extra_timestep=True, ) self.relative_maximum_charge_state = flow_system.create_time_series( f'{self.label_full}|relative_maximum_charge_state', self.relative_maximum_charge_state, - needs_extra_timestep=True, + has_extra_timestep=True, ) self.eta_charge = flow_system.create_time_series(f'{self.label_full}|eta_charge', self.eta_charge) self.eta_discharge = flow_system.create_time_series(f'{self.label_full}|eta_discharge', self.eta_discharge) @@ -342,7 +342,7 @@ def __init__(self, model: SystemModel, element: Transmission): def do_modeling(self): """Initiates all FlowModels""" # Force On Variable if absolute losses are present - if (self.element.absolute_losses is not None) and np.any(self.element.absolute_losses.active_data != 0): + if (self.element.absolute_losses is not None) and np.any(self.element.absolute_losses.selected_data != 0): for flow in self.element.inputs + self.element.outputs: if flow.on_off_parameters is None: flow.on_off_parameters = OnOffParameters() @@ -379,14 +379,14 @@ def create_transmission_equation(self, name: str, in_flow: Flow, out_flow: Flow) # eq: out(t) + on(t)*loss_abs(t) = in(t)*(1 - loss_rel(t)) con_transmission = self.add( self._model.add_constraints( - out_flow.model.flow_rate == -in_flow.model.flow_rate * (self.element.relative_losses.active_data - 1), + out_flow.model.flow_rate == -in_flow.model.flow_rate * (self.element.relative_losses.selected_data - 1), name=f'{self.label_full}|{name}', ), name, ) if self.element.absolute_losses is not None: - con_transmission.lhs += in_flow.model.on_off.on * self.element.absolute_losses.active_data + con_transmission.lhs += in_flow.model.on_off.on * self.element.absolute_losses.selected_data return con_transmission @@ -413,8 +413,8 @@ def do_modeling(self): self.add( self._model.add_constraints( - sum([flow.model.flow_rate * conv_factors[flow.label].active_data for flow in used_inputs]) - == sum([flow.model.flow_rate * conv_factors[flow.label].active_data for flow in used_outputs]), + sum([flow.model.flow_rate * conv_factors[flow.label].selected_data for flow in used_inputs]) + == sum([flow.model.flow_rate * conv_factors[flow.label].selected_data for flow in used_outputs]), name=f'{self.label_full}|conversion_{i}', ) ) @@ -474,12 +474,12 @@ def do_modeling(self): ) charge_state = self.charge_state - rel_loss = self.element.relative_loss_per_hour.active_data + rel_loss = self.element.relative_loss_per_hour.selected_data hours_per_step = self._model.hours_per_step charge_rate = self.element.charging.model.flow_rate discharge_rate = self.element.discharging.model.flow_rate - eff_charge = self.element.eta_charge.active_data - eff_discharge = self.element.eta_discharge.active_data + eff_charge = self.element.eta_charge.selected_data + eff_discharge = self.element.eta_discharge.selected_data self.add( self._model.add_constraints( @@ -565,8 +565,8 @@ def absolute_charge_state_bounds(self) -> Tuple[NumericData, NumericData]: @property def relative_charge_state_bounds(self) -> Tuple[NumericData, NumericData]: return ( - self.element.relative_minimum_charge_state.active_data, - self.element.relative_maximum_charge_state.active_data, + self.element.relative_minimum_charge_state.selected_data, + self.element.relative_maximum_charge_state.selected_data, ) diff --git a/flixopt/core.py b/flixopt/core.py index 379828554..d2a8edd59 100644 --- a/flixopt/core.py +++ b/flixopt/core.py @@ -7,6 +7,7 @@ import json import logging import pathlib +import textwrap from collections import Counter from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union @@ -40,56 +41,563 @@ class ConversionError(Exception): class DataConverter: """ - Converts various data types into xarray.DataArray with a timesteps index. - - Supports: scalars, arrays, Series, DataFrames, and DataArrays. + Converts various data types into xarray.DataArray with timesteps and optional scenarios dimensions. + + Supports: + - Scalar values (broadcast to all timesteps/scenarios) + - 1D arrays (mapped to timesteps, broadcast to scenarios if provided) + - 2D arrays (mapped to scenarios × timesteps if dimensions match) + - Series with time index (broadcast to scenarios if provided) + - DataFrames with time index and a single column (broadcast to scenarios if provided) + - Series/DataFrames with MultiIndex (scenario, time) + - Existing DataArrays """ + #TODO: Allow DataFrame with scenarios as columns + @staticmethod - def as_dataarray(data: NumericData, timesteps: pd.DatetimeIndex) -> xr.DataArray: - """Convert data to xarray.DataArray with specified timesteps index.""" - if not isinstance(timesteps, pd.DatetimeIndex) or len(timesteps) == 0: - raise ValueError(f'Timesteps must be a non-empty DatetimeIndex, got {type(timesteps).__name__}') - if not timesteps.name == 'time': - raise ConversionError(f'DatetimeIndex is not named correctly. Must be named "time", got {timesteps.name=}') + def as_dataarray( + data: NumericData, timesteps: pd.DatetimeIndex, scenarios: Optional[pd.Index] = None + ) -> xr.DataArray: + """ + Convert data to xarray.DataArray with specified timesteps and optional scenarios dimensions. + + Args: + data: The data to convert (scalar, array, Series, DataFrame, or DataArray) + timesteps: DatetimeIndex representing the time dimension (must be named 'time') + scenarios: Optional Index representing scenarios (must be named 'scenario') + + Returns: + DataArray with the converted data + + Raises: + ValueError: If timesteps or scenarios are invalid + ConversionError: If the data cannot be converted to the expected dimensions + """ + # Validate inputs + DataConverter._validate_timesteps(timesteps) + if scenarios is not None: + DataConverter._validate_scenarios(scenarios) - coords = [timesteps] - dims = ['time'] - expected_shape = (len(timesteps),) + # Determine dimensions and coordinates + coords, dims, expected_shape = DataConverter._get_dimensions(timesteps, scenarios) try: + # Convert different data types using specialized methods if isinstance(data, (int, float, np.integer, np.floating)): - return xr.DataArray(data, coords=coords, dims=dims) + return DataConverter._convert_scalar(data, coords, dims) + elif isinstance(data, pd.DataFrame): - if not data.index.equals(timesteps): - raise ConversionError("DataFrame index doesn't match timesteps index") - if not len(data.columns) == 1: - raise ConversionError('DataFrame must have exactly one column') - return xr.DataArray(data.values.flatten(), coords=coords, dims=dims) + return DataConverter._convert_dataframe(data, timesteps, scenarios, coords, dims) + elif isinstance(data, pd.Series): - if not data.index.equals(timesteps): - raise ConversionError("Series index doesn't match timesteps index") - return xr.DataArray(data.values, coords=coords, dims=dims) + return DataConverter._convert_series(data, timesteps, scenarios, coords, dims) + elif isinstance(data, np.ndarray): - if data.ndim != 1: - raise ConversionError(f'Array must be 1-dimensional, got {data.ndim}') - elif data.shape[0] != expected_shape[0]: - raise ConversionError(f"Array shape {data.shape} doesn't match expected {expected_shape}") - return xr.DataArray(data, coords=coords, dims=dims) + return DataConverter._convert_ndarray(data, timesteps, scenarios, coords, dims, expected_shape) + elif isinstance(data, xr.DataArray): - if data.dims != tuple(dims): - raise ConversionError(f"DataArray dimensions {data.dims} don't match expected {dims}") - if data.sizes[dims[0]] != len(coords[0]): - raise ConversionError( - f"DataArray length {data.sizes[dims[0]]} doesn't match expected {len(coords[0])}" - ) - return data.copy(deep=True) + return DataConverter._convert_dataarray(data, timesteps, scenarios, coords, dims) + else: raise ConversionError(f'Unsupported type: {type(data).__name__}') + except Exception as e: if isinstance(e, ConversionError): raise - raise ConversionError(f'Converting data {type(data)} to xarray.Dataset raised an error: {str(e)}') from e + raise ConversionError(f'Converting {type(data)} to DataArray raised an error: {str(e)}') from e + + @staticmethod + def _validate_timesteps(timesteps: pd.DatetimeIndex) -> None: + """ + Validate that timesteps is a properly named non-empty DatetimeIndex. + + Args: + timesteps: The DatetimeIndex to validate + + Raises: + ValueError: If timesteps is not a non-empty DatetimeIndex + ConversionError: If timesteps is not named 'time' + """ + if not isinstance(timesteps, pd.DatetimeIndex) or len(timesteps) == 0: + raise ValueError(f'Timesteps must be a non-empty DatetimeIndex, got {type(timesteps).__name__}') + if timesteps.name != 'time': + raise ConversionError(f'DatetimeIndex must be named "time", got {timesteps.name=}') + + @staticmethod + def _validate_scenarios(scenarios: pd.Index) -> None: + """ + Validate that scenarios is a properly named non-empty Index. + + Args: + scenarios: The Index to validate + + Raises: + ValueError: If scenarios is not a non-empty Index + ConversionError: If scenarios is not named 'scenario' + """ + if not isinstance(scenarios, pd.Index) or len(scenarios) == 0: + raise ValueError(f'Scenarios must be a non-empty Index, got {type(scenarios).__name__}') + if scenarios.name != 'scenario': + raise ConversionError(f'Scenarios Index must be named "scenario", got {scenarios.name=}') + + @staticmethod + def _get_dimensions( + timesteps: pd.DatetimeIndex, scenarios: Optional[pd.Index] = None + ) -> Tuple[Dict[str, pd.Index], Tuple[str, ...], Tuple[int, ...]]: + """ + Create the coordinates, dimensions, and expected shape for the output DataArray. + + Args: + timesteps: The time index + scenarios: Optional scenario index + + Returns: + Tuple containing: + - Dict mapping dimension names to coordinate indexes + - Tuple of dimension names + - Tuple of expected shape + """ + if scenarios is not None: + coords = {'scenario': scenarios, 'time': timesteps} + dims = ('scenario', 'time') + expected_shape = (len(scenarios), len(timesteps)) + else: + coords = {'time': timesteps} + dims = ('time',) + expected_shape = (len(timesteps),) + + return coords, dims, expected_shape + + @staticmethod + def _convert_scalar( + data: Union[int, float, np.integer, np.floating], coords: Dict[str, pd.Index], dims: Tuple[str, ...] + ) -> xr.DataArray: + """ + Convert a scalar value to a DataArray. + + Args: + data: The scalar value to convert + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray with the scalar value broadcast to all coordinates + """ + return xr.DataArray(data, coords=coords, dims=dims) + + @staticmethod + def _convert_dataframe( + df: pd.DataFrame, + timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index], + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Convert a DataFrame to a DataArray. + + Args: + df: The DataFrame to convert + timesteps: The time index + scenarios: Optional scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray created from the DataFrame + + Raises: + ConversionError: If the DataFrame cannot be converted to the expected dimensions + """ + # Case 1: DataFrame with MultiIndex (scenario, time) + if ( + isinstance(df.index, pd.MultiIndex) + and len(df.index.names) == 2 + and 'scenario' in df.index.names + and 'time' in df.index.names + and scenarios is not None + ): + return DataConverter._convert_multi_index_dataframe(df, timesteps, scenarios, coords, dims) + + # Case 2: Standard DataFrame with time index + elif not isinstance(df.index, pd.MultiIndex): + return DataConverter._convert_standard_dataframe(df, timesteps, scenarios, coords, dims) + + else: + raise ConversionError(f'Unsupported DataFrame index structure: {df}') + + @staticmethod + def _convert_multi_index_dataframe( + df: pd.DataFrame, + timesteps: pd.DatetimeIndex, + scenarios: pd.Index, + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Convert a DataFrame with MultiIndex (scenario, time) to a DataArray. + + Args: + df: The DataFrame with MultiIndex to convert + timesteps: The time index + scenarios: The scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray created from the MultiIndex DataFrame + + Raises: + ConversionError: If the DataFrame's index doesn't match expected or has multiple columns + """ + # Validate that the index contains the expected values + if not set(df.index.get_level_values('time')).issubset(set(timesteps)): + raise ConversionError("DataFrame time index doesn't match or isn't a subset of timesteps") + if not set(df.index.get_level_values('scenario')).issubset(set(scenarios)): + raise ConversionError("DataFrame scenario index doesn't match or isn't a subset of scenarios") + + # Ensure single column + if len(df.columns) != 1: + raise ConversionError('DataFrame must have exactly one column') + + # Reindex to ensure complete coverage and correct order + multi_idx = pd.MultiIndex.from_product([scenarios, timesteps], names=['scenario', 'time']) + reindexed = df.reindex(multi_idx).iloc[:, 0] + + # Reshape to 2D array + reshaped = reindexed.values.reshape(len(scenarios), len(timesteps)) + return xr.DataArray(reshaped, coords=coords, dims=dims) + + @staticmethod + def _convert_standard_dataframe( + df: pd.DataFrame, + timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index], + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Convert a standard DataFrame with time index to a DataArray. + + Args: + df: The DataFrame to convert + timesteps: The time index + scenarios: Optional scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray created from the DataFrame + + Raises: + ConversionError: If the DataFrame's index doesn't match timesteps or has multiple columns + """ + if not df.index.equals(timesteps): + raise ConversionError("DataFrame index doesn't match timesteps index") + if len(df.columns) != 1: + raise ConversionError('DataFrame must have exactly one column') + + # Get values + values = df.values.flatten() + + if scenarios is not None: + # Broadcast to scenarios dimension + values = np.tile(values, (len(scenarios), 1)) + + return xr.DataArray(values, coords=coords, dims=dims) + + @staticmethod + def _convert_series( + series: pd.Series, + timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index], + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Convert a Series to a DataArray. + + Args: + series: The Series to convert + timesteps: The time index + scenarios: Optional scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray created from the Series + + Raises: + ConversionError: If the Series cannot be converted to the expected dimensions + """ + # Case 1: Series with MultiIndex (scenario, time) + if ( + isinstance(series.index, pd.MultiIndex) + and len(series.index.names) == 2 + and 'scenario' in series.index.names + and 'time' in series.index.names + and scenarios is not None + ): + return DataConverter._convert_multi_index_series(series, timesteps, scenarios, coords, dims) + + # Case 2: Standard Series with time index + elif not isinstance(series.index, pd.MultiIndex): + return DataConverter._convert_standard_series(series, timesteps, scenarios, coords, dims) + + else: + raise ConversionError('Unsupported Series index structure') + + @staticmethod + def _convert_multi_index_series( + series: pd.Series, + timesteps: pd.DatetimeIndex, + scenarios: pd.Index, + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Convert a Series with MultiIndex (scenario, time) to a DataArray. + + Args: + series: The Series with MultiIndex to convert + timesteps: The time index + scenarios: The scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray created from the MultiIndex Series + + Raises: + ConversionError: If the Series' index doesn't match expected + """ + # Validate that the index contains the expected values + if not set(series.index.get_level_values('time')).issubset(set(timesteps)): + raise ConversionError("Series time index doesn't match or isn't a subset of timesteps") + if not set(series.index.get_level_values('scenario')).issubset(set(scenarios)): + raise ConversionError("Series scenario index doesn't match or isn't a subset of scenarios") + + # Reindex to ensure complete coverage and correct order + multi_idx = pd.MultiIndex.from_product([scenarios, timesteps], names=['scenario', 'time']) + reindexed = series.reindex(multi_idx) + + # Reshape to 2D array + reshaped = reindexed.values.reshape(len(scenarios), len(timesteps)) + return xr.DataArray(reshaped, coords=coords, dims=dims) + + @staticmethod + def _convert_standard_series( + series: pd.Series, + timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index], + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Convert a standard Series with time index to a DataArray. + + Args: + series: The Series to convert + timesteps: The time index + scenarios: Optional scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray created from the Series + + Raises: + ConversionError: If the Series' index doesn't match timesteps + """ + if not series.index.equals(timesteps): + raise ConversionError("Series index doesn't match timesteps index") + + # Get values + values = series.values + + if scenarios is not None: + # Broadcast to scenarios dimension + values = np.tile(values, (len(scenarios), 1)) + + return xr.DataArray(values, coords=coords, dims=dims) + + @staticmethod + def _convert_ndarray( + arr: np.ndarray, + timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index], + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + expected_shape: Tuple[int, ...], + ) -> xr.DataArray: + """ + Convert a numpy array to a DataArray. + + Args: + arr: The numpy array to convert + timesteps: The time index + scenarios: Optional scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + expected_shape: Expected shape of the resulting array + + Returns: + DataArray created from the numpy array + + Raises: + ConversionError: If the array cannot be converted to the expected dimensions + """ + # Case 1: With scenarios - array can be 1D or 2D + if scenarios is not None: + return DataConverter._convert_ndarray_with_scenarios( + arr, timesteps, scenarios, coords, dims, expected_shape + ) + + # Case 2: Without scenarios - array must be 1D + else: + return DataConverter._convert_ndarray_without_scenarios(arr, timesteps, coords, dims) + + @staticmethod + def _convert_ndarray_with_scenarios( + arr: np.ndarray, + timesteps: pd.DatetimeIndex, + scenarios: pd.Index, + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + expected_shape: Tuple[int, ...], + ) -> xr.DataArray: + """ + Convert a numpy array to a DataArray with scenarios dimension. + + Args: + arr: The numpy array to convert + timesteps: The time index + scenarios: The scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + expected_shape: Expected shape (scenarios, timesteps) + + Returns: + DataArray created from the numpy array + + Raises: + ConversionError: If the array dimensions don't match expected + """ + if arr.ndim == 1: + # 1D array should match timesteps and be broadcast to scenarios + if arr.shape[0] != len(timesteps): + raise ConversionError(f"1D array length {arr.shape[0]} doesn't match timesteps length {len(timesteps)}") + # Broadcast to scenarios + values = np.tile(arr, (len(scenarios), 1)) + return xr.DataArray(values, coords=coords, dims=dims) + + elif arr.ndim == 2: + # 2D array should match (scenarios, timesteps) + if arr.shape != expected_shape: + raise ConversionError(f"2D array shape {arr.shape} doesn't match expected shape {expected_shape}") + return xr.DataArray(arr, coords=coords, dims=dims) + + else: + raise ConversionError(f'Array must be 1D or 2D, got {arr.ndim}D') + + @staticmethod + def _convert_ndarray_without_scenarios( + arr: np.ndarray, timesteps: pd.DatetimeIndex, coords: Dict[str, pd.Index], dims: Tuple[str, ...] + ) -> xr.DataArray: + """ + Convert a numpy array to a DataArray without scenarios dimension. + + Args: + arr: The numpy array to convert + timesteps: The time index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray created from the numpy array + + Raises: + ConversionError: If the array isn't 1D or doesn't match timesteps length + """ + if arr.ndim != 1: + raise ConversionError(f'Without scenarios, array must be 1D, got {arr.ndim}D') + if arr.shape[0] != len(timesteps): + raise ConversionError(f"Array shape {arr.shape} doesn't match expected length {len(timesteps)}") + return xr.DataArray(arr, coords=coords, dims=dims) + + @staticmethod + def _convert_dataarray( + da: xr.DataArray, + timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index], + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Convert an existing DataArray to a new DataArray with the desired dimensions. + + Args: + da: The DataArray to convert + timesteps: The time index + scenarios: Optional scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + New DataArray with the specified coordinates and dimensions + + Raises: + ConversionError: If the DataArray dimensions don't match expected + """ + # Case 1: DataArray with only time dimension when scenarios are provided + if scenarios is not None and set(da.dims) == {'time'}: + return DataConverter._broadcast_time_only_dataarray(da, timesteps, scenarios, coords, dims) + + # Case 2: DataArray dimensions should match expected + elif set(da.dims) != set(dims): + raise ConversionError(f"DataArray dimensions {da.dims} don't match expected {dims}") + + # Validate dimensions sizes + for dim in dims: + if not np.array_equal(da.coords[dim].values, coords[dim].values): + raise ConversionError(f"DataArray dimension '{dim}' doesn't match expected {coords[dim]}") + + # Create a new DataArray with our coordinates to ensure consistency + result = xr.DataArray(da.values.copy(), coords=coords, dims=dims) + return result + + @staticmethod + def _broadcast_time_only_dataarray( + da: xr.DataArray, + timesteps: pd.DatetimeIndex, + scenarios: pd.Index, + coords: Dict[str, pd.Index], + dims: Tuple[str, ...], + ) -> xr.DataArray: + """ + Broadcast a time-only DataArray to include the scenarios dimension. + + Args: + da: The DataArray with only time dimension + timesteps: The time index + scenarios: The scenario index + coords: Dictionary mapping dimension names to coordinate indexes + dims: Tuple of dimension names + + Returns: + DataArray with the data broadcast to include scenarios dimension + + Raises: + ConversionError: If the DataArray time coordinates aren't compatible with timesteps + """ + # Ensure the time dimension is compatible + if not np.array_equal(da.coords['time'].values, timesteps.values): + raise ConversionError("DataArray time coordinates aren't compatible with timesteps") + + # Broadcast to scenarios + values = np.tile(da.values.copy(), (len(scenarios), 1)) + return xr.DataArray(values, coords=coords, dims=dims) class TimeSeriesData: @@ -146,18 +654,19 @@ class TimeSeries: name (str): The name of the time series aggregation_weight (Optional[float]): Weight used for aggregation aggregation_group (Optional[str]): Group name for shared aggregation weighting - needs_extra_timestep (bool): Whether this series needs an extra timestep + has_extra_timestep (bool): Whether this series needs an extra timestep """ @classmethod def from_datasource( cls, - data: NumericData, + data: NumericDataTS, name: str, timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index] = None, aggregation_weight: Optional[float] = None, aggregation_group: Optional[str] = None, - needs_extra_timestep: bool = False, + has_extra_timestep: bool = False, ) -> 'TimeSeries': """ Initialize the TimeSeries from multiple data sources. @@ -166,19 +675,20 @@ def from_datasource( data: The time series data name: The name of the TimeSeries timesteps: The timesteps of the TimeSeries + scenarios: The scenarios of the TimeSeries aggregation_weight: The weight in aggregation calculations aggregation_group: Group this TimeSeries belongs to for aggregation weight sharing - needs_extra_timestep: Whether this series requires an extra timestep + has_extra_timestep: Whether this series requires an extra timestep Returns: A new TimeSeries instance """ return cls( - DataConverter.as_dataarray(data, timesteps), + DataConverter.as_dataarray(data, timesteps, scenarios), name, aggregation_weight, aggregation_group, - needs_extra_timestep, + has_extra_timestep, ) @classmethod @@ -212,7 +722,7 @@ def from_json(cls, data: Optional[Dict[str, Any]] = None, path: Optional[str] = name=data['name'], aggregation_weight=data['aggregation_weight'], aggregation_group=data['aggregation_group'], - needs_extra_timestep=data['needs_extra_timestep'], + has_extra_timestep=data['has_extra_timestep'], ) def __init__( @@ -221,7 +731,7 @@ def __init__( name: str, aggregation_weight: Optional[float] = None, aggregation_group: Optional[str] = None, - needs_extra_timestep: bool = False, + has_extra_timestep: bool = False, ): """ Initialize a TimeSeries with a DataArray. @@ -231,35 +741,42 @@ def __init__( name: The name of the TimeSeries aggregation_weight: The weight in aggregation calculations aggregation_group: Group this TimeSeries belongs to for weight sharing - needs_extra_timestep: Whether this series requires an extra timestep + has_extra_timestep: Whether this series requires an extra timestep Raises: - ValueError: If data doesn't have a 'time' index or has more than 1 dimension + ValueError: If data doesn't have a 'time' index or has unsupported dimensions """ if 'time' not in data.indexes: raise ValueError(f'DataArray must have a "time" index. Got {data.indexes}') - if data.ndim > 1: - raise ValueError(f'Number of dimensions of DataArray must be 1. Got {data.ndim}') + + allowed_dims = {'time', 'scenario'} + if not set(data.dims).issubset(allowed_dims): + raise ValueError(f'DataArray dimensions must be subset of {allowed_dims}. Got {data.dims}') self.name = name self.aggregation_weight = aggregation_weight self.aggregation_group = aggregation_group - self.needs_extra_timestep = needs_extra_timestep + self.has_extra_timestep = has_extra_timestep # Data management self._stored_data = data.copy(deep=True) self._backup = self._stored_data.copy(deep=True) - self._active_timesteps = self._stored_data.indexes['time'] - self._active_data = None - self._update_active_data() - def reset(self): + # Selection state + self._selected_timesteps: Optional[pd.DatetimeIndex] = None + self._selected_scenarios: Optional[pd.Index] = None + + # Flag for whether this series has scenarios + self._has_scenarios = 'scenario' in data.dims + + def reset(self) -> None: """ - Reset active timesteps to the full set of stored timesteps. + Reset selections to include all timesteps and scenarios. + This is equivalent to clearing all selections. """ - self.active_timesteps = None + self.clear_selection() - def restore_data(self): + def restore_data(self) -> None: """ Restore stored_data from the backup and reset active timesteps. """ @@ -280,8 +797,8 @@ def to_json(self, path: Optional[pathlib.Path] = None) -> Dict[str, Any]: 'name': self.name, 'aggregation_weight': self.aggregation_weight, 'aggregation_group': self.aggregation_group, - 'needs_extra_timestep': self.needs_extra_timestep, - 'data': self.active_data.to_dict(), + 'has_extra_timestep': self.has_extra_timestep, + 'data': self.selected_data.to_dict(), } # Convert datetime objects to ISO strings @@ -303,84 +820,100 @@ def stats(self) -> str: Returns: String representation of data statistics """ - return get_numeric_stats(self.active_data, padd=0) - - def _update_active_data(self): - """ - Update the active data based on active_timesteps. - """ - self._active_data = self._stored_data.sel(time=self.active_timesteps) + return get_numeric_stats(self.selected_data, padd=0, by_scenario=True) @property def all_equal(self) -> bool: """Check if all values in the series are equal.""" - return np.unique(self.active_data.values).size == 1 + return np.unique(self.selected_data.values).size == 1 @property - def active_timesteps(self) -> pd.DatetimeIndex: - """Get the current active timesteps.""" - return self._active_timesteps - - @active_timesteps.setter - def active_timesteps(self, timesteps: Optional[pd.DatetimeIndex]): + def selected_data(self) -> xr.DataArray: """ - Set active_timesteps and refresh active_data. - - Args: - timesteps: New timesteps to activate, or None to use all stored timesteps - - Raises: - TypeError: If timesteps is not a pandas DatetimeIndex or None + Get a view of stored_data based on current selections. + This computes the view dynamically based on the current selection state. """ - if timesteps is None: - self._active_timesteps = self.stored_data.indexes['time'] - elif isinstance(timesteps, pd.DatetimeIndex): - self._active_timesteps = timesteps - else: - raise TypeError('active_timesteps must be a pandas DatetimeIndex or None') + return self._stored_data.sel(**self._valid_selector) - self._update_active_data() + @property + def active_timesteps(self) -> pd.DatetimeIndex: + """Get the current active timesteps.""" + if self._selected_timesteps is None: + return self._stored_data.indexes['time'] + return self._selected_timesteps @property - def active_data(self) -> xr.DataArray: - """Get a view of stored_data based on active_timesteps.""" - return self._active_data + def active_scenarios(self) -> Optional[pd.Index]: + """Get the current active scenarios.""" + if not self._has_scenarios: + return None + if self._selected_scenarios is None: + return self._stored_data.indexes['scenario'] + return self._selected_scenarios @property def stored_data(self) -> xr.DataArray: """Get a copy of the full stored data.""" return self._stored_data.copy() - @stored_data.setter - def stored_data(self, value: NumericData): + def update_stored_data(self, value: xr.DataArray) -> None: """ - Update stored_data and refresh active_data. + Update stored_data and refresh selected_data. Args: value: New data to store """ - new_data = DataConverter.as_dataarray(value, timesteps=self.active_timesteps) + new_data = DataConverter.as_dataarray( + value, + timesteps=self.active_timesteps, + scenarios=self.active_scenarios if self._has_scenarios else None + ) # Skip if data is unchanged to avoid overwriting backup if new_data.equals(self._stored_data): return self._stored_data = new_data - self.active_timesteps = None # Reset to full timeline + self.clear_selection() # Reset selections to full dataset + + def clear_selection(self, timesteps: bool = True, scenarios: bool = True) -> None: + if timesteps: + self._selected_timesteps = None + if scenarios: + self._selected_scenarios = None + + def set_selection(self, timesteps: Optional[pd.DatetimeIndex] = None, scenarios: Optional[pd.Index] = None) -> None: + if timesteps is None: + self.clear_selection(timesteps=True, scenarios=False) + else: + self._selected_timesteps = timesteps + + if scenarios is None: + self.clear_selection(timesteps=False, scenarios=True) + else: + self._selected_scenarios = scenarios @property def sel(self): - return self.active_data.sel + """Direct access to the selected_data's sel method for convenience.""" + return self.selected_data.sel @property def isel(self): - return self.active_data.isel + """Direct access to the selected_data's isel method for convenience.""" + return self.selected_data.isel + + @property + def _valid_selector(self) -> Dict[str, pd.Index]: + """Get the current selection as a dictionary.""" + full_selection = {'time': self._selected_timesteps, 'scenario': self._selected_scenarios} + return {dim: sel for dim, sel in full_selection.items() if dim in self._stored_data.dims and sel is not None} def _apply_operation(self, other, op): """Apply an operation between this TimeSeries and another object.""" if isinstance(other, TimeSeries): - other = other.active_data - return op(self.active_data, other) + other = other.selected_data + return op(self.selected_data, other) def __add__(self, other): return self._apply_operation(other, lambda x, y: x + y) @@ -395,25 +928,25 @@ def __truediv__(self, other): return self._apply_operation(other, lambda x, y: x / y) def __radd__(self, other): - return other + self.active_data + return other + self.selected_data def __rsub__(self, other): - return other - self.active_data + return other - self.selected_data def __rmul__(self, other): - return other * self.active_data + return other * self.selected_data def __rtruediv__(self, other): - return other / self.active_data + return other / self.selected_data def __neg__(self) -> xr.DataArray: - return -self.active_data + return -self.selected_data def __pos__(self) -> xr.DataArray: - return +self.active_data + return +self.selected_data def __abs__(self) -> xr.DataArray: - return abs(self.active_data) + return abs(self.selected_data) def __gt__(self, other): """ @@ -426,7 +959,7 @@ def __gt__(self, other): True if all values in this TimeSeries are greater than other """ if isinstance(other, TimeSeries): - return (self.active_data > other.active_data).all().item() + return (self.selected_data > other.selected_data).all().item() return NotImplemented def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): @@ -435,8 +968,8 @@ def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): This allows NumPy functions to work with TimeSeries objects. """ - # Convert any TimeSeries inputs to their active_data - inputs = [x.active_data if isinstance(x, TimeSeries) else x for x in inputs] + # Convert any TimeSeries inputs to their selected_data + inputs = [x.selected_data if isinstance(x, TimeSeries) else x for x in inputs] return getattr(ufunc, method)(*inputs, **kwargs) def __repr__(self): @@ -450,10 +983,10 @@ def __repr__(self): 'name': self.name, 'aggregation_weight': self.aggregation_weight, 'aggregation_group': self.aggregation_group, - 'needs_extra_timestep': self.needs_extra_timestep, - 'shape': self.active_data.shape, - 'time_range': f'{self.active_timesteps[0]} to {self.active_timesteps[-1]}', + 'has_extra_timestep': self.has_extra_timestep, + 'shape': self.selected_data.shape, } + attr_str = ', '.join(f'{k}={repr(v)}' for k, v in attrs.items()) return f'TimeSeries({attr_str})' @@ -464,281 +997,329 @@ def __str__(self): Returns: Descriptive string with statistics """ - return f"TimeSeries '{self.name}': {self.stats}" + return f'TimeSeries "{self.name}":\n{textwrap.indent(self.stats, " ")}' class TimeSeriesCollection: """ - Collection of TimeSeries objects with shared timestep management. + Simplified central manager for time series data with reference tracking. - TimeSeriesCollection handles multiple TimeSeries objects with synchronized - timesteps, provides operations on collections, and manages extra timesteps. + Provides a way to store time series data and work with subsets of dimensions + that automatically update all references when changed. """ - def __init__( self, timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index] = None, hours_of_last_timestep: Optional[float] = None, hours_of_previous_timesteps: Optional[Union[float, np.ndarray]] = None, ): - """ - Args: - timesteps: The timesteps of the Collection. - hours_of_last_timestep: The duration of the last time step. Uses the last time interval if not specified - hours_of_previous_timesteps: The duration of previous timesteps. - If None, the first time increment of time_series is used. - This is needed to calculate previous durations (for example consecutive_on_hours). - If you use an array, take care that its long enough to cover all previous values! - """ - # Prepare and validate timesteps + """Initialize a TimeSeriesCollection.""" self._validate_timesteps(timesteps) self.hours_of_previous_timesteps = self._calculate_hours_of_previous_timesteps( timesteps, hours_of_previous_timesteps - ) + ) #TODO: Make dynamic - # Set up timesteps and hours - self.all_timesteps = timesteps - self.all_timesteps_extra = self._create_timesteps_with_extra(timesteps, hours_of_last_timestep) - self.all_hours_per_timestep = self.calculate_hours_per_timestep(self.all_timesteps_extra) + self._full_timesteps = timesteps + self._full_timesteps_extra = self._create_timesteps_with_extra(timesteps, hours_of_last_timestep) + self._full_hours_per_timestep = self.calculate_hours_per_timestep(self._full_timesteps_extra) - # Active timestep tracking - self._active_timesteps = None - self._active_timesteps_extra = None - self._active_hours_per_timestep = None + self._full_scenarios = scenarios - # Dictionary of time series by name - self.time_series_data: Dict[str, TimeSeries] = {} + # Series that need extra timestep + self._has_extra_timestep: set = set() - # Aggregation - self.group_weights: Dict[str, float] = {} - self.weights: Dict[str, float] = {} + # Storage for TimeSeries objects + self._time_series: Dict[str, TimeSeries] = {} - @classmethod - def with_uniform_timesteps( - cls, start_time: pd.Timestamp, periods: int, freq: str, hours_per_step: Optional[float] = None - ) -> 'TimeSeriesCollection': - """Create a collection with uniform timesteps.""" - timesteps = pd.date_range(start_time, periods=periods, freq=freq, name='time') - return cls(timesteps, hours_of_previous_timesteps=hours_per_step) - - def create_time_series( - self, data: Union[NumericData, TimeSeriesData], name: str, needs_extra_timestep: bool = False + # Active subset selectors + self._selected_timesteps: Optional[pd.DatetimeIndex] = None + self._selected_scenarios: Optional[pd.Index] = None + self._selected_timesteps_extra: Optional[pd.DatetimeIndex] = None + self._selected_hours_per_timestep: Optional[xr.DataArray] = None + + def add_time_series( + self, + name: str, + data: Union[NumericDataTS, TimeSeries], + aggregation_weight: Optional[float] = None, + aggregation_group: Optional[str] = None, + has_extra_timestep: bool = False, ) -> TimeSeries: """ - Creates a TimeSeries from the given data and adds it to the collection. + Add a new TimeSeries to the allocator. Args: - data: The data to create the TimeSeries from. - name: The name of the TimeSeries. - needs_extra_timestep: Whether to create an additional timestep at the end of the timesteps. - The data to create the TimeSeries from. + name: Name of the time series + data: Data for the time series (can be raw data or an existing TimeSeries) + aggregation_weight: Weight used for aggregation + aggregation_group: Group name for shared aggregation weighting + has_extra_timestep: Whether this series needs an extra timestep Returns: - The created TimeSeries. - + The created TimeSeries object """ - # Check for duplicate name - if name in self.time_series_data: - raise ValueError(f"TimeSeries '{name}' already exists in this collection") - - # Determine which timesteps to use - timesteps_to_use = self.timesteps_extra if needs_extra_timestep else self.timesteps - - # Create the time series - if isinstance(data, TimeSeriesData): - time_series = TimeSeries.from_datasource( + if name in self._time_series: + raise KeyError(f"TimeSeries '{name}' already exists in allocator") + + # Choose which timesteps to use + target_timesteps = self.timesteps_extra if has_extra_timestep else self.timesteps + + # Create or adapt the TimeSeries object + if isinstance(data, TimeSeries): + # Use the existing TimeSeries but update its parameters + time_series = data + # Update the stored data to use our timesteps and scenarios + data_array = DataConverter.as_dataarray( + time_series.stored_data, timesteps=target_timesteps, scenarios=self.scenarios + ) + time_series = TimeSeries( + data=data_array, name=name, - data=data.data, - timesteps=timesteps_to_use, - aggregation_weight=data.agg_weight, - aggregation_group=data.agg_group, - needs_extra_timestep=needs_extra_timestep, + aggregation_weight=aggregation_weight or time_series.aggregation_weight, + aggregation_group=aggregation_group or time_series.aggregation_group, + has_extra_timestep=has_extra_timestep or time_series.has_extra_timestep, ) - # Connect the user time series to the created TimeSeries - data.label = name else: + # Create a new TimeSeries from raw data time_series = TimeSeries.from_datasource( - name=name, data=data, timesteps=timesteps_to_use, needs_extra_timestep=needs_extra_timestep + data=data, + name=name, + timesteps=target_timesteps, + scenarios=self.scenarios, + aggregation_weight=aggregation_weight, + aggregation_group=aggregation_group, + has_extra_timestep=has_extra_timestep, ) - # Add to the collection - self.add_time_series(time_series) + # Add to storage + self._time_series[name] = time_series - return time_series + # Track if it needs extra timestep + if has_extra_timestep: + self._has_extra_timestep.add(name) - def calculate_aggregation_weights(self) -> Dict[str, float]: - """Calculate and return aggregation weights for all time series.""" - self.group_weights = self._calculate_group_weights() - self.weights = self._calculate_weights() - - if np.all(np.isclose(list(self.weights.values()), 1, atol=1e-6)): - logger.info('All Aggregation weights were set to 1') - - return self.weights + # Return the TimeSeries object + return time_series - def activate_timesteps(self, active_timesteps: Optional[pd.DatetimeIndex] = None): + def clear_selection(self, timesteps: bool = True, scenarios: bool = True) -> None: """ - Update active timesteps for the collection and all time series. - If no arguments are provided, the active timesteps are reset. + Clear selection for timesteps and/or scenarios. Args: - active_timesteps: The active timesteps of the model. - If None, the all timesteps of the TimeSeriesCollection are taken. + timesteps: Whether to clear timesteps selection + scenarios: Whether to clear scenarios selection """ - if active_timesteps is None: - return self.reset() + if timesteps: + self._update_selected_timesteps(timesteps=None) + if scenarios: + self._selected_scenarios = None - if not np.all(np.isin(active_timesteps, self.all_timesteps)): - raise ValueError('active_timesteps must be a subset of the timesteps of the TimeSeriesCollection') + # Apply the selection to all TimeSeries objects + self._propagate_selection_to_time_series() - # Calculate derived timesteps - self._active_timesteps = active_timesteps - first_ts_index = np.where(self.all_timesteps == active_timesteps[0])[0][0] - last_ts_idx = np.where(self.all_timesteps == active_timesteps[-1])[0][0] - self._active_timesteps_extra = self.all_timesteps_extra[first_ts_index : last_ts_idx + 2] - self._active_hours_per_timestep = self.all_hours_per_timestep.isel(time=slice(first_ts_index, last_ts_idx + 1)) + def set_selection(self, timesteps: Optional[pd.DatetimeIndex] = None, scenarios: Optional[pd.Index] = None) -> None: + """ + Set active subset for timesteps and scenarios. - # Update all time series - self._update_time_series_timesteps() + Args: + timesteps: Timesteps to activate, or None to clear + scenarios: Scenarios to activate, or None to clear + """ + if timesteps is None: + self.clear_selection(timesteps=True, scenarios=False) + else: + self._update_selected_timesteps(timesteps) - def reset(self): - """Reset active timesteps to defaults for all time series.""" - self._active_timesteps = None - self._active_timesteps_extra = None - self._active_hours_per_timestep = None + if scenarios is None: + self.clear_selection(timesteps=False, scenarios=True) + else: + self._selected_scenarios = scenarios - for time_series in self.time_series_data.values(): - time_series.reset() + # Apply the selection to all TimeSeries objects + self._propagate_selection_to_time_series() - def restore_data(self): - """Restore original data for all time series.""" - for time_series in self.time_series_data.values(): - time_series.restore_data() + def _update_selected_timesteps(self, timesteps: Optional[pd.DatetimeIndex]) -> None: + """ + Updates the timestep and related metrics (timesteps_extra, hours_per_timestep) based on the current selection. + """ + if timesteps is None: + self._selected_timesteps = None + self._selected_timesteps_extra = None + self._selected_hours_per_timestep = None + return - def add_time_series(self, time_series: TimeSeries): - """Add an existing TimeSeries to the collection.""" - if time_series.name in self.time_series_data: - raise ValueError(f"TimeSeries '{time_series.name}' already exists in this collection") + self._validate_timesteps(timesteps, self._full_timesteps) - self.time_series_data[time_series.name] = time_series + self._selected_timesteps = timesteps + self._selected_hours_per_timestep = self._full_hours_per_timestep.sel(time=timesteps) + self._selected_timesteps_extra = self._create_timesteps_with_extra( + timesteps, self._selected_hours_per_timestep.isel(time=-1).max().item() + ) - def insert_new_data(self, data: pd.DataFrame, include_extra_timestep: bool = False): + def as_dataset(self, with_extra_timestep: bool = True, with_constants: bool = True) -> xr.Dataset: """ - Update time series with new data from a DataFrame. + Convert the TimeSeriesCollection to a xarray Dataset, containing the data of each TimeSeries. Args: - data: DataFrame containing new data with timestamps as index - include_extra_timestep: Whether the provided data already includes the extra timestep, by default False + with_extra_timestep: Whether to exclude the extra timesteps. + Effectively, this removes the last timestep for certain TImeSeries, but mitigates the presence of NANs in others. + with_constants: Whether to exclude TimeSeries with a constant value from the dataset. """ - if not isinstance(data, pd.DataFrame): - raise TypeError(f'data must be a pandas DataFrame, got {type(data).__name__}') + if self.scenarios is None: + ds = xr.Dataset(coords={'time': self.timesteps_extra}) + else: + ds = xr.Dataset(coords={'scenario': self.scenarios, 'time': self.timesteps_extra}) + + for ts in self._time_series.values(): + if not with_constants and ts.all_equal: + continue + ds[ts.name] = ts.selected_data - # Check if the DataFrame index matches the expected timesteps - expected_timesteps = self.timesteps_extra if include_extra_timestep else self.timesteps - if not data.index.equals(expected_timesteps): - raise ValueError( - f'DataFrame index must match {"collection timesteps with extra timestep" if include_extra_timestep else "collection timesteps"}' + if not with_extra_timestep: + return ds.sel(time=self.timesteps) + + return ds + + @property + def timesteps(self) -> pd.DatetimeIndex: + """Get the current active timesteps.""" + if self._selected_timesteps is None: + return self._full_timesteps + return self._selected_timesteps + + @property + def timesteps_extra(self) -> pd.DatetimeIndex: + """Get the current active timesteps with extra timestep.""" + if self._selected_timesteps_extra is None: + return self._full_timesteps_extra + return self._selected_timesteps_extra + + @property + def hours_per_timestep(self) -> xr.DataArray: + """Get the current active hours per timestep.""" + if self._selected_hours_per_timestep is None: + return self._full_hours_per_timestep + return self._selected_hours_per_timestep + + @property + def scenarios(self) -> Optional[pd.Index]: + """Get the current active scenarios.""" + if self._selected_scenarios is None: + return self._full_scenarios + return self._selected_scenarios + + def _propagate_selection_to_time_series(self) -> None: + """Apply the current selection to all TimeSeries objects.""" + for ts_name, ts in self._time_series.items(): + timesteps = self._selected_timesteps_extra if ts_name in self._has_extra_timestep else self._selected_timesteps + ts.set_selection( + timesteps=timesteps, + scenarios=self._selected_scenarios ) - for name, ts in self.time_series_data.items(): - if name in data.columns: - if not ts.needs_extra_timestep: - # For time series without extra timestep - if include_extra_timestep: - # If data includes extra timestep but series doesn't need it, exclude the last point - ts.stored_data = data[name].iloc[:-1] - else: - # Use data as is - ts.stored_data = data[name] - else: - # For time series with extra timestep - if include_extra_timestep: - # Data already includes extra timestep - ts.stored_data = data[name] - else: - # Need to add extra timestep - extrapolate from the last value - extra_step_value = data[name].iloc[-1] - extra_step_index = pd.DatetimeIndex([self.timesteps_extra[-1]], name='time') - extra_step_series = pd.Series([extra_step_value], index=extra_step_index) - - # Combine the regular data with the extra timestep - ts.stored_data = pd.concat([data[name], extra_step_series]) - - logger.debug(f'Updated data for {name}') - - def to_dataframe( - self, filtered: Literal['all', 'constant', 'non_constant'] = 'non_constant', include_extra_timestep: bool = True - ) -> pd.DataFrame: - """ - Convert collection to DataFrame with optional filtering and timestep control. + def __getitem__(self, name: str) -> TimeSeries: + """ + Get a reference to a time series or data array. Args: - filtered: Filter time series by variability, by default 'non_constant' - include_extra_timestep: Whether to include the extra timestep in the result, by default True + name: Name of the data array or time series Returns: - DataFrame representation of the collection + TimeSeries object if it exists, otherwise DataArray with current selection applied """ - include_constants = filtered != 'non_constant' - ds = self.to_dataset(include_constants=include_constants) - - if not include_extra_timestep: - ds = ds.isel(time=slice(None, -1)) + # First check if this is a TimeSeries + if name in self._time_series: + # Return the TimeSeries object (it will handle selection internally) + return self._time_series[name] + raise ValueError(f'No TimeSeries named "{name}" found') + + def __contains__(self, value) -> bool: + if isinstance(value, str): + return value in self._time_series + elif isinstance(value, TimeSeries): + return value.name in self._time_series + raise TypeError(f'Invalid type for __contains__ of {self.__class__.__name__}: {type(value)}') - df = ds.to_dataframe() - - # Apply filtering - if filtered == 'all': - return df - elif filtered == 'constant': - return df.loc[:, df.nunique() == 1] - elif filtered == 'non_constant': - return df.loc[:, df.nunique() > 1] - else: - raise ValueError("filtered must be one of: 'all', 'constant', 'non_constant'") + def __iter__(self) -> Iterator[TimeSeries]: + """Iterate over TimeSeries objects.""" + return iter(self._time_series.values()) - def to_dataset(self, include_constants: bool = True) -> xr.Dataset: + def update_time_series(self, name: str, data: NumericData) -> TimeSeries: """ - Combine all time series into a single Dataset with all timesteps. + Update an existing TimeSeries with new data. Args: - include_constants: Whether to include time series with constant values, by default True + name: Name of the TimeSeries to update + data: New data to assign Returns: - Dataset containing all selected time series with all timesteps - """ - # Determine which series to include - if include_constants: - series_to_include = self.time_series_data.values() - else: - series_to_include = self.non_constants + The updated TimeSeries - # Create individual datasets and merge them - ds = xr.merge([ts.active_data.to_dataset(name=ts.name) for ts in series_to_include]) + Raises: + KeyError: If no TimeSeries with the given name exists + """ + if name not in self._time_series: + raise KeyError(f"No TimeSeries named '{name}' found") - # Ensure the correct time coordinates - ds = ds.reindex(time=self.timesteps_extra) + # Get the TimeSeries + ts = self._time_series[name] - ds.attrs.update( - { - 'timesteps_extra': f'{self.timesteps_extra[0]} ... {self.timesteps_extra[-1]} | len={len(self.timesteps_extra)}', - 'hours_per_timestep': self._format_stats(self.hours_per_timestep), - } + # Convert data to proper format + data_array = DataConverter.as_dataarray( + data, + self.timesteps_extra if name in self._has_extra_timestep else self.timesteps, + self.scenarios ) - return ds + # Update the TimeSeries + ts.update_stored_data(data_array) + + return ts + + def calculate_aggregation_weights(self) -> Dict[str, float]: + """Calculate and return aggregation weights for all time series.""" + group_weights = self._calculate_group_weights() - def _update_time_series_timesteps(self): - """Update active timesteps for all time series.""" - for ts in self.time_series_data.values(): - if ts.needs_extra_timestep: - ts.active_timesteps = self.timesteps_extra + weights = {} + for name, ts in self._time_series.items(): + if ts.aggregation_group is not None: + # Use group weight + weights[name] = group_weights.get(ts.aggregation_group, 1) else: - ts.active_timesteps = self.timesteps + # Use individual weight or default to 1 + weights[name] = ts.aggregation_weight or 1 + + if np.all(np.isclose(list(weights.values()), 1, atol=1e-6)): + logger.info('All Aggregation weights were set to 1') + + return weights + + def _calculate_group_weights(self) -> Dict[str, float]: + """Calculate weights for aggregation groups.""" + # Count series in each group + groups = [ts.aggregation_group for ts in self._time_series.values() if ts.aggregation_group is not None] + group_counts = Counter(groups) + + # Calculate weight for each group (1/count) + return {group: 1 / count for group, count in group_counts.items()} @staticmethod - def _validate_timesteps(timesteps: pd.DatetimeIndex): - """Validate timesteps format and rename if needed.""" + def _validate_timesteps(timesteps: pd.DatetimeIndex, present_timesteps: Optional[pd.DatetimeIndex] = None): + """ + Validate timesteps format and rename if needed. + Args: + timesteps: The timesteps to validate + present_timesteps: The timesteps that are present in the dataset + + Raises: + ValueError: If timesteps is not a pandas DatetimeIndex + ValueError: If timesteps is not at least 2 timestamps + ValueError: If timesteps has a different name than 'time' + ValueError: If timesteps is not sorted + ValueError: If timesteps contains duplicates + ValueError: If timesteps is not a subset of present_timesteps + """ if not isinstance(timesteps, pd.DatetimeIndex): raise TypeError('timesteps must be a pandas DatetimeIndex') @@ -750,6 +1331,18 @@ def _validate_timesteps(timesteps: pd.DatetimeIndex): logger.warning('Renamed timesteps to "time" (was "%s")', timesteps.name) timesteps.name = 'time' + # Ensure timesteps is sorted + if not timesteps.is_monotonic_increasing: + raise ValueError('timesteps must be sorted') + + # Ensure timesteps has no duplicates + if len(timesteps) != len(timesteps.drop_duplicates()): + raise ValueError('timesteps must not contain duplicates') + + # Ensure timesteps is a subset of present_timesteps + if present_timesteps is not None and not set(timesteps).issubset(set(present_timesteps)): + raise ValueError('timesteps must be a subset of present_timesteps') + @staticmethod def _create_timesteps_with_extra( timesteps: pd.DatetimeIndex, hours_of_last_timestep: Optional[float] @@ -787,128 +1380,49 @@ def calculate_hours_per_timestep(timesteps_extra: pd.DatetimeIndex) -> xr.DataAr data=hours_per_step, coords={'time': timesteps_extra[:-1]}, dims=('time',), name='hours_per_step' ) - def _calculate_group_weights(self) -> Dict[str, float]: - """Calculate weights for aggregation groups.""" - # Count series in each group - groups = [ts.aggregation_group for ts in self.time_series_data.values() if ts.aggregation_group is not None] - group_counts = Counter(groups) - # Calculate weight for each group (1/count) - return {group: 1 / count for group, count in group_counts.items()} - - def _calculate_weights(self) -> Dict[str, float]: - """Calculate weights for all time series.""" - # Calculate weight for each time series - weights = {} - for name, ts in self.time_series_data.items(): - if ts.aggregation_group is not None: - # Use group weight - weights[name] = self.group_weights.get(ts.aggregation_group, 1) - else: - # Use individual weight or default to 1 - weights[name] = ts.aggregation_weight or 1 - - return weights - - def _format_stats(self, data) -> str: - """Format statistics for a data array.""" - if hasattr(data, 'values'): - values = data.values - else: - values = np.asarray(data) - - mean_val = np.mean(values) - min_val = np.min(values) - max_val = np.max(values) - - return f'mean: {mean_val:.2f}, min: {min_val:.2f}, max: {max_val:.2f}' - - def __getitem__(self, name: str) -> TimeSeries: - """Get a TimeSeries by name.""" - try: - return self.time_series_data[name] - except KeyError as e: - raise KeyError(f'TimeSeries "{name}" not found in the TimeSeriesCollection') from e - - def __iter__(self) -> Iterator[TimeSeries]: - """Iterate through all TimeSeries in the collection.""" - return iter(self.time_series_data.values()) - - def __len__(self) -> int: - """Get the number of TimeSeries in the collection.""" - return len(self.time_series_data) - - def __contains__(self, item: Union[str, TimeSeries]) -> bool: - """Check if a TimeSeries exists in the collection.""" - if isinstance(item, str): - return item in self.time_series_data - elif isinstance(item, TimeSeries): - return item in self.time_series_data.values() - return False - - @property - def non_constants(self) -> List[TimeSeries]: - """Get time series with varying values.""" - return [ts for ts in self.time_series_data.values() if not ts.all_equal] - - @property - def constants(self) -> List[TimeSeries]: - """Get time series with constant values.""" - return [ts for ts in self.time_series_data.values() if ts.all_equal] - - @property - def timesteps(self) -> pd.DatetimeIndex: - """Get the active timesteps.""" - return self.all_timesteps if self._active_timesteps is None else self._active_timesteps - - @property - def timesteps_extra(self) -> pd.DatetimeIndex: - """Get the active timesteps with extra step.""" - return self.all_timesteps_extra if self._active_timesteps_extra is None else self._active_timesteps_extra - - @property - def hours_per_timestep(self) -> xr.DataArray: - """Get the duration of each active timestep.""" - return ( - self.all_hours_per_timestep if self._active_hours_per_timestep is None else self._active_hours_per_timestep - ) - - @property - def hours_of_last_timestep(self) -> float: - """Get the duration of the last timestep.""" - return float(self.hours_per_timestep[-1].item()) - - def __repr__(self): - return f'TimeSeriesCollection:\n{self.to_dataset()}' - - def __str__(self): - longest_name = max([time_series.name for time_series in self.time_series_data], key=len) - - stats_summary = '\n'.join( - [ - f' - {time_series.name:<{len(longest_name)}}: {get_numeric_stats(time_series.active_data)}' - for time_series in self.time_series_data - ] - ) - - return ( - f'TimeSeriesCollection with {len(self.time_series_data)} series\n' - f' Time Range: {self.timesteps[0]} → {self.timesteps[-1]}\n' - f' No. of timesteps: {len(self.timesteps)} + 1 extra\n' - f' Hours per timestep: {get_numeric_stats(self.hours_per_timestep)}\n' - f' Time Series Data:\n' - f'{stats_summary}' - ) +def get_numeric_stats(data: xr.DataArray, decimals: int = 2, padd: int = 10, by_scenario: bool = False) -> str: + """ + Calculates the mean, median, min, max, and standard deviation of a numeric DataArray. + Args: + data: The DataArray to analyze + decimals: Number of decimal places to show + padd: Padding for alignment + by_scenario: Whether to break down stats by scenario -def get_numeric_stats(data: xr.DataArray, decimals: int = 2, padd: int = 10) -> str: - """Calculates the mean, median, min, max, and standard deviation of a numeric DataArray.""" + Returns: + String representation of data statistics + """ format_spec = f'>{padd}.{decimals}f' if padd else f'.{decimals}f' + + # If by_scenario is True and there's a scenario dimension with multiple values + if by_scenario and 'scenario' in data.dims and data.sizes['scenario'] > 1: + results = [] + for scenario in data.coords['scenario'].values: + scenario_data = data.sel(scenario=scenario) + if np.unique(scenario_data).size == 1: + results.append(f' {scenario}: {scenario_data.max().item():{format_spec}} (constant)') + else: + mean = scenario_data.mean().item() + median = scenario_data.median().item() + min_val = scenario_data.min().item() + max_val = scenario_data.max().item() + std = scenario_data.std().item() + results.append( + f' {scenario}: {mean:{format_spec}} (mean), {median:{format_spec}} (median), ' + f'{min_val:{format_spec}} (min), {max_val:{format_spec}} (max), {std:{format_spec}} (std)' + ) + return '\n'.join(['By scenario:'] + results) + + # Standard logic for non-scenario data or aggregated stats if np.unique(data).size == 1: return f'{data.max().item():{format_spec}} (constant)' + mean = data.mean().item() median = data.median().item() min_val = data.min().item() max_val = data.max().item() std = data.std().item() + return f'{mean:{format_spec}} (mean), {median:{format_spec}} (median), {min_val:{format_spec}} (min), {max_val:{format_spec}} (max), {std:{format_spec}} (std)' diff --git a/flixopt/effects.py b/flixopt/effects.py index 82aa63a43..9b5ea41d6 100644 --- a/flixopt/effects.py +++ b/flixopt/effects.py @@ -137,10 +137,10 @@ def __init__(self, model: SystemModel, element: Effect): label_full=f'{self.label_full}(operation)', total_max=self.element.maximum_operation, total_min=self.element.minimum_operation, - min_per_hour=self.element.minimum_operation_per_hour.active_data + min_per_hour=self.element.minimum_operation_per_hour.selected_data if self.element.minimum_operation_per_hour is not None else None, - max_per_hour=self.element.maximum_operation_per_hour.active_data + max_per_hour=self.element.maximum_operation_per_hour.selected_data if self.element.maximum_operation_per_hour is not None else None, ) @@ -376,7 +376,7 @@ def _add_share_between_effects(self): for target_effect, time_series in origin_effect.specific_share_to_other_effects_operation.items(): self.effects[target_effect].model.operation.add_share( origin_effect.model.operation.label_full, - origin_effect.model.operation.total_per_timestep * time_series.active_data, + origin_effect.model.operation.total_per_timestep * time_series.selected_data, ) # 2. invest: -> hier ist es Scalar (share) for target_effect, factor in origin_effect.specific_share_to_other_effects_invest.items(): diff --git a/flixopt/elements.py b/flixopt/elements.py index 05898d4e5..95536b910 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -374,7 +374,7 @@ def _create_shares(self): self._model.effects.add_share_to_effects( name=self.label_full, # Use the full label of the element expressions={ - effect: self.flow_rate * self._model.hours_per_step * factor.active_data + effect: self.flow_rate * self._model.hours_per_step * factor.selected_data for effect, factor in self.element.effects_per_flow_hour.items() }, target='operation', @@ -429,8 +429,8 @@ def relative_flow_rate_bounds(self) -> Tuple[NumericData, NumericData]: """Returns relative flow rate bounds.""" fixed_profile = self.element.fixed_relative_profile if fixed_profile is None: - return self.element.relative_minimum.active_data, self.element.relative_maximum.active_data - return fixed_profile.active_data, fixed_profile.active_data + return self.element.relative_minimum.selected_data, self.element.relative_maximum.selected_data + return fixed_profile.selected_data, fixed_profile.selected_data class BusModel(ElementModel): @@ -451,7 +451,7 @@ def do_modeling(self) -> None: # Fehlerplus/-minus: if self.element.with_excess: excess_penalty = np.multiply( - self._model.hours_per_step, self.element.excess_penalty_per_flow_hour.active_data + self._model.hours_per_step, self.element.excess_penalty_per_flow_hour.selected_data ) self.excess_input = self.add( self._model.add_variables(lower=0, coords=self._model.coords, name=f'{self.label_full}|excess_input'), diff --git a/flixopt/features.py b/flixopt/features.py index 92caf9dc2..32c382486 100644 --- a/flixopt/features.py +++ b/flixopt/features.py @@ -441,7 +441,7 @@ def _get_duration_in_hours( if previous_duration + self._model.hours_per_step[0] > first_step_max: logger.warning( - f'The maximum duration of "{variable_name}" is set to {maximum_duration.active_data}h, ' + f'The maximum duration of "{variable_name}" is set to {maximum_duration.selected_data}h, ' f'but the consecutive_duration previous to this model is {previous_duration}h. ' f'This forces "{binary_variable.name} = 0" in the first time step ' f'(dt={self._model.hours_per_step[0]}h)!' @@ -450,7 +450,7 @@ def _get_duration_in_hours( duration_in_hours = self.add( self._model.add_variables( lower=0, - upper=maximum_duration.active_data if maximum_duration is not None else mega, + upper=maximum_duration.selected_data if maximum_duration is not None else mega, coords=self._model.coords, name=f'{self.label_full}|{variable_name}', ), diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index 93720de60..e39d71e94 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -35,12 +35,14 @@ class FlowSystem: def __init__( self, timesteps: pd.DatetimeIndex, + scenarios: Optional[pd.Index] = None, hours_of_last_timestep: Optional[float] = None, hours_of_previous_timesteps: Optional[Union[int, float, np.ndarray]] = None, ): """ Args: timesteps: The timesteps of the model. + scenarios: The scenarios of the model. hours_of_last_timestep: The duration of the last time step. Uses the last time interval if not specified hours_of_previous_timesteps: The duration of previous timesteps. If None, the first time increment of time_series is used. @@ -49,6 +51,7 @@ def __init__( """ self.time_series_collection = TimeSeriesCollection( timesteps=timesteps, + scenarios=scenarios, hours_of_last_timestep=hours_of_last_timestep, hours_of_previous_timesteps=hours_of_previous_timesteps, ) @@ -184,7 +187,7 @@ def as_dataset(self, constants_in_dataset: bool = False) -> xr.Dataset: Args: constants_in_dataset: If True, constants are included as Dataset variables. """ - ds = self.time_series_collection.to_dataset(include_constants=constants_in_dataset) + ds = self.time_series_collection.as_dataset() ds.attrs = self.as_dict(data_mode='name') return ds @@ -275,7 +278,7 @@ def create_time_series( self, name: str, data: Optional[Union[NumericData, TimeSeriesData, TimeSeries]], - needs_extra_timestep: bool = False, + has_extra_timestep: bool = False, ) -> Optional[TimeSeries]: """ Tries to create a TimeSeries from NumericData Data and adds it to the time_series_collection @@ -290,11 +293,20 @@ def create_time_series( data.restore_data() if data in self.time_series_collection: return data - return self.time_series_collection.create_time_series( - data=data.active_data, name=name, needs_extra_timestep=needs_extra_timestep + return self.time_series_collection.add_time_series( + data=data.selected_data, name=name, has_extra_timestep=has_extra_timestep ) - return self.time_series_collection.create_time_series( - data=data, name=name, needs_extra_timestep=needs_extra_timestep + elif isinstance(data, TimeSeriesData): + data.label = name + return self.time_series_collection.add_time_series( + data=data.data, + name=name, + has_extra_timestep=has_extra_timestep, + aggregation_weight=data.agg_weight, + aggregation_group=data.agg_group + ) + return self.time_series_collection.add_time_series( + data=data, name=name, has_extra_timestep=has_extra_timestep ) def create_effect_time_series( diff --git a/flixopt/io.py b/flixopt/io.py index 35d927136..5cc353836 100644 --- a/flixopt/io.py +++ b/flixopt/io.py @@ -23,7 +23,7 @@ def replace_timeseries(obj, mode: Literal['name', 'stats', 'data'] = 'name'): return [replace_timeseries(v, mode) for v in obj] elif isinstance(obj, TimeSeries): # Adjust this based on the actual class if obj.all_equal: - return obj.active_data.values[0].item() + return obj.selected_data.values[0].item() elif mode == 'name': return f'::::{obj.name}' elif mode == 'stats': diff --git a/flixopt/structure.py b/flixopt/structure.py index e7f1c62a4..2e136c652 100644 --- a/flixopt/structure.py +++ b/flixopt/structure.py @@ -534,7 +534,7 @@ def copy_and_convert_datatypes(data: Any, use_numpy: bool = True, use_element_la return copy_and_convert_datatypes(data.tolist(), use_numpy, use_element_label) elif isinstance(data, TimeSeries): - return copy_and_convert_datatypes(data.active_data, use_numpy, use_element_label) + return copy_and_convert_datatypes(data.selected_data, use_numpy, use_element_label) elif isinstance(data, TimeSeriesData): return copy_and_convert_datatypes(data.data, use_numpy, use_element_label) diff --git a/tests/test_dataconverter.py b/tests/test_dataconverter.py index 49f1438e7..579de9c00 100644 --- a/tests/test_dataconverter.py +++ b/tests/test_dataconverter.py @@ -3,69 +3,848 @@ import pytest import xarray as xr -from flixopt.core import ConversionError, DataConverter # Adjust this import to match your project structure +from flixopt.core import ( # Adjust this import to match your project structure + ConversionError, + DataConverter, + TimeSeries, +) @pytest.fixture -def sample_time_index(request): - index = pd.date_range('2024-01-01', periods=5, freq='D', name='time') - return index +def sample_time_index(): + return pd.date_range('2024-01-01', periods=5, freq='D', name='time') -def test_scalar_conversion(sample_time_index): - # Test scalar conversion - result = DataConverter.as_dataarray(42, sample_time_index) - assert isinstance(result, xr.DataArray) - assert result.shape == (len(sample_time_index),) - assert result.dims == ('time',) - assert np.all(result.values == 42) +@pytest.fixture +def sample_scenario_index(): + return pd.Index(['baseline', 'high_demand', 'low_price'], name='scenario') + + +@pytest.fixture +def multi_index(sample_time_index, sample_scenario_index): + """Create a sample MultiIndex combining scenarios and times.""" + return pd.MultiIndex.from_product([sample_scenario_index, sample_time_index], names=['scenario', 'time']) + + +class TestSingleDimensionConversion: + """Tests for converting data without scenarios (1D: time only).""" + + def test_scalar_conversion(self, sample_time_index): + """Test converting a scalar value.""" + # Test with integer + result = DataConverter.as_dataarray(42, sample_time_index) + assert isinstance(result, xr.DataArray) + assert result.shape == (len(sample_time_index),) + assert result.dims == ('time',) + assert np.all(result.values == 42) + + # Test with float + result = DataConverter.as_dataarray(42.5, sample_time_index) + assert np.all(result.values == 42.5) + + # Test with numpy scalar types + result = DataConverter.as_dataarray(np.int64(42), sample_time_index) + assert np.all(result.values == 42) + result = DataConverter.as_dataarray(np.float32(42.5), sample_time_index) + assert np.all(result.values == 42.5) + + def test_series_conversion(self, sample_time_index): + """Test converting a pandas Series.""" + # Test with integer values + series = pd.Series([1, 2, 3, 4, 5], index=sample_time_index) + result = DataConverter.as_dataarray(series, sample_time_index) + assert isinstance(result, xr.DataArray) + assert result.shape == (5,) + assert result.dims == ('time',) + assert np.array_equal(result.values, series.values) + + # Test with float values + series = pd.Series([1.1, 2.2, 3.3, 4.4, 5.5], index=sample_time_index) + result = DataConverter.as_dataarray(series, sample_time_index) + assert np.array_equal(result.values, series.values) + + # Test with mixed NA values + series = pd.Series([1, np.nan, 3, None, 5], index=sample_time_index) + result = DataConverter.as_dataarray(series, sample_time_index) + assert np.array_equal(np.isnan(result.values), np.isnan(series.values)) + assert np.array_equal(result.values[~np.isnan(result.values)], series.values[~np.isnan(series.values)]) + + def test_dataframe_conversion(self, sample_time_index): + """Test converting a pandas DataFrame.""" + # Test with a single-column DataFrame + df = pd.DataFrame({'A': [1, 2, 3, 4, 5]}, index=sample_time_index) + result = DataConverter.as_dataarray(df, sample_time_index) + assert isinstance(result, xr.DataArray) + assert result.shape == (5,) + assert result.dims == ('time',) + assert np.array_equal(result.values.flatten(), df['A'].values) + + # Test with float values + df = pd.DataFrame({'A': [1.1, 2.2, 3.3, 4.4, 5.5]}, index=sample_time_index) + result = DataConverter.as_dataarray(df, sample_time_index) + assert np.array_equal(result.values.flatten(), df['A'].values) + + # Test with NA values + df = pd.DataFrame({'A': [1, np.nan, 3, None, 5]}, index=sample_time_index) + result = DataConverter.as_dataarray(df, sample_time_index) + assert np.array_equal(np.isnan(result.values), np.isnan(df['A'].values)) + assert np.array_equal(result.values[~np.isnan(result.values)], df['A'].values[~np.isnan(df['A'].values)]) + + def test_ndarray_conversion(self, sample_time_index): + """Test converting a numpy ndarray.""" + # Test with integer 1D array + arr_1d = np.array([1, 2, 3, 4, 5]) + result = DataConverter.as_dataarray(arr_1d, sample_time_index) + assert result.shape == (5,) + assert result.dims == ('time',) + assert np.array_equal(result.values, arr_1d) + + # Test with float 1D array + arr_1d = np.array([1.1, 2.2, 3.3, 4.4, 5.5]) + result = DataConverter.as_dataarray(arr_1d, sample_time_index) + assert np.array_equal(result.values, arr_1d) + + # Test with array containing NaN + arr_1d = np.array([1, np.nan, 3, np.nan, 5]) + result = DataConverter.as_dataarray(arr_1d, sample_time_index) + assert np.array_equal(np.isnan(result.values), np.isnan(arr_1d)) + assert np.array_equal(result.values[~np.isnan(result.values)], arr_1d[~np.isnan(arr_1d)]) + + def test_dataarray_conversion(self, sample_time_index): + """Test converting an existing xarray DataArray.""" + # Create original DataArray + original = xr.DataArray(data=np.array([1, 2, 3, 4, 5]), coords={'time': sample_time_index}, dims=['time']) + + # Convert and check + result = DataConverter.as_dataarray(original, sample_time_index) + assert result.shape == (5,) + assert result.dims == ('time',) + assert np.array_equal(result.values, original.values) + + # Ensure it's a copy + result[0] = 999 + assert original[0].item() == 1 # Original should be unchanged + + # Test with different time coordinates but same length + different_times = pd.date_range('2025-01-01', periods=5, freq='D', name='time') + original = xr.DataArray(data=np.array([1, 2, 3, 4, 5]), coords={'time': different_times}, dims=['time']) + + # Should raise an error for mismatched time coordinates + with pytest.raises(ConversionError): + DataConverter.as_dataarray(original, sample_time_index) + + +class TestMultiDimensionConversion: + """Tests for converting data with scenarios (2D: scenario × time).""" + + def test_scalar_with_scenarios(self, sample_time_index, sample_scenario_index): + """Test converting scalar values with scenario dimension.""" + # Test with integer + result = DataConverter.as_dataarray(42, sample_time_index, sample_scenario_index) + + assert isinstance(result, xr.DataArray) + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + assert result.dims == ('scenario', 'time') + assert np.all(result.values == 42) + assert set(result.coords['scenario'].values) == set(sample_scenario_index.values) + assert set(result.coords['time'].values) == set(sample_time_index.values) + + # Test with float + result = DataConverter.as_dataarray(42.5, sample_time_index, sample_scenario_index) + assert np.all(result.values == 42.5) + + def test_series_with_scenarios(self, sample_time_index, sample_scenario_index): + """Test converting Series with scenario dimension.""" + # Create time series data + series = pd.Series([1, 2, 3, 4, 5], index=sample_time_index) + + # Convert with scenario dimension + result = DataConverter.as_dataarray(series, sample_time_index, sample_scenario_index) + + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + assert result.dims == ('scenario', 'time') + + # Values should be broadcast to all scenarios + for scenario in sample_scenario_index: + scenario_slice = result.sel(scenario=scenario) + assert np.array_equal(scenario_slice.values, series.values) + + # Test with series containing NaN + series = pd.Series([1, np.nan, 3, np.nan, 5], index=sample_time_index) + result = DataConverter.as_dataarray(series, sample_time_index, sample_scenario_index) + + # Each scenario should have the same pattern of NaNs + for scenario in sample_scenario_index: + scenario_slice = result.sel(scenario=scenario) + assert np.array_equal(np.isnan(scenario_slice.values), np.isnan(series.values)) + assert np.array_equal( + scenario_slice.values[~np.isnan(scenario_slice.values)], series.values[~np.isnan(series.values)] + ) + + def test_multi_index_series(self, sample_time_index, sample_scenario_index, multi_index): + """Test converting a Series with MultiIndex (scenario, time).""" + # Create a MultiIndex Series with scenario-specific values + values = [ + # baseline scenario + 10, + 20, + 30, + 40, + 50, + # high_demand scenario + 15, + 25, + 35, + 45, + 55, + # low_price scenario + 5, + 15, + 25, + 35, + 45, + ] + series_multi = pd.Series(values, index=multi_index) + + # Convert the MultiIndex Series + result = DataConverter.as_dataarray(series_multi, sample_time_index, sample_scenario_index) + + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + assert result.dims == ('scenario', 'time') + + # Check values for each scenario + baseline_values = result.sel(scenario='baseline').values + assert np.array_equal(baseline_values, [10, 20, 30, 40, 50]) + + high_demand_values = result.sel(scenario='high_demand').values + assert np.array_equal(high_demand_values, [15, 25, 35, 45, 55]) + + low_price_values = result.sel(scenario='low_price').values + assert np.array_equal(low_price_values, [5, 15, 25, 35, 45]) + + # Test with some missing values in the MultiIndex + incomplete_index = multi_index[:-2] # Remove last two entries + incomplete_values = values[:-2] # Remove corresponding values + incomplete_series = pd.Series(incomplete_values, index=incomplete_index) + + result = DataConverter.as_dataarray(incomplete_series, sample_time_index, sample_scenario_index) + + # The last value of low_price scenario should be NaN + assert np.isnan(result.sel(scenario='low_price').values[-1]) + + def test_dataframe_with_scenarios(self, sample_time_index, sample_scenario_index): + """Test converting DataFrame with scenario dimension.""" + # Create a single-column DataFrame + df = pd.DataFrame({'A': [1, 2, 3, 4, 5]}, index=sample_time_index) + + # Convert with scenario dimension + result = DataConverter.as_dataarray(df, sample_time_index, sample_scenario_index) + + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + assert result.dims == ('scenario', 'time') + + # Values should be broadcast to all scenarios + for scenario in sample_scenario_index: + scenario_slice = result.sel(scenario=scenario) + assert np.array_equal(scenario_slice.values, df['A'].values) + + def test_multi_index_dataframe(self, sample_time_index, sample_scenario_index, multi_index): + """Test converting a DataFrame with MultiIndex (scenario, time).""" + # Create a MultiIndex DataFrame with scenario-specific values + values = [ + # baseline scenario + 10, + 20, + 30, + 40, + 50, + # high_demand scenario + 15, + 25, + 35, + 45, + 55, + # low_price scenario + 5, + 15, + 25, + 35, + 45, + ] + df_multi = pd.DataFrame({'A': values}, index=multi_index) + + # Convert the MultiIndex DataFrame + result = DataConverter.as_dataarray(df_multi, sample_time_index, sample_scenario_index) + + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + assert result.dims == ('scenario', 'time') + + # Check values for each scenario + baseline_values = result.sel(scenario='baseline').values + assert np.array_equal(baseline_values, [10, 20, 30, 40, 50]) + + high_demand_values = result.sel(scenario='high_demand').values + assert np.array_equal(high_demand_values, [15, 25, 35, 45, 55]) + + low_price_values = result.sel(scenario='low_price').values + assert np.array_equal(low_price_values, [5, 15, 25, 35, 45]) + + # Test with missing values + incomplete_index = multi_index[:-2] # Remove last two entries + incomplete_values = values[:-2] # Remove corresponding values + incomplete_df = pd.DataFrame({'A': incomplete_values}, index=incomplete_index) + + result = DataConverter.as_dataarray(incomplete_df, sample_time_index, sample_scenario_index) + + # The last value of low_price scenario should be NaN + assert np.isnan(result.sel(scenario='low_price').values[-1]) + + # Test with multiple columns (should raise error) + df_multi_col = pd.DataFrame({'A': values, 'B': [v * 2 for v in values]}, index=multi_index) + + with pytest.raises(ConversionError): + DataConverter.as_dataarray(df_multi_col, sample_time_index, sample_scenario_index) + + def test_1d_array_with_scenarios(self, sample_time_index, sample_scenario_index): + """Test converting 1D array with scenario dimension (broadcasting).""" + # Create 1D array matching timesteps length + arr_1d = np.array([1, 2, 3, 4, 5]) + + # Convert with scenarios + result = DataConverter.as_dataarray(arr_1d, sample_time_index, sample_scenario_index) + + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + assert result.dims == ('scenario', 'time') + + # Each scenario should have the same values (broadcasting) + for scenario in sample_scenario_index: + scenario_slice = result.sel(scenario=scenario) + assert np.array_equal(scenario_slice.values, arr_1d) + + def test_2d_array_with_scenarios(self, sample_time_index, sample_scenario_index): + """Test converting 2D array with scenario dimension.""" + # Create 2D array with different values per scenario + arr_2d = np.array( + [ + [1, 2, 3, 4, 5], # baseline scenario + [6, 7, 8, 9, 10], # high_demand scenario + [11, 12, 13, 14, 15], # low_price scenario + ] + ) + + # Convert to DataArray + result = DataConverter.as_dataarray(arr_2d, sample_time_index, sample_scenario_index) + + assert result.shape == (3, 5) + assert result.dims == ('scenario', 'time') + + # Check that each scenario has correct values + assert np.array_equal(result.sel(scenario='baseline').values, arr_2d[0]) + assert np.array_equal(result.sel(scenario='high_demand').values, arr_2d[1]) + assert np.array_equal(result.sel(scenario='low_price').values, arr_2d[2]) + + def test_dataarray_with_scenarios(self, sample_time_index, sample_scenario_index): + """Test converting an existing DataArray with scenarios.""" + # Create a multi-scenario DataArray + original = xr.DataArray( + data=np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10], [11, 12, 13, 14, 15]]), + coords={'scenario': sample_scenario_index, 'time': sample_time_index}, + dims=['scenario', 'time'], + ) + + # Test conversion + result = DataConverter.as_dataarray(original, sample_time_index, sample_scenario_index) + + assert result.shape == (3, 5) + assert result.dims == ('scenario', 'time') + assert np.array_equal(result.values, original.values) + + # Ensure it's a copy + result.loc['baseline'] = 999 + assert original.sel(scenario='baseline')[0].item() == 1 # Original should be unchanged + + def test_time_only_dataarray_with_scenarios(self, sample_time_index, sample_scenario_index): + """Test broadcasting a time-only DataArray to scenarios.""" + # Create a DataArray with only time dimension + time_only = xr.DataArray(data=np.array([1, 2, 3, 4, 5]), coords={'time': sample_time_index}, dims=['time']) + + # Convert with scenarios - should broadcast to all scenarios + result = DataConverter.as_dataarray(time_only, sample_time_index, sample_scenario_index) + + assert result.shape == (3, 5) + assert result.dims == ('scenario', 'time') + + # Each scenario should have same values + for scenario in sample_scenario_index: + assert np.array_equal(result.sel(scenario=scenario).values, time_only.values) + + +class TestInvalidInputs: + """Tests for invalid inputs and error handling.""" + + def test_time_index_validation(self): + """Test validation of time index.""" + # Test with unnamed index + unnamed_index = pd.date_range('2024-01-01', periods=5, freq='D') + with pytest.raises(ConversionError): + DataConverter.as_dataarray(42, unnamed_index) + + # Test with empty index + empty_index = pd.DatetimeIndex([], name='time') + with pytest.raises(ValueError): + DataConverter.as_dataarray(42, empty_index) + + # Test with non-DatetimeIndex + wrong_type_index = pd.Index([1, 2, 3, 4, 5], name='time') + with pytest.raises(ValueError): + DataConverter.as_dataarray(42, wrong_type_index) + + def test_scenario_index_validation(self, sample_time_index): + """Test validation of scenario index.""" + # Test with unnamed scenario index + unnamed_index = pd.Index(['baseline', 'high_demand']) + with pytest.raises(ConversionError): + DataConverter.as_dataarray(42, sample_time_index, unnamed_index) + # Test with empty scenario index + empty_index = pd.Index([], name='scenario') + with pytest.raises(ValueError): + DataConverter.as_dataarray(42, sample_time_index, empty_index) -def test_series_conversion(sample_time_index): - series = pd.Series([1, 2, 3, 4, 5], index=sample_time_index) + # Test with non-Index scenario + with pytest.raises(ValueError): + DataConverter.as_dataarray(42, sample_time_index, ['baseline', 'high_demand']) - # Test Series conversion - result = DataConverter.as_dataarray(series, sample_time_index) - assert isinstance(result, xr.DataArray) - assert result.shape == (5,) - assert result.dims == ('time',) - assert np.array_equal(result.values, series.values) + def test_invalid_data_types(self, sample_time_index, sample_scenario_index): + """Test handling of invalid data types.""" + # Test invalid input type (string) + with pytest.raises(ConversionError): + DataConverter.as_dataarray('invalid_string', sample_time_index) + # Test invalid input type with scenarios + with pytest.raises(ConversionError): + DataConverter.as_dataarray('invalid_string', sample_time_index, sample_scenario_index) -def test_dataframe_conversion(sample_time_index): - # Create a single-column DataFrame - df = pd.DataFrame({'A': [1, 2, 3, 4, 5]}, index=sample_time_index) + # Test unsupported complex object + with pytest.raises(ConversionError): + DataConverter.as_dataarray(object(), sample_time_index) - # Test DataFrame conversion - result = DataConverter.as_dataarray(df, sample_time_index) - assert isinstance(result, xr.DataArray) - assert result.shape == (5,) - assert result.dims == ('time',) - assert np.array_equal(result.values.flatten(), df['A'].values) + # Test None value + with pytest.raises(ConversionError): + DataConverter.as_dataarray(None, sample_time_index) + def test_mismatched_input_dimensions(self, sample_time_index, sample_scenario_index): + """Test handling of mismatched input dimensions.""" + # Test mismatched Series index + mismatched_series = pd.Series( + [1, 2, 3, 4, 5, 6], index=pd.date_range('2025-01-01', periods=6, freq='D', name='time') + ) + with pytest.raises(ConversionError): + DataConverter.as_dataarray(mismatched_series, sample_time_index) -def test_ndarray_conversion(sample_time_index): - # Test 1D array conversion - arr_1d = np.array([1, 2, 3, 4, 5]) - result = DataConverter.as_dataarray(arr_1d, sample_time_index) - assert result.shape == (5,) - assert result.dims == ('time',) - assert np.array_equal(result.values, arr_1d) + # Test DataFrame with multiple columns + df_multi_col = pd.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [6, 7, 8, 9, 10]}, index=sample_time_index) + with pytest.raises(ConversionError): + DataConverter.as_dataarray(df_multi_col, sample_time_index) + # Test mismatched array shape for time-only + with pytest.raises(ConversionError): + DataConverter.as_dataarray(np.array([1, 2, 3]), sample_time_index) # Wrong length -def test_dataarray_conversion(sample_time_index): - # Create a DataArray - original = xr.DataArray(data=np.array([1, 2, 3, 4, 5]), coords={'time': sample_time_index}, dims=['time']) + # Test mismatched array shape for scenario × time + # Array shape should be (n_scenarios, n_timesteps) + wrong_shape_array = np.array( + [ + [1, 2, 3, 4], # Missing a timestep + [5, 6, 7, 8], + [9, 10, 11, 12], + ] + ) + with pytest.raises(ConversionError): + DataConverter.as_dataarray(wrong_shape_array, sample_time_index, sample_scenario_index) - # Test DataArray conversion - result = DataConverter.as_dataarray(original, sample_time_index) - assert result.shape == (5,) - assert result.dims == ('time',) - assert np.array_equal(result.values, original.values) + # Test array with too many dimensions + with pytest.raises(ConversionError): + # 3D array not allowed + DataConverter.as_dataarray(np.ones((3, 5, 2)), sample_time_index, sample_scenario_index) - # Ensure it's a copy - result[0] = 999 - assert original[0].item() == 1 # Original should be unchanged + def test_dataarray_dimension_mismatch(self, sample_time_index, sample_scenario_index): + """Test handling of mismatched DataArray dimensions.""" + # Create DataArray with wrong dimensions + wrong_dims = xr.DataArray(data=np.array([1, 2, 3, 4, 5]), coords={'wrong_dim': range(5)}, dims=['wrong_dim']) + with pytest.raises(ConversionError): + DataConverter.as_dataarray(wrong_dims, sample_time_index) + + # Create DataArray with scenario but no time + wrong_dims_2 = xr.DataArray(data=np.array([1, 2, 3]), coords={'scenario': ['a', 'b', 'c']}, dims=['scenario']) + with pytest.raises(ConversionError): + DataConverter.as_dataarray(wrong_dims_2, sample_time_index, sample_scenario_index) + + # Create DataArray with right dims but wrong length + wrong_length = xr.DataArray( + data=np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), + coords={ + 'scenario': sample_scenario_index, + 'time': pd.date_range('2024-01-01', periods=3, freq='D', name='time'), + }, + dims=['scenario', 'time'], + ) + with pytest.raises(ConversionError): + DataConverter.as_dataarray(wrong_length, sample_time_index, sample_scenario_index) + + +class TestEdgeCases: + """Tests for edge cases and special scenarios.""" + + def test_single_timestep(self, sample_scenario_index): + """Test with a single timestep.""" + # Test with only one timestep + single_timestep = pd.DatetimeIndex(['2024-01-01'], name='time') + + # Scalar conversion + result = DataConverter.as_dataarray(42, single_timestep) + assert result.shape == (1,) + assert result.dims == ('time',) + + # With scenarios + result_with_scenarios = DataConverter.as_dataarray(42, single_timestep, sample_scenario_index) + assert result_with_scenarios.shape == (len(sample_scenario_index), 1) + assert result_with_scenarios.dims == ('scenario', 'time') + + def test_single_scenario(self, sample_time_index): + """Test with a single scenario.""" + # Test with only one scenario + single_scenario = pd.Index(['baseline'], name='scenario') + + # Scalar conversion with single scenario + result = DataConverter.as_dataarray(42, sample_time_index, single_scenario) + assert result.shape == (1, len(sample_time_index)) + assert result.dims == ('scenario', 'time') + + # Array conversion with single scenario + arr = np.array([1, 2, 3, 4, 5]) + result_arr = DataConverter.as_dataarray(arr, sample_time_index, single_scenario) + assert result_arr.shape == (1, 5) + assert np.array_equal(result_arr.sel(scenario='baseline').values, arr) + + # 2D array with single scenario + arr_2d = np.array([[1, 2, 3, 4, 5]]) # Note the extra dimension + result_arr_2d = DataConverter.as_dataarray(arr_2d, sample_time_index, single_scenario) + assert result_arr_2d.shape == (1, 5) + assert np.array_equal(result_arr_2d.sel(scenario='baseline').values, arr_2d[0]) + + def test_different_scenario_order(self, sample_time_index): + """Test that scenario order is preserved.""" + # Test with different scenario orders + scenarios1 = pd.Index(['a', 'b', 'c'], name='scenario') + scenarios2 = pd.Index(['c', 'b', 'a'], name='scenario') + + # Create DataArray with first order + data = np.array( + [ + [1, 2, 3, 4, 5], # a + [6, 7, 8, 9, 10], # b + [11, 12, 13, 14, 15], # c + ] + ) + + result1 = DataConverter.as_dataarray(data, sample_time_index, scenarios1) + assert np.array_equal(result1.sel(scenario='a').values, [1, 2, 3, 4, 5]) + assert np.array_equal(result1.sel(scenario='c').values, [11, 12, 13, 14, 15]) + + # Create DataArray with second order + result2 = DataConverter.as_dataarray(data, sample_time_index, scenarios2) + # First row should match 'c' now + assert np.array_equal(result2.sel(scenario='c').values, [1, 2, 3, 4, 5]) + # Last row should match 'a' now + assert np.array_equal(result2.sel(scenario='a').values, [11, 12, 13, 14, 15]) + + def test_all_nan_data(self, sample_time_index, sample_scenario_index): + """Test handling of all-NaN data.""" + # Create array of all NaNs + all_nan_array = np.full(5, np.nan) + result = DataConverter.as_dataarray(all_nan_array, sample_time_index) + assert np.all(np.isnan(result.values)) + + # With scenarios + result = DataConverter.as_dataarray(all_nan_array, sample_time_index, sample_scenario_index) + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + assert np.all(np.isnan(result.values)) + + # Series of all NaNs + all_nan_series = pd.Series([np.nan, np.nan, np.nan, np.nan, np.nan], index=sample_time_index) + result = DataConverter.as_dataarray(all_nan_series, sample_time_index, sample_scenario_index) + assert np.all(np.isnan(result.values)) + + def test_subset_index_multiindex(self, sample_time_index, sample_scenario_index): + """Test handling of MultiIndex Series/DataFrames with subset of expected indices.""" + # Create a subset of the expected indexes + subset_time = sample_time_index[1:4] # Middle subset + subset_scenarios = sample_scenario_index[0:2] # First two scenarios + + # Create MultiIndex with subset + subset_multi_index = pd.MultiIndex.from_product([subset_scenarios, subset_time], names=['scenario', 'time']) + + # Create Series with subset of data + values = [ + # baseline (3 values) + 20, + 30, + 40, + # high_demand (3 values) + 25, + 35, + 45, + ] + subset_series = pd.Series(values, index=subset_multi_index) + + # Convert and test + result = DataConverter.as_dataarray(subset_series, sample_time_index, sample_scenario_index) + + # Shape should be full size + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + + # Check values - present values should match + assert result.sel(scenario='baseline', time=subset_time[0]).item() == 20 + assert result.sel(scenario='high_demand', time=subset_time[1]).item() == 35 + + # Missing values should be NaN + assert np.isnan(result.sel(scenario='baseline', time=sample_time_index[0]).item()) + assert np.isnan(result.sel(scenario='low_price', time=sample_time_index[2]).item()) + + def test_mixed_data_types(self, sample_time_index, sample_scenario_index): + """Test conversion of mixed integer and float data.""" + # Create array with mixed types + mixed_array = np.array([1, 2.5, 3, 4.5, 5]) + result = DataConverter.as_dataarray(mixed_array, sample_time_index) + + # Result should be float dtype + assert np.issubdtype(result.dtype, np.floating) + assert np.array_equal(result.values, mixed_array) + + # With scenarios + result = DataConverter.as_dataarray(mixed_array, sample_time_index, sample_scenario_index) + assert np.issubdtype(result.dtype, np.floating) + for scenario in sample_scenario_index: + assert np.array_equal(result.sel(scenario=scenario).values, mixed_array) + + +class TestFunctionalUseCase: + """Tests for realistic use cases combining multiple features.""" + + def test_multiindex_with_nans_and_partial_data(self, sample_time_index, sample_scenario_index): + """Test MultiIndex Series with partial data and NaN values.""" + # Create a MultiIndex Series with missing values and partial coverage + time_subset = sample_time_index[1:4] # Middle 3 timestamps only + + # Build index with holes + idx_tuples = [] + for scenario in sample_scenario_index: + for time in time_subset: + # Skip some combinations to create holes + if scenario == 'baseline' and time == time_subset[0]: + continue + if scenario == 'high_demand' and time == time_subset[2]: + continue + idx_tuples.append((scenario, time)) + + partial_idx = pd.MultiIndex.from_tuples(idx_tuples, names=['scenario', 'time']) + + # Create values with some NaNs + values = [ + # baseline (2 values, skipping first) + 30, + 40, + # high_demand (2 values, skipping last) + 25, + 35, + # low_price (3 values) + 15, + np.nan, + 35, + ] + + # Create Series + partial_series = pd.Series(values, index=partial_idx) + + # Convert and test + result = DataConverter.as_dataarray(partial_series, sample_time_index, sample_scenario_index) + + # Shape should be full size + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + + # Check specific values + assert result.sel(scenario='baseline', time=time_subset[1]).item() == 30 + assert result.sel(scenario='high_demand', time=time_subset[0]).item() == 25 + assert np.isnan(result.sel(scenario='low_price', time=time_subset[1]).item()) + + # All skipped combinations should be NaN + assert np.isnan(result.sel(scenario='baseline', time=time_subset[0]).item()) + assert np.isnan(result.sel(scenario='high_demand', time=time_subset[2]).item()) + + # First and last timestamps should all be NaN (not in original subset) + assert np.all(np.isnan(result.sel(time=sample_time_index[0]).values)) + assert np.all(np.isnan(result.sel(time=sample_time_index[-1]).values)) + + def test_scenario_broadcast_with_nan_values(self, sample_time_index, sample_scenario_index): + """Test broadcasting a Series with NaN values to scenarios.""" + # Create Series with some NaN values + series = pd.Series([1, np.nan, 3, np.nan, 5], index=sample_time_index) + + # Convert with scenario broadcasting + result = DataConverter.as_dataarray(series, sample_time_index, sample_scenario_index) + + # All scenarios should have the same pattern of NaN values + for scenario in sample_scenario_index: + scenario_data = result.sel(scenario=scenario) + assert np.isnan(scenario_data[1].item()) + assert np.isnan(scenario_data[3].item()) + assert scenario_data[0].item() == 1 + assert scenario_data[2].item() == 3 + assert scenario_data[4].item() == 5 + + def test_large_dataset(self, sample_scenario_index): + """Test with a larger dataset to ensure performance.""" + # Create a larger timestep array (e.g., hourly for a year) + large_timesteps = pd.date_range( + '2024-01-01', + periods=8760, # Hours in a year + freq='H', + name='time', + ) + + # Create large 2D array (3 scenarios × 8760 hours) + large_data = np.random.rand(len(sample_scenario_index), len(large_timesteps)) + + # Convert and check + result = DataConverter.as_dataarray(large_data, large_timesteps, sample_scenario_index) + + assert result.shape == (len(sample_scenario_index), len(large_timesteps)) + assert result.dims == ('scenario', 'time') + assert np.array_equal(result.values, large_data) + + +class TestMultiScenarioArrayConversion: + """Tests specifically focused on array conversion with scenarios.""" + + def test_1d_array_broadcasting(self, sample_time_index, sample_scenario_index): + """Test that 1D arrays are properly broadcast to all scenarios.""" + arr_1d = np.array([1, 2, 3, 4, 5]) + result = DataConverter.as_dataarray(arr_1d, sample_time_index, sample_scenario_index) + + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + + # Each scenario should have identical values + for i, scenario in enumerate(sample_scenario_index): + assert np.array_equal(result.sel(scenario=scenario).values, arr_1d) + + # Modify one scenario's values + result.loc[dict(scenario=scenario)] = np.ones(len(sample_time_index)) * i + + # Ensure modifications are isolated to each scenario + for i, scenario in enumerate(sample_scenario_index): + assert np.all(result.sel(scenario=scenario).values == i) + + def test_2d_array_different_shapes(self, sample_time_index): + """Test different scenario shapes with 2D arrays.""" + # Test with 1 scenario + single_scenario = pd.Index(['baseline'], name='scenario') + arr_1_scenario = np.array([[1, 2, 3, 4, 5]]) + + result = DataConverter.as_dataarray(arr_1_scenario, sample_time_index, single_scenario) + assert result.shape == (1, len(sample_time_index)) + + # Test with 2 scenarios + two_scenarios = pd.Index(['baseline', 'high_demand'], name='scenario') + arr_2_scenarios = np.array([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]) + + result = DataConverter.as_dataarray(arr_2_scenarios, sample_time_index, two_scenarios) + assert result.shape == (2, len(sample_time_index)) + assert np.array_equal(result.sel(scenario='baseline').values, arr_2_scenarios[0]) + assert np.array_equal(result.sel(scenario='high_demand').values, arr_2_scenarios[1]) + + # Test mismatched scenarios count + three_scenarios = pd.Index(['a', 'b', 'c'], name='scenario') + with pytest.raises(ConversionError): + DataConverter.as_dataarray(arr_2_scenarios, sample_time_index, three_scenarios) + + def test_array_handling_edge_cases(self, sample_time_index, sample_scenario_index): + """Test array edge cases.""" + # Test with boolean array + bool_array = np.array([True, False, True, False, True]) + result = DataConverter.as_dataarray(bool_array, sample_time_index, sample_scenario_index) + assert result.dtype == bool + assert result.shape == (len(sample_scenario_index), len(sample_time_index)) + + # Test with array containing infinite values + inf_array = np.array([1, np.inf, 3, -np.inf, 5]) + result = DataConverter.as_dataarray(inf_array, sample_time_index, sample_scenario_index) + for scenario in sample_scenario_index: + scenario_data = result.sel(scenario=scenario) + assert np.isinf(scenario_data[1].item()) + assert np.isinf(scenario_data[3].item()) + assert scenario_data[3].item() < 0 # Negative infinity + + +class TestScenarioReindexing: + """Tests for reindexing and coordinate preservation in DataConverter.""" + + def test_preserving_scenario_order(self, sample_time_index): + """Test that scenario order is preserved in converted DataArrays.""" + # Define scenarios in a specific order + scenarios = pd.Index(['scenario3', 'scenario1', 'scenario2'], name='scenario') + + # Create 2D array + data = np.array( + [ + [1, 2, 3, 4, 5], # scenario3 + [6, 7, 8, 9, 10], # scenario1 + [11, 12, 13, 14, 15], # scenario2 + ] + ) + + # Convert to DataArray + result = DataConverter.as_dataarray(data, sample_time_index, scenarios) + + # Verify order of scenarios is preserved + assert list(result.coords['scenario'].values) == list(scenarios) + + # Verify data for each scenario + assert np.array_equal(result.sel(scenario='scenario3').values, data[0]) + assert np.array_equal(result.sel(scenario='scenario1').values, data[1]) + assert np.array_equal(result.sel(scenario='scenario2').values, data[2]) + + def test_multiindex_reindexing(self, sample_time_index): + """Test reindexing of MultiIndex Series.""" + # Create scenarios with intentional different order + scenarios = pd.Index(['z_scenario', 'a_scenario', 'm_scenario'], name='scenario') + + # Create MultiIndex with different order than the target + source_scenarios = pd.Index(['a_scenario', 'm_scenario', 'z_scenario'], name='scenario') + multi_idx = pd.MultiIndex.from_product([source_scenarios, sample_time_index], names=['scenario', 'time']) + + # Create values - order should match the source index + values = [] + for i, _ in enumerate(source_scenarios): + values.extend([i * 10 + j for j in range(1, len(sample_time_index) + 1)]) + + # Create Series + series = pd.Series(values, index=multi_idx) + + # Convert using the target scenario order + result = DataConverter.as_dataarray(series, sample_time_index, scenarios) + + # Verify scenario order matches the target + assert list(result.coords['scenario'].values) == list(scenarios) + + # Verify values are correctly indexed + assert np.array_equal(result.sel(scenario='a_scenario').values, [1, 2, 3, 4, 5]) + assert np.array_equal(result.sel(scenario='m_scenario').values, [11, 12, 13, 14, 15]) + assert np.array_equal(result.sel(scenario='z_scenario').values, [21, 22, 23, 24, 25]) + + +if __name__ == '__main__': + pytest.main() def test_invalid_inputs(sample_time_index): diff --git a/tests/test_timeseries.py b/tests/test_timeseries.py index 48c7ab7b2..50136536b 100644 --- a/tests/test_timeseries.py +++ b/tests/test_timeseries.py @@ -8,7 +8,7 @@ import pytest import xarray as xr -from flixopt.core import ConversionError, DataConverter, TimeSeries, TimeSeriesCollection, TimeSeriesData +from flixopt.core import ConversionError, DataConverter, TimeSeries, TimeSeriesCollection @pytest.fixture @@ -44,13 +44,13 @@ def test_initialization(self, simple_dataarray): # Check data initialization assert isinstance(ts.stored_data, xr.DataArray) assert ts.stored_data.equals(simple_dataarray) - assert ts.active_data.equals(simple_dataarray) + assert ts.selected_data.equals(simple_dataarray) # Check backup was created assert ts._backup.equals(simple_dataarray) # Check active timesteps - assert ts.active_timesteps.equals(simple_dataarray.indexes['time']) + assert ts._valid_selector == {} # No selections initially def test_initialization_with_aggregation_params(self, simple_dataarray): """Test initialization with aggregation parameters.""" @@ -73,53 +73,51 @@ def test_initialization_validation(self, sample_timesteps): multi_dim_data = xr.DataArray( [[1, 2, 3], [4, 5, 6]], coords={'dim1': [0, 1], 'time': sample_timesteps[:3]}, dims=['dim1', 'time'] ) - with pytest.raises(ValueError, match='dimensions of DataArray must be 1'): + with pytest.raises(ValueError, match='DataArray dimensions must be subset of'): TimeSeries(multi_dim_data, name='Multi-dim Series') - def test_active_timesteps_getter_setter(self, sample_timeseries, sample_timesteps): - """Test active_timesteps getter and setter.""" - # Initial state should use all timesteps - assert sample_timeseries.active_timesteps.equals(sample_timesteps) + def test_selection_methods(self, sample_timeseries, sample_timesteps): + """Test selection methods.""" + # Initial state should have no selections + assert sample_timeseries._selected_timesteps is None + assert sample_timeseries._selected_scenarios is None # Set to a subset subset_index = sample_timesteps[1:3] - sample_timeseries.active_timesteps = subset_index - assert sample_timeseries.active_timesteps.equals(subset_index) + sample_timeseries.set_selection(timesteps=subset_index) + assert sample_timeseries._selected_timesteps.equals(subset_index) # Active data should reflect the subset - assert sample_timeseries.active_data.equals(sample_timeseries.stored_data.sel(time=subset_index)) + assert sample_timeseries.selected_data.equals(sample_timeseries.stored_data.sel(time=subset_index)) - # Reset to full index - sample_timeseries.active_timesteps = None - assert sample_timeseries.active_timesteps.equals(sample_timesteps) - - # Test invalid type - with pytest.raises(TypeError, match='must be a pandas DatetimeIndex'): - sample_timeseries.active_timesteps = 'invalid' + # Clear selection + sample_timeseries.clear_selection() + assert sample_timeseries._selected_timesteps is None + assert sample_timeseries.selected_data.equals(sample_timeseries.stored_data) def test_reset(self, sample_timeseries, sample_timesteps): """Test reset method.""" # Set to subset first subset_index = sample_timesteps[1:3] - sample_timeseries.active_timesteps = subset_index + sample_timeseries.set_selection(timesteps=subset_index) # Reset sample_timeseries.reset() - # Should be back to full index - assert sample_timeseries.active_timesteps.equals(sample_timesteps) - assert sample_timeseries.active_data.equals(sample_timeseries.stored_data) + # Should be back to full index (all selections cleared) + assert sample_timeseries._selected_timesteps is None + assert sample_timeseries.selected_data.equals(sample_timeseries.stored_data) def test_restore_data(self, sample_timeseries, simple_dataarray): """Test restore_data method.""" # Modify the stored data - new_data = xr.DataArray([1, 2, 3, 4, 5], coords={'time': sample_timeseries.active_timesteps}, dims=['time']) + new_data = xr.DataArray([1, 2, 3, 4, 5], coords={'time': sample_timeseries.stored_data.coords['time']}, dims=['time']) # Store original data for comparison original_data = sample_timeseries.stored_data - # Set new data - sample_timeseries.stored_data = new_data + # Update data + sample_timeseries.update_stored_data(new_data) assert sample_timeseries.stored_data.equals(new_data) # Restore from backup @@ -127,42 +125,42 @@ def test_restore_data(self, sample_timeseries, simple_dataarray): # Should be back to original data assert sample_timeseries.stored_data.equals(original_data) - assert sample_timeseries.active_data.equals(original_data) + assert sample_timeseries.selected_data.equals(original_data) - def test_stored_data_setter(self, sample_timeseries, sample_timesteps): - """Test stored_data setter with different data types.""" + def test_update_stored_data(self, sample_timeseries, sample_timesteps): + """Test update_stored_data method with different data types.""" # Test with a Series series_data = pd.Series([5, 6, 7, 8, 9], index=sample_timesteps) - sample_timeseries.stored_data = series_data + sample_timeseries.update_stored_data(series_data) assert np.array_equal(sample_timeseries.stored_data.values, series_data.values) # Test with a single-column DataFrame df_data = pd.DataFrame({'col1': [15, 16, 17, 18, 19]}, index=sample_timesteps) - sample_timeseries.stored_data = df_data + sample_timeseries.update_stored_data(df_data) assert np.array_equal(sample_timeseries.stored_data.values, df_data['col1'].values) # Test with a NumPy array array_data = np.array([25, 26, 27, 28, 29]) - sample_timeseries.stored_data = array_data + sample_timeseries.update_stored_data(array_data) assert np.array_equal(sample_timeseries.stored_data.values, array_data) # Test with a scalar - sample_timeseries.stored_data = 42 + sample_timeseries.update_stored_data(42) assert np.all(sample_timeseries.stored_data.values == 42) # Test with another DataArray another_dataarray = xr.DataArray([30, 31, 32, 33, 34], coords={'time': sample_timesteps}, dims=['time']) - sample_timeseries.stored_data = another_dataarray + sample_timeseries.update_stored_data(another_dataarray) assert sample_timeseries.stored_data.equals(another_dataarray) def test_stored_data_setter_no_change(self, sample_timeseries): - """Test stored_data setter when data doesn't change.""" + """Test update_stored_data method when data doesn't change.""" # Get current data current_data = sample_timeseries.stored_data current_backup = sample_timeseries._backup # Set the same data - sample_timeseries.stored_data = current_data + sample_timeseries.update_stored_data(current_data) # Backup shouldn't change assert sample_timeseries._backup is current_backup # Should be the same object @@ -229,35 +227,35 @@ def test_all_equal(self, sample_timesteps): def test_arithmetic_operations(self, sample_timeseries): """Test arithmetic operations.""" # Create a second TimeSeries for testing - data2 = xr.DataArray([1, 2, 3, 4, 5], coords={'time': sample_timeseries.active_timesteps}, dims=['time']) + data2 = xr.DataArray([1, 2, 3, 4, 5], coords={'time': sample_timeseries.stored_data.coords['time']}, dims=['time']) ts2 = TimeSeries(data2, 'Second Series') # Test operations between two TimeSeries objects assert np.array_equal( - (sample_timeseries + ts2).values, sample_timeseries.active_data.values + ts2.active_data.values + (sample_timeseries + ts2).values, sample_timeseries.selected_data.values + ts2.selected_data.values ) assert np.array_equal( - (sample_timeseries - ts2).values, sample_timeseries.active_data.values - ts2.active_data.values + (sample_timeseries - ts2).values, sample_timeseries.selected_data.values - ts2.selected_data.values ) assert np.array_equal( - (sample_timeseries * ts2).values, sample_timeseries.active_data.values * ts2.active_data.values + (sample_timeseries * ts2).values, sample_timeseries.selected_data.values * ts2.selected_data.values ) assert np.array_equal( - (sample_timeseries / ts2).values, sample_timeseries.active_data.values / ts2.active_data.values + (sample_timeseries / ts2).values, sample_timeseries.selected_data.values / ts2.selected_data.values ) # Test operations with DataArrays - assert np.array_equal((sample_timeseries + data2).values, sample_timeseries.active_data.values + data2.values) - assert np.array_equal((data2 + sample_timeseries).values, data2.values + sample_timeseries.active_data.values) + assert np.array_equal((sample_timeseries + data2).values, sample_timeseries.selected_data.values + data2.values) + assert np.array_equal((data2 + sample_timeseries).values, data2.values + sample_timeseries.selected_data.values) # Test operations with scalars - assert np.array_equal((sample_timeseries + 5).values, sample_timeseries.active_data.values + 5) - assert np.array_equal((5 + sample_timeseries).values, 5 + sample_timeseries.active_data.values) + assert np.array_equal((sample_timeseries + 5).values, sample_timeseries.selected_data.values + 5) + assert np.array_equal((5 + sample_timeseries).values, 5 + sample_timeseries.selected_data.values) # Test unary operations - assert np.array_equal((-sample_timeseries).values, -sample_timeseries.active_data.values) - assert np.array_equal((+sample_timeseries).values, +sample_timeseries.active_data.values) - assert np.array_equal((abs(sample_timeseries)).values, abs(sample_timeseries.active_data.values)) + assert np.array_equal((-sample_timeseries).values, -sample_timeseries.selected_data.values) + assert np.array_equal((+sample_timeseries).values, +sample_timeseries.selected_data.values) + assert np.array_equal((abs(sample_timeseries)).values, abs(sample_timeseries.selected_data.values)) def test_comparison_operations(self, sample_timesteps): """Test comparison operations.""" @@ -279,327 +277,473 @@ def test_comparison_operations(self, sample_timesteps): def test_numpy_ufunc(self, sample_timeseries): """Test numpy ufunc compatibility.""" # Test basic numpy functions - assert np.array_equal(np.add(sample_timeseries, 5).values, np.add(sample_timeseries.active_data, 5).values) + assert np.array_equal(np.add(sample_timeseries, 5).values, np.add(sample_timeseries.selected_data, 5).values) assert np.array_equal( - np.multiply(sample_timeseries, 2).values, np.multiply(sample_timeseries.active_data, 2).values + np.multiply(sample_timeseries, 2).values, np.multiply(sample_timeseries.selected_data, 2).values ) # Test with two TimeSeries objects - data2 = xr.DataArray([1, 2, 3, 4, 5], coords={'time': sample_timeseries.active_timesteps}, dims=['time']) + data2 = xr.DataArray([1, 2, 3, 4, 5], coords={'time': sample_timeseries.stored_data.coords['time']}, dims=['time']) ts2 = TimeSeries(data2, 'Second Series') assert np.array_equal( - np.add(sample_timeseries, ts2).values, np.add(sample_timeseries.active_data, ts2.active_data).values + np.add(sample_timeseries, ts2).values, np.add(sample_timeseries.selected_data, ts2.selected_data).values ) def test_sel_and_isel_properties(self, sample_timeseries): """Test sel and isel properties.""" # Test that sel property works - selected = sample_timeseries.sel(time=sample_timeseries.active_timesteps[0]) - assert selected.item() == sample_timeseries.active_data.values[0] + selected = sample_timeseries.sel(time=sample_timeseries.stored_data.coords['time'][0]) + assert selected.item() == sample_timeseries.selected_data.values[0] # Test that isel property works indexed = sample_timeseries.isel(time=0) - assert indexed.item() == sample_timeseries.active_data.values[0] + assert indexed.item() == sample_timeseries.selected_data.values[0] + + +@pytest.fixture +def sample_scenario_index(): + """Create a sample scenario index with the required 'scenario' name.""" + return pd.Index(['baseline', 'high_demand', 'low_price'], name='scenario') + + +@pytest.fixture +def simple_scenario_dataarray(sample_timesteps, sample_scenario_index): + """Create a DataArray with both scenario and time dimensions.""" + data = np.array([ + [10, 20, 30, 40, 50], # baseline + [15, 25, 35, 45, 55], # high_demand + [5, 15, 25, 35, 45] # low_price + ]) + return xr.DataArray( + data=data, + coords={'scenario': sample_scenario_index, 'time': sample_timesteps}, + dims=['scenario', 'time'] + ) @pytest.fixture -def sample_collection(sample_timesteps): +def sample_scenario_timeseries(simple_scenario_dataarray): + """Create a sample TimeSeries object with scenario dimension.""" + return TimeSeries(simple_scenario_dataarray, name='Test Scenario Series') + + +@pytest.fixture +def sample_allocator(sample_timesteps): """Create a sample TimeSeriesCollection.""" return TimeSeriesCollection(sample_timesteps) @pytest.fixture -def populated_collection(sample_collection): - """Create a TimeSeriesCollection with test data.""" - # Add a constant time series - sample_collection.create_time_series(42, 'constant_series') - - # Add a varying time series - varying_data = np.array([10, 20, 30, 40, 50]) - sample_collection.create_time_series(varying_data, 'varying_series') - - # Add a time series with extra timestep - sample_collection.create_time_series( - np.array([1, 2, 3, 4, 5, 6]), 'extra_timestep_series', needs_extra_timestep=True - ) +def sample_scenario_allocator(sample_timesteps, sample_scenario_index): + """Create a sample TimeSeriesCollection with scenarios.""" + return TimeSeriesCollection(sample_timesteps, scenarios=sample_scenario_index) - # Add series with aggregation settings - sample_collection.create_time_series( - TimeSeriesData(np.array([5, 5, 5, 5, 5]), agg_group='group1'), 'group1_series1' - ) - sample_collection.create_time_series( - TimeSeriesData(np.array([6, 6, 6, 6, 6]), agg_group='group1'), 'group1_series2' - ) - sample_collection.create_time_series( - TimeSeriesData(np.array([10, 10, 10, 10, 10]), agg_weight=0.5), 'weighted_series' - ) - return sample_collection +class TestTimeSeriesWithScenarios: + """Test suite for TimeSeries class with scenarios.""" + + def test_initialization_with_scenarios(self, simple_scenario_dataarray): + """Test initialization of TimeSeries with scenario dimension.""" + ts = TimeSeries(simple_scenario_dataarray, name='Scenario Series') + # Check basic properties + assert ts.name == 'Scenario Series' + assert ts._has_scenarios is True + assert ts._selected_scenarios is None # No selection initially + + # Check data initialization + assert isinstance(ts.stored_data, xr.DataArray) + assert ts.stored_data.equals(simple_scenario_dataarray) + assert ts.selected_data.equals(simple_scenario_dataarray) + + # Check backup was created + assert ts._backup.equals(simple_scenario_dataarray) + + def test_reset_with_scenarios(self, sample_scenario_timeseries, simple_scenario_dataarray): + """Test reset method with scenarios.""" + # Get original full indexes + full_timesteps = simple_scenario_dataarray.coords['time'] + full_scenarios = simple_scenario_dataarray.coords['scenario'] + + # Set to subset timesteps and scenarios + subset_timesteps = full_timesteps[1:3] + subset_scenarios = full_scenarios[:2] -class TestTimeSeriesCollection: - """Test suite for TimeSeriesCollection.""" + sample_scenario_timeseries.set_selection(timesteps=subset_timesteps, scenarios=subset_scenarios) + + # Verify subsets were set + assert sample_scenario_timeseries._selected_timesteps.equals(subset_timesteps) + assert sample_scenario_timeseries._selected_scenarios.equals(subset_scenarios) + assert sample_scenario_timeseries.selected_data.shape == (len(subset_scenarios), len(subset_timesteps)) + + # Reset + sample_scenario_timeseries.reset() + + # Should be back to full indexes + assert sample_scenario_timeseries._selected_timesteps is None + assert sample_scenario_timeseries._selected_scenarios is None + assert sample_scenario_timeseries.selected_data.shape == (len(full_scenarios), len(full_timesteps)) + + def test_scenario_selection(self, sample_scenario_timeseries, sample_scenario_index): + """Test scenario selection.""" + # Initial state should use all scenarios + assert sample_scenario_timeseries._selected_scenarios is None + + # Set to a subset + subset_index = sample_scenario_index[:2] # First two scenarios + sample_scenario_timeseries.set_selection(scenarios=subset_index) + assert sample_scenario_timeseries._selected_scenarios.equals(subset_index) + + # Active data should reflect the subset + assert sample_scenario_timeseries.selected_data.equals( + sample_scenario_timeseries.stored_data.sel(scenario=subset_index) + ) + + # Clear selection + sample_scenario_timeseries.clear_selection(timesteps=False, scenarios=True) + assert sample_scenario_timeseries._selected_scenarios is None + + def test_all_equal_with_scenarios(self, sample_timesteps, sample_scenario_index): + """Test all_equal property with scenarios.""" + # All values equal across all scenarios + equal_data = np.full((3, 5), 5) # All values are 5 + equal_dataarray = xr.DataArray( + data=equal_data, + coords={'scenario': sample_scenario_index, 'time': sample_timesteps}, + dims=['scenario', 'time'] + ) + ts_equal = TimeSeries(equal_dataarray, 'Equal Scenario Series') + assert ts_equal.all_equal is True + + # Equal within each scenario but different between scenarios + per_scenario_equal = np.array([ + [5, 5, 5, 5, 5], # baseline - all 5 + [10, 10, 10, 10, 10], # high_demand - all 10 + [15, 15, 15, 15, 15] # low_price - all 15 + ]) + per_scenario_dataarray = xr.DataArray( + data=per_scenario_equal, + coords={'scenario': sample_scenario_index, 'time': sample_timesteps}, + dims=['scenario', 'time'] + ) + ts_per_scenario = TimeSeries(per_scenario_dataarray, 'Per-Scenario Equal Series') + assert ts_per_scenario.all_equal is False + + def test_arithmetic_with_scenarios(self, sample_scenario_timeseries, sample_timesteps, sample_scenario_index): + """Test arithmetic operations with scenarios.""" + # Create a second TimeSeries with scenarios + data2 = np.ones((3, 5)) # All ones + second_dataarray = xr.DataArray( + data=data2, + coords={'scenario': sample_scenario_index, 'time': sample_timesteps}, + dims=['scenario', 'time'] + ) + ts2 = TimeSeries(second_dataarray, 'Second Series') + + # Test operations between two scenario TimeSeries objects + result = sample_scenario_timeseries + ts2 + assert result.shape == (3, 5) + assert result.dims == ('scenario', 'time') + + # First scenario values should be increased by 1 + baseline_original = sample_scenario_timeseries.sel(scenario='baseline').values + baseline_result = result.sel(scenario='baseline').values + assert np.array_equal(baseline_result, baseline_original + 1) + + +class TestTimeSeriesAllocator: + """Test suite for TimeSeriesCollection class.""" def test_initialization(self, sample_timesteps): """Test basic initialization.""" - collection = TimeSeriesCollection(sample_timesteps) + allocator = TimeSeriesCollection(sample_timesteps) - assert collection.all_timesteps.equals(sample_timesteps) - assert len(collection.all_timesteps_extra) == len(sample_timesteps) + 1 - assert isinstance(collection.all_hours_per_timestep, xr.DataArray) - assert len(collection) == 0 + assert allocator.timesteps.equals(sample_timesteps) + assert len(allocator.timesteps_extra) == len(sample_timesteps) + 1 + assert isinstance(allocator.hours_per_timestep, xr.DataArray) + assert len(allocator._time_series) == 0 def test_initialization_with_custom_hours(self, sample_timesteps): """Test initialization with custom hour settings.""" # Test with last timestep duration last_timestep_hours = 12 - collection = TimeSeriesCollection(sample_timesteps, hours_of_last_timestep=last_timestep_hours) + allocator = TimeSeriesCollection(sample_timesteps, hours_of_last_timestep=last_timestep_hours) # Verify the last timestep duration - extra_step_delta = collection.all_timesteps_extra[-1] - collection.all_timesteps_extra[-2] + extra_step_delta = allocator.timesteps_extra[-1] - allocator.timesteps_extra[-2] assert extra_step_delta == pd.Timedelta(hours=last_timestep_hours) # Test with previous timestep duration hours_per_step = 8 - collection2 = TimeSeriesCollection(sample_timesteps, hours_of_previous_timesteps=hours_per_step) + allocator2 = TimeSeriesCollection(sample_timesteps, hours_of_previous_timesteps=hours_per_step) - assert collection2.hours_of_previous_timesteps == hours_per_step + assert allocator2.hours_of_previous_timesteps == hours_per_step - def test_create_time_series(self, sample_collection): - """Test creating time series.""" + def test_add_time_series(self, sample_allocator, sample_timesteps): + """Test adding time series.""" # Test scalar - ts1 = sample_collection.create_time_series(42, 'scalar_series') + ts1 = sample_allocator.add_time_series('scalar_series', 42) assert ts1.name == 'scalar_series' - assert np.all(ts1.active_data.values == 42) + assert np.all(ts1.selected_data.values == 42) # Test numpy array data = np.array([1, 2, 3, 4, 5]) - ts2 = sample_collection.create_time_series(data, 'array_series') - assert np.array_equal(ts2.active_data.values, data) + ts2 = sample_allocator.add_time_series('array_series', data) + assert np.array_equal(ts2.selected_data.values, data) - # Test with TimeSeriesData - ts3 = sample_collection.create_time_series(TimeSeriesData(10, agg_weight=0.7), 'weighted_series') - assert ts3.aggregation_weight == 0.7 + # Test with existing TimeSeries + existing_ts = TimeSeries.from_datasource(10, 'original_name', sample_timesteps, aggregation_weight=0.7) + ts3 = sample_allocator.add_time_series('weighted_series', existing_ts) + assert ts3.name == 'weighted_series' # Name changed + assert ts3.aggregation_weight == 0.7 # Weight preserved # Test with extra timestep - ts4 = sample_collection.create_time_series(5, 'extra_series', needs_extra_timestep=True) - assert ts4.needs_extra_timestep - assert len(ts4.active_data) == len(sample_collection.timesteps_extra) + ts4 = sample_allocator.add_time_series('extra_series', 5, has_extra_timestep=True) + assert ts4.name == 'extra_series' + assert ts4.has_extra_timestep + assert len(ts4.selected_data) == len(sample_allocator.timesteps_extra) # Test duplicate name - with pytest.raises(ValueError, match='already exists'): - sample_collection.create_time_series(1, 'scalar_series') + with pytest.raises(KeyError, match='already exists'): + sample_allocator.add_time_series('scalar_series', 1) - def test_access_time_series(self, populated_collection): + def test_access_time_series(self, sample_allocator): """Test accessing time series.""" + # Add a few time series + sample_allocator.add_time_series('series1', 42) + sample_allocator.add_time_series('series2', np.array([1, 2, 3, 4, 5])) + # Test __getitem__ - ts = populated_collection['varying_series'] - assert ts.name == 'varying_series' + ts = sample_allocator['series1'] + assert ts.name == 'series1' # Test __contains__ with string - assert 'constant_series' in populated_collection - assert 'nonexistent_series' not in populated_collection + assert 'series1' in sample_allocator + assert 'nonexistent_series' not in sample_allocator # Test __contains__ with TimeSeries object - assert populated_collection['varying_series'] in populated_collection - - # Test __iter__ - names = [ts.name for ts in populated_collection] - assert len(names) == 6 - assert 'varying_series' in names + assert sample_allocator['series2'] in sample_allocator # Test access to non-existent series - with pytest.raises(KeyError): - populated_collection['nonexistent_series'] - - def test_constants_and_non_constants(self, populated_collection): - """Test constants and non_constants properties.""" - # Test constants - constants = populated_collection.constants - assert len(constants) == 4 # constant_series, group1_series1, group1_series2, weighted_series - assert all(ts.all_equal for ts in constants) - - # Test non_constants - non_constants = populated_collection.non_constants - assert len(non_constants) == 2 # varying_series, extra_timestep_series - assert all(not ts.all_equal for ts in non_constants) - - # Test modifying a series changes the results - populated_collection['constant_series'].stored_data = np.array([1, 2, 3, 4, 5]) - updated_constants = populated_collection.constants - assert len(updated_constants) == 3 # One less constant - assert 'constant_series' not in [ts.name for ts in updated_constants] - - def test_timesteps_properties(self, populated_collection, sample_timesteps): - """Test timestep-related properties.""" - # Test default (all) timesteps - assert populated_collection.timesteps.equals(sample_timesteps) - assert len(populated_collection.timesteps_extra) == len(sample_timesteps) + 1 - - # Test activating a subset - subset = sample_timesteps[1:3] - populated_collection.activate_timesteps(subset) - - assert populated_collection.timesteps.equals(subset) - assert len(populated_collection.timesteps_extra) == len(subset) + 1 - - # Check that time series were updated - assert populated_collection['varying_series'].active_timesteps.equals(subset) - assert populated_collection['extra_timestep_series'].active_timesteps.equals( - populated_collection.timesteps_extra - ) - - # Test reset - populated_collection.reset() - assert populated_collection.timesteps.equals(sample_timesteps) + with pytest.raises(ValueError): + sample_allocator['nonexistent_series'] - def test_to_dataframe_and_dataset(self, populated_collection): - """Test conversion to DataFrame and Dataset.""" - # Test to_dataset - ds = populated_collection.to_dataset() - assert isinstance(ds, xr.Dataset) - assert len(ds.data_vars) == 6 + def test_selection_propagation(self, sample_allocator, sample_timesteps): + """Test that selections propagate to TimeSeries.""" + # Add a few time series + ts1 = sample_allocator.add_time_series('series1', 42) + ts2 = sample_allocator.add_time_series('series2', np.array([1, 2, 3, 4, 5])) + ts3 = sample_allocator.add_time_series('series3', 5, has_extra_timestep=True) - # Test to_dataframe with different filters - df_all = populated_collection.to_dataframe(filtered='all') - assert len(df_all.columns) == 6 + # Initially no selections + assert ts1._selected_timesteps is None + assert ts2._selected_timesteps is None + assert ts3._selected_timesteps is None - df_constant = populated_collection.to_dataframe(filtered='constant') - assert len(df_constant.columns) == 4 + # Apply selection + subset_timesteps = sample_timesteps[1:3] + sample_allocator.set_selection(timesteps=subset_timesteps) - df_non_constant = populated_collection.to_dataframe(filtered='non_constant') - assert len(df_non_constant.columns) == 2 + # Check selection propagated to regular time series + assert ts1._selected_timesteps.equals(subset_timesteps) + assert ts2._selected_timesteps.equals(subset_timesteps) - # Test invalid filter - with pytest.raises(ValueError): - populated_collection.to_dataframe(filtered='invalid') - - def test_calculate_aggregation_weights(self, populated_collection): - """Test aggregation weight calculation.""" - weights = populated_collection.calculate_aggregation_weights() - - # Group weights should be 0.5 each (1/2) - assert populated_collection.group_weights['group1'] == 0.5 - - # Series in group1 should have weight 0.5 - assert weights['group1_series1'] == 0.5 - assert weights['group1_series2'] == 0.5 - - # Series with explicit weight should have that weight - assert weights['weighted_series'] == 0.5 - - # Series without group or weight should have weight 1 - assert weights['constant_series'] == 1 - - def test_insert_new_data(self, populated_collection, sample_timesteps): - """Test inserting new data.""" - # Create new data - new_data = pd.DataFrame( - { - 'constant_series': [100, 100, 100, 100, 100], - 'varying_series': [5, 10, 15, 20, 25], - # extra_timestep_series is omitted to test partial updates - }, - index=sample_timesteps, - ) + # Check selection with extra timestep + assert ts3._selected_timesteps is not None + assert len(ts3._selected_timesteps) == len(subset_timesteps) + 1 - # Insert data - populated_collection.insert_new_data(new_data) + # Clear selection + sample_allocator.clear_selection() - # Verify updates - assert np.all(populated_collection['constant_series'].active_data.values == 100) - assert np.array_equal(populated_collection['varying_series'].active_data.values, np.array([5, 10, 15, 20, 25])) + # Check selection cleared + assert ts1._selected_timesteps is None + assert ts2._selected_timesteps is None + assert ts3._selected_timesteps is None - # Series not in the DataFrame should be unchanged - assert np.array_equal( - populated_collection['extra_timestep_series'].active_data.values[:-1], np.array([1, 2, 3, 4, 5]) - ) + def test_update_time_series(self, sample_allocator): + """Test updating a time series.""" + # Add a time series + ts = sample_allocator.add_time_series('series', 42) - # Test with mismatched index - bad_index = pd.date_range('2023-02-01', periods=5, freq='D', name='time') - bad_data = pd.DataFrame({'constant_series': [1, 1, 1, 1, 1]}, index=bad_index) - - with pytest.raises(ValueError, match='must match collection timesteps'): - populated_collection.insert_new_data(bad_data) - - def test_restore_data(self, populated_collection): - """Test restoring original data.""" - # Capture original data - original_values = {name: ts.stored_data.copy() for name, ts in populated_collection.time_series_data.items()} - - # Modify data - new_data = pd.DataFrame( - { - name: np.ones(len(populated_collection.timesteps)) * 999 - for name in populated_collection.time_series_data - if not populated_collection[name].needs_extra_timestep - }, - index=populated_collection.timesteps, - ) + # Update it + sample_allocator.update_time_series('series', np.array([1, 2, 3, 4, 5])) - populated_collection.insert_new_data(new_data) + # Check update was applied + assert np.array_equal(ts.selected_data.values, np.array([1, 2, 3, 4, 5])) - # Verify data was changed - assert np.all(populated_collection['constant_series'].active_data.values == 999) + # Test updating non-existent series + with pytest.raises(KeyError): + sample_allocator.update_time_series('nonexistent', 42) - # Restore data - populated_collection.restore_data() + def test_as_dataset(self, sample_allocator): + """Test as_dataset method.""" + # Add some time series + sample_allocator.add_time_series('series1', 42) + sample_allocator.add_time_series('series2', np.array([1, 2, 3, 4, 5])) - # Verify data was restored - for name, original in original_values.items(): - restored = populated_collection[name].stored_data - assert np.array_equal(restored.values, original.values) + # Get dataset + ds = sample_allocator.as_dataset(with_extra_timestep=False) - def test_class_method_with_uniform_timesteps(self): - """Test the with_uniform_timesteps class method.""" - collection = TimeSeriesCollection.with_uniform_timesteps( - start_time=pd.Timestamp('2023-01-01'), periods=24, freq='H', hours_per_step=1 - ) + # Check dataset contents + assert isinstance(ds, xr.Dataset) + assert 'series1' in ds + assert 'series2' in ds + assert np.all(ds['series1'].values == 42) + assert np.array_equal(ds['series2'].values, np.array([1, 2, 3, 4, 5])) - assert len(collection.timesteps) == 24 - assert collection.hours_of_previous_timesteps == 1 - assert (collection.timesteps[1] - collection.timesteps[0]) == pd.Timedelta(hours=1) - - def test_hours_per_timestep(self, populated_collection): - """Test hours_per_timestep calculation.""" - # Standard case - uniform timesteps - hours = populated_collection.hours_per_timestep.values - assert np.allclose(hours, 24) # Default is daily timesteps - - # Create non-uniform timesteps - non_uniform_times = pd.DatetimeIndex( - [ - pd.Timestamp('2023-01-01'), - pd.Timestamp('2023-01-02'), - pd.Timestamp('2023-01-03 12:00:00'), # 1.5 days from previous - pd.Timestamp('2023-01-04'), # 0.5 days from previous - pd.Timestamp('2023-01-06'), # 2 days from previous - ], - name='time', - ) - collection = TimeSeriesCollection(non_uniform_times) - hours = collection.hours_per_timestep.values +class TestTimeSeriesAllocatorWithScenarios: + """Test suite for TimeSeriesCollection with scenarios.""" - # Expected hours between timestamps - expected = np.array([24, 36, 12, 48, 48]) - assert np.allclose(hours, expected) + def test_initialization_with_scenarios(self, sample_timesteps, sample_scenario_index): + """Test initialization with scenarios.""" + allocator = TimeSeriesCollection(sample_timesteps, scenarios=sample_scenario_index) - def test_validation_and_errors(self, sample_timesteps): - """Test validation and error handling.""" - # Test non-DatetimeIndex - with pytest.raises(TypeError, match='must be a pandas DatetimeIndex'): - TimeSeriesCollection(pd.Index([1, 2, 3, 4, 5])) + assert allocator.timesteps.equals(sample_timesteps) + assert allocator.scenarios.equals(sample_scenario_index) + assert len(allocator._time_series) == 0 - # Test too few timesteps - with pytest.raises(ValueError, match='must contain at least 2 timestamps'): - TimeSeriesCollection(pd.DatetimeIndex([pd.Timestamp('2023-01-01')], name='time')) + def test_add_time_series_with_scenarios(self, sample_scenario_allocator): + """Test creating time series with scenarios.""" + # Test scalar (broadcasts to all scenarios) + ts1 = sample_scenario_allocator.add_time_series('scalar_series', 42) + assert ts1._has_scenarios + assert ts1.name == 'scalar_series' + assert ts1.selected_data.shape == (3, 5) # 3 scenarios, 5 timesteps + assert np.all(ts1.selected_data.values == 42) - # Test invalid active_timesteps - collection = TimeSeriesCollection(sample_timesteps) - invalid_timesteps = pd.date_range('2024-01-01', periods=3, freq='D', name='time') + # Test 1D array (broadcasts to all scenarios) + data = np.array([1, 2, 3, 4, 5]) + ts2 = sample_scenario_allocator.add_time_series('array_series', data) + assert ts2._has_scenarios + assert ts2.selected_data.shape == (3, 5) + # Each scenario should have the same values + for scenario in sample_scenario_allocator.scenarios: + assert np.array_equal(ts2.sel(scenario=scenario).values, data) + + # Test 2D array (one row per scenario) + data_2d = np.array([ + [10, 20, 30, 40, 50], + [15, 25, 35, 45, 55], + [5, 15, 25, 35, 45] + ]) + ts3 = sample_scenario_allocator.add_time_series('scenario_specific_series', data_2d) + assert ts3._has_scenarios + assert ts3.selected_data.shape == (3, 5) + # Each scenario should have its own values + assert np.array_equal(ts3.sel(scenario='baseline').values, data_2d[0]) + assert np.array_equal(ts3.sel(scenario='high_demand').values, data_2d[1]) + assert np.array_equal(ts3.sel(scenario='low_price').values, data_2d[2]) + + def test_selection_propagation_with_scenarios(self, sample_scenario_allocator, sample_timesteps, sample_scenario_index): + """Test scenario selection propagation.""" + # Add some time series + ts1 = sample_scenario_allocator.add_time_series('series1', 42) + ts2 = sample_scenario_allocator.add_time_series('series2', np.array([1, 2, 3, 4, 5])) + + # Initial state - no selections + assert ts1._selected_scenarios is None + assert ts2._selected_scenarios is None + + # Select scenarios + subset_scenarios = sample_scenario_index[:2] + sample_scenario_allocator.set_selection(scenarios=subset_scenarios) + + # Check selections propagated + assert ts1._selected_scenarios.equals(subset_scenarios) + assert ts2._selected_scenarios.equals(subset_scenarios) + + # Check data is filtered + assert ts1.selected_data.shape == (2, 5) # 2 scenarios, 5 timesteps + assert ts2.selected_data.shape == (2, 5) + + # Apply combined selection + subset_timesteps = sample_timesteps[1:3] + sample_scenario_allocator.set_selection(timesteps=subset_timesteps, scenarios=subset_scenarios) + + # Check combined selection applied + assert ts1._selected_timesteps.equals(subset_timesteps) + assert ts1._selected_scenarios.equals(subset_scenarios) + assert ts1.selected_data.shape == (2, 2) # 2 scenarios, 2 timesteps + + # Clear selections + sample_scenario_allocator.clear_selection() + assert ts1._selected_timesteps is None + assert ts1._selected_scenarios is None + assert ts1.selected_data.shape == (3, 5) # Back to full shape + + def test_as_dataset_with_scenarios(self, sample_scenario_allocator): + """Test as_dataset method with scenarios.""" + # Add some time series + sample_scenario_allocator.add_time_series('scalar_series', 42) + sample_scenario_allocator.add_time_series( + 'varying_series', + np.array([ + [10, 20, 30, 40, 50], + [15, 25, 35, 45, 55], + [5, 15, 25, 35, 45] + ]) + ) - with pytest.raises(ValueError, match='must be a subset'): - collection.activate_timesteps(invalid_timesteps) + # Get dataset + ds = sample_scenario_allocator.as_dataset(with_extra_timestep=False) + + # Check dataset dimensions + assert 'scenario' in ds.dims + assert 'time' in ds.dims + assert ds.dims['scenario'] == 3 + assert ds.dims['time'] == 5 + + # Check dataset variables + assert 'scalar_series' in ds + assert 'varying_series' in ds + + # Check values + assert np.all(ds['scalar_series'].values == 42) + baseline_values = ds['varying_series'].sel(scenario='baseline').values + assert np.array_equal(baseline_values, np.array([10, 20, 30, 40, 50])) + + def test_contains_and_iteration(self, sample_scenario_allocator): + """Test __contains__ and __iter__ methods.""" + # Add some time series + ts1 = sample_scenario_allocator.add_time_series('series1', 42) + sample_scenario_allocator.add_time_series('series2', 10) + + # Test __contains__ + assert 'series1' in sample_scenario_allocator + assert ts1 in sample_scenario_allocator + assert 'nonexistent' not in sample_scenario_allocator + + # Test behavior with invalid type + with pytest.raises(TypeError): + assert 42 in sample_scenario_allocator + + def test_update_time_series_with_scenarios(self, sample_scenario_allocator, sample_scenario_index): + """Test updating a time series with scenarios.""" + # Add a time series + ts = sample_scenario_allocator.add_time_series('series', 42) + assert ts._has_scenarios + assert np.all(ts.selected_data.values == 42) + + # Update with scenario-specific data + new_data = np.array([ + [1, 2, 3, 4, 5], + [6, 7, 8, 9, 10], + [11, 12, 13, 14, 15] + ]) + sample_scenario_allocator.update_time_series('series', new_data) + + # Check update was applied + assert np.array_equal(ts.selected_data.values, new_data) + assert ts._has_scenarios + + # Check scenario-specific values + assert np.array_equal(ts.sel(scenario='baseline').values, new_data[0]) + assert np.array_equal(ts.sel(scenario='high_demand').values, new_data[1]) + assert np.array_equal(ts.sel(scenario='low_price').values, new_data[2]) + + +if __name__ == '__main__': + pytest.main()