Source code for etna.reconciliation.top_down

from enum import Enum

import bottleneck as bn
import pandas as pd
from scipy.sparse import lil_matrix

from etna.datasets import TSDataset
from etna.reconciliation.base import BaseReconciliator


[docs]class ReconciliationProportionsMethod(str, Enum): """Enum for different reconciliation proportions methods.""" AHP = "AHP" PHA = "PHA" @classmethod def _missing_(cls, method): raise ValueError( f"Unable to recognize reconciliation method '{method}'! " f"Supported methods: {', '.join(sorted(m for m in cls))}." )
[docs]class TopDownReconciliator(BaseReconciliator): """Top-down reconciliation methods. Notes ----- Top-down reconciliation methods support only non-negative data. """ def __init__(self, target_level: str, source_level: str, period: int, method: str): """Create top-down reconciliator from ``source_level`` to ``target_level``. Parameters ---------- target_level: Level to be reconciled from the forecasts. source_level: Level to be forecasted. period: Period length for calculation reconciliation proportions. method: Proportions calculation method. Selects last ``period`` timestamps for estimation. Currently supported options: * AHP - Average historical proportions * PHA - Proportions of the historical averages """ super().__init__(target_level=target_level, source_level=source_level) if period < 1: raise ValueError("Period length must be positive!") self.period = period self.method = method proportions_method = ReconciliationProportionsMethod(method) if proportions_method == ReconciliationProportionsMethod.AHP: self._proportions_method_func = self._estimate_ahp_proportion elif proportions_method == ReconciliationProportionsMethod.PHA: self._proportions_method_func = self._estimate_pha_proportion else: raise ValueError(f"Failed to initialize proportions calculation method with name '{method}'!")
[docs] def fit(self, ts: TSDataset) -> "TopDownReconciliator": """Fit the reconciliator parameters. Parameters ---------- ts: TSDataset on the level which is lower or equal to ``target_level``, ``source_level``. Returns ------- : Fitted instance of reconciliator. """ if ts.hierarchical_structure is None: raise ValueError(f"The method can be applied only to instances with a hierarchy!") current_level_index = ts.hierarchical_structure.get_level_depth(ts.current_df_level) # type: ignore source_level_index = ts.hierarchical_structure.get_level_depth(self.source_level) target_level_index = ts.hierarchical_structure.get_level_depth(self.target_level) if target_level_index < source_level_index: raise ValueError("Target level should be lower or equal in the hierarchy than the source level!") if current_level_index < target_level_index: raise ValueError("Current TSDataset level should be lower or equal in the hierarchy than the target level!") if (ts[..., "target"] < 0).values.any(): raise ValueError("Provided dataset should not contain any negative numbers!") source_level_ts = ts.get_level_dataset(self.source_level) target_level_ts = ts.get_level_dataset(self.target_level) if source_level_index < target_level_index: summing_matrix = target_level_ts.hierarchical_structure.get_summing_matrix( # type: ignore target_level=self.source_level, source_level=self.target_level ) source_level_segments = source_level_ts.hierarchical_structure.get_level_segments(self.source_level) # type: ignore target_level_segments = target_level_ts.hierarchical_structure.get_level_segments(self.target_level) # type: ignore self.mapping_matrix = lil_matrix((len(target_level_segments), len(source_level_segments))) for source_index, target_index in zip(*summing_matrix.nonzero()): source_segment = source_level_segments[source_index] target_segment = target_level_segments[target_index] self.mapping_matrix[target_index, source_index] = self._proportions_method_func( # type: ignore target_series=target_level_ts[:, target_segment, "target"], source_series=source_level_ts[:, source_segment, "target"], ) self.mapping_matrix = self.mapping_matrix.tocsr() else: self.mapping_matrix = target_level_ts.hierarchical_structure.get_summing_matrix( # type: ignore target_level=self.target_level, source_level=self.source_level ) return self
def _estimate_ahp_proportion(self, target_series: pd.Series, source_series: pd.Series) -> float: """Calculate reconciliation proportion with Average historical proportions method.""" data = pd.concat((target_series, source_series), axis=1).values data = data[-self.period :] return bn.nanmean(data[..., 0] / data[..., 1]) def _estimate_pha_proportion(self, target_series: pd.Series, source_series: pd.Series) -> float: """Calculate reconciliation proportion with Proportions of the historical averages method.""" target_data = target_series.values source_data = source_series.values return bn.nanmean(target_data[-self.period :]) / bn.nanmean(source_data[-self.period :])