"""This module contains methods to compute the groundwater signatures. Part of the
signatures selection is based on the work of :cite:t:`heudorfer_index-based_2019`."""
# Type Hinting
from logging import getLogger
from typing import Optional, Tuple, Union
from numpy import (
arctan,
array,
cos,
diff,
exp,
isclose,
isnan,
linspace,
log,
nan,
ndarray,
pi,
sin,
split,
sqrt,
where,
)
from packaging.version import parse as parse_version
from pandas import DataFrame, DatetimeIndex, Series, Timedelta, concat, cut, to_datetime
from pandas import __version__ as pd_version
from scipy.optimize import curve_fit
from scipy.stats import linregress
import pastas as ps
from pastas.stats.core import acf
pandas_version = parse_version(pd_version)
year_offset = "YE" if pandas_version >= parse_version("2.2.0") else "A"
month_offset = "ME" if pandas_version >= parse_version("2.2.0") else "M"
__all__ = [
"cv_period_mean",
"cv_date_min",
"cv_date_max",
"cv_fall_rate",
"cv_rise_rate",
"parde_seasonality",
"avg_seasonal_fluctuation",
"interannual_variation",
"low_pulse_count",
"high_pulse_count",
"low_pulse_duration",
"high_pulse_duration",
"bimodality_coefficient",
"mean_annual_maximum",
"rise_rate",
"fall_rate",
"reversals_avg",
"reversals_cv",
"colwell_contingency",
"colwell_constancy",
"recession_constant",
"recovery_constant",
"duration_curve_slope",
"duration_curve_ratio",
"richards_pathlength",
"baselevel_index",
"baselevel_stability",
"magnitude",
"autocorr_time",
"date_min",
"date_max",
]
logger = getLogger(__name__)
def _normalize(series: Series) -> Series:
"""Normalize the time series by subtracting the mean and dividing over the range.
Parameters
----------
series: pandas.Series
Pandas Series to be normalized.
Returns
-------
series: pandas.Series
Pandas Series scaled by subtracting the mean and dividing over the range of the
values. This results in a time series with values between zero and one.
"""
series = (series - series.min()) / (series.max() - series.min())
return series
[docs]def cv_period_mean(
series: Series, normalize: bool = False, freq: str = month_offset
) -> float:
"""Coefficient of variation of the mean head over a period (default monthly).
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
freq: str, optional
frequency to resample the series to by averaging.
Returns
-------
cv: float
Coefficient of variation of mean head resampled over a period (default monthly).
Notes
-----
Coefficient of variation of mean monthly heads, adapted after
:cite:t:`hughes_hydrological_1989`. The higher the coefficient of variation, the
more variable the mean monthly head is throughout the year, and vice versa. The
coefficient of variation is the standard deviation divided by the mean.
Examples
--------
>>> import pandas as pd
>>> from pastas.stats.signatures import cv_period_mean
>>> series = pd.Series([1, 2, 3, 4, 5, 6],
index=pd.date_range(start='2022-01-01', periods=6, freq='M'))
>>> cv = cv_period_mean(series)
>>> print(cv)
"""
if normalize:
series = _normalize(series)
series = series.resample(freq).mean()
cv = series.std(ddof=1) / series.mean() # ddof=1 = > sample std
return cv
def _cv_date_min_max(series: Series, stat: str) -> float:
"""Method to compute the coefficient of variation of the date of annual
minimum or maximum head using circular statistics.
Parameters
----------
series : Series
Pandas Series with DatetimeIndex and head values.
stat : str
"min" or "max" to compute the cv of the date of the annual minimum or maximum
head.
Returns
-------
float:
Circular coefficient of variation of the date of annual minimum or maximum
head.
Notes
-----
Coefficient of variation of the date of annual minimum or maximum head computed
using circular statistics as described in :cite:t:`fisher_statistical_1995` (page
32). If there are multiple dates with the same minimum or maximum head, the first
date is chosen. The higher the coefficient of variation, the more variable the date
of the annual minimum or maximum head is, and vice versa.
"""
if stat == "min":
data = series.groupby(series.index.year).idxmin(skipna=True).dropna().values
elif stat == "max":
data = series.groupby(series.index.year).idxmax(skipna=True).dropna().values
doy = DatetimeIndex(data).dayofyear.to_numpy(float)
m = 365.25
two_pi = 2 * pi
thetas = array(doy) * two_pi / m
c = cos(thetas).sum()
s = sin(thetas).sum()
r = sqrt(c**2 + s**2) / doy.size
if (s > 0) & (c > 0):
mean_theta = arctan(s / c)
elif c < 0:
mean_theta = arctan(s / c) + pi
elif (s < 0) & (c > 0):
mean_theta = arctan(s / c) + two_pi
else:
# This should never happen
raise ValueError("Something went wrong in the circular statistics.")
mu = mean_theta * m / two_pi
std = sqrt(-2 * log(r)) * m / two_pi
return std / mu
[docs]def cv_date_min(series: Series) -> float:
"""Coefficient of variation of the date of annual minimum head.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
Returns
-------
cv: float
Coefficient of variation of the date of annual minimum head.
Notes
-----
Coefficient of variation of the date of annual minimum head computed using circular
statistics as described in :cite:t:`fisher_statistical_1995` (page 32). If there
are multiple dates with the same minimum head, the first date is chosen. The higher
the coefficient of variation, the more variable the date of the annual minimum head
is, and vice versa.
"""
cv = _cv_date_min_max(series, stat="min")
return cv
[docs]def cv_date_max(series: Series) -> float:
"""Coefficient of variation of the date of annual maximum head.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
Returns
-------
cv: float
Coefficient of variation of the date of annual maximum head.
Notes
-----
Coefficient of variation of the date of annual maximum head computed using circular
statistics as described in :cite:t:`fisher_statistical_1995` (page 32). If there
are multiple dates with the same maximum head, the first date is chosen. The higher
the coefficient of variation, the more variable the date of the maximum head is,
and vice versa.
"""
cv = _cv_date_min_max(series, stat="max")
return cv
[docs]def parde_seasonality(series: Series, normalize: bool = True) -> float:
"""Parde seasonality according to :cite:t:`parde_fleuves_1933`, adapted for heads.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
Parde seasonality.
Notes
-----
Pardé seasonality is the difference between the maximum and minimum Pardé
coefficient. A Pardé series consists of 12 Pardé coefficients, corresponding to
12 months. Pardé coefficient for, for example, January is its long-term monthly
mean head divided by the overall mean head. The higher the Pardé seasonality, the
more seasonal the head is, and vice versa.
"""
coefficients = _parde_coefficients(series=series, normalize=normalize)
return coefficients.max() - coefficients.min()
def _parde_coefficients(series: Series, normalize: bool = True) -> Series:
"""Parde coefficients for each month :cite:t:`parde_fleuves_1933`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
coefficients: pandas.Series
Parde coefficients for each month.
Notes
-----
Pardé seasonality is the difference between the maximum and minimum Pardé
coefficient. A Pardé series consists of 12 Pardé coefficients, corresponding to
12 months. Pardé coefficient for, for example, January is its long-term monthly
mean head divided by the overall mean head.
Examples
--------
>>> import pandas as pd
>>> from pastas.stats.signatures import parde_coefficients
>>> series = pd.Series([1, 2, 3, 4, 5, 6],
index=pd.date_range(start='2022-01-01', periods=6, freq='M'))
>>> coefficients = parde_coefficients(series)
>>> print(coefficients)
month
1 0.0
2 0.4
3 0.8
4 1.2
5 1.6
6 2.0
dtype: float64
"""
if normalize:
series = _normalize(series)
coefficients = series.groupby(series.index.month).mean() / series.mean()
coefficients.index.name = "month"
return coefficients
def _martens(series: Series, normalize: bool = False) -> Tuple[Series, Series]:
"""Function for the average seasonal fluctuation and interannual fluctuation.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
hl: pandas.Series
Average of the three lowest heads in a year.
hw: pandas.Series
Average of the three largest heads in a year.
Notes
-----
According to :cite:t:`martens_groundwater_2013`. The average of the three lowest
and three highest heads in three different months for each year is computed. The
average is then taken over all years.
"""
if normalize:
series = _normalize(series)
s = series.resample(month_offset)
s_min = s.min()
s_max = s.max()
hl = s_min.groupby(s_min.index.year).nsmallest(3).groupby(level=0).mean()
hw = s_max.groupby(s_max.index.year).nlargest(3).groupby(level=0).mean()
return hl, hw
[docs]def avg_seasonal_fluctuation(series: Series, normalize: bool = False) -> float:
"""Average seasonal fluctuation after :cite:t:`martens_groundwater_2013`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
Average seasonal fluctuation (s).
Notes
-----
Mean annual difference between the averaged 3 highest monthly heads
per year and the averaged 3 lowest monthly heads per year.
Average seasonal fluctuation (s):
s = MHW - MLW
A higher value of s indicates a more seasonal head, and vice versa.
Warning
-------
In this formulating the water table is referenced to a certain datum and
positive, not as depth below the surface!
"""
hl, hw = _martens(series, normalize=normalize)
return (hw - hl).mean()
[docs]def interannual_variation(series: Series, normalize: bool = False) -> float:
"""Interannual variation after :cite:t:`martens_groundwater_2013`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
Interannual variation (s).
Notes
-----
The average between the range in annually averaged 3 highest monthly heads and the
range in annually averaged 3 lowest monthly heads.
Inter-yearly variation of high and low water table (s):
s = ((max_HW - min_HW) + (max_LW - min_LW)) / 2
A higher value of s indicates a more variable head, and vice versa.
Warning
-------
In this formulating the water table is referenced to a certain datum and
positive, not as depth below the surface!
"""
hl, hw = _martens(series, normalize=normalize)
return ((hw.max() - hw.min()) + (hl.max() - hl.min())) / 2
def _colwell_components(
series: Series,
bins: int = 11,
freq: str = "W",
method: str = "mean",
normalize: bool = True,
) -> Tuple[float, float, float]:
"""Colwell's predictability, constant, and contingency
:cite:t:`colwell_predictability_1974`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
bins: int
number of bins to determine the states of the groundwater.
freq: str, optional
frequency to resample the series to. Possible options are "D", "W", "M" or "ME".
method: str, optional
Method to use for resampling. Only "mean" is allowed now.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
p, c, m: float, float, float
predictability, constancy, contingency
Notes
-----
The difference between the sum of entropy for each time step and possible state
of the seasonal cycle, and the overall entropy across all states and time steps,
divided by the logarithm of the absolute number of possible states. Entropy
according to definition in information theory, see reference for details.
"""
# Prepare data and pivot table
if normalize:
series = _normalize(series)
if method == "mean":
series = series.resample(freq).mean().dropna()
else:
raise NotImplementedError
series.name = "head"
binned = cut(
series, bins=bins, right=False, include_lowest=True, labels=range(bins)
)
df = DataFrame(binned, dtype=float)
if freq in ("M", "ME"):
df["time"] = df.index.isocalendar().month
elif freq == "W":
df["time"] = df.index.isocalendar().week
elif freq == "D":
df["time"] = df.index.isocalendar().day
else:
msg = "freq %s is not a supported option."
logger.error(msg, freq)
raise ValueError(msg % freq)
df["values"] = 1.0
df = df.pivot_table(columns="head", index="time", aggfunc="sum", values="values")
# Count of rows and column items
x = df.sum(axis=1) # Time
y = df.sum(axis=0) # Head
z = series.size # Total number of observations
hx = -(x / z * log(x / z)).sum()
hy = -(y / z * log(y / z)).sum()
hxy = -(df / z * log(df / z, where=df.values != 0)).sum().sum()
# Compute final components
p = 1 - (hxy - hx) / log(bins) # Predictability
c = 1 - hy / log(bins) # Constancy
m = (hx + hy - hxy) / log(bins) # Contingency
return p, c, m
[docs]def colwell_constancy(
series: Series,
bins: int = 11,
freq: str = "W",
method: str = "mean",
normalize: bool = True,
) -> Tuple[float, float, float]:
"""Colwells constancy index after :cite:t:`colwell_predictability_1974`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
bins: int
number of bins to determine the states of the groundwater.
freq: str, optional
frequency to resample the series to.
method: str, optional
Method to use for resampling. Only "mean" is allowed now.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
c: float
Colwell's constancy.
Notes
-----
One minus the sum of entropy with respect to state, divided by the logarithm of
the absolute number of possible states.
"""
return _colwell_components(
series=series, bins=bins, freq=freq, method=method, normalize=normalize
)[1]
[docs]def colwell_contingency(
series: Series,
bins: int = 11,
freq: str = "W",
method: str = "mean",
normalize: bool = True,
) -> Tuple[float, float, float]:
"""Colwell's contingency :cite:t:`colwell_predictability_1974`
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
bins: int
number of bins to determine the states of the groundwater.
freq: str, optional
frequency to resample the series to.
method: str, optional
Method to use for resampling. Only "mean" is allowed now.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
m: float
Colwell's contingency.
Notes
-----
The difference between the sum of entropy for each time step and possible state
of the seasonal cycle, and the overall entropy across all states and time steps,
divided by the logarithm of the absolute number of possible states. Entropy
according to definition in information theory, see reference for details.
"""
return _colwell_components(
series=series, bins=bins, freq=freq, method=method, normalize=normalize
)[2]
[docs]def low_pulse_count(
series: Series, quantile: float = 0.2, rolling_window: Union[str, None] = "7D"
) -> float:
"""Average number of times the series exceeds a certain threshold per year.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
quantile: float, optional
Quantile used as a threshold.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Average number of times the series exceeds a certain threshold per year.
Notes
-----
Number of times during which the head drops below a certain threshold.
The threshold is defined as the 20th percentile of non-exceedance
:cite:t:`richter_method_1996`.
Warning
-------
This method is sensitive to measurement noise, e.g., every change is sign in the
differences is counted as a pulse. Therefore, it is recommended to smooth the time
series first (which is also the default).
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
h = series < series.quantile(quantile)
sel = h.astype(int).diff().replace(0.0, nan).shift(-1).dropna().index
# Deal with pulses in the beginning and end of the time series
if h.iat[0]:
sel = sel.append(series.index[:1]).sort_values()
if h.iat[-1]:
sel = sel.append(series.index[-1:]).sort_values()
return sel.size / 2 / series.index.year.unique().size
[docs]def high_pulse_count(
series: Series, quantile: float = 0.8, rolling_window: Union[str, None] = "7D"
) -> float:
"""Average number of times the series exceeds a certain threshold per year.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
quantile: float, optional
Quantile used as a threshold.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Average number of times the series exceeds a certain threshold per year.
Notes
-----
Number of times during which the head exceeds a certain threshold. The threshold is
defined as the 80th percentile of non-exceedance.
Warning
-------
This method is sensitive to measurement noise, e.g., every change is sign in the
differences is counted as a pulse. Therefore, it is recommended to smooth the time
series first (which is also the default).
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
h = series > series.quantile(quantile)
sel = h.astype(int).diff().replace(0.0, nan).shift(-1).dropna().index
if h.iat[0]:
sel = sel.append(series.index[:1]).sort_values()
if h.iat[-1]:
sel = sel.append(series.index[-1:]).sort_values()
return sel.size / 2 / series.index.year.unique().size
[docs]def low_pulse_duration(
series: Series, quantile: float = 0.2, rolling_window: Union[str, None] = "7D"
) -> float:
"""Average duration of pulses where the head is below a certain threshold.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
quantile: float, optional
Quantile used as a threshold.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Average duration (in days) of pulses where the head drops below a certain
threshold.
Notes
-----
Average duration of pulses (in days) where the head drops below a certain threshold.
Warning
-------
This method is sensitive to measurement noise, e.g., every change is sign in the
differences is counted as a pulse. Therefore, it is recommended to smooth the time
series first (which is also the default).
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
h = series < series.quantile(quantile)
sel = h.astype(int).diff().replace(0.0, nan).shift(-1).dropna().index
if h.iat[0]:
sel = sel.append(series.index[:1]).sort_values()
if h.iat[-1]:
sel = sel.append(series.index[-1:]).sort_values()
return (diff(sel.to_numpy()) / Timedelta("1D"))[::2].mean()
[docs]def high_pulse_duration(
series: Series, quantile: float = 0.8, rolling_window: Union[str, None] = "7D"
) -> float:
"""Average duration of pulses where the head exceeds a certain threshold.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
quantile: float, optional
Quantile used as a threshold.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Average duration (in days) of pulses where the head drops below a certain
threshold.
Notes
-----
Average duration of pulses where the head drops exceeds a certain threshold. The
threshold is defined as the 80th percentile of non-exceedance.
Warning
-------
This method is sensitive to measurement noise, e.g., every change is sign in the
differences is counted as a pulse. Therefore, it is recommended to smooth the time
series first (which is also the default).
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
h = series > series.quantile(quantile)
sel = h.astype(int).diff().replace(0.0, nan).shift(-1).dropna().index
if h.iat[0]:
sel = sel.append(series.index[:1]).sort_values()
if h.iat[-1]:
sel = sel.append(series.index[-1:]).sort_values()
return (diff(sel.to_numpy()) / Timedelta("1D"))[::2].mean()
def _get_differences(series: Series, normalize: bool = False) -> Series:
"""Get the changes in the time series.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
differences: pandas.Series
Differences in the time series in L/day.
Notes
-----
Get the differences in the time series, and divide by the time step to get the rate
of change. If normalize is True, the time series is normalized to values between
zero and one.
"""
if normalize:
series = _normalize(series)
dt = diff(series.index.to_numpy()) / Timedelta("1D")
differences = series.diff().iloc[1:] / dt
return differences
[docs]def rise_rate(
series: Series, normalize: bool = False, rolling_window: Union[str, None] = "7D"
) -> float:
"""Mean of positive head changes from one day to the next.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Mean of positive head changes from one day to the next. The units of the rise
rate are L/day (L defined by the input).
Notes
-----
Mean rate of positive changes in head from one day to the next.
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
differences = _get_differences(series, normalize=normalize)
rises = differences[differences > 0]
return rises.mean()
[docs]def fall_rate(
series: Series, normalize: bool = False, rolling_window: Union[str, None] = "7D"
) -> float:
"""Mean negative head changes from one day to the next.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Mean of negative head changes from one day to the next. The units of the fall
rate are L/day (L defined by the input).
Notes
-----
Mean rate of negative changes in head from one day to the next, according to
:cite:t:`richter_method_1996`.
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
differences = _get_differences(series, normalize=normalize)
falls = differences.loc[differences < 0]
return falls.mean()
[docs]def cv_rise_rate(
series: Series, normalize: bool = True, rolling_window: Union[str, None] = "7D"
) -> float:
"""Coefficient of Variation in rise rate.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Coefficient of Variation in rise rate.
Notes
-----
Coefficient of variation in rise rate :cite:p:`richter_method_1996`. The higher the
coefficient of variation, the more variable the rise rate is, and vice versa.
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
differences = _get_differences(series, normalize=normalize)
rises = differences[differences > 0]
return rises.std(ddof=1) / rises.mean()
[docs]def cv_fall_rate(
series: Series, normalize: bool = False, rolling_window: Union[str, None] = "7D"
) -> float:
"""Coefficient of Variation in fall rate.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
rolling_window: str, optional
Rolling window to use for smoothing the time series. Default is 7 days. Set to
None to disable. See the pandas documentation for more information.
Returns
-------
float:
Coefficient of Variation in fall rate.
Notes
-----
Coefficient of Variation in fall rate :cite:p:`richter_method_1996`. The higher the
coefficient of variation, the more variable the fall rate is, and vice versa.
"""
if rolling_window is not None:
series = series.rolling(rolling_window).mean()
differences = _get_differences(series, normalize=normalize)
falls = differences[differences < 0]
return falls.std(ddof=1) / falls.mean()
[docs]def magnitude(series: Series) -> float:
"""Difference between the minimum and maximum heads, divided by the minimum head
adapted after :cite:t:`hannah_approach_2000`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
Returns
-------
float:
Difference between the minimum and maximum heads, divided by the minimum head.
Notes
-----
Difference between the minimum and maximum heads, divided by the minimum head:
..math::
(h_max - h_min ) / h_min
The higher the magnitude, the more variable the head is, and vice versa.
"""
return (series.max() - series.min()) / series.min()
[docs]def reversals_avg(series: Series) -> float:
"""Average annual number of rises and falls in daily head.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
Returns
-------
float:
Average number of rises and falls in daily head per year.
Notes
-----
Average annual number of rises and falls (i.e., change of sign) in daily head
:cite:p:`richter_method_1996`. The higher the number of reversals, the more
variable the head is, and vice versa. If the head data is not daily, a warning is
issued and nan is returned.
"""
# Get the time step in days
dt = diff(series.index.to_numpy()) / Timedelta("1D")
# Check if the time step is approximately daily
if not (dt > 0.9).all() & (dt < 1.1).all():
msg = (
"The time step is not approximately daily (>10%% of time steps are"
"non-daily). This may lead to incorrect results."
)
logger.warning(msg)
return nan
else:
series_diff = series.diff()
reversals = (
(series_diff[series_diff != 0.0] > 0).astype(int).diff().replace(-1, 1)
)
return reversals.resample(year_offset).sum().mean()
[docs]def reversals_cv(series: Series) -> float:
"""Coefficient of Variation in annual number of rises and falls.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
Returns
-------
float:
Coefficient of Variation in annual number of rises and falls.
Notes
-----
Coefficient of Variation in annual number of rises and falls in daily head
:cite:p:`richter_method_1996`. If the coefficient of variation is high, the number
of reversals is highly variable, and vice versa. If the head data is not daily, a
warning is issued and nan is returned.
"""
# Get the time step in days
dt = diff(series.index.to_numpy()) / Timedelta("1D")
# Check if the time step is approximately daily
if not (dt > 0.9).all() & (dt < 1.1).all():
msg = (
"The time step is not approximately daily. "
"This may lead to incorrect results."
)
logger.warning(msg)
return nan
else:
series_diff = series.diff()
reversals = (
(series_diff[series_diff != 0.0] > 0).astype(int).diff().replace(-1, 1)
)
annual_sum = reversals.resample(year_offset).sum()
return annual_sum.std(ddof=1) / annual_sum.mean()
[docs]def mean_annual_maximum(series: Series, normalize: bool = True) -> float:
"""Mean of annual maximum head after :cite:t:`clausen_flow_2000`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
Mean of annual maximum head.
Notes
-----
Mean of annual maximum head :cite:p:`clausen_flow_2000`.
Warning
-------
This signatures is sensitive to the base level of the time series if normalize is
set to False.
"""
if normalize:
series = _normalize(series)
return series.resample(year_offset).max().mean()
[docs]def bimodality_coefficient(series: Series, normalize: bool = True) -> float:
"""Bimodality coefficient after :cite:t:`ellison_effect_1987`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
Bimodality coefficient.
Notes
-----
Squared product moment skewness (s) plus one, divided by product moment kurtosis
(k):
..math::
b = (s^2 + 1 ) / k
Adapted from the R 'modes' package. The higher the bimodality coefficient, the more
bimodal the head distribution is, and vice versa.
"""
if normalize:
series = _normalize(series)
series = series.dropna()
n = series.size
series_mean_diff = series - series.mean()
# Compute the skew for a finite sample
skew = (
(1 / n)
* sum(series_mean_diff**3)
/ (((1 / n) * sum(series_mean_diff**2)) ** 1.5)
)
skew *= (sqrt(n * (n - 1))) / (n - 2)
# Compute the kurtosis for a finite sample
kurt = (1 / n) * sum(series_mean_diff**4) / (
((1 / n) * sum(series_mean_diff**2)) ** 2
) - 3
kurt = ((n - 1) * ((n + 1) * kurt - 3 * (n - 1)) / ((n - 2) * (n - 3))) + 3
return ((skew**2) + 1) / (kurt + ((3 * ((n - 1) ** 2)) / ((n - 2) * (n - 3))))
def _get_events_binned(
series: Series,
normalize: bool = False,
up: bool = True,
bins: int = 300,
min_event_length: int = 10,
min_n_events: int = 2,
) -> Series:
"""Get the recession or recovery events and bin them.
Parameters
----------
series : Series
Pandas Series with DatetimeIndex and head values.
normalize : bool, optional
normalize the time series to values between zero and one.
up : bool, optional
If True, get the recovery events, if False, get the recession events.
bins : int, optional
Number of bins to bin the data to.
min_event_length : int, optional
Minimum length of an event in days. Events shorter than this are discarded.
min_n_events : int, optional
Minimum number of events in a bin. Bins with less events are discarded.
Returns
-------
Series:
Binned events.
"""
if normalize:
series = _normalize(series)
series.name = "difference" # Name the series for the split function
# Get the negative differences
h = series.dropna().copy()
# Set the negative differences to nan if up is True, and the positive differences
# to nan if up is False (down).
if up:
h[h.diff() < 0] = nan
else:
h[h.diff() > 0] = nan
# Split the data into events
events = split(h, where(isnan(h.values))[0])
events = [ev[~isnan(ev.values)] for ev in events if not isinstance(ev, ndarray)]
events_new = []
for ev in events:
# Drop empty events and events shorter than min_events_length.
if ev.empty or ev.index.size < 2:
pass
else:
ev.index = (ev.index - ev.index[0]).days
if ev.index[-1] > min_event_length:
events_new.append(ev)
if len(events_new) == 0:
return Series(dtype=float)
events = concat(events_new, axis=1)
# Subtract the absolute value of the first day of each event
data = events - events.iloc[0, :]
data = data.loc[:, data.sum() != 0.0] # Drop columns with only zeros (no events)
# Bin the data and compute the mean
binned = Series(dtype=float)
for g in data.groupby(
cut(data.index, bins=min(bins, data.index.max())), observed=False
):
# Only use bins with more than 5 events
if g[1].dropna(axis=1).columns.size > min_n_events:
value = g[1].dropna(axis=1).mean(axis=1)
if not value.empty:
binned[g[0].mid] = value.iat[0]
binned = binned[binned != 0].dropna()
return binned
[docs]def recession_constant(
series: Series,
bins: int = 300,
normalize: bool = False,
min_event_length: int = 10,
min_n_events: int = 2,
) -> float:
"""Recession constant adapted after :cite:t:`kirchner_catchments_2009`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
bins: int, optional
Number of bins to bin the data to.
normalize: bool, optional
normalize the time series to values between zero and one.
min_event_length: int, optional
Minimum length of an event in days. Events shorter than this are discarded.
min_n_events: int, optional
Minimum number of events in a bin. Bins with less events are discarded.
Returns
-------
float:
Recession constant in days.
Notes
-----
Recession constant adapted after :cite:t:`kirchner_catchments_2009`, which is the
decay constant of the exponential model fitted to the percentile-wise binned means
of the recession segments. The higher the recession constant, the slower the head
declines, and vice versa. The following function is fitted to the binned data
(similar to the Exponential response function in Pastas):
..math::
h(t) = - h_0 * (1 - exp(-t / c))
where h(t) is the head at time t, h_0 is the final head as t goes to infinity, and
c is the recession constant.
"""
binned = _get_events_binned(
series,
normalize=normalize,
up=False,
bins=bins,
min_event_length=min_event_length,
min_n_events=min_n_events,
)
# Deal with the case that binned is empty
if binned.empty:
return nan
# Fit an exponential model to the binned data and return the decay constant
def f(t, *p):
return -p[0] * (1 - exp(-t / p[1]))
popt, _ = curve_fit(
f, binned.index, binned.values, p0=[1, 100], bounds=(0, [100, 1e3])
)
# Return nan and raise warning if the decay constant is close to the boundary
if isclose(popt[1], 0.0) or isclose(popt[1], 1e3):
msg = (
"The estimated recession constant (%s) is close to the boundary. "
"This may lead to incorrect results."
)
logger.warning(msg, round(popt[1], 2))
return nan
else:
return popt[1]
[docs]def recovery_constant(
series: Series,
bins: int = 300,
normalize: bool = False,
min_event_length: int = 10,
min_n_events: int = 2,
) -> float:
"""Recovery constant after :cite:t:`kirchner_catchments_2009`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
bins: int, optional
Number of bins to bin the data to.
normalize: bool, optional
normalize the time series to values between zero and one.
min_event_length: int, optional
Minimum length of an event in days. Events shorter than this are discarded.
min_n_events: int, optional
Minimum number of events in a bin. Bins with less events are discarded.
Returns
-------
float:
Recovery constant.
Notes
-----
Time constant of the exponential function fitted to percentile-wise binned means
of the recovery segments. The higher the recovery constant, the slower the head
recovers, and vice versa. The following function is fitted to the binned data
(similar to the Exponential response function in Pastas):
..math::
h(t) = h_0 * (1 - exp(-t / c))
where h(t) is the head at time t, h_0 is the final head as t goes to infinity, and
c is the recovery constant.
"""
binned = _get_events_binned(
series,
normalize=normalize,
up=True,
bins=bins,
min_event_length=min_event_length,
)
# Deal with the case that binned is empty
if binned.empty:
return nan
# Fit an exponential model to the binned data and return the decay constant
def f(t, *p):
return -p[0] * (1 - exp(-t / p[1]))
popt, _ = curve_fit(
f, binned.index, binned.values, p0=[1, 100], bounds=(0, [100, 1e3])
)
# Return nan and raise warning if the recovery constant is close to the boundary
if isclose(popt[1], 0.0) or isclose(popt[1], 1e3):
msg = (
"The estimated recovery constant (%s) is close to the boundary. "
"This may lead to incorrect results."
)
logger.warning(msg, round(popt[1], 2))
return nan
else:
return popt[1]
[docs]def duration_curve_slope(
series: Series,
l: float = 0.1, # noqa: E741
u: float = 0.9,
normalize: bool = False,
) -> float:
"""Slope of the head duration curve between percentile l and u after
:cite:t:`oudin_are_2010`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
l: float, optional
lower percentile, a float between 0 and 1, lower than u.
u: float, optional
upper percentile, a float between 0 and 1, higher than l.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
Slope of the head duration curve between percentile l and u.
Notes
-----
Slope of the head duration curve between percentile l and u. The more negative the
slope, the more values are above or below the percentile l and u, and vice versa.
Note that the slope is negative, contrary to the flow duration curve commonly used
in surface water hydrology.
"""
if normalize:
series = _normalize(series)
# Get the series between the percentiles
s = series[
(series > series.quantile(l)) & (series < series.quantile(u))
].sort_values(ascending=False)
# Deal with the case that s is empty
if s.empty:
return nan
s.index = linspace(0, 1, s.size)
return linregress(s.index, s.values).slope
[docs]def duration_curve_ratio(
series: Series,
l: float = 0.1, # noqa: E741
u: float = 0.9,
normalize: bool = True,
) -> float:
"""Ratio of the head duration curve between the percentile l and u after
:cite:t:`richards_measures_1990`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
l: float
lower percentile, a float between 0 and 1, lower than u.
u: float, optional
upper percentile, a float between 0 and 1, higher than l.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
Ratio of the duration curve between the percentile l and u.
Notes
-----
Ratio of the duration curve between the percentile l and u. The higher the ratio,
the flatter the head duration curve, and vice versa.
"""
if normalize:
series = _normalize(series)
return series.quantile(l) / series.quantile(u)
[docs]def richards_pathlength(series: Series, normalize: bool = True) -> float:
"""The path length of the time series, standardized by time series length after
:cite:t:`baker_new_2004`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
Returns
-------
float:
The path length of the time series, standardized by time series length and
median.
Notes
-----
The path length of the time series, standardized by time series length and median.
"""
if normalize:
series = _normalize(series)
series = series.dropna()
dt = diff(series.index.to_numpy()) / Timedelta("1D")
dh = series.diff().dropna()
# sum(dt) is more fair with irregular time series
return sum(sqrt(dh**2 + dt**2)) / (sum(dt) * series.median())
def _baselevel(
series: Series, normalize: bool = True, period="30D"
) -> Tuple[Series, Series]:
"""Baselevel function for the baselevel index and stability.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
period: str, optional
Period to resample the time series to in days (e.g., '10D' or '90D'). Default
is 30 days.
Returns
-------
series: pandas.Series
Pandas Series of the original for
ht: pandas.Series
Pandas Series for the base head
"""
if normalize:
series = _normalize(series)
# A/B. Selecting minima hm over a period
hm = series.resample(period).min().dropna()
series = series.resample("D").interpolate()
# C. define the turning point ht (0.9 * head < adjacent heads)
ht = Series(index=[hm.index[0]], data=[hm.iat[0]], dtype=float)
for i, h in enumerate(hm.iloc[1:-1], start=1):
if (0.9 * h < hm.iat[i - 1]) & (0.9 * h < hm.iat[i + 1]):
ht[hm.index[i]] = h
ht[hm.index[-1]] = hm.iat[-1]
# ensure that index is a DatetimeIndex
ht.index = to_datetime(ht.index)
# D. Interpolate
ht = ht.resample("D").interpolate()
# E. Assign a base head to each day
ht[ht > series.loc[ht.index]] = series
return series, ht
[docs]def baselevel_index(series: Series, normalize: bool = True, period="30D") -> float:
"""Base level index (BLI) adapted after :cite:t:`organization_manual_2008`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
period: str, optional
Period to resample the time series to in days (e.g., '10D' or '90D'). Default
is 30 days.
Returns
-------
float:
Base level index.
Notes
-----
Adapted analogously to its application in streamflow. Here, a baselevel time
series is separated from a X-day minimum head in a moving window. BLI
equals the total sum of heads of original time series divided by the total sum of
heads from the baselevel time series. Both these time series are resampled to daily
heads by interpolation for consistency.
"""
series, ht = _baselevel(series, normalize=normalize, period=period)
return ht.sum() / series.sum()
[docs]def baselevel_stability(series: Series, normalize: bool = True, period="30D") -> float:
"""Baselevel stability after :cite:t:`heudorfer_index-based_2019`.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
normalize: bool, optional
normalize the time series to values between zero and one.
period: str, optional
Period to resample the time series to, in days (e.g., '10D' or '90D'). Default
is 30 days.
Returns
-------
float:
Base level stability.
Notes
-----
Originally developed for streamflow, here the Base Flow Index algorithm is
analogously adapted to groundwater time series as a filter to separate the slow
component (“base level") of the time series. Then, the mean annual base level is
calculated. Base Level Stability is the difference of maximum and minimum annual
base level.
"""
_, ht = _baselevel(series, normalize=normalize, period=period)
return ht.resample(year_offset).mean().max() - ht.resample(year_offset).mean().min()
[docs]def autocorr_time(series: Series, cutoff: float = 0.8, **kwargs) -> float:
"""Lag where the autocorrelation function exceeds a cut-off value.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
cutoff: float, optional
Cut-off value for the autocorrelation function. Default is 0.7.
kwargs: dict, optional
Additional keyword arguments are passed to the pastas acf method.
Returns
-------
float:
Lag in days where the autocorrelation function exceeds the cutoff value.
Notes
-----
Lag in days where the autocorrelation function exceeds the cutoff value for the
first time. The higher the lag, the more autocorrelated the time series is, and
vice versa. In practical terms higher values mean that the groundwater system has
a longer memory and the response to changes in the forcing are visible longer in
the head time series.
"""
c = acf(series.dropna(), **kwargs) # Compute the autocorrelation function
if c.min() > cutoff:
return nan
else:
return (c < cutoff).idxmax() / Timedelta("1D")
def _date_min_max(series: Series, stat: str) -> float:
"""Compute the average date of the minimum head value with circular statistics.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
stat: str
Either "min" or "max". If "min", the average date of the minimum head value is
computed. If "max", the average date of the maximum head value is computed.
Returns
-------
float:
Average date of the minimum or maximum head value.
Notes
-----
The average date is computed by taking the average of the day of the year of the
minimum head value for each year, using circular statistics. We refer to
:cite:t:`fisher_statistical_1995` (page 31) for more information on circular
statistics.
"""
# Get the day of the year of the minimum head value for each year
if stat == "min":
data = series.groupby(series.index.year).idxmin(skipna=True).dropna().values
elif stat == "max":
data = series.groupby(series.index.year).idxmax(skipna=True).dropna().values
doy = DatetimeIndex(data).dayofyear.to_numpy(float)
m = 365.25
two_pi = 2 * pi
thetas = array(doy) * two_pi / m
c = cos(thetas).sum()
s = sin(thetas).sum()
if (s > 0) & (c > 0):
mean_theta = arctan(s / c)
elif c < 0:
mean_theta = arctan(s / c) + pi
elif (s < 0) & (c > 0):
mean_theta = arctan(s / c) + two_pi
else:
# This should never happen
raise ValueError("Something went wrong in the circular statistics.")
return mean_theta * 365.25 / two_pi
[docs]def date_min(series: Series) -> float:
"""Compute the average date of the minimum head value with circular statistics.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
Returns
-------
float:
Average date of the minimum head value.
Notes
-----
Average date of the minimum head value. The higher the date, the later the minimum
head value occurs in the year, and vice versa.
The average date is computed by taking the average of the day of the year of the
minimum head value for each year, using circular statistics. We refer to
:cite:t:`fisher_statistical_1995` (page 31) for more information on circular
statistics.
"""
return _date_min_max(series, "min")
[docs]def date_max(series: Series) -> float:
"""Compute the average date of the maximum head value with circular statistics.
Parameters
----------
series: pandas.Series
Pandas Series with DatetimeIndex and head values.
Returns
-------
float:
Average date of the maximum head value.
Notes
-----
Average date of the maximum head value. The higher the date, the later the maximum
head value occurs in the year, and vice versa.
The average date is computed by taking the average of the day of the year of the
maximum head value for each year, using circular statistics. We refer to
:cite:t:`fisher_statistical_1995` (page 31) for more information on circular
statistics.
"""
return _date_min_max(series, "max")
[docs]def summary(
data: Union[DataFrame, Series], signatures: Optional[list] = None
) -> DataFrame:
"""Method to get many signatures for a time series.
Parameters
----------
data: Union[pandas.DataFrame, pandas.Series]
pandas DataFrame or Series with DatetimeIndex
signatures: list
list of signatures to return. By default all available signatures are returned.
Returns
-------
result: pandas.DataFrame
Pandas DataFrame with every row a signature and the signature value for each column.
Examples
--------
>>> idx = date_range("2000", "2010")
>>> data = np.random.rand(len(idx), 3)
>>> df = DataFrame(index=idx, data=data, columns=[year_offset, "B", "C"], dtype=float)
>>> ps.stats.signatures.summary(df)
"""
if signatures is None:
signatures = __all__
if isinstance(data, DataFrame):
result = DataFrame(index=signatures, columns=data.columns, dtype=float)
elif isinstance(data, Series):
result = DataFrame(index=signatures, columns=[data.name], dtype=float)
else:
raise ValueError("Invalid data type. Expected DataFrame or Series.")
# Get the signatures
for signature in signatures:
# Check if the signature is valid
if signature not in __all__:
msg = "Signature %s is not a valid signature."
logger.error(msg, signature)
raise ValueError(msg % signature)
# Get the function and compute the signature for each column/series
func = getattr(ps.stats.signatures, signature)
if isinstance(data, DataFrame):
result.loc[signature] = data.apply(func)
elif isinstance(data, Series):
result.loc[signature] = func(data)
return result