Source code for pastas.stats.core

"""The following methods may be used to calculate the crosscorrelation and
autocorrelation for a time series.

These methods are 'special' in the sense that they are able to deal with irregular
time steps often observed in hydrological time series.
"""

# Type Hinting
from logging import getLogger
from typing import Tuple, Union

from numba import prange
from numpy import (
    append,
    arange,
    array,
    average,
    corrcoef,
    diff,
    empty_like,
    exp,
    inf,
    nan,
    ndarray,
    ones,
    pi,
    sqrt,
)
from pandas import DataFrame, Index, Series, Timedelta, to_timedelta
from scipy.stats import norm

from pastas.typing import ArrayLike

from ..decorators import njit

logger = getLogger(__name__)


[docs]def acf( x: Series, lags: ArrayLike = 365, bin_method: str = "regular", bin_width: float = 0.5, max_gap: float = inf, min_obs: int = 50, full_output: bool = False, alpha: float = 0.05, fallback_bin_method: str = "gaussian", ) -> Union[Series, DataFrame]: """Calculate the autocorrelation function for irregular time steps. Parameters ---------- x: pandas.Series Pandas Series containing the values to calculate the cross-correlation on. The index has to be a Pandas.DatetimeIndex. lags: array_like, optional numpy array containing the lags in days for which the cross-correlation if calculated. Defaults is all lags from 1 to 365 days. bin_method: str, optional method to determine the type of bin. Options are "regular" for regular data (default), and "gaussian" and "rectangle" for irregular data. bin_width: float, optional number of days used as the width for the bin to calculate the correlation. max_gap: float, optional Maximum time step gap in the data. All time steps above this gap value are not used for calculating the average time step. This can be helpful when there is a large gap in the data that influences the average time step. min_obs: int, optional Minimum number of observations in a bin to determine the correlation. full_output: bool, optional If True, also estimated uncertainties are returned. Default is False. alpha: float, optional alpha level to compute the confidence interval (e.g., 1-alpha). Returns ------- result: pandas.Series or pandas.DataFrame If full_output=True, a DataFrame with columns "acf", "conf", and "n", containing the autocorrelation function, confidence intervals (depends on alpha), and the number of samples n used to compute these, respectively. If full_output=False, only the ACF is returned. Notes ----- The ACF method primarily tries to estimate the autocorrelation using common techniques if the time step between the measurements is regular. If the time step is irregular, the method falls back to an alternative method to calculate the autocorrelation function for irregular timesteps based on the slotting technique :cite:t:`rehfeld_comparison_2011`. Different methods (kernels) to bin the data are available. Estimating the autocorrelation for irregular time steps can be challenging. Depending on the data and the binning method and settings used, the correlation can be above 1 or below -1. If this occurs, a warning is raised. Examples -------- For example, to estimate the autocorrelation for every second lag up to lags of one year: >>> acf = ps.stats.acf(x, lags=np.arange(1.0, 366.0, 2.0)) See Also -------- pastas.stats.ccf statsmodels.api.tsa.acf """ c = ccf( x=x, y=x, lags=lags, bin_method=bin_method, bin_width=bin_width, max_gap=max_gap, min_obs=min_obs, full_output=full_output, alpha=alpha, fallback_bin_method=fallback_bin_method, ) # drop value for lag=0 by default, unless explicitly included if c.index[0] == Timedelta(0) and isinstance(lags, int): c = c.drop(c.index[0]) c.name = "ACF" if full_output: return c.rename(columns={"ccf": "acf"}) else: return c
[docs]def ccf( x: Series, y: Series, lags: ArrayLike = 365, bin_method: str = "regular", bin_width: float = 0.5, max_gap: float = inf, min_obs: int = 50, full_output: bool = False, alpha: float = 0.05, fallback_bin_method: str = "gaussian", ) -> Union[Series, DataFrame]: """Method to compute the cross-correlation for irregular time series. Parameters ---------- x,y: pandas.Series Pandas Series containing the values to calculate the cross-correlation on. The index has to be a Pandas.DatetimeIndex. lags: array_like, optional numpy array containing the lags in days for which the cross-correlation is calculated. Defaults is all lags from 1 to 365 days. bin_method: str, optional method to determine the type of bin. Options are "regular" for regular data (default), and "gaussian" and "rectangle" for irregular data. bin_width: float, optional number of days used as the width for the bin to calculate the correlation. max_gap: float, optional Maximum timestep gap in the data. All timesteps above this gap value are not used for calculating the average timestep. This can be helpful when there is a large gap in the data that influences the average timestep. min_obs: int, optional Minimum number of observations in a bin to determine the correlation. full_output: bool, optional If True, also estimated uncertainties are returned. Default is False. alpha: float alpha level to compute the confidence interval (e.g., 1-alpha). fallback_bin_method: str, optional method to determine the type of bin used to compute the correlations if the data has irregular time steps between the measurements. Options are "gaussian" (default) and "rectangle" . Returns ------- result: pandas.Series or pandas.DataFrame If full_output=True, a DataFrame with columns "ccf", "conf", and "n", containing the cross-correlation function, confidence intervals (depends on alpha), and the number of samples n used to compute these, respectively. If full_output=False, only the CCF is returned. Examples -------- >>> ccf = ps.stats.ccf(x, y, bin_method="gaussian") Notes ----- The CCF method primarily tries to estimate the correlation using common techniques if the time step between the measurements is regular. If the time step is irregular, the method falls back to an alternative method to calculate the correlation function for irregular timesteps based on the slotting technique :cite:t:`rehfeld_comparison_2011`. Different methods (kernels) to bin the data are available. Estimating the correlation for irregular time steps can be challenging. Depending on the data and the binning method and settings used, the correlation can be above 1 or below -1. If this occurs, a warning is raised. """ # Check if the time series have regular time steps if ( not x.index.inferred_freq and not y.index.inferred_freq and bin_method == "regular" ): msg = ( f"time series does not have regular time steps, the fallback_bin_method" f"'{fallback_bin_method}' is applied" ) logger.warning(msg) bin_method = fallback_bin_method # prepare the time indices for x and y x, t_x, dt_x_mu = _preprocess(x, max_gap=max_gap) y, t_y, dt_y_mu = _preprocess(y, max_gap=max_gap) dt_mu = max(dt_x_mu, dt_y_mu) # The mean time step from both series if isinstance(lags, int) and bin_method == "regular": lags = arange(0, lags + 1, int(dt_mu), dtype=float) elif isinstance(lags, int): lags = arange(0, lags + 1, dtype=float) elif isinstance(lags, list): lags = array(lags, dtype=float) elif isinstance(lags, ndarray): # ensure dtype float otherwise numba will # create integer arrays for the results lags = lags.astype(float) if bin_method == "rectangle": c, b = _compute_ccf_rectangle(lags, t_x, x, t_y, y, bin_width) elif bin_method == "gaussian": c, b = _compute_ccf_gaussian(lags, t_x, x, t_y, y, bin_width) elif bin_method == "regular": c, b = _compute_ccf_regular(lags, x, y) else: raise NotImplementedError conf = norm.ppf(1 - alpha / 2.0) / sqrt(b) result = DataFrame( data={"ccf": c, "conf": conf, "n": b}, dtype=float, index=Index(to_timedelta(lags, unit="D"), name="Lags"), ) result = result.where(result.n > min_obs).dropna() # Raise a warning if the correlation is above 1 or below -1 # NOTE: Using 1.01 to avoid excessive warnings when using binning methods for # irregular timesteps. In those cases correlations sometimes exceed 1 by a small # amount. if (result.ccf.abs() > 1.01).any(): msg = ( "The correlation is above 1 or below -1. This can occur due to the " "binning method used. Please check the data and the binning method and " "use the autocorrelation function with extreme care." ) logger.warning(msg) if full_output: return result else: return result.ccf
def _preprocess(x: Series, max_gap: float) -> Tuple[ArrayLike, ArrayLike, float]: """Internal method to preprocess the time series.""" dt = x.index.to_series().diff().dropna().values / Timedelta(1, "D") dt_mu = dt[dt < max_gap].mean() # Deal with big gaps if present dt_mu = max(dt_mu, 1) # Prevent division by zero error t = dt.cumsum() # Normalize the values and create numpy arrays x = (x.values - x.values.mean()) / x.values.std() return x, t, dt_mu @njit(parallel=True, nogil=True, cache=True) def _compute_ccf_rectangle( lags: ArrayLike, t_x: ArrayLike, x: ArrayLike, t_y: ArrayLike, y: ArrayLike, bin_width: float = 0.5, ) -> Tuple[ArrayLike, ArrayLike]: """Internal numba-optimized method to compute the ccf.""" c = empty_like(lags) b = empty_like(lags) n = len(t_x) for k in prange(len(lags)): cl = 0.0 b_sum = 0.0 lag_k = lags[k] # traverse the lower diagonal of NxN matrix: np.dot(x.T, y) for j in range(n): yj = y[j] t_yj = t_y[j] for i in range(j, n): d = abs(t_x[i] - t_yj) - lag_k if abs(d) <= bin_width: cl += x[i] * yj b_sum += 1.0 if b_sum == 0.0: c[k] = nan b[k] = 1e-16 # Prevent division by zero error else: c[k] = cl / b_sum b[k] = b_sum return c, b @njit(parallel=True, nogil=True, cache=True) def _compute_ccf_gaussian( lags: ArrayLike, t_x: ArrayLike, x: ArrayLike, t_y: ArrayLike, y: ArrayLike, bin_width: float = 0.5, ) -> Tuple[ArrayLike, ArrayLike]: """Internal numba-optimized method to compute the ccf.""" c = empty_like(lags) b = empty_like(lags) n = len(t_x) den1 = -2 * bin_width**2 # denominator 1 den2 = sqrt(2 * pi * bin_width) # denominator 2 six_den2 = 6 * den2 # six std. dev. for k in prange(len(lags)): cl = 0.0 b_sum = 0.0 lag_k = lags[k] # traverse the lower diagonal of NxN matrix: np.dot(x.T, y) for j in range(n): t_yj = t_y[j] yj = y[j] for i in range(j, n): dtlag = t_x[i] - t_yj - lag_k if abs(dtlag) < six_den2: d = exp(dtlag**2 / den1) / den2 # if d > 1e-5: cl += x[i] * yj * d b_sum += d if b_sum == 0.0: c[k] = nan b[k] = 1e-16 # Prevent division by zero error else: c[k] = cl / b_sum b[k] = b_sum return c, b def _compute_ccf_regular( lags: ArrayLike, x: ArrayLike, y: ArrayLike ) -> Tuple[ArrayLike, ArrayLike]: c = empty_like(lags) n = len(x) for i in range(len(lags)): lag = int(lags[i]) if lag < n: # flip x, y to match numpy/scipy correlate output order c[i] = corrcoef(y[: n - lag], x[lag:])[0, 1] else: c[i] = nan b = n - lags b[b <= 0] = 1e-16 # Prevent division by zero error return c, b
[docs]def mean(x: Series, weighted: bool = True, max_gap: int = 30) -> ArrayLike: """Method to compute the (weighted) mean of a time series. Parameters ---------- x: pandas.Series Series with the values and a DatetimeIndex as an index. weighted: bool, optional Weight the values by the normalized time step to account for irregular time series. Default is True. max_gap: int, optional maximum allowed gap period in days to use for the computation of the weights. All time steps larger than max_gap are replace with the mean weight. Default value is 90 days. Notes ----- The (weighted) mean for a time series x is computed as: .. math:: \\bar{x} = \\sum_{i=1}^{N} w_i x_i where :math:`w_i` are the weights, taken as the time step between observations, normalized by the sum of all time steps. """ w = _get_weights(x, weighted=weighted, max_gap=max_gap) return average(x.to_numpy(), weights=w)
[docs]def var(x: Series, weighted: bool = True, max_gap: int = 30) -> ArrayLike: """Method to compute the (weighted) variance of a time series. Parameters ---------- x: pandas.Series Series with the values and a DatetimeIndex as an index. weighted: bool, optional Weight the values by the normalized time step to account for irregular time series. Default is True. max_gap: int, optional maximum allowed gap period in days to use for the computation of the weights. All time steps larger than max_gap are replace with the mean weight. Default value is 90 days. Notes ----- The (weighted) variance for a time series x is computed as: .. math:: \\sigma_x^2 = \\sum_{i=1}^{N} w_i (x_i - \\bar{x})^2 where :math:`w_i` are the weights, taken as the time step between observations, normalized by the sum of all time steps. Note how weighted mean (:math:`\\bar{ x}`) is used in this formula. """ w = _get_weights(x, weighted=weighted, max_gap=max_gap) mu = average(x.to_numpy(), weights=w) sigma = (x.size / (x.size - 1) * w * (x.to_numpy() - mu) ** 2).sum() return sigma
[docs]def std(x: Series, weighted: bool = True, max_gap: int = 30) -> ArrayLike: """Method to compute the (weighted) variance of a time series. Parameters ---------- x: pandas.Series Series with the values and a DatetimeIndex as an index. weighted: bool, optional Weight the values by the normalized time step to account for irregular time series. Default is True. max_gap: int, optional maximum allowed gap period in days to use for the computation of the weights. All time steps larger than max_gap are replace with the mean weight. Default value is 90 days. See Also -------- ps.stats.mean, ps.stats.var """ return sqrt(var(x, weighted=weighted, max_gap=max_gap))
# Helper functions def _get_weights(x: Series, weighted: bool = True, max_gap: int = 30) -> ArrayLike: """Helper method to compute the weights as the time step between obs. Parameters ---------- x: pandas.Series Series with the values and a DatetimeIndex as an index. weighted: bool, optional Weight the values by the normalized time step to account for irregular time series. max_gap: int, optional maximum allowed gap period in days to use for the computation of the weights. All time steps larger than max_gap are replace with the mean weight. Default value is 30 days. """ if weighted: w = append(0.0, diff(x.index.to_numpy()) / Timedelta("1D")) w[w > max_gap] = max_gap else: w = ones(x.index.size) w /= w.sum() return w