Source code for etna.transforms.math.sklearn

import warnings
from copy import deepcopy
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from typing import cast

import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin

from etna.core import StringEnumWithRepr
from etna.datasets import TSDataset
from etna.datasets import set_columns_wide
from etna.distributions import BaseDistribution
from etna.distributions import CategoricalDistribution
from etna.transforms.base import ReversibleTransform
from etna.transforms.utils import check_new_segments
from etna.transforms.utils import match_target_quantiles


[docs]class TransformMode(StringEnumWithRepr): """Enum for different metric aggregation modes.""" macro = "macro" per_segment = "per-segment"
[docs]class SklearnTransform(ReversibleTransform): """Base class for different sklearn transforms.""" def __init__( self, in_column: Optional[Union[str, List[str]]], out_column: Optional[str], transformer: TransformerMixin, inplace: bool = True, mode: Union[TransformMode, str] = "per-segment", ): """ Init SklearnTransform. Parameters ---------- in_column: columns to be transformed, if None - all columns will be transformed. transformer: :py:class:`sklearn.base.TransformerMixin` instance. inplace: features are changed by transformed. out_column: base for the names of generated columns, uses ``self.__repr__()`` if not given. mode: "macro" or "per-segment", way to transform features over segments. * If "macro", transforms features globally, gluing the corresponding ones for all segments. * If "per-segment", transforms features for each segment separately. Raises ------ ValueError: if incorrect mode given """ if isinstance(in_column, str): in_column = [in_column] required_features = sorted(in_column) if in_column is not None else "all" super().__init__(required_features=required_features) # type: ignore if inplace and (out_column is not None): warnings.warn("Transformation will be applied inplace, out_column param will be ignored") self.in_column = in_column if in_column is None else sorted(in_column) self.transformer = transformer self.inplace = inplace self.mode = TransformMode(mode) self.out_column = out_column self.out_columns: Optional[List[str]] = None self.out_column_regressors: Optional[List[str]] = None self._fit_segments: Optional[List[str]] = None def _get_column_name(self, in_column: str) -> str: if self.out_column is None: new_transform = deepcopy(self) new_transform.in_column = [in_column] return repr(new_transform) else: return f"{self.out_column}_{in_column}" def _fit(self, df: pd.DataFrame) -> "SklearnTransform": """ Fit transformer with data from df. Parameters ---------- df: DataFrame to fit transformer. Returns ------- : """ df = df.sort_index(axis=1) if self.in_column is None: self.in_column = sorted(set(df.columns.get_level_values("feature"))) if self.inplace: self.out_columns = self.in_column else: self.out_columns = [self._get_column_name(column) for column in self.in_column] self._fit_segments = df.columns.get_level_values("segment").unique().tolist() if self.mode == TransformMode.per_segment: x = df.loc[:, pd.IndexSlice[:, self.in_column]].values elif self.mode == TransformMode.macro: x = self._preprocess_macro(df) else: raise ValueError(f"'{self.mode}' is not a valid TransformMode.") self.transformer.fit(X=x) return self
[docs] def fit(self, ts: TSDataset) -> "SklearnTransform": """Fit the transform.""" super().fit(ts) if self.in_column is None: raise ValueError("Something went wrong during the fit, cat not recognize in_column!") self.out_column_regressors = [ self._get_column_name(in_column) for in_column in self.in_column if in_column in ts.regressors ] return self
def _transform(self, df: pd.DataFrame) -> pd.DataFrame: """ Transform given data with fitted transformer. Parameters ---------- df: DataFrame to transform with transformer. Returns ------- : transformed DataFrame. Raises ------ ValueError: If transform isn't fitted. NotImplementedError: If there are segments that weren't present during training. """ if self._fit_segments is None: raise ValueError("The transform isn't fitted!") else: self.in_column = cast(List[str], self.in_column) df = df.sort_index(axis=1) transformed = self._make_transform(df) if self.inplace: df.loc[:, pd.IndexSlice[:, self.in_column]] = transformed else: segments = sorted(set(df.columns.get_level_values("segment"))) transformed_features = pd.DataFrame( transformed, columns=df.loc[:, pd.IndexSlice[:, self.in_column]].columns, index=df.index ).sort_index(axis=1) transformed_features.columns = pd.MultiIndex.from_product([segments, self.out_columns]) df = pd.concat((df, transformed_features), axis=1) df = df.sort_index(axis=1) return df def _inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: """ Apply inverse transformation to DataFrame. Parameters ---------- df: DataFrame to apply inverse transform. Returns ------- : transformed DataFrame. Raises ------ ValueError: If transform isn't fitted. NotImplementedError: If there are segments that weren't present during training. """ if self._fit_segments is None: raise ValueError("The transform isn't fitted!") else: self.in_column = cast(List[str], self.in_column) df = df.sort_index(axis=1) if "target" in self.in_column: quantiles = match_target_quantiles(set(df.columns.get_level_values("feature"))) else: quantiles = set() if self.inplace: quantiles_arrays: Dict[str, pd.DataFrame] = dict() transformed = self._make_inverse_transform(df) # quantiles inverse transformation for quantile_column_nm in quantiles: df_slice_copy = df.loc[:, pd.IndexSlice[:, self.in_column]].copy() df_slice_copy = set_columns_wide( df_slice_copy, df, features_left=["target"], features_right=[quantile_column_nm] ) transformed_quantile = self._make_inverse_transform(df_slice_copy) df_slice_copy.loc[:, pd.IndexSlice[:, self.in_column]] = transformed_quantile quantiles_arrays[quantile_column_nm] = df_slice_copy.loc[:, pd.IndexSlice[:, "target"]].rename( columns={"target": quantile_column_nm} ) df.loc[:, pd.IndexSlice[:, self.in_column]] = transformed for quantile_column_nm in quantiles: df.loc[:, pd.IndexSlice[:, quantile_column_nm]] = quantiles_arrays[quantile_column_nm].values return df def _preprocess_macro(self, df: pd.DataFrame) -> np.ndarray: segments = sorted(set(df.columns.get_level_values("segment"))) x = df.loc[:, pd.IndexSlice[:, self.in_column]] x = pd.concat([x[segment] for segment in segments]).values return x def _postprocess_macro(self, df: pd.DataFrame, transformed: np.ndarray) -> np.ndarray: time_period_len = len(df) n_segments = len(set(df.columns.get_level_values("segment"))) transformed = np.concatenate( [transformed[i * time_period_len : (i + 1) * time_period_len, :] for i in range(n_segments)], axis=1 ) return transformed def _preprocess_per_segment(self, df: pd.DataFrame) -> np.ndarray: self._fit_segments = cast(List[str], self._fit_segments) transform_segments = df.columns.get_level_values("segment").unique().tolist() check_new_segments(transform_segments=transform_segments, fit_segments=self._fit_segments) df = df.loc[:, pd.IndexSlice[:, self.in_column]] to_add_segments = set(self._fit_segments) - set(transform_segments) df_to_add = pd.DataFrame(index=df.index, columns=pd.MultiIndex.from_product([to_add_segments, self.in_column])) df = pd.concat([df, df_to_add], axis=1) df = df.sort_index(axis=1) return df.values def _postprocess_per_segment(self, df: pd.DataFrame, transformed: np.ndarray) -> np.ndarray: self._fit_segments = cast(List[str], self._fit_segments) self.in_column = cast(List[str], self.in_column) num_features = len(self.in_column) transform_segments = set(df.columns.get_level_values("segment")) select_segments = [segment in transform_segments for segment in self._fit_segments] # make a mask for columns to select select_columns = np.repeat(select_segments, num_features) result = transformed[:, select_columns] return result def _make_transform(self, df: pd.DataFrame) -> np.ndarray: if self.mode == TransformMode.per_segment: x = self._preprocess_per_segment(df) transformed = self.transformer.transform(X=x) transformed = self._postprocess_per_segment(df, transformed) elif self.mode == TransformMode.macro: x = self._preprocess_macro(df) transformed = self.transformer.transform(X=x) transformed = self._postprocess_macro(df, transformed) else: raise ValueError(f"'{self.mode}' is not a valid TransformMode.") return transformed def _make_inverse_transform(self, df: pd.DataFrame) -> np.ndarray: if self.mode == TransformMode.per_segment: x = self._preprocess_per_segment(df) transformed = self.transformer.inverse_transform(X=x) transformed = self._postprocess_per_segment(df, transformed) elif self.mode == TransformMode.macro: x = self._preprocess_macro(df) transformed = self.transformer.inverse_transform(X=x) transformed = self._postprocess_macro(df, transformed) else: raise ValueError(f"'{self.mode}' is not a valid TransformMode.") return transformed
[docs] def get_regressors_info(self) -> List[str]: """Return the list with regressors created by the transform.""" if self.out_column_regressors is None: raise ValueError("Fit the transform to get the correct regressors info!") if self.inplace: return [] return self.out_column_regressors
[docs] def params_to_tune(self) -> Dict[str, BaseDistribution]: """Get default grid for tuning hyperparameters. This grid tunes ``mode`` parameter. Other parameters are expected to be set by the user. Returns ------- : Grid to tune. """ return { "mode": CategoricalDistribution(["per-segment", "macro"]), }