Source code for etna.analysis.forecast.utils

import warnings
from copy import deepcopy
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Union

import pandas as pd

if TYPE_CHECKING:
    from etna.datasets import TSDataset


[docs]def get_residuals(forecast_df: pd.DataFrame, ts: "TSDataset") -> "TSDataset": """Get residuals for further analysis. Function keeps hierarchy, features in result dataset and removes target components. Parameters ---------- forecast_df: forecasted dataframe with timeseries data ts: dataset of timeseries that has answers to forecast Returns ------- new_ts: TSDataset with residuals in forecasts Raises ------ KeyError: if segments of ``forecast_df`` and ``ts`` aren't the same """ from etna.datasets import TSDataset # remove target components ts_copy = deepcopy(ts) ts_copy.drop_target_components() # find the residuals true_df = ts_copy[forecast_df.index, :, :] if set(ts_copy.segments) != set(forecast_df.columns.get_level_values("segment").unique()): raise KeyError("Segments of `ts` and `forecast_df` should be the same") true_df.loc[:, pd.IndexSlice[ts.segments, "target"]] -= forecast_df.loc[:, pd.IndexSlice[ts.segments, "target"]] # make TSDataset new_ts = TSDataset(df=true_df, freq=ts_copy.freq, hierarchical_structure=ts_copy.hierarchical_structure) new_ts.known_future = deepcopy(ts_copy.known_future) new_ts._regressors = deepcopy(ts_copy.regressors) if ts.df_exog is not None: new_ts.df_exog = ts.df_exog.copy(deep=True) return new_ts
[docs]def _get_existing_quantiles(ts: "TSDataset") -> Set[float]: """Get quantiles that are present inside the TSDataset.""" cols = [col for col in ts.columns.get_level_values("feature").unique().tolist() if col.startswith("target_0.")] existing_quantiles = {float(col[len("target_") :]) for col in cols} return existing_quantiles
[docs]def _select_quantiles(forecast_results: Dict[str, "TSDataset"], quantiles: Optional[List[float]]) -> List[float]: """Select quantiles from the forecast results. Selected quantiles exist in each forecast. """ intersection_quantiles_set = set.intersection( *[_get_existing_quantiles(forecast) for forecast in forecast_results.values()] ) intersection_quantiles = sorted(intersection_quantiles_set) if quantiles is None: selected_quantiles = intersection_quantiles else: selected_quantiles = sorted(set(quantiles) & intersection_quantiles_set) non_existent = set(quantiles) - intersection_quantiles_set if non_existent: warnings.warn(f"Quantiles {non_existent} do not exist in each forecast dataset. They will be dropped.") return selected_quantiles
[docs]def _prepare_forecast_results( forecast_ts: Union["TSDataset", List["TSDataset"], Dict[str, "TSDataset"]] ) -> Dict[str, "TSDataset"]: """Prepare dictionary with forecasts results.""" from etna.datasets import TSDataset if isinstance(forecast_ts, TSDataset): return {"1": forecast_ts} elif isinstance(forecast_ts, list) and len(forecast_ts) > 0: return {str(i + 1): forecast for i, forecast in enumerate(forecast_ts)} elif isinstance(forecast_ts, dict) and len(forecast_ts) > 0: return forecast_ts else: raise ValueError("Unknown type of `forecast_ts`")
[docs]def _validate_intersecting_segments(fold_numbers: pd.Series): """Validate if segments aren't intersecting.""" fold_info = [] for fold_number in fold_numbers.unique(): fold_start = fold_numbers[fold_numbers == fold_number].index.min() fold_end = fold_numbers[fold_numbers == fold_number].index.max() fold_info.append({"fold_start": fold_start, "fold_end": fold_end}) fold_info.sort(key=lambda x: x["fold_start"]) for fold_info_1, fold_info_2 in zip(fold_info[:-1], fold_info[1:]): if fold_info_2["fold_start"] <= fold_info_1["fold_end"]: raise ValueError("Folds are intersecting")