Source code for etna.transforms.missing_values.imputation

from enum import Enum
from typing import Dict
from typing import List
from typing import Optional
from typing import cast

import numpy as np
import pandas as pd

from etna.distributions import BaseDistribution
from etna.distributions import CategoricalDistribution
from etna.distributions import IntDistribution
from etna.transforms.base import ReversibleTransform
from etna.transforms.utils import check_new_segments


[docs]class ImputerMode(str, Enum): """Enum for different imputation strategy.""" mean = "mean" running_mean = "running_mean" forward_fill = "forward_fill" seasonal = "seasonal" constant = "constant" @classmethod def _missing_(cls, value): raise NotImplementedError( f"{value} is not a valid {cls.__name__}. Supported strategies: {', '.join([repr(m.value) for m in cls])}" )
[docs]class TimeSeriesImputerTransform(ReversibleTransform): """Transform to fill NaNs in series of a given dataframe. - It is assumed that given series begins with first non NaN value. - This transform can't fill NaNs in the future, only on train data. - This transform can't fill NaNs if all values are NaNs. In this case exception is raised. Warning ------- This transform can suffer from look-ahead bias in 'mean' mode. For transforming data at some timestamp it uses information from the whole train part. """ def __init__( self, in_column: str = "target", strategy: str = ImputerMode.constant, window: int = -1, seasonality: int = 1, default_value: Optional[float] = None, constant_value: float = 0, ): """ Create instance of TimeSeriesImputerTransform. Parameters ---------- in_column: name of processed column strategy: filling value in missing timestamps: - If "mean", then replace missing dates using the mean in fit stage. - If "running_mean" then replace missing dates using mean of subset of data - If "forward_fill" then replace missing dates using last existing value - If "seasonal" then replace missing dates using seasonal moving average - If "constant" then replace missing dates using constant value. window: In case of moving average and seasonality. * If ``window=-1`` all previous dates are taken in account * Otherwise only window previous dates seasonality: the length of the seasonality default_value: value which will be used to impute the NaNs left after applying the imputer with the chosen strategy constant_value: value to fill gaps in "constant" strategy Raises ------ ValueError: if incorrect strategy given """ super().__init__(required_features=[in_column]) self.in_column = in_column self.strategy = strategy self.window = window self.seasonality = seasonality self.default_value = default_value self.constant_value = constant_value self._strategy = ImputerMode(strategy) self._fill_value: Optional[Dict[str, float]] = None self._nan_timestamps: Optional[Dict[str, List[pd.Timestamp]]] = None
[docs] def get_regressors_info(self) -> List[str]: """Return the list with regressors created by the transform.""" return []
def _fit(self, df: pd.DataFrame): """Fit the transform. Parameters ---------- df: Dataframe in etna wide format. """ segments = sorted(set(df.columns.get_level_values("segment"))) features = df.loc[:, pd.IndexSlice[segments, self.in_column]] if features.isna().all().any(): raise ValueError("Series hasn't non NaN values which means it is empty and can't be filled.") nan_timestamps = {} for segment in segments: series = features.loc[:, pd.IndexSlice[segment, self.in_column]] series = series[series.first_valid_index() :] nan_timestamps[segment] = series[series.isna()].index fill_value = {} if self._strategy is ImputerMode.mean: mean_values = features.mean().to_dict() # take only segment from multiindex key mean_values = {key[0]: value for key, value in mean_values.items()} fill_value = mean_values self._nan_timestamps = nan_timestamps self._fill_value = fill_value def _transform(self, df: pd.DataFrame) -> pd.DataFrame: """Transform dataframe. Parameters ---------- df: Dataframe in etna wide format. Returns ------- : Transformed Dataframe in etna wide format. """ if self._fill_value is None or self._nan_timestamps is None: raise ValueError("Transform is not fitted!") segments = sorted(set(df.columns.get_level_values("segment"))) check_new_segments(transform_segments=segments, fit_segments=self._nan_timestamps.keys()) cur_nans = {} for segment in segments: series = df.loc[:, pd.IndexSlice[segment, self.in_column]] cur_nans[segment] = series[series.isna()].index result_df = self._fill(df) # restore nans not in self.nan_timestamps for segment in segments: restore_nans = cur_nans[segment].difference(self._nan_timestamps[segment]) result_df.loc[restore_nans, pd.IndexSlice[segment, self.in_column]] = np.nan return result_df def _fill(self, df: pd.DataFrame) -> pd.DataFrame: """Fill the NaNs in a given Dataframe. Fills missed values for new dates according to ``self.strategy`` Parameters ---------- df: dataframe to fill Returns ------- : Filled Dataframe. """ self._fill_value = cast(Dict[str, float], self._fill_value) self._nan_timestamps = cast(Dict[str, List[pd.Timestamp]], self._nan_timestamps) segments = sorted(set(df.columns.get_level_values("segment"))) if self._strategy is ImputerMode.constant: new_values = df.loc[:, pd.IndexSlice[:, self.in_column]].fillna(value=self.constant_value) df.loc[:, pd.IndexSlice[:, self.in_column]] = new_values elif self._strategy is ImputerMode.forward_fill: new_values = df.loc[:, pd.IndexSlice[:, self.in_column]].fillna(method="ffill") df.loc[:, pd.IndexSlice[:, self.in_column]] = new_values elif self._strategy is ImputerMode.mean: for segment in segments: df.loc[:, pd.IndexSlice[segment, self.in_column]].fillna(value=self._fill_value[segment], inplace=True) elif self._strategy is ImputerMode.running_mean or self._strategy is ImputerMode.seasonal: timestamp_to_index = {timestamp: i for i, timestamp in enumerate(df.index)} for segment in segments: history = self.seasonality * self.window if self.window != -1 else len(df) for timestamp in self._nan_timestamps[segment]: i = timestamp_to_index[timestamp] indexes = np.arange(i - self.seasonality, i - self.seasonality - history, -self.seasonality) indexes = indexes[indexes >= 0] values = df.loc[df.index[indexes], pd.IndexSlice[segment, self.in_column]] df.loc[timestamp, pd.IndexSlice[segment, self.in_column]] = np.nanmean(values) if self.default_value is not None: df.fillna(value=self.default_value, inplace=True) return df def _inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: """Inverse transform dataframe. Parameters ---------- df: Dataframe to be inverse transformed. Returns ------- : Dataframe after applying inverse transformation. """ if self._fill_value is None or self._nan_timestamps is None: raise ValueError("Transform is not fitted!") segments = sorted(set(df.columns.get_level_values("segment"))) check_new_segments(transform_segments=segments, fit_segments=self._nan_timestamps.keys()) for segment in segments: index = df.index.intersection(self._nan_timestamps[segment]) df.loc[index, pd.IndexSlice[segment, self.in_column]] = np.NaN return df
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]: """Get default grid for tuning hyperparameters. This grid tunes parameters: ``strategy``, ``window``. Other parameters are expected to be set by the user. Strategy "seasonal" is suggested only if ``self.seasonality`` is set higher than 1. Returns ------- : Grid to tune. """ if self.seasonality > 1: return { "strategy": CategoricalDistribution(["constant", "mean", "running_mean", "forward_fill", "seasonal"]), "window": IntDistribution(low=1, high=20), } else: return { "strategy": CategoricalDistribution(["constant", "mean", "running_mean", "forward_fill"]), "window": IntDistribution(low=1, high=20), }
__all__ = ["TimeSeriesImputerTransform"]