"""This module contains the noise models available in Pastas.
A Noise model may be used to transform the residual series into a noise series that
better represents white noise.
Examples
--------
By default, a noise model is added to a Pastas model. It is possible to replace the
default model with different models as follows:
>>> n = ps.ArmaNoiseModel()
>>> ml.add_noisemodel(n)
or, to delete the noise model from the model:
>>> ml.del_noisemodel()
See Also
--------
pastas.model.Model.add_noisemodel
"""
from logging import getLogger
import numpy as np
from pandas import DataFrame, DatetimeIndex, Series, Timedelta
from pastas.typing import ArrayLike
from .decorators import PastasDeprecationWarning, njit, set_parameter
logger = getLogger(__name__)
__all__ = ["ArNoiseModel", "ArmaNoiseModel"]
[docs]class NoiseModelBase:
_name = "NoiseModelBase"
[docs] def __init__(self) -> None:
self.nparam = 1
self.name = "noise"
self.norm = None
self.parameters = DataFrame(
columns=["initial", "pmin", "pmax", "vary", "name", "dist"]
)
[docs] def set_init_parameters(self, oseries: Series | None = None) -> None:
if oseries is not None:
pinit = np.diff(oseries.index.to_numpy()) / Timedelta("1D")
pinit = np.median(pinit)
else:
pinit = 14.0
self.parameters.loc["noise_alpha"] = (
pinit,
1e-5,
5000.0,
True,
"noise",
"uniform",
)
@set_parameter
def _set_initial(self, name: str, value: float) -> None:
"""Internal method to set the initial parameter value.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "initial"] = value
@set_parameter
def _set_pmin(self, name: str, value: float) -> None:
"""Internal method to set the minimum value of the noisemodel.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "pmin"] = value
@set_parameter
def _set_pmax(self, name: str, value: float) -> None:
"""Internal method to set the maximum parameter values.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "pmax"] = value
@set_parameter
def _set_vary(self, name: str, value: float) -> None:
"""Internal method to set if the parameter is varied.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "vary"] = value
@set_parameter
def _set_dist(self, name: str, value: str) -> None:
"""Internal method to set distribution of prior of the parameter.
Notes
-----
The preferred method for parameter setting is through the model.
"""
self.parameters.at[name, "dist"] = str(value)
[docs] def to_dict(self) -> dict:
"""Method to return a dict to store the noise model"""
data = {"class": self._name, "norm": self.norm}
return data
[docs] @staticmethod
def weights(res, p) -> int:
return 1
[docs]class ArNoiseModel(NoiseModelBase):
"""Noise model with exponential decay of the residuals and weighting.
Parameters
----------
norm: boolean, optional
Boolean to indicate whether weights are normalized according to the Von
Asmuth and Bierkens (2005) paper. Default is True.
Notes
-----
Calculates the noise :cite:t:`von_asmuth_modeling_2005` according to:
.. math::
v(t_1) = r(t_1) - r(t_0) * \\exp(- \\Delta t / \\alpha)
Calculates the weights as
.. math::
w = 1 / \\sqrt{(1 - \\exp(-2 \\Delta t / \\alpha))}
The units of the alpha parameter is always in days. The first value of the noise
is the residual (:math:`v(t=0=r(t=0)`). First weight is 1 / sig_residuals (i.e.,
delt = infty). Normalization of weights as in :cite:t:`von_asmuth_modeling_2005`,
optional.
"""
_name = "ArNoiseModel"
[docs] def __init__(self, norm: bool = True) -> None:
NoiseModelBase.__init__(self)
self.norm = norm
self.nparam = 1
self.set_init_parameters()
[docs] @staticmethod
def simulate(res: Series, p: ArrayLike) -> Series:
"""Simulate noise from the residuals.
Parameters
----------
res: pandas.Series
The residual series.
p: array_like
array_like object with the values as floats representing the model
parameters. Here, Alpha parameter used by the noisemodel.
Returns
-------
noise: pandas.Series
Series of the noise.
"""
alpha = p[0]
odelt = np.diff(res.index.to_numpy()) / Timedelta("1D")
v = np.append(
res.values[0], res.values[1:] - np.exp(-odelt / alpha) * res.values[:-1]
)
return Series(data=v, index=res.index, name="Noise")
[docs] def weights(self, res: Series, p: ArrayLike) -> Series:
"""Method to calculate the weights for the noise.
Parameters
----------
res: pandas.Series
Pandas Series with the residuals to compute the weights for. The Series
index must be a DatetimeIndex.
p: array_like
NumPy array with the parameters used in the noise model.
Returns
-------
w: pandas.Series
Series of the weights.
Notes
-----
Weights are
.. math:: w = 1 / sqrt((1 - exp(-2 \\Delta t / \\alpha)))
which are then normalized so that sum(w) = len(res).
"""
alpha = p[0]
# large for first measurement
odelt = np.append(1e12, np.diff(res.index.to_numpy()) / Timedelta("1D"))
exp = np.exp(-2.0 / alpha * odelt) # Twice as fast as 2*odelt/alpha
w = 1 / np.sqrt(1.0 - exp) # weights of noise, not noise^2
if self.norm:
w *= np.exp(1.0 / (2.0 * odelt.size) * np.sum(np.log(1.0 - exp)))
return Series(data=w, index=res.index, name="noise_weights")
[docs] def get_correction(
self, res: Series, p: ArrayLike, tindex: DatetimeIndex
) -> Series:
"""Get the correction for a forecast using the noise model.
Parameters
----------
res : Series
The residual series.
p : ArrayLike
The parameters of the noise model.
tindex : DatetimeIndex
The index of the forecast.
Returns
-------
Series
The correction to the forecast.
Notes
-----
The correction is calculated as:
.. math::
correction = \\exp(-\\Delta t / \\alpha) * last_residual
where :math:`\\Delta t` is the time difference between the last observation
and the forecast, and :math:`\\alpha` is the noise parameter.
"""
alpha = p[0]
last_residual = res.iat[-1]
last_date = res.index[-1]
dt = (tindex - last_date).days
correction = Series(
index=tindex,
name="correction",
dtype=float,
data=np.exp(-dt / alpha) * last_residual,
)
return correction
[docs] def to_dict(self) -> dict:
"""Method to return a dict to store the noise model"""
data = {"class": self._name, "norm": self.norm}
return data
@PastasDeprecationWarning(
remove_version="2.0.0", reason="Please use `ps.ArNoiseModel` instead."
)
def NoiseModel(*args, **kwargs) -> ArNoiseModel:
n = ArNoiseModel(*args, **kwargs)
n._name = "NoiseModel"
return n
[docs]class ArmaNoiseModel(NoiseModelBase):
"""ARMA(1,1) Noise model to simulate the noise as defined in
:cite:t:`collenteur_estimation_2021`.
Notes
-----
Calculates the noise according to:
.. math::
\\upsilon_t = r_t - r_{t-1} e^{-\\Delta t/\\alpha} - \\upsilon_{t-1}
e^{-\\Delta t/\\beta}
The units of the alpha and beta parameters are always in days.
Warnings
--------
This model has only been tested on regular time steps and should not be used for
irregular time steps yet.
"""
_name = "ArmaNoiseModel"
[docs] def __init__(self) -> None:
NoiseModelBase.__init__(self)
self.nparam = 2
self.set_init_parameters()
[docs] def set_init_parameters(self, oseries: Series = None) -> None:
if oseries is not None:
pinit = np.diff(oseries.index.to_numpy()) / Timedelta("1D")
pinit = np.median(pinit)
else:
pinit = 14.0
self.parameters.loc["noise_alpha"] = (
pinit,
1e-9,
5000.0,
True,
"noise",
"uniform",
)
self.parameters.loc["noise_beta"] = (
1.0,
-np.inf,
np.inf,
True,
"noise",
"uniform",
)
[docs] def simulate(self, res: Series, p: ArrayLike) -> Series:
alpha = p[0]
beta = p[1]
# Calculate the time steps
odelt = np.diff(res.index.to_numpy()) / Timedelta("1D")
a = self.calculate_noise(res.values, odelt, alpha, beta)
return Series(index=res.index, data=a, name="Noise")
[docs] @staticmethod
@njit
def calculate_noise(
res: ArrayLike, odelt: ArrayLike, alpha: float, beta: float
) -> ArrayLike:
# Create an array to store the noise
a = np.zeros_like(res)
a[0] = res[0]
if beta == 0.0: # Prevent division by zero errors
beta = 1e-24
pm = beta / np.abs(beta)
# We have to loop through each value
for i in range(1, res.size):
a[i] = (
res[i]
- res[i - 1] * np.exp(-odelt[i - 1] / alpha)
- a[i - 1] * pm * np.exp(-odelt[i - 1] / np.abs(beta))
)
return a
@PastasDeprecationWarning(
remove_version="2.0.0", reason="Please use `ps.ArmaNoiseModel` instead."
)
def ArmaModel(*args, **kwargs) -> ArmaNoiseModel:
n = ArmaNoiseModel(*args, **kwargs)
n._name = "ArmaModel"
return n