Source code for pastas.rfunc

# coding=utf-8
"""This module contains all the response functions available in Pastas."""

from logging import getLogger

import numpy as np
from numpy import pi
from pandas import DataFrame
from scipy.integrate import quad
from scipy.interpolate import interp1d
from scipy.special import (
    erfc,
    erfcinv,
    exp1,
    gamma,
    gammainc,
    gammaincinv,
    k0,
    k1,
    lambertw,
)

from .decorators import latexfun, njit
from .version import check_numba_scipy

try:
    from numba import prange
except ModuleNotFoundError:
    prange = range

# Type Hinting
from typing import Optional, Union

from pastas.typing import ArrayLike

logger = getLogger(__name__)

__all__ = [
    "Gamma",
    "Exponential",
    "Hantush",
    "Polder",
    "FourParam",
    "DoubleExponential",
    "One",
    "Edelman",
    "HantushWellModel",
    "Kraijenhoff",
    "Spline",
]


[docs]class RfuncBase: _name = "RfuncBase"
[docs] def __init__( self, cutoff: float = 0.999, **kwargs, ) -> None: self.cutoff = cutoff if "up" in kwargs: raise TypeError( "keyword argument 'up' is not supported in init. " "Set with update_rfunc_settings()." ) if "gain_scale_factor" in kwargs: raise TypeError( "keyword argument 'gain_scale_factor' is not supported in " "init. Set with update_rfunc_settings()." ) # initialize attributes, these can be set with update_rfunc_settings() self.up = None self.gain_scale_factor = 1.0
[docs] def update_rfunc_settings( self, up: Optional[bool] = "nochange", gain_scale_factor: Optional[float] = None, cutoff: Optional[float] = None, ) -> None: """Internal method to set the settings of the response function. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. Notes ----- Only change the settings if values are provided! """ if up != "nochange": self.up = up if gain_scale_factor is not None: if 1e-8 > gain_scale_factor > 0: gain_scale_factor = 1e-8 # arbitrary number to prevent division by zero elif gain_scale_factor < 0 and up is True: gain_scale_factor = gain_scale_factor * -1 self.gain_scale_factor = gain_scale_factor if cutoff is not None: self.cutoff = cutoff
[docs] def get_init_parameters(self, name: str) -> DataFrame: """Get initial parameters and bounds. It is called by the stressmodel. Parameters ---------- name: str Name of the stressmodel. Returns ------- parameters: pandas DataFrame The initial parameters and parameter bounds used by the solver. """
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: """Method to get the response time for a certain cutoff. Parameters ---------- p: array_like array_like object with the values as floats representing the model parameters. cutoff: float, optional proportion after which the step function is cut off. default is 0.999. Returns ------- tmax: float Number of days when 99.9% of the response has effectuated, when the cutoff is chosen at 0.999. """
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: """Method to return the step function. Parameters ---------- p: array_like array_like object with the values as floats representing the model parameters. dt: float timestep as a multiple of one day. cutoff: float, optional proportion after which the step function is cut off. default is 0.999. maxtmax: int, optional Maximum timestep to compute the block response for. Returns ------- s: array_like Array with the step response. """ return
[docs] def block( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: """Method to return the block function. Parameters ---------- p: array_like array_like object with the values as floats representing the model parameters. dt: float timestep as a multiple of one day. cutoff: float, optional proportion after which the step function is cut off. maxtmax: int, optional Maximum timestep to compute the block response for. Returns ------- s: array_like Array with the block response. """ s = self.step(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) return np.append(s[0], np.subtract(s[1:], s[:-1]))
[docs] @staticmethod def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: """Method to return the impulse response function. Parameters ---------- t: array_like array_like object with the times at which to evaluate the impulse response, can be obtained with get_t() method p: array_like array_like object with the values as floats representing the model parameters. Returns ------- s: array_like Array with the impulse response. Notes ----- The impulse response function for each response function class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.RfuncName.impulse Only used for internal consistency checks """
[docs] def get_t( self, p: ArrayLike, dt: float, cutoff: float, maxtmax: Optional[int] = None ) -> ArrayLike: """Internal method to determine the times at which to evaluate the step response, from t=0. Parameters ---------- p: array_like array_like object with the values as floats representing the model parameters. dt: float timestep as a multiple of one day. cutoff: float proportion after which the step function is cut off. maxtmax: float, optional The maximum time of the response, usually set to the simulation length. Returns ------- t: array_like Array with the times. """ if isinstance(dt, np.ndarray): return dt else: tmax = self.get_tmax(p, cutoff) if maxtmax is not None: tmax = min(tmax, maxtmax) tmax = max(tmax, 3 * dt) return np.arange(dt, tmax, dt)
[docs] def to_dict(self): """Method to export the response function to a dictionary. Returns ------- data: dict dictionary with all necessary information to reconstruct the rfunc object. Notes ----- The exported dictionary should exactly match the input arguments of __init__. """ data = { "class": self._name, "up": self.up, "gain_scale_factor": self.gain_scale_factor, "cutoff": self.cutoff, } return data
[docs]class Gamma(RfuncBase): """Gamma response function with 3 parameters A, a, and n. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional mean value of the stress, used to set the initial value such that the final step times the mean stress equals 1. cutoff: float, optional proportion after which the step function is cut off. Notes ----- The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.Gamma.impulse The Gamma function is equal to the Exponential function when n=1. """ _name = "Gamma"
[docs] def __init__( self, cutoff: float = 0.999, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 3
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, 1e-5, 100 / self.gain_scale_factor, True, name, ) elif self.up is False: parameters.loc[name + "_A"] = ( -1 / self.gain_scale_factor, -100 / self.gain_scale_factor, -1e-5, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) # if n is too small, the length of response function is close to zero parameters.loc[name + "_n"] = (1, 0.01, 100, True, name) parameters.loc[name + "_a"] = (10, 0.01, 1e4, True, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: if cutoff is None: cutoff = self.cutoff return gammaincinv(p[1], cutoff) * p[2]
[docs] def gain(self, p: ArrayLike) -> float: return p[0]
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) s = p[0] * gammainc(p[1], t / p[2]) return s
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta", "gamma": "Gamma"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: A, n, a = p return A * t ** (n - 1) * np.exp(-t / a) / (a**n * gamma(n))
[docs]class Exponential(RfuncBase): """Exponential response function with 2 parameters: A and a. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. Notes ----- The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.Exponential.impulse """ _name = "Exponential"
[docs] def __init__( self, cutoff: float = 0.999, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 2
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, 1e-5, 100 / self.gain_scale_factor, True, name, ) elif self.up is False: parameters.loc[name + "_A"] = ( -1 / self.gain_scale_factor, -100 / self.gain_scale_factor, -1e-5, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) parameters.loc[name + "_a"] = (10, 0.01, 1000, True, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff=None) -> float: if cutoff is None: cutoff = self.cutoff return -p[1] * np.log(1 - cutoff)
[docs] def gain(self, p: ArrayLike) -> float: return p[0]
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[float] = None, ) -> ArrayLike: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) s = p[0] * (1.0 - np.exp(-t / p[1])) return s
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: A, a = p return A / a * np.exp(-t / a)
[docs]class HantushWellModel(RfuncBase): """An implementation of the Hantush well function for multiple pumping wells. Parameters ---------- up: bool, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False). gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. use_numba: bool, optional Use the method 'numba_step' to compute the step_response. quad: bool, optional Use the method 'numba_quad' to compute the step_response. Notes ----- The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.HantushWellModel.impulse where r is the distance from the pumping well to the observation point and must be specified. A, a, and b are parameters, which are slightly different from the Hantush response function. The gain is defined as: :math:`\\text{gain} = A K_0 \\left( 2r \\sqrt(b) \\right)` The implementation used here is explained in :cite:t:`veling_hantush_2010`. """ _name = "HantushWellModel"
[docs] def __init__( self, cutoff: float = 0.999, use_numba: bool = False, quad: bool = False, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.distances = None self.nparam = 3 self.use_numba = use_numba # requires numba_scipy for real speedups self.quad = quad # if quad=True, implicitly uses numba # check numba and numba_scipy installation if self.quad or self.use_numba: # turn off use_numba if numba_scipy is not available # or there is a version conflict if self.use_numba: self.use_numba = check_numba_scipy()
[docs] def set_distances(self, distances) -> None: self.distances = distances
[docs] def get_init_parameters(self, name: str) -> DataFrame: if self.distances is None: raise ( Exception( "distances is None. Set using method set_distances() or use " "Hantush." ) ) parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: # divide by k0(2) to get same initial value as ps.Hantush parameters.loc[name + "_A"] = ( 1 / (self.gain_scale_factor * k0(2)), 0, np.nan, True, name, ) elif self.up is False: # divide by k0(2) to get same initial value as ps.Hantush parameters.loc[name + "_A"] = ( -1 / (self.gain_scale_factor * k0(2)), np.nan, 0, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) parameters.loc[name + "_a"] = (100, 1e-3, 1e4, True, name) # set initial and bounds for b taking into account distances # note log transform to avoid tiny values for b binit = np.log(1.0 / np.mean(self.distances) ** 2) bmin = np.log(1e-6 / np.max(self.distances) ** 2) bmax = np.log(25.0 / np.min(self.distances) ** 2) parameters.loc[name + "_b"] = (binit, bmin, bmax, True, name) return parameters
@staticmethod def _get_distance_from_params(p: ArrayLike) -> float: if len(p) == 3: r = 1.0 logger.info("No distance passed to HantushWellModel, assuming r=1.0.") else: r = p[3] return r
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: r = self._get_distance_from_params(p) # approximate formula for tmax if cutoff is None: cutoff = self.cutoff a, b = p[1:3] rho = 2 * r * np.exp(b / 2) k0rho = k0(rho) if k0rho == 0.0: return 50 * 365.0 # 50 years, need to set some tmax if k0rho==0.0 else: return lambertw(1 / ((1 - cutoff) * k0rho)).real * a
[docs] def gain(self, p: ArrayLike, r: Optional[float] = None) -> float: if r is None: r = self._get_distance_from_params(p) rho = 2 * r * np.exp(p[2] / 2) return p[0] * k0(rho)
@staticmethod @njit def _integrand_hantush(y: float, b: float) -> float: return np.exp(-y - (b / y)) / y
[docs] @staticmethod @njit(parallel=True) def numba_step(A: float, a: float, b: float, r: float, t: ArrayLike) -> ArrayLike: rho = 2 * r * np.exp(b / 2) rhosq = rho**2 k0rho = k0(rho) tau = t / a w = (exp1(rho) - k0rho) / (exp1(rho) - exp1(rho / 2)) F = np.zeros((tau.size,), dtype=np.float64) for i in prange(tau.size): tau_i = tau[i] if tau_i < rho / 2: F[i] = w * exp1(rhosq / (4 * tau_i)) - (w - 1) * exp1( tau_i + rhosq / (4 * tau_i) ) elif tau_i >= rho / 2: F[i] = ( 2 * k0rho - w * exp1(tau_i) + (w - 1) * exp1(tau_i + rhosq / (4 * tau_i)) ) return A * F / 2
[docs] @staticmethod def numpy_step(A: float, a: float, b: float, r: float, t: ArrayLike) -> ArrayLike: rho = 2 * r * np.exp(b / 2) rhosq = rho**2 k0rho = k0(rho) tau = t / a tau1 = tau[tau < rho / 2] tau2 = tau[tau >= rho / 2] w = (exp1(rho) - k0rho) / (exp1(rho) - exp1(rho / 2)) F = np.zeros_like(tau) F[tau < rho / 2] = w * exp1(rhosq / (4 * tau1)) - (w - 1) * exp1( tau1 + rhosq / (4 * tau1) ) F[tau >= rho / 2] = ( 2 * k0rho - w * exp1(tau2) + (w - 1) * exp1(tau2 + rhosq / (4 * tau2)) ) return A * F / 2
[docs] def quad_step( self, A: float, a: float, b: float, r: float, t: ArrayLike ) -> ArrayLike: F = np.zeros_like(t) brsq = np.exp(b) * r**2 u = a * brsq / t for i in range(0, len(t)): F[i] = quad(self._integrand_hantush, u[i], np.inf, args=(brsq,))[0] return F * A / 2
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: A, a, b = p[:3] r = self._get_distance_from_params(p) t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) if self.quad: return self.quad_step(A, a, b, r, t) else: # if numba_scipy is available and param a >= ~30, numba is faster if a >= 30.0 and self.use_numba: return self.numba_step(A, a, b, r, t) else: # otherwise numpy is faster return self.numpy_step(A, a, b, r, t)
[docs] @staticmethod def variance_gain( A: float, b: float, var_A: float, var_b: float, cov_Ab: float, r: float = 1.0, ) -> Union[float, ArrayLike]: """Calculate variance of the gain from parameters A and b. Variance of the gain is calculated based on propagation of uncertainty using optimal values, the variances of A and b and the covariance between A and b. Notes ----- Estimated variance can be biased for non-linear functions as it uses truncated series expansion. Parameters ---------- A : float optimal value of parameter A, (e.g. ml.parameters.optimal). b : float optimal value of parameter b, (e.g. ml.parameters.optimal). var_A : float variance of parameter A, can be obtained from the diagonal of the covariance matrix (e.g. ml.fit.pcov). var_b : float variance of parameter A, can be obtained from the diagonal of the covariance matrix (e.g. ml.fit.pcov). cov_Ab : float covariance between A and b, can be obtained from the covariance matrix ( e.g. ml.fit.pcov). r : float or array_like, optional distance(s) between observation well and stress(es), default value is 1.0. Returns ------- var_gain : float or array_like variance of the gain calculated based on propagation of uncertainty of parameters A and b. See Also -------- ps.WellModel.variance_gain """ var_gain = ( (k0(2 * r * np.exp(b / 2))) ** 2 * var_A + (A * r * k1(2 * r * np.exp(b / 2))) ** 2 * np.exp(b) * var_b - 2 * A * r * k0(2 * r * np.exp(b / 2)) * k1(2 * r * np.exp(b / 2)) * np.exp(b / 2) * cov_Ab ) return var_gain
[docs] def to_dict(self): """Method to export the response function to a dictionary. Returns ------- data: dict dictionary with all necessary information to reconstruct the rfunc object. Notes ----- The exported dictionary should exactly match the input arguments of __init__. """ data = { "class": self._name, "up": self.up, "gain_scale_factor": self.gain_scale_factor, "cutoff": self.cutoff, "use_numba": self.use_numba, "quad": self.quad, } return data
[docs]class Hantush(RfuncBase): """The Hantush well function, using the standard A, a, b parameters. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. use_numba: bool, optional Use the method 'numba_step' to compute the step_response. quad: bool, optional Use the method 'numba_quad' to compute the step_response. Notes ----- Notes ----- The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.Hantush.impulse The implementation used here is explained in :cite:t:`veling_hantush_2010`. """ _name = "Hantush"
[docs] def __init__( self, cutoff: float = 0.999, use_numba: bool = False, quad: bool = False, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 3 self.use_numba = use_numba self.quad = quad # check numba and numba_scipy installation if self.quad or self.use_numba: # turn off use_numba if numba_scipy is not available # or there is a version conflict if self.use_numba: self.use_numba = check_numba_scipy()
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, 0, np.nan, True, name, ) elif self.up is False: parameters.loc[name + "_A"] = ( -1 / self.gain_scale_factor, np.nan, 0, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) parameters.loc[name + "_a"] = (100, 1e-3, 1e4, True, name) parameters.loc[name + "_b"] = (1, 1e-6, 25, True, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: # approximate formula for tmax if cutoff is None: cutoff = self.cutoff a, b = p[1:] rho = 2 * np.sqrt(b) return lambertw(1 / ((1 - cutoff) * k0(rho))).real * a
[docs] @staticmethod def gain(p: ArrayLike) -> float: return p[0]
@staticmethod @njit def _integrand_hantush(y: float, b: float) -> float: return np.exp(-y - (b / y)) / y
[docs] @staticmethod @njit(parallel=True) def numba_step(A: float, a: float, b: float, t: ArrayLike) -> ArrayLike: rho = 2 * np.sqrt(b) rhosq = rho**2 k0rho = k0(rho) tau = t / a w = (exp1(rho) - k0rho) / (exp1(rho) - exp1(rho / 2)) F = np.zeros((tau.size,), dtype=np.float64) for i in prange(tau.size): tau_i = tau[i] if tau_i < rho / 2: F[i] = w * exp1(rhosq / (4 * tau_i)) - (w - 1) * exp1( tau_i + rhosq / (4 * tau_i) ) elif tau_i >= rho / 2: F[i] = ( 2 * k0rho - w * exp1(tau_i) + (w - 1) * exp1(tau_i + rhosq / (4 * tau_i)) ) return A * F / (2 * k0rho)
[docs] @staticmethod def numpy_step(A: float, a: float, b: float, t: ArrayLike) -> ArrayLike: rho = 2 * np.sqrt(b) rhosq = rho**2 k0rho = k0(rho) tau = t / a tau_mask = tau < rho / 2 tau1 = tau[tau_mask] tau2 = tau[~tau_mask] w = (exp1(rho) - k0rho) / (exp1(rho) - exp1(rho / 2)) F = np.zeros_like(tau) F[tau_mask] = w * exp1(rhosq / (4 * tau1)) - (w - 1) * exp1( tau1 + rhosq / (4 * tau1) ) F[~tau_mask] = ( 2 * k0rho - w * exp1(tau2) + (w - 1) * exp1(tau2 + rhosq / (4 * tau2)) ) return A * F / (2 * k0rho)
[docs] def quad_step(self, A: float, a: float, b: float, t: ArrayLike) -> ArrayLike: F = np.zeros_like(t) u = a * b / t for i in range(0, len(t)): F[i] = quad(self._integrand_hantush, u[i], np.inf, args=(b,))[0] return F * A / (2 * k0(2 * np.sqrt(b)))
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: A, a, b = p t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) if self.quad: return self.quad_step(A, a, b, t) else: # if numba_scipy is available and param a >= ~30, numba is faster if a >= 30.0 and self.use_numba: return self.numba_step(A, a, b, t) else: # otherwise numpy is faster return self.numpy_step(A, a, b, t)
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta", "k0": "K_0"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: A, a, b = p return A / (2 * t * k0(2 * np.sqrt(b))) * np.exp(-t / a - a * b / t)
[docs] def to_dict(self): """Method to export the response function to a dictionary. Returns ------- data: dict dictionary with all necessary information to reconstruct the rfunc object. Notes ----- The exported dictionary should exactly match the input arguments of __init__. """ data = { "class": self._name, "up": self.up, "gain_scale_factor": self.gain_scale_factor, "cutoff": self.cutoff, "use_numba": self.use_numba, "quad": self.quad, } return data
[docs]class Polder(RfuncBase): """The Polder function, using the standard A, a, b parameters. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. Notes ----- The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.Polder.impulse The function is explained in Eq. 123.32 in:cite:t:`bruggeman_analytical_1999`. """ _name = "Polder"
[docs] def __init__( self, cutoff: float = 0.999, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 3
[docs] def get_init_parameters(self, name) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) parameters.loc[name + "_A"] = (1, 0, 2, True, name) parameters.loc[name + "_a"] = (10, 0.01, 1000, True, name) parameters.loc[name + "_b"] = (1, 1e-6, 25, True, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: if cutoff is None: cutoff = self.cutoff _, a, b = p b = a * b x = np.sqrt(b / a) inverfc = erfcinv(2 * cutoff) y = (-inverfc + np.sqrt(inverfc**2 + 4 * x)) / 2 tmax = a * y**2 return tmax
[docs] def gain(self, p: ArrayLike) -> float: # the steady state solution of Mazure g = p[0] * np.exp(-np.sqrt(4 * p[2])) if not self.up: g = -g return g
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) A, a, b = p s = A * self.polder_function(np.sqrt(b), np.sqrt(t / a)) # / np.exp(-2 * np.sqrt(b)) if not self.up: s = -s return s
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: A, a, b = p return A * np.sqrt(a * b / pi) * t ** (-1.5) * np.exp(-t / a - a * b / t)
[docs] @staticmethod @latexfun(use_raw_function_name=True) def polder_function(x: float, y: float) -> float: return 0.5 * np.exp(2 * x) * erfc(x / y + y) + 0.5 * np.exp(-2 * x) * erfc( x / y - y )
[docs]class One(RfuncBase): """Instant response with no lag and one parameter d. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True) or down (False), if None (default) the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. Has no influence for this response function. """ _name = "One"
[docs] def __init__( self, cutoff: float = 0.999, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 1
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_d"] = ( self.gain_scale_factor, 0, np.nan, True, name, ) elif self.up is False: parameters.loc[name + "_d"] = ( -self.gain_scale_factor, np.nan, 0, True, name, ) else: parameters.loc[name + "_d"] = ( self.gain_scale_factor, np.nan, np.nan, True, name, ) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: return 0.0
[docs] def gain(self, p: ArrayLike) -> float: return p[0]
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: if isinstance(dt, np.ndarray): return p[0] * np.ones(len(dt)) else: return p[0] * np.ones(1)
[docs] def block( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: return p[0] * np.ones(1)
[docs]class FourParam(RfuncBase): """Four Parameter response function with 4 parameters A, a, b, and n. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. quad: bool, optional If true, use the 'quad' method from scipy.integrate to integrate the impulse response function. This may be more accurate but increases computation times. Notes ----- The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.FourParam.impulse """ _name = "FourParam"
[docs] def __init__( self, cutoff: float = 0.999, quad: bool = False, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 4 self.quad = quad
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, 0, 100 / self.gain_scale_factor, True, name, ) elif self.up is False: parameters.loc[name + "_A"] = ( -1 / self.gain_scale_factor, -100 / self.gain_scale_factor, 0, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) parameters.loc[name + "_n"] = (1, -10, 10, True, name) parameters.loc[name + "_a"] = (10, 0.01, 5000, True, name) parameters.loc[name + "_b"] = (10, 1e-6, 25, True, name) return parameters
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: _, n, a, b = p return (t ** (n - 1)) * np.exp(-t / a - a * b / t)
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: if cutoff is None: cutoff = self.cutoff # Because Model.get_response_tmax() provides parameters for the stressmodel, # not only the response functions if len(p) > 4: p = p[:4] if self.quad: x = np.arange(1, 10000, 1) y = np.zeros_like(x) func = self.impulse(x, p) func_half = self.impulse(x[:-1] + 1 / 2, p) y[1:] = y[0] + np.cumsum(1 / 6 * (func[:-1] + 4 * func_half + func[1:])) y = y / quad(self.impulse, 0, np.inf, args=p)[0] return np.searchsorted(y, cutoff) else: t1 = -np.sqrt(3 / 5) t2 = 0 t3 = np.sqrt(3 / 5) w1 = 5 / 9 w2 = 8 / 9 w3 = 5 / 9 x = np.arange(1, 10000, 1) y = np.zeros_like(x) func = self.impulse(x, p) func_half = self.impulse(x[:-1] + 1 / 2, p) y[0] = 0.5 * ( w1 * self.impulse(0.5 * t1 + 0.5, p) + w2 * self.impulse(0.5 * t2 + 0.5, p) + w3 * self.impulse(0.5 * t3 + 0.5, p) ) y[1:] = y[0] + np.cumsum(1 / 6 * (func[:-1] + 4 * func_half + func[1:])) y = y / quad(self.impulse, 0, np.inf, args=p)[0] return np.searchsorted(y, cutoff)
[docs] @staticmethod def gain(p: ArrayLike) -> float: return p[0]
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: # Because Model.get_response_tmax() provides parameters for the stressmodel, # not only the response functions if len(p) > 4: p = p[:4] if self.quad: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) s = np.zeros_like(t) s[0] = quad(self.impulse, 0, dt, args=p)[0] for i in range(1, len(t)): s[i] = s[i - 1] + quad(self.impulse, t[i - 1], t[i], args=p)[0] s = s * (p[0] / (quad(self.impulse, 0, np.inf, args=p))[0]) return s else: t1 = -np.sqrt(3 / 5) t2 = 0 t3 = np.sqrt(3 / 5) w1 = 5 / 9 w2 = 8 / 9 w3 = 5 / 9 if dt > 0.1: step = 0.1 # step size for numerical integration tmax = max(self.get_tmax(p=p, cutoff=cutoff), 3 * dt) t = np.arange(step, tmax, step) s = np.zeros_like(t) # for interval [0,dt] : s[0] = (step / 2) * ( w1 * self.impulse((step / 2) * t1 + (step / 2), p) + w2 * self.impulse((step / 2) * t2 + (step / 2), p) + w3 * self.impulse((step / 2) * t3 + (step / 2), p) ) # for interval [dt,tmax]: func = self.impulse(t, p) func_half = self.impulse(t[:-1] + step / 2, p) s[1:] = s[0] + np.cumsum( step / 6 * (func[:-1] + 4 * func_half + func[1:]) ) s = s * (p[0] / quad(self.impulse, 0, np.inf, args=p)[0]) return s[int(dt / step - 1) :: int(dt / step)] else: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) s = np.zeros_like(t) # for interval [0,dt] Gaussian quadrate: s[0] = (dt / 2) * ( w1 * self.impulse((dt / 2) * t1 + (dt / 2), p) + w2 * self.impulse((dt / 2) * t2 + (dt / 2), p) + w3 * self.impulse((dt / 2) * t3 + (dt / 2), p) ) # for interval [dt,tmax] Simpson integration: func = self.impulse(t, p) func_half = self.impulse(t[:-1] + dt / 2, p) s[1:] = s[0] + np.cumsum( dt / 6 * (func[:-1] + 4 * func_half + func[1:]) ) s = s * (p[0] / quad(self.impulse, 0, np.inf, args=p)[0]) return s
[docs] def to_dict(self): """Method to export the response function to a dictionary. Returns ------- data: dict dictionary with all necessary information to reconstruct the rfunc object. Notes ----- The exported dictionary should exactly match the input arguments of __init__. """ data = { "class": self._name, "up": self.up, "gain_scale_factor": self.gain_scale_factor, "cutoff": self.cutoff, "quad": self.quad, } return data
[docs]class DoubleExponential(RfuncBase): """Double Exponential response function with 4 parameters A, alpha, a1 and a2. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. Notes ----- The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.DoubleExponential.impulse """ _name = "DoubleExponential"
[docs] def __init__( self, cutoff: float = 0.999, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 4
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, 0, 100 / self.gain_scale_factor, True, name, ) elif self.up is False: parameters.loc[name + "_A"] = ( -1 / self.gain_scale_factor, -100 / self.gain_scale_factor, 0, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) parameters.loc[name + "_alpha"] = (0.1, 0.01, 0.99, True, name) parameters.loc[name + "_a1"] = (10, 0.01, 5000, True, name) parameters.loc[name + "_a2"] = (10, 0.01, 5000, True, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: if cutoff is None: cutoff = self.cutoff if p[2] > p[3]: # a1 > a2 return -p[2] * np.log(1 - cutoff) else: # a1 < a2 return -p[3] * np.log(1 - cutoff)
[docs] def gain(self, p: ArrayLike) -> float: return p[0]
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: A, alpha, a_1, a_2 = p return A * ( (1 - alpha) / a_1 * np.exp(-t / a_1) + alpha / a_2 * np.exp(-t / a_2) )
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) s = p[0] * (1 - ((1 - p[1]) * np.exp(-t / p[2]) + p[1] * np.exp(-t / p[3]))) return s
[docs]class Edelman(RfuncBase): """The function of Edelman, describing the propagation of an instantaneous water level change into an adjacent half-infinite aquifer. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. Notes ----- The Edelman function is explained in :cite:t:`edelman_over_1947`. The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.Edelman.impulse """ _name = "Edelman"
[docs] def __init__( self, cutoff: float = 0.999, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 1
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) beta_init = 1.0 parameters.loc[name + "_beta"] = (beta_init, 0, 1000, True, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: if cutoff is None: cutoff = self.cutoff return 1.0 / (p[0] * erfcinv(cutoff)) ** 2
[docs] @staticmethod def gain(p: ArrayLike) -> float: return 1.0
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: (a,) = p return 1 / (np.sqrt(pi) * a * t**1.5) * np.exp(-1 / (a**2 * t))
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) s = erfc(1 / (p[0] * np.sqrt(t))) return s
[docs]class Kraijenhoff(RfuncBase): """The response function of :cite:t:`van_de_leur_study_1958`. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. n_terms: int, optional Number of terms. Notes ----- The Kraijenhoff van de Leur function is explained in :cite:t:`van_de_leur_study_1958`. The impulse response function for this class can be viewed on the Documentation website or using `latexify` by running the following code in a Jupyter notebook environment:: ps.Kraijenhoff.impulse The function describes the response of a domain between two drainage channels. The function gives the same outcome as equation 133.15 in :cite:t:`bruggeman_analytical_1999`. This is the response that is actually calculated with this function. The response function has three parameters A, a and b: - A is the gain (scaled), - a is the reservoir coefficient (j in :cite:t:`van_de_leur_study_1958`), - b is the location in the domain with the origin in the middle. This means that b=0 is in the middle and b=1/2 is at the drainage channel. At b=1/4 the response function is most similar to the exponential response function. """ _name = "Kraijenhoff"
[docs] def __init__( self, cutoff: float = 0.999, n_terms: int = 10, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.nparam = 3 self.n_terms = n_terms
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, 1e-5, 100 / self.gain_scale_factor, True, name, ) elif self.up is False: parameters.loc[name + "_A"] = ( -1 / self.gain_scale_factor, -100 / self.gain_scale_factor, -1e-5, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) parameters.loc[name + "_a"] = (1e2, 0.01, 1e5, True, name) parameters.loc[name + "_b"] = (0, 0, 0.499999, True, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: if cutoff is None: cutoff = self.cutoff return -p[1] * np.log(1 - cutoff)
[docs] @staticmethod def gain(p: ArrayLike) -> float: return p[0]
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) h = 0 for n in range(self.n_terms): h += ( (-1) ** n / (2 * n + 1) ** 3 * np.cos((2 * n + 1) * pi * p[2]) * np.exp(-((2 * n + 1) ** 2) * t / p[1]) ) s = p[0] * (1 - (8 / (pi**3 * (1 / 4 - p[2] ** 2)) * h)) return s
[docs] @staticmethod @latexfun(identifiers={"impulse": "theta"}) def impulse(t: ArrayLike, p: ArrayLike) -> ArrayLike: A, a, b = p nterms = 10 return ( A * 8 / (pi**3 * ((1 / 4) - b**2)) * sum( (-1) ** n / (a * (2 * n + 1)) * np.cos((2 * n + 1) * pi * b) * np.exp(-((2 * n + 1) ** 2 * t) / a) for n in range(nterms) ) )
[docs] def to_dict(self): """Method to export the response function to a dictionary. Returns ------- data: dict dictionary with all necessary information to reconstruct the rfunc object. Notes ----- The exported dictionary should exactly match the input arguments of __init__. """ data = { "class": self._name, "up": self.up, "gain_scale_factor": self.gain_scale_factor, "cutoff": self.cutoff, "n_terms": self.n_terms, } return data
[docs]class Spline(RfuncBase): """Spline response function with parameters: A and a factor for every t. Parameters ---------- up: bool or None, optional indicates whether a positive stress will cause the head to go up (True, default) or down (False), if None the head can go both ways. gain_scale_factor: float, optional the scale factor is used to set the initial value and the bounds of the gain parameter, computed as 1 / gain_scale_factor. cutoff: float, optional proportion after which the step function is cut off. default is 0.999. this parameter has no influence for this response function. kind: string, optional see scipy.interpolate.interp1d. Most useful for a smooth response function are 'quadratic' and 'cubic'. t: list, optional times at which the response function is defined. Notes ----- The spline response function generates a response function from factors at t = 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 and 1024 days by default. This response function is more data-driven than existing response functions and has no physical background. Therefore, it can primarily be used to compare to other more physical response functions, that probably describe the groundwater system better. """ _name = "Spline"
[docs] def __init__( self, cutoff: float = 0.999, kind: str = "quadratic", t: Optional[list] = None, **kwargs, ) -> None: RfuncBase.__init__(self, cutoff=cutoff, **kwargs) self.kind = kind if t is None: t = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024] self.t = t self.nparam = len(t) + 1
[docs] def get_init_parameters(self, name: str) -> DataFrame: parameters = DataFrame(columns=["initial", "pmin", "pmax", "vary", "name"]) if self.up: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, 1e-5, 100 / self.gain_scale_factor, True, name, ) elif self.up is False: parameters.loc[name + "_A"] = ( -1 / self.gain_scale_factor, -100 / self.gain_scale_factor, -1e-5, True, name, ) else: parameters.loc[name + "_A"] = ( 1 / self.gain_scale_factor, np.nan, np.nan, True, name, ) initial = np.linspace(0.0, 1.0, len(self.t) + 1)[1:] for i in range(len(self.t)): index = name + "_" + str(self.t[i]) vary = True # fix the value of the factor at the last timestep to 1.0 if i == len(self.t) - 1: vary = False parameters.loc[index] = (initial[i], 0.0, 1.0, vary, name) return parameters
[docs] def get_tmax(self, p: ArrayLike, cutoff: Optional[float] = None) -> float: return self.t[-1]
[docs] def gain(self, p: ArrayLike) -> float: return p[0]
[docs] def step( self, p: ArrayLike, dt: float = 1.0, cutoff: Optional[float] = None, maxtmax: Optional[int] = None, ) -> ArrayLike: f = interp1d(self.t, p[1 : len(self.t) + 1], kind=self.kind) t = self.get_t(p=p, dt=dt, cutoff=cutoff, maxtmax=maxtmax) s = p[0] * f(t) return s
[docs] def to_dict(self): """Method to export the response function to a dictionary. Returns ------- data: dict dictionary with all necessary information to reconstruct the rfunc object. Notes ----- The exported dictionary should exactly match the input arguments of __init__. """ data = { "class": self._name, "up": self.up, "gain_scale_factor": self.gain_scale_factor, "cutoff": self.cutoff, "kind": self.kind, "t": self.t, } return data