Source code for etna.analysis.outliers.density_outliers

from typing import TYPE_CHECKING
from typing import Callable
from typing import Dict
from typing import List

import numpy as np
import pandas as pd

    from etna.datasets import TSDataset

[docs]def absolute_difference_distance(x: float, y: float) -> float: """Calculate distance for :py:func:`get_anomalies_density` function by taking absolute value of difference. Parameters ---------- x: first value y: second value Returns ------- result: float absolute difference between values """ return abs(x - y)
[docs]def get_segment_density_outliers_indices( series: np.ndarray, window_size: int = 7, distance_threshold: float = 10, n_neighbors: int = 3, distance_func: Callable[[float, float], float] = absolute_difference_distance, ) -> List[int]: """Get indices of outliers for one series. Parameters ---------- series: array to find outliers in window_size: size of window distance_threshold: if distance between two items in the window is less than threshold those items are supposed to be close to each other n_neighbors: min number of close items that item should have not to be outlier distance_func: distance function Returns ------- : list of outliers' indices """ def is_close(item1: float, item2: float) -> int: """Return 1 if item1 is closer to item2 than distance_threshold according to distance_func, 0 otherwise.""" return int(distance_func(item1, item2) < distance_threshold) outliers_indices = [] for idx, item in enumerate(series): is_outlier = True left_start = max(0, idx - window_size) left_stop = max(0, min(idx, len(series) - window_size)) closeness = None n = 0 for i in range(left_start, left_stop + 1): if closeness is None: closeness = [is_close(item, series[j]) for j in range(i, min(i + window_size, len(series)))] n = sum(closeness) - 1 else: n -= closeness.pop(0) new_element_is_close = is_close(item, series[i + window_size - 1]) closeness.append(new_element_is_close) n += new_element_is_close if n >= n_neighbors: is_outlier = False break if is_outlier: outliers_indices.append(idx) return list(outliers_indices)
[docs]def get_anomalies_density( ts: "TSDataset", in_column: str = "target", window_size: int = 15, distance_coef: float = 3, n_neighbors: int = 3, distance_func: Callable[[float, float], float] = absolute_difference_distance, ) -> Dict[str, List[pd.Timestamp]]: """Compute outliers according to density rule. For each element in the series build all the windows of size ``window_size`` containing this point. If any of the windows contains at least ``n_neighbors`` that are closer than ``distance_coef * std(series)`` to target point according to ``distance_func`` target point is not an outlier. Parameters ---------- ts: TSDataset with timeseries data in_column: name of the column in which the anomaly is searching window_size: size of windows to build distance_coef: factor for standard deviation that forms distance threshold to determine points are close to each other n_neighbors: min number of close neighbors of point not to be outlier distance_func: distance function Returns ------- : dict of outliers in format {segment: [outliers_timestamps]} Notes ----- It is a variation of distance-based (index) outlier detection method adopted for timeseries. """ segments = ts.segments outliers_per_segment = {} for seg in segments: # TODO: dropna() now is responsible for removing nan-s at the end of the sequence and in the middle of it # May be error or warning should be raised in this case segment_df = ts[:, seg, :][seg].dropna().reset_index() series = segment_df[in_column].values timestamps = segment_df["timestamp"].values series_std = np.std(series) if series_std: outliers_idxs = get_segment_density_outliers_indices( series=series, window_size=window_size, distance_threshold=distance_coef * series_std, n_neighbors=n_neighbors, distance_func=distance_func, ) outliers = [timestamps[i] for i in outliers_idxs] outliers_per_segment[seg] = outliers else: outliers_per_segment[seg] = [] return outliers_per_segment
__all__ = ["get_anomalies_density", "absolute_difference_distance"]