Source code for etna.pipeline.hierarchical_pipeline

import pathlib
from copy import deepcopy
from typing import Dict
from typing import List
from typing import Optional
from typing import Sequence

import pandas as pd

from etna.datasets.tsdataset import TSDataset
from etna.datasets.utils import get_target_with_quantiles
from etna.loggers import tslogger
from etna.metrics import MAE
from etna.metrics import Metric
from etna.models.base import ModelType
from etna.pipeline.pipeline import Pipeline
from etna.reconciliation.base import BaseReconciliator
from etna.transforms.base import Transform


[docs]class HierarchicalPipeline(Pipeline): """Pipeline of transforms with a final estimator for hierarchical time series data. Notes ----- Aggregation of target quantiles and components is performed along with the target itself. It uses a provided hierarchical structure and a reconciliation method. """ def __init__( self, reconciliator: BaseReconciliator, model: ModelType, transforms: Sequence[Transform] = (), horizon: int = 1 ): """Create instance of HierarchicalPipeline with given parameters. Parameters ---------- reconciliator: Instance of reconciliation method model: Instance of the etna Model transforms: Sequence of the transforms horizon: Number of timestamps in the future for forecasting Warnings -------- Estimation of forecast intervals with `forecast(prediction_interval=True)` method and `BottomUpReconciliator` may be not reliable. """ super().__init__(model=model, transforms=transforms, horizon=horizon) self.reconciliator = reconciliator self._fit_ts: Optional[TSDataset] = None
[docs] def fit(self, ts: TSDataset) -> "HierarchicalPipeline": """Fit the HierarchicalPipeline. Fit and apply given transforms to the data, then fit the model on the transformed data. Provided hierarchical dataset will be aggregated to the source level before fitting pipeline. Parameters ---------- ts: Dataset with hierarchical timeseries data Returns ------- : Fitted HierarchicalPipeline instance """ self._fit_ts = deepcopy(ts) self.reconciliator.fit(ts=ts) ts = self.reconciliator.aggregate(ts=ts) super().fit(ts=ts) return self
[docs] def raw_forecast( self, ts: TSDataset, prediction_interval: bool = False, quantiles: Sequence[float] = (0.25, 0.75), n_folds: int = 3, return_components: bool = False, ) -> TSDataset: """Make a forecast of the next points of a dataset at the source level. The result of forecasting starts from the last point of ``ts``, not including it. Parameters ---------- ts: Dataset to forecast prediction_interval: If True returns prediction interval for forecast quantiles: Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval n_folds: Number of folds to use in the backtest for prediction interval estimation return_components: If True additionally returns forecast components Returns ------- : Dataset with predictions at the source level """ # handle `prediction_interval=True` separately source_ts = self.reconciliator.aggregate(ts=ts) forecast = super().forecast( ts=source_ts, prediction_interval=False, n_folds=n_folds, return_components=return_components ) if prediction_interval: forecast = self._forecast_prediction_interval( ts=ts, predictions=forecast, quantiles=quantiles, n_folds=n_folds ) target_columns = tuple(get_target_with_quantiles(columns=forecast.columns)) hierarchical_forecast = TSDataset( df=forecast[..., target_columns], freq=forecast.freq, df_exog=forecast.df_exog, known_future=forecast.known_future, hierarchical_structure=ts.hierarchical_structure, # type: ignore ) if return_components: hierarchical_forecast.add_target_components(target_components_df=forecast.get_target_components()) return hierarchical_forecast
[docs] def raw_predict( self, ts: TSDataset, start_timestamp: Optional[pd.Timestamp] = None, end_timestamp: Optional[pd.Timestamp] = None, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975), return_components: bool = False, ) -> TSDataset: """Make in-sample predictions on dataset at the source level in a given range. Parameters ---------- ts: Dataset to make predictions on. If not given, dataset given during :py:meth:``fit`` is used. start_timestamp: First timestamp of prediction range to return, should be >= than first timestamp in ``ts``; expected that beginning of each segment <= ``start_timestamp``; if isn't set the first timestamp where each segment began is taken. end_timestamp: Last timestamp of prediction range to return; if isn't set the last timestamp of ``ts`` is taken. Expected that value is less or equal to the last timestamp in ``ts``. prediction_interval: If True returns prediction interval for forecast. quantiles: Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval. return_components: If True additionally returns forecast components. Returns ------- : Dataset with predictions at the source level in ``[start_timestamp, end_timestamp]`` range. """ source_ts = self.reconciliator.aggregate(ts=ts) forecast = super().predict( ts=source_ts, start_timestamp=start_timestamp, end_timestamp=end_timestamp, prediction_interval=prediction_interval, quantiles=quantiles, return_components=return_components, ) target_columns = tuple(get_target_with_quantiles(columns=forecast.columns)) hierarchical_forecast = TSDataset( df=forecast[..., target_columns], freq=forecast.freq, df_exog=forecast.df_exog, known_future=forecast.known_future, hierarchical_structure=ts.hierarchical_structure, # type: ignore ) if return_components: hierarchical_forecast.add_target_components(target_components_df=forecast.get_target_components()) return hierarchical_forecast
[docs] def forecast( self, ts: Optional[TSDataset] = None, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975), n_folds: int = 3, return_components: bool = False, ) -> TSDataset: """Make a forecast of the next points of a dataset at a target level. The result of forecasting starts from the last point of ``ts``, not including it. Method makes a prediction for target at the source level of hierarchy and then makes reconciliation to target level. Parameters ---------- ts: Dataset to forecast. If not given, dataset given during :py:meth:``fit`` is used. prediction_interval: If True returns prediction interval for forecast quantiles: Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval n_folds: Number of folds to use in the backtest for prediction interval estimation return_components: If True additionally returns forecast components Returns ------- : Dataset with predictions at the target level of hierarchy. """ if ts is None: if self._fit_ts is None: raise ValueError( "There is no ts to forecast! Pass ts into forecast method or make sure that pipeline is loaded with ts." ) ts = self._fit_ts forecast = self.raw_forecast( ts=ts, prediction_interval=prediction_interval, quantiles=quantiles, n_folds=n_folds, return_components=return_components, ) forecast_reconciled = self.reconciliator.reconcile(forecast) return forecast_reconciled
[docs] def predict( self, ts: Optional[TSDataset] = None, start_timestamp: Optional[pd.Timestamp] = None, end_timestamp: Optional[pd.Timestamp] = None, prediction_interval: bool = False, quantiles: Sequence[float] = (0.025, 0.975), return_components: bool = False, ) -> TSDataset: """Make in-sample predictions on dataset at the target level in a given range. Method makes a prediction for target at the source level of hierarchy and then makes reconciliation to the target level. Currently, in situation when segments start with different timestamps we only guarantee to work with ``start_timestamp`` >= beginning of all segments. Parameters ---------- ts: Dataset to make predictions on. If not given, dataset given during :py:meth:``fit`` is used. start_timestamp: First timestamp of prediction range to return, should be >= than first timestamp in ``ts``; expected that beginning of each segment <= ``start_timestamp``; if isn't set the first timestamp where each segment began is taken. end_timestamp: Last timestamp of prediction range to return; if isn't set the last timestamp of ``ts`` is taken. Expected that value is less or equal to the last timestamp in ``ts``. prediction_interval: If True returns prediction interval for forecast. quantiles: Levels of prediction distribution. By default 2.5% and 97.5% taken to form a 95% prediction interval. return_components: If True additionally returns forecast components. Returns ------- : Dataset with predictions at the target level in ``[start_timestamp, end_timestamp]`` range. """ if ts is None: if self._fit_ts is None: raise ValueError( "There is no ts to predict! Pass ts into predict method or make sure that pipeline is loaded with ts." ) ts = self._fit_ts forecast = self.raw_predict( ts=ts, start_timestamp=start_timestamp, end_timestamp=end_timestamp, prediction_interval=prediction_interval, quantiles=quantiles, return_components=return_components, ) forecast_reconciled = self.reconciliator.reconcile(forecast) return forecast_reconciled
def _compute_metrics( self, metrics: List[Metric], y_true: TSDataset, y_pred: TSDataset ) -> Dict[str, Dict[str, float]]: """Compute metrics for given y_true, y_pred.""" if y_true.current_df_level != self.reconciliator.target_level: y_true = y_true.get_level_dataset(self.reconciliator.target_level) if y_pred.current_df_level == self.reconciliator.source_level: y_pred = self.reconciliator.reconcile(y_pred) metrics_values: Dict[str, Dict[str, float]] = {} for metric in metrics: metrics_values[metric.name] = metric(y_true=y_true, y_pred=y_pred) # type: ignore return metrics_values def _forecast_prediction_interval( self, ts: TSDataset, predictions: TSDataset, quantiles: Sequence[float], n_folds: int ) -> TSDataset: """Add prediction intervals to the forecasts.""" if self.ts is None: raise ValueError("Pipeline is not fitted! Fit the Pipeline before calling forecast method.") self.forecast, self.raw_forecast = self.raw_forecast, self.forecast # type: ignore try: # TODO: rework intervals estimation for `BottomUpReconciliator` with tslogger.disable(): _, forecasts, _ = self.backtest(ts=ts, metrics=[MAE()], n_folds=n_folds) source_ts = self.reconciliator.aggregate(ts=ts) self._add_forecast_borders( ts=source_ts, backtest_forecasts=forecasts, quantiles=quantiles, predictions=predictions ) return predictions finally: self.forecast, self.raw_forecast = self.raw_forecast, self.forecast # type: ignore
[docs] def save(self, path: pathlib.Path): """Save the object. Parameters ---------- path: Path to save object to. """ fit_ts = self._fit_ts try: # extract attributes we can't easily save delattr(self, "_fit_ts") # save the remaining part super().save(path=path) finally: self._fit_ts = fit_ts
[docs] @classmethod def load(cls, path: pathlib.Path, ts: Optional[TSDataset] = None) -> "HierarchicalPipeline": """Load an object. Parameters ---------- path: Path to load object from. ts: TSDataset to set into loaded pipeline. Returns ------- : Loaded object. """ obj = super().load(path=path) obj._fit_ts = deepcopy(ts) if ts is not None: obj.ts = obj.reconciliator.aggregate(ts=ts) else: obj.ts = None return obj