Source code for etna.clustering.distances.dtw_distance

from typing import TYPE_CHECKING
from typing import Callable
from typing import List
from typing import Tuple

import numba
import numpy as np
import pandas as pd

from etna.clustering.distances.base import Distance

    from etna.datasets import TSDataset

[docs]@numba.njit def simple_dist(x1: float, x2: float) -> float: """Get distance between two samples for dtw distance. Parameters ---------- x1: first value x2: second value Returns ------- float: distance between x1 and x2 """ return abs(x1 - x2)
[docs]class DTWDistance(Distance): """DTW distance handler.""" def __init__(self, points_distance: Callable[[float, float], float] = simple_dist, trim_series: bool = False): """Init DTWDistance. Parameters ---------- points_distance: function to be used for computation of distance between two series' points trim_series: True if it is necessary to trim series, default False. Notes ----- Specifying manual ``points_distance`` might slow down the clustering algorithm. """ super().__init__(trim_series=trim_series) self.points_distance = points_distance @staticmethod @numba.njit def _build_matrix(x1: np.ndarray, x2: np.ndarray, points_distance: Callable[[float, float], float]) -> np.ndarray: """Build dtw-distance matrix for series x1 and x2.""" x1_size, x2_size = len(x1), len(x2) matrix = np.empty(shape=(x1_size, x2_size)) matrix[0][0] = points_distance(x1[0], x2[0]) for i in range(1, x1_size): matrix[i][0] = points_distance(x1[i], x2[0]) + matrix[i - 1][0] for j in range(1, x2_size): matrix[0][j] = points_distance(x1[0], x2[j]) + matrix[0][j - 1] for i in range(1, x1_size): for j in range(1, x2_size): matrix[i][j] = points_distance(x1[i], x2[j]) + min( matrix[i - 1][j], matrix[i][j - 1], matrix[i - 1][j - 1] ) return matrix @staticmethod @numba.njit def _get_path(matrix: np.ndarray) -> List[Tuple[int, int]]: """Build a warping path with given matrix of dtw-distance.""" i, j = matrix.shape[0] - 1, matrix.shape[1] - 1 path = [(i, j)] while i and j: candidates = [(i - 1, j), (i, j - 1), (i - 1, j - 1)] costs = np.array([matrix[c] for c in candidates]) k = np.argmin(costs) i, j = candidates[k] path.append((i, j)) while i: i = i - 1 path.append((i, j)) while j: j = j - 1 path.append((i, j)) return path def _compute_distance(self, x1: np.ndarray, x2: np.ndarray) -> float: """Compute distance between x1 and x2.""" matrix = self._build_matrix(x1=x1, x2=x2, points_distance=self.points_distance) return matrix[-1][-1] def _dba_iteration(self, initial_centroid: np.ndarray, series_list: List[np.ndarray]) -> np.ndarray: """Run DBA iteration. * for each series from series list build a dtw matrix and warping path * update values of centroid with values from series according to path """ assoc_table = initial_centroid.copy() n_samples = np.ones(shape=(len(initial_centroid))) for series in series_list: mat = self._build_matrix(x1=initial_centroid, x2=series, points_distance=self.points_distance) path = self._get_path(matrix=mat) i, j = len(initial_centroid) - 1, len(series) - 1 while i and j: assoc_table[i] += series[j] n_samples[i] += 1 path.pop(0) i, j = path[0] centroid = assoc_table / n_samples return centroid @staticmethod def _get_longest_series(ts: "TSDataset") -> pd.Series: """Get the longest series from the list.""" series_list: List[pd.Series] = [] for segment in ts.segments: series = ts[:, segment, "target"].dropna() series_list.append(series) longest_series = max(series_list, key=len) return longest_series @staticmethod def _get_all_series(ts: "TSDataset") -> List[np.ndarray]: """Get series from the TSDataset.""" series_list = [] for segment in ts.segments: series = ts[:, segment, "target"].dropna().values series_list.append(series) return series_list def _get_average(self, ts: "TSDataset", n_iters: int = 10) -> pd.DataFrame: """Get series that minimizes squared distance to given ones according to the dtw distance. Parameters ---------- ts: TSDataset with series to be averaged n_iters: number of DBA iterations to adjust centroid with series Returns ------- pd.Dataframe: dataframe with columns "timestamp" and "target" that contains the series """ series_list = self._get_all_series(ts) initial_centroid = self._get_longest_series(ts) centroid = initial_centroid.values for _ in range(n_iters): new_centroid = self._dba_iteration(initial_centroid=centroid, series_list=series_list) centroid = new_centroid centroid = pd.DataFrame({"timestamp": initial_centroid.index.values, "target": centroid}) return centroid
__all__ = ["DTWDistance", "simple_dist"]