"""This module contains the noise models available in Pastas.
A Noise model may be used to transform the residual series into a noise series that
better represents white noise.
Examples
--------
By default, a noise model is added to a Pastas model. It is possible to replace the
default model with different models as follows:
>>> n = ps.ArmaNoiseModel()
>>> ml.add_noisemodel(n)
or, to delete the noise model from the model:
>>> ml.del_noisemodel()
See Also
--------
pastas.model.Model.add_noisemodel
"""
from logging import getLogger
from typing import Optional
import numpy as np
from pandas import DataFrame, DatetimeIndex, Series, Timedelta
from pastas.typing import ArrayLike
from .decorators import PastasDeprecationWarning, njit, set_parameter
logger = getLogger(__name__)
__all__ = ["ArNoiseModel", "ArmaNoiseModel"]
[docs]class NoiseModelBase:
_name = "NoiseModelBase"
[docs] def __init__(self) -> None:
self.nparam = 1
self.name = "noise"
self.norm = None
self.parameters = DataFrame(
columns=["initial", "pmin", "pmax", "vary", "name", "dist"]
)
[docs] def set_init_parameters(self, oseries: Optional[Series] = None) -> None:
if oseries is not None:
pinit = np.diff(oseries.index.to_numpy()) / Timedelta("1D")
pinit = np.median(pinit)
else:
pinit = 14.0
self.parameters.loc["noise_alpha"] = (
pinit,
1e-5,
5000.0,
True,
"noise",
"uniform",
)
@set_parameter
def _set_initial(self, name: str, value: float) -> None:
"""Internal method to set the initial parameter value.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "initial"] = value
@set_parameter
def _set_pmin(self, name: str, value: float) -> None:
"""Internal method to set the minimum value of the noisemodel.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "pmin"] = value
@set_parameter
def _set_pmax(self, name: str, value: float) -> None:
"""Internal method to set the maximum parameter values.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "pmax"] = value
@set_parameter
def _set_vary(self, name: str, value: float) -> None:
"""Internal method to set if the parameter is varied.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "vary"] = value
@set_parameter
def _set_dist(self, name: str, value: str) -> None:
"""Internal method to set distribution of prior of the parameter.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "dist"] = str(value)
[docs] def to_dict(self) -> dict:
"""Method to return a dict to store the noise model"""
data = {"class": self._name, "norm": self.norm}
return data
[docs] @staticmethod
def weights(res, p) -> int:
return 1
[docs]class ArNoiseModel(NoiseModelBase):
"""Noise model with exponential decay of the residuals and weighting.
Parameters
----------
norm: boolean, optional
Boolean to indicate whether weights are normalized according to the Von
Asmuth and Bierkens (2005) paper. Default is True.
Notes
-----
Calculates the noise :cite:t:`von_asmuth_modeling_2005` according to:
.. math::
v(t_1) = r(t_1) - r(t_0) * \\exp(- \\Delta t / \\alpha)
Calculates the weights as
.. math::
w = 1 / \\sqrt{(1 - \\exp(-2 \\Delta t / \\alpha))}
The units of the alpha parameter is always in days. The first value of the noise
is the residual (:math:`v(t=0=r(t=0)`). First weight is 1 / sig_residuals (i.e.,
delt = infty). Normalization of weights as in :cite:t:`von_asmuth_modeling_2005`,
optional.
"""
_name = "ArNoiseModel"
[docs] def __init__(self, norm: bool = True) -> None:
NoiseModelBase.__init__(self)
self.norm = norm
self.nparam = 1
self.set_init_parameters()
[docs] @staticmethod
def simulate(res: Series, p: ArrayLike) -> Series:
"""Simulate noise from the residuals.
Parameters
----------
res: pandas.Series
The residual series.
p: array_like
array_like object with the values as floats representing the model
parameters. Here, Alpha parameter used by the noisemodel.
Returns
-------
noise: pandas.Series
Series of the noise.
"""
alpha = p[0]
odelt = np.diff(res.index.to_numpy()) / Timedelta("1D")
v = np.append(
res.values[0], res.values[1:] - np.exp(-odelt / alpha) * res.values[:-1]
)
return Series(data=v, index=res.index, name="Noise")
[docs] def weights(self, res: Series, p: ArrayLike) -> Series:
"""Method to calculate the weights for the noise.
Parameters
----------
res: pandas.Series
Pandas Series with the residuals to compute the weights for. The Series
index must be a DatetimeIndex.
p: array_like
NumPy array with the parameters used in the noise model.
Returns
-------
w: pandas.Series
Series of the weights.
Notes
-----
Weights are
.. math:: w = 1 / sqrt((1 - exp(-2 \\Delta t / \\alpha)))
which are then normalized so that sum(w) = len(res).
"""
alpha = p[0]
# large for first measurement
odelt = np.append(1e12, np.diff(res.index.to_numpy()) / Timedelta("1D"))
exp = np.exp(-2.0 / alpha * odelt) # Twice as fast as 2*odelt/alpha
w = 1 / np.sqrt(1.0 - exp) # weights of noise, not noise^2
if self.norm:
w *= np.exp(1.0 / (2.0 * odelt.size) * np.sum(np.log(1.0 - exp)))
return Series(data=w, index=res.index, name="noise_weights")
[docs] def get_correction(
self, res: Series, p: ArrayLike, tindex: DatetimeIndex
) -> Series:
"""Get the correction for a forecast using the noise model.
Parameters
----------
res : Series
The residual series.
p : ArrayLike
The parameters of the noise model.
tindex : DatetimeIndex
The index of the forecast.
Returns
-------
Series
The correction to the forecast.
Notes
-----
The correction is calculated as:
.. math::
correction = \\exp(-\\Delta t / \\alpha) * last_residual
where :math:`\\Delta t` is the time difference between the last observation
and the forecast, and :math:`\\alpha` is the noise parameter.
"""
alpha = p[0]
last_residual = res.iat[-1]
last_date = res.index[-1]
dt = (tindex - last_date).days
correction = Series(
index=tindex,
name="correction",
dtype=float,
data=np.exp(-dt / alpha) * last_residual,
)
return correction
[docs] def to_dict(self) -> dict:
"""Method to return a dict to store the noise model"""
data = {"class": self._name, "norm": self.norm}
return data
@PastasDeprecationWarning(
remove_version="2.0.0", reason="Please use `ps.ArNoiseModel` instead."
)
def NoiseModel(*args, **kwargs) -> ArNoiseModel:
n = ArNoiseModel(*args, **kwargs)
n._name = "NoiseModel"
return n
[docs]class ArmaNoiseModel(NoiseModelBase):
"""ARMA(1,1) Noise model to simulate the noise as defined in
:cite:t:`collenteur_estimation_2021`.
Notes
-----
Calculates the noise according to:
.. math::
\\upsilon_t = r_t - r_{t-1} e^{-\\Delta t/\\alpha} - \\upsilon_{t-1}
e^{-\\Delta t/\\beta}
The units of the alpha and beta parameters are always in days.
Warnings
--------
This model has only been tested on regular time steps and should not be used for
irregular time steps yet.
"""
_name = "ArmaNoiseModel"
[docs] def __init__(self) -> None:
NoiseModelBase.__init__(self)
self.nparam = 2
self.set_init_parameters()
[docs] def set_init_parameters(self, oseries: Series = None) -> None:
if oseries is not None:
pinit = np.diff(oseries.index.to_numpy()) / Timedelta("1D")
pinit = np.median(pinit)
else:
pinit = 14.0
self.parameters.loc["noise_alpha"] = (
pinit,
1e-9,
5000.0,
True,
"noise",
"uniform",
)
self.parameters.loc["noise_beta"] = (
1.0,
-np.inf,
np.inf,
True,
"noise",
"uniform",
)
[docs] def simulate(self, res: Series, p: ArrayLike) -> Series:
alpha = p[0]
beta = p[1]
# Calculate the time steps
odelt = np.diff(res.index.to_numpy()) / Timedelta("1D")
a = self.calculate_noise(res.values, odelt, alpha, beta)
return Series(index=res.index, data=a, name="Noise")
[docs] @staticmethod
@njit
def calculate_noise(
res: ArrayLike, odelt: ArrayLike, alpha: float, beta: float
) -> ArrayLike:
# Create an array to store the noise
a = np.zeros_like(res)
a[0] = res[0]
if beta == 0.0: # Prevent division by zero errors
beta = 1e-24
pm = beta / np.abs(beta)
# We have to loop through each value
for i in range(1, res.size):
a[i] = (
res[i]
- res[i - 1] * np.exp(-odelt[i - 1] / alpha)
- a[i - 1] * pm * np.exp(-odelt[i - 1] / np.abs(beta))
)
return a
@PastasDeprecationWarning(
remove_version="2.0.0", reason="Please use `ps.ArmaNoiseModel` instead."
)
def ArmaModel(*args, **kwargs) -> ArmaNoiseModel:
n = ArmaNoiseModel(*args, **kwargs)
n._name = "ArmaModel"
return n