# coding=utf-8
"""This module contains all the response functions available in Pastas."""
import numpy as np
from pandas import DataFrame
from scipy.integrate import quad
from scipy.special import (erfc, erfcinv, exp1, gammainc, gammaincinv, k0, k1,
lambertw)
from scipy.interpolate import interp1d
__all__ = ["Gamma", "Exponential", "Hantush", "Polder", "FourParam",
"DoubleExponential", "One", "Edelman", "HantushWellModel",
"Kraijenhoff", "Spline"]
[docs]class RfuncBase:
_name = "RfuncBase"
[docs] def __init__(self, up, meanstress, cutoff):
self.up = up
# Completely arbitrary number to prevent division by zero
if 1e-8 > meanstress > 0:
meanstress = 1e-8
elif meanstress < 0 and up is True:
meanstress = meanstress * -1
self.meanstress = meanstress
self.cutoff = cutoff
[docs] def get_init_parameters(self, name):
"""Get initial parameters and bounds. It is called by the stressmodel.
Parameters
----------
name : str
Name of the stressmodel
Returns
-------
parameters : pandas DataFrame
The initial parameters and parameter bounds used by the solver
"""
pass
[docs] def get_tmax(self, p, cutoff=None):
"""Method to get the response time for a certain cutoff.
Parameters
----------
p: array_like
array_like object with the values as floats representing the
model parameters.
cutoff: float, optional
float between 0 and 1.
Returns
-------
tmax: float
Number of days when 99.9% of the response has effectuated, when the
cutoff is chosen at 0.999.
"""
pass
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
"""Method to return the step function.
Parameters
----------
p: array_like
array_like object with the values as floats representing the
model parameters.
dt: float
timestep as a multiple of of day.
cutoff: float, optional
float between 0 and 1.
maxtmax: int, optional
Maximum timestep to compute the block response for.
Returns
-------
s: numpy.array
Array with the step response.
"""
pass
[docs] def block(self, p, dt=1, cutoff=None, maxtmax=None):
"""Method to return the block function.
Parameters
----------
p: array_like
array_like object with the values as floats representing the
model parameters.
dt: float
timestep as a multiple of of day.
cutoff: float, optional
float between 0 and 1.
maxtmax: int, optional
Maximum timestep to compute the block response for.
Returns
-------
s: numpy.array
Array with the block response.
"""
s = self.step(p, dt, cutoff, maxtmax)
return np.append(s[0], np.subtract(s[1:], s[:-1]))
[docs] def get_t(self, p, dt, cutoff, maxtmax=None):
"""Internal method to determine the times at which to evaluate the
step-response, from t=0.
Parameters
----------
p: array_like
array_like object with the values as floats representing the
model parameters.
dt: float
timestep as a multiple of of day.
cutoff: float
float between 0 and 1, that determines which part of the step-
response is taken into account.
maxtmax: float, optional
The maximum time of the response, usually set to the simulation
length.
Returns
-------
t: numpy.array
Array with the times
"""
if isinstance(dt, np.ndarray):
return dt
else:
tmax = self.get_tmax(p, cutoff)
if maxtmax is not None:
tmax = min(tmax, maxtmax)
tmax = max(tmax, 3 * dt)
return np.arange(dt, tmax, dt)
[docs]class Gamma(RfuncBase):
"""Gamma response function with 3 parameters A, a, and n.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
Notes
-----
The impulse response function may be written as:
.. math:: \\theta(t) = At^{n-1} e^{-t/a}
"""
_name = "Gamma"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 3
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_A'] = (1 / self.meanstress, 1e-5,
100 / self.meanstress, True, name)
elif self.up is False:
parameters.loc[name + '_A'] = (-1 / self.meanstress,
-100 / self.meanstress,
-1e-5, True, name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress,
np.nan, np.nan, True, name)
# if n is too small, the length of response function is close to zero
parameters.loc[name + '_n'] = (1, 0.1, 100, True, name)
parameters.loc[name + '_a'] = (10, 0.01, 1e4, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
if cutoff is None:
cutoff = self.cutoff
return gammaincinv(p[1], cutoff) * p[2]
[docs] def gain(self, p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
t = self.get_t(p, dt, cutoff, maxtmax)
s = p[0] * gammainc(p[1], t / p[2])
return s
[docs]class Exponential(RfuncBase):
"""Exponential response function with 2 parameters: A and a.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
Notes
-----
The impulse response function may be written as:
.. math:: \\theta(t) = A e^{-t/a}
"""
_name = "Exponential"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 2
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_A'] = (1 / self.meanstress, 1e-5,
100 / self.meanstress, True, name)
elif self.up is False:
parameters.loc[name + '_A'] = (-1 / self.meanstress,
-100 / self.meanstress,
-1e-5, True, name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress,
np.nan, np.nan, True, name)
parameters.loc[name + '_a'] = (10, 0.01, 1000, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
if cutoff is None:
cutoff = self.cutoff
return -p[1] * np.log(1 - cutoff)
[docs] def gain(self, p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
t = self.get_t(p, dt, cutoff, maxtmax)
s = p[0] * (1.0 - np.exp(-t / p[1]))
return s
[docs]class HantushWellModel(RfuncBase):
"""A special implementation of the Hantush well function for multiple
wells.
Parameters
----------
up: bool, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False)
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. Default is 0.999.
Notes
-----
The Hantush well function is explained in [hantush_1955]_,
[veling_2010]_ and [asmuth_2008]_. The impulse response function may be
written as:
.. math:: \\theta(t) = \\frac{A}{t} K_0 \\left( \\frac{r^2}{4 \\lambda^2} \\right) \\exp(-t/a - ab/t)
.. math:: p[0] = A = \\frac{1}{4 \\pi T}
.. math:: p[1] = a = cS
.. math:: p[2] = b = 1^2 / (4 \\lambda^2)
.. math:: p[3] = r \\text{(not optimized)}
where :math:`\\lambda = \\sqrt{Tc}`
The parameter r (distance from the well to the observation point)
is passed as a known value, and is used to scale the response function.
The optimized parameters are slightly different from the original
Hantush implementation:
- A: in the original Hantush parameter A is the gain. Now the gain is
equal to :math:`\\text{gain} = A K_0 ( \\sqrt(4 r^2 b) )`
- a: is the same :math:`a = cS`
- b: is the same, but :math:`r` is set to 1 if passed separately,
:math:`b = 1^2 / (4 \\lambda^2)`
"""
_name = "HantushWellModel"
[docs] def __init__(self, up=False, meanstress=1, cutoff=0.999, distances=1.0):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 3
self.distances = distances
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
# divide by k0(2) to get same initial value as ps.Hantush
parameters.loc[name + '_A'] = (1 / (self.meanstress * k0(2)),
0, np.nan, True, name)
elif self.up is False:
# divide by k0(2) to get same initial value as ps.Hantush
parameters.loc[name + '_A'] = (-1 / (self.meanstress * k0(2)),
np.nan, 0, True, name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress, np.nan,
np.nan, True, name)
parameters.loc[name + '_a'] = (100, 1e-3, 1e4, True, name)
# set initial and bounds for b taking into account distances
binit = 1.0 / np.mean(self.distances) ** 2
bmin = 1e-4 / np.max(self.distances) ** 2
bmax = 25. / np.min(self.distances) ** 2
parameters.loc[name + '_b'] = (binit, bmin, bmax, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
r = 1.0 if len(p) == 3 else p[3]
# approximate formula for tmax
if cutoff is None:
cutoff = self.cutoff
cS = p[1]
rho = np.sqrt(4 * r ** 2 * p[2])
k0rho = k0(rho)
if k0rho == 0.0:
return 100 * 365. # 100 years
else:
return lambertw(1 / ((1 - cutoff) * k0rho)).real * cS
[docs] @staticmethod
def gain(p):
r = 1.0 if len(p) == 3 else p[3]
rho = np.sqrt(4 * r ** 2 * p[2])
return p[0] * k0(rho)
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
r = 1.0 if len(p) == 3 else p[3]
cS = p[1]
rho = np.sqrt(4 * r ** 2 * p[2])
k0rho = k0(rho)
t = self.get_t(p, dt, cutoff, maxtmax)
tau = t / cS
tau1 = tau[tau < rho / 2]
tau2 = tau[tau >= rho / 2]
w = (exp1(rho) - k0rho) / (exp1(rho) - exp1(rho / 2))
F = np.zeros_like(tau)
F[tau < rho / 2] = w * exp1(rho ** 2 / (4 * tau1)) - (w - 1) * exp1(
tau1 + rho ** 2 / (4 * tau1))
F[tau >= rho / 2] = 2 * k0rho - w * exp1(tau2) + (w - 1) * exp1(
tau2 + rho ** 2 / (4 * tau2))
return p[0] * F / 2
[docs] @staticmethod
def variance_gain(A, b, var_A, var_b, cov_Ab, r=1.0):
"""Calculate variance of the gain from parameters A and b.
Variance of the gain is calculated based on propagation of
uncertainty using optimal values, the variances of A and b
and the covariance between A and b.
Parameters
----------
A : float
optimal value of parameter A, (e.g. ml.parameters.optimal)
b : float
optimal value of parameter b, (e.g. ml.parameters.optimal)
var_A : float
variance of parameter A, can be obtained from the diagonal of
the covariance matrix (e.g. ml.fit.pcov)
var_b : float
variance of parameter A, can be obtained from the diagonal of
the covariance matrix (e.g. ml.fit.pcov)
cov_Ab : float
covariance between A and b, can be obtained from the covariance
matrix (e.g. ml.fit.pcov)
r : float or np.array, optional
distance(s) between observation well and stress(es),
default value is 1.0
Returns
-------
var_gain : float or np.array
variance of the gain calculated based on propagation of
uncertainty of parameters A and b.
"""
var_gain = (
(k0(2 * np.sqrt(r ** 2 * b))) ** 2 * var_A +
(-A * r * k1(2 * np.sqrt(r ** 2 * b)) / np.sqrt(
b)) ** 2 * var_b -
2 * A * r * k0(2 * np.sqrt(r ** 2 * b)) *
k1(2 * np.sqrt(r ** 2 * b)) / np.sqrt(b) * cov_Ab
)
return var_gain
[docs]class Hantush(RfuncBase):
"""The Hantush well function, using the standard A, a, b parameters.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
Notes
-----
The Hantush well function is explained in [hantush_1955]_, [veling_2010]_
and [asmuth_2008]_. The impulse response function may be written as:
.. math:: \\theta(t) = \\frac{A}{t} \\exp(-t/a - ab/t)
.. math:: p[0] = A = \\frac{1}{2 \\pi T}
.. math:: p[1] = a = cS
.. math:: p[2] = b = r^2 / (4 \\lambda^2)
where :math:`\\lambda = \\sqrt{Tc}`
References
----------
.. [hantush_1955] Hantush, M. S., & Jacob, C. E. (1955). Non‐steady
radial flow in an infinite leaky aquifer. Eos, Transactions American
Geophysical Union, 36(1), 95-100.
.. [veling_2010] Veling, E. J. M., & Maas, C. (2010). Hantush well function
revisited. Journal of hydrology, 393(3), 381-388.
.. [asmuth_2008] Von Asmuth, J. R., Maas, K., Bakker, M., & Petersen,
J. (2008). Modeling time series of ground water head fluctuations
subjected to multiple stresses. Ground Water, 46(1), 30-40.
"""
_name = "Hantush"
[docs] def __init__(self, up=False, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 3
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_A'] = (1 / self.meanstress,
0, np.nan, True, name)
elif self.up is False:
parameters.loc[name + '_A'] = (-1 / self.meanstress,
np.nan, 0, True, name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress,
np.nan, np.nan, True, name)
parameters.loc[name + '_a'] = (100, 1e-3, 1e4, True, name)
parameters.loc[name + '_b'] = (1, 1e-6, 25, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
# approximate formula for tmax
if cutoff is None:
cutoff = self.cutoff
cS = p[1]
rho = np.sqrt(4 * p[2])
k0rho = k0(rho)
return lambertw(1 / ((1 - cutoff) * k0rho)).real * cS
[docs] @staticmethod
def gain(p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
cS = p[1]
rho = np.sqrt(4 * p[2])
k0rho = k0(rho)
t = self.get_t(p, dt, cutoff, maxtmax)
tau = t / cS
tau1 = tau[tau < rho / 2]
tau2 = tau[tau >= rho / 2]
w = (exp1(rho) - k0rho) / (exp1(rho) - exp1(rho / 2))
F = np.zeros_like(tau)
F[tau < rho / 2] = w * exp1(rho ** 2 / (4 * tau1)) - (w - 1) * exp1(
tau1 + rho ** 2 / (4 * tau1))
F[tau >= rho / 2] = 2 * k0rho - w * exp1(tau2) + (w - 1) * exp1(
tau2 + rho ** 2 / (4 * tau2))
return p[0] * F / (2 * k0rho)
[docs]class Polder(RfuncBase):
"""The Polder function, using the standard A, a, b parameters.
Notes
-----
The Polder function is explained in [polder]_. The impulse response
function may be written as:
.. math:: \\theta(t) = \\exp(-\\sqrt(4b)) \\frac{A}{t^{-3/2}}
\\exp(-t/a -b/t)
.. math:: p[0] = A = \\exp(-x/\\lambda)
.. math:: p[1] = a = \\sqrt{\\frac{1}{cS}}
.. math:: p[2] = b = x^2 / (4 \\lambda^2)
where :math:`\\lambda = \\sqrt{kDc}`
References
----------
.. [polder] G.A. Bruggeman (1999). Analytical solutions of
geohydrological problems. Elsevier Science. Amsterdam, Eq. 123.32
"""
_name = "Polder"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 3
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
parameters.loc[name + '_A'] = (1, 0, 2, True, name)
parameters.loc[name + '_a'] = (10, 0.01, 1000, True, name)
parameters.loc[name + '_b'] = (1, 1e-6, 25, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
if cutoff is None:
cutoff = self.cutoff
_, a, b = p
b = a * b
x = np.sqrt(b / a)
inverfc = erfcinv(2 * cutoff)
y = (-inverfc + np.sqrt(inverfc ** 2 + 4 * x)) / 2
tmax = a * y ** 2
return tmax
[docs] def gain(self, p):
# the steady state solution of Mazure
g = p[0] * np.exp(-np.sqrt(4 * p[2]))
if not self.up:
g = -g
return g
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
t = self.get_t(p, dt, cutoff, maxtmax)
A, a, b = p
s = A * self.polder_function(np.sqrt(b), np.sqrt(t / a))
# / np.exp(-2 * np.sqrt(b))
if not self.up:
s = -s
return s
[docs] @staticmethod
def polder_function(x, y):
s = 0.5 * np.exp(2 * x) * erfc(x / y + y) + \
0.5 * np.exp(-2 * x) * erfc(x / y - y)
return s
[docs]class One(RfuncBase):
"""Instant response with no lag and one parameter d.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True) or down (False), if None (default) the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
"""
_name = "One"
[docs] def __init__(self, up=None, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 1
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_d'] = (
self.meanstress, 0, np.nan, True, name)
elif self.up is False:
parameters.loc[name + '_d'] = (
-self.meanstress, np.nan, 0, True, name)
else:
parameters.loc[name + '_d'] = (
self.meanstress, np.nan, np.nan, True, name)
return parameters
[docs] def gain(self, p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
if isinstance(dt, np.ndarray):
return p[0] * np.ones(len(dt))
else:
return p[0] * np.ones(1)
[docs] def block(self, p, dt=1, cutoff=None, maxtmax=None):
return p[0] * np.ones(1)
[docs]class FourParam(RfuncBase):
"""Four Parameter response function with 4 parameters A, a, b, and n.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
Notes
-----
The impulse response function may be written as:
.. math:: \\theta(t) = At^{n-1} e^{-t/a -ab/t}
If Fourparam.quad is set to True, this response function uses np.quad to
integrate the Four Parameter response function, which requires more
calculation time.
"""
_name = "FourParam"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 4
self.quad = False
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_A'] = (1 / self.meanstress, 0,
100 / self.meanstress, True, name)
elif self.up is False:
parameters.loc[name + '_A'] = (-1 / self.meanstress,
-100 / self.meanstress, 0, True,
name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress,
np.nan, np.nan, True, name)
parameters.loc[name + '_n'] = (1, -10, 10, True, name)
parameters.loc[name + '_a'] = (10, 0.01, 5000, True, name)
parameters.loc[name + '_b'] = (10, 1e-6, 25, True, name)
return parameters
[docs] @staticmethod
def function(t, p):
return (t ** (p[1] - 1)) * np.exp(-t / p[2] - p[2] * p[3] / t)
[docs] def get_tmax(self, p, cutoff=None):
if cutoff is None:
cutoff = self.cutoff
if self.quad:
x = np.arange(1, 10000, 1)
y = np.zeros_like(x)
func = self.function(x, p)
func_half = self.function(x[:-1] + 1 / 2, p)
y[1:] = y[0] + np.cumsum(1 / 6 *
(func[:-1] + 4 * func_half + func[1:]))
y = y / quad(self.function, 0, np.inf, args=p)[0]
return np.searchsorted(y, cutoff)
else:
t1 = -np.sqrt(3 / 5)
t2 = 0
t3 = np.sqrt(3 / 5)
w1 = 5 / 9
w2 = 8 / 9
w3 = 5 / 9
x = np.arange(1, 10000, 1)
y = np.zeros_like(x)
func = self.function(x, p)
func_half = self.function(x[:-1] + 1 / 2, p)
y[0] = 0.5 * (w1 * self.function(0.5 * t1 + 0.5, p) +
w2 * self.function(0.5 * t2 + 0.5, p) +
w3 * self.function(0.5 * t3 + 0.5, p))
y[1:] = y[0] + np.cumsum(1 / 6 *
(func[:-1] + 4 * func_half + func[1:]))
y = y / quad(self.function, 0, np.inf, args=p)[0]
return np.searchsorted(y, cutoff)
[docs] @staticmethod
def gain(p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
if self.quad:
t = self.get_t(p, dt, cutoff, maxtmax)
s = np.zeros_like(t)
s[0] = quad(self.function, 0, dt, args=p)[0]
for i in range(1, len(t)):
s[i] = s[i - 1] + quad(self.function, t[i - 1], t[i], args=p)[
0]
s = s * (p[0] / (quad(self.function, 0, np.inf, args=p))[0])
return s
else:
t1 = -np.sqrt(3 / 5)
t2 = 0
t3 = np.sqrt(3 / 5)
w1 = 5 / 9
w2 = 8 / 9
w3 = 5 / 9
if dt > 0.1:
step = 0.1 # step size for numerical integration
tmax = max(self.get_tmax(p, cutoff), 3 * dt)
t = np.arange(step, tmax, step)
s = np.zeros_like(t)
# for interval [0,dt] :
s[0] = (step / 2) * \
(w1 * self.function((step / 2) * t1 + (step / 2), p) +
w2 * self.function((step / 2) * t2 + (step / 2), p) +
w3 * self.function((step / 2) * t3 + (step / 2), p))
# for interval [dt,tmax]:
func = self.function(t, p)
func_half = self.function(t[:-1] + step / 2, p)
s[1:] = s[0] + np.cumsum(
step / 6 * (func[:-1] + 4 * func_half + func[1:]))
s = s * (p[0] / quad(self.function, 0, np.inf, args=p)[0])
return s[int(dt / step - 1)::int(dt / step)]
else:
t = self.get_t(p, dt, cutoff, maxtmax)
s = np.zeros_like(t)
# for interval [0,dt] Gaussian quadrate:
s[0] = (dt / 2) * \
(w1 * self.function((dt / 2) * t1 + (dt / 2), p) +
w2 * self.function((dt / 2) * t2 + (dt / 2), p) +
w3 * self.function((dt / 2) * t3 + (dt / 2), p))
# for interval [dt,tmax] Simpson integration:
func = self.function(t, p)
func_half = self.function(t[:-1] + dt / 2, p)
s[1:] = s[0] + np.cumsum(
dt / 6 * (func[:-1] + 4 * func_half + func[1:]))
s = s * (p[0] / quad(self.function, 0, np.inf, args=p)[0])
return s
[docs]class DoubleExponential(RfuncBase):
"""Double Exponential response function with 4 parameters A, alpha, a1 and
a2.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
Notes
-----
The impulse response function may be written as:
.. math:: \\theta(t) = A (1 - \\alpha) e^{-t/a_1} + A \\alpha e^{-t/a_2}
"""
_name = "DoubleExponential"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 4
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_A'] = (1 / self.meanstress, 0,
100 / self.meanstress, True, name)
elif self.up is False:
parameters.loc[name + '_A'] = (-1 / self.meanstress,
-100 / self.meanstress, 0, True,
name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress,
np.nan, np.nan, True, name)
parameters.loc[name + '_alpha'] = (0.1, 0.01, 0.99, True, name)
parameters.loc[name + '_a1'] = (10, 0.01, 5000, True, name)
parameters.loc[name + '_a2'] = (10, 0.01, 5000, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
if cutoff is None:
cutoff = self.cutoff
if p[2] > p[3]: # a1 > a2
return -p[2] * np.log(1 - cutoff)
else: # a1 < a2
return -p[3] * np.log(1 - cutoff)
[docs] def gain(self, p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
t = self.get_t(p, dt, cutoff, maxtmax)
s = p[0] * (1 - ((1 - p[1]) * np.exp(-t / p[2]) +
p[1] * np.exp(-t / p[3])))
return s
[docs]class Edelman(RfuncBase):
"""The function of Edelman, describing the propagation of an instantaneous
water level change into an adjacent half-infinite aquifer.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
Notes
-----
The Edelman function is explained in [5]_. The impulse response function
may be written as:
.. math:: \\text{unknown}
It's parameters are:
.. math:: p[0] = \\beta = \\frac{\\sqrt{\\frac{4kD}{S}}}{x}
References
----------
.. [5] http://grondwaterformules.nl/index.php/formules/waterloop/peilverandering
"""
_name = "Edelman"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 1
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
beta_init = 1.0
parameters.loc[name + '_beta'] = (beta_init, 0, 1000, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
if cutoff is None:
cutoff = self.cutoff
return 1. / (p[0] * erfcinv(cutoff * erfc(0))) ** 2
[docs] @staticmethod
def gain(p):
return 1.
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
t = self.get_t(p, dt, cutoff, maxtmax)
s = erfc(1 / (p[0] * np.sqrt(t)))
return s
[docs]class Kraijenhoff(RfuncBase):
"""The response function of Kraijenhoff van de Leur (and Bruggeman 133.15)
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
Notes
-----
The Kraijenhoff van de Leur function is explained in [Kraijenhoff]_.
The impulse response function may be written as:
.. math:: \\theta(t) = \\frac{4}{\pi S} \sum_{n=1,3,5...}^\infty \\frac{1}{n} e^{-n^2\\frac{t}{j}} \sin (\\frac{n\pi x}{L})
The function describes the response of a domain between two drainage
channels. The function gives the same outcome as Bruggeman equation 133.15.
Bruggeman 133.15 is the response that is actually calculated with this
function. [Bruggeman]_
The response function has three parameters: A, a and b.
A is the gain (scaled),
a is the reservoir coefficient (j in [Kraijenhoff]_),
b is the location in the domain with the origin in the middle. This means
that b=0 is in the middle and b=1/2 is at the drainage channel. At b=1/4
the response function is most similar to the exponential response function.
References
----------
.. [Kraijenhoff] Kraijenhoff van de Leur, D. A. (1958). A study of
non-steady groundwater flow with special reference to a reservoir
coefficient. De Ingenieur, 70(19), B87-B94. https://edepot.wur.nl/422032
.. [Bruggeman] G.A. Bruggeman (1999). Analytical solutions of
geohydrological problems. Elsevier Science. Amsterdam, Eq. 133.15
"""
_name = "Kraijenhoff"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999, n_terms=10):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.nparam = 3
self.n_terms = n_terms
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_A'] = (1 / self.meanstress, 1e-5,
100 / self.meanstress, True, name)
elif self.up is False:
parameters.loc[name + '_A'] = (-1 / self.meanstress,
-100 / self.meanstress,
-1e-5, True, name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress,
np.nan, np.nan, True, name)
parameters.loc[name + '_a'] = (1e2, 0.01, 1e5, True, name)
parameters.loc[name + '_b'] = (0, 0, 0.499999, True, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
if cutoff is None:
cutoff = self.cutoff
return - p[1] * np.log(1 - cutoff)
[docs] @staticmethod
def gain(p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
t = self.get_t(p, dt, cutoff, maxtmax)
h = 0
for n in range(self.n_terms):
h += (-1) ** n / (2 * n + 1) ** 3 * \
np.cos((2 * n + 1) * np.pi * p[2]) * \
np.exp(-(2 * n + 1) ** 2 * t / p[1])
s = p[0] * (1 - (8 / (np.pi ** 3 * (1 / 4 - p[2] ** 2)) * h))
return s
[docs]class Spline(RfuncBase):
"""Spline response function with parameters: A and a factor for every t.
Parameters
----------
up: bool or None, optional
indicates whether a positive stress will cause the head to go up
(True, default) or down (False), if None the head can go both ways.
meanstress: float
mean value of the stress, used to set the initial value such that
the final step times the mean stress equals 1
cutoff: float
proportion after which the step function is cut off. default is 0.999.
this parameter is ignored by Points
t: list
times at which the response function is defined
kind: string
see scipy.interpolate.interp1d. Most useful for a smooth response
function are ‘quadratic’ and ‘cubic’.
Notes
-----
The spline response function generates a response function from factors at
t = 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 and 1024 days by default. This
response function is more data-driven than existing response functions and
has no physical background. Therefore it can primarily be used to compare
to other more physical response functions, that probably describe the
groundwater system better.
"""
_name = "Spline"
[docs] def __init__(self, up=True, meanstress=1, cutoff=0.999, kind='quadratic',
t=[1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]):
RfuncBase.__init__(self, up, meanstress, cutoff)
self.kind = kind
self.t = t
self.nparam = len(t) + 1
[docs] def get_init_parameters(self, name):
parameters = DataFrame(
columns=['initial', 'pmin', 'pmax', 'vary', 'name'])
if self.up:
parameters.loc[name + '_A'] = (1 / self.meanstress, 1e-5,
100 / self.meanstress, True, name)
elif self.up is False:
parameters.loc[name + '_A'] = (-1 / self.meanstress,
-100 / self.meanstress,
-1e-5, True, name)
else:
parameters.loc[name + '_A'] = (1 / self.meanstress,
np.nan, np.nan, True, name)
initial = np.linspace(0.0, 1.0, len(self.t) + 1)[1:]
for i in range(len(self.t)):
index = name + '_' + str(self.t[i])
vary = True
# fix the value of the factor at the last timestep to 1.0
if i == len(self.t) - 1:
vary = False
parameters.loc[index] = (initial[i], 0.0, 1.0, vary, name)
return parameters
[docs] def get_tmax(self, p, cutoff=None):
return self.t[-1]
[docs] def gain(self, p):
return p[0]
[docs] def step(self, p, dt=1, cutoff=None, maxtmax=None):
f = interp1d(self.t, p[1:len(self.t) + 1], kind=self.kind)
t = self.get_t(p, dt, cutoff, maxtmax)
s = p[0] * f(t)
return s