"""This module contains utility functions for working with Pastas models.
"""
import logging
from datetime import datetime, timedelta
from logging import handlers
import numpy as np
from pandas import Series, to_datetime, Timedelta, Timestamp, date_range
from pandas.tseries.frequencies import to_offset
from scipy import interpolate
logger = logging.getLogger(__name__)
[docs]def frequency_is_supported(freq):
"""Method to determine if a frequency is supported for a Pastas model.
Parameters
----------
freq: str
Returns
-------
freq
String with the simulation frequency
Notes
-----
Possible frequency-offsets are listed in:
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases
The frequency can be a multiple of these offsets, like '7D'. Because of the
use in convolution, only frequencies with an equidistant offset are
allowed. This means monthly ('M'), yearly ('Y') or even weekly ('W')
frequencies are not allowed. Use '7D' for a weekly simulation.
D calendar day frequency
H hourly frequency
T, min minutely frequency
S secondly frequency
L, ms milliseconds
U, us microseconds
N nanoseconds
TODO: Rename to get_frequency_string and change Returns-documentation
"""
offset = to_offset(freq)
if not hasattr(offset, 'delta'):
msg = "Frequency {} not supported.".format(freq)
logger.error(msg)
raise KeyError(msg)
else:
if offset.n == 1:
freq = offset.name
else:
freq = str(offset.n) + offset.name
return freq
def _get_stress_dt(freq):
"""Internal method to obtain a timestep in days from a frequency string.
Parameters
----------
freq: str
Returns
-------
dt: float
Approximate timestep in number of days.
Notes
-----
Used for comparison to determine if a time series needs to be up or
downsampled.
See http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases
for the offset_aliases supported by Pandas.
"""
# Get the frequency string and multiplier
offset = to_offset(freq)
if hasattr(offset, 'delta'):
dt = offset.delta / Timedelta(1, "D")
else:
num = offset.n
freq = offset.name
if freq in ['A', 'Y', 'AS', 'YS', 'BA', 'BY', 'BAS', 'BYS']:
# year
dt = num * 365
elif freq in ['BQ', 'BQS', 'Q', 'QS']:
# quarter
dt = num * 90
elif freq in ['BM', 'BMS', 'CBM', 'CBMS', 'M', 'MS']:
# month
dt = num * 30
elif freq in ['SM', 'SMS']:
# semi-month
dt = num * 15
elif freq in ['W']:
# week
dt = num * 7
elif freq in ['B', 'C']:
# day
dt = num
elif freq in ['BH', 'CBH']:
# hour
dt = num * 1 / 24
else:
raise (ValueError('freq of {} not supported'.format(freq)))
return dt
def _get_dt(freq):
"""Internal method to obtain a timestep in DAYS from a frequency string.
Parameters
----------
freq: str
Returns
-------
dt: float
Number of days
"""
# Get the frequency string and multiplier
dt = to_offset(freq).delta / Timedelta(1, "D")
return dt
def _get_time_offset(t, freq):
"""Internal method to calculate the time offset of a TimeStamp.
Parameters
----------
t: pandas.Timestamp
Timestamp to calculate the offset from the desired freq for.
freq: str
String with the desired frequency.
Returns
-------
offset: pandas.Timedelta
Timedelta with the offset for the timestamp t.
"""
if freq is None:
raise TypeError("frequency is None")
return t - t.floor(freq)
[docs]def get_sample(tindex, ref_tindex):
"""Sample the index so that the frequency is not higher than the frequency
of ref_tindex.
Parameters
----------
tindex: pandas.index
Pandas index object
ref_tindex: pandas.index
Pandas index object
Returns
-------
series: pandas.index
Notes
-----
Find the index closest to the ref_tindex, and then return a selection
of the index.
"""
if len(tindex) == 1:
return tindex
else:
f = interpolate.interp1d(tindex.asi8, np.arange(0, tindex.size),
kind='nearest', bounds_error=False,
fill_value='extrapolate')
ind = np.unique(f(ref_tindex.asi8).astype(int))
return tindex[ind]
[docs]def timestep_weighted_resample(series0, tindex):
"""Resample a timeseries to a new tindex, using an overlapping period
weighted average.
The original series and the new tindex do not have to be equidistant. Also,
the timestep-edges of the new tindex do not have to overlap with the
original series.
It is assumed the series consists of measurements that describe an
intensity at the end of the period for which they apply. Therefore, when
upsampling, the values are uniformly spread over the new timestep (like
bfill).
Compared to the reample methods in Pandas, this method is more accurate for
non-equidistanct series. It is much slower however.
Parameters
----------
series0 : pandas.Series
The original series to be resampled
tindex : pandas.index
The index to which to resample the series
Returns
-------
series : pandas.Series
The resampled series
"""
# determine some arrays for the input-series
t0e = np.array(series0.index)
dt0 = np.diff(t0e)
dt0 = np.hstack((dt0[0], dt0))
t0s = t0e - dt0
v0 = series0.values
# determine some arrays for the output-series
t1e = np.array(tindex)
dt1 = np.diff(t1e)
dt1 = np.hstack((dt1[0], dt1))
t1s = t1e - dt1
v1 = []
for t1si, t1ei in zip(t1s, t1e):
# determine which periods within the series are within the new tindex
mask = (t0e > t1si) & (t0s < t1ei)
if np.any(mask):
# cut by the timestep-edges
ts = t0s[mask]
te = t0e[mask]
ts[ts < t1si] = t1si
te[te > t1ei] = t1ei
# determine timestep
dt = (te - ts).astype(float)
# determine timestep-weighted value
v1.append(np.sum(dt * v0[mask]) / np.sum(dt))
# replace all values in the series
series = Series(v1, index=tindex)
return series
[docs]def timestep_weighted_resample_fast(series0, freq):
"""Resample a time series to a new frequency, using an overlapping period
weighted average.
The original series does not have to be equidistant.
It is assumed the series consists of measurements that describe an
intensity at the end of the period for which they apply. Therefore, when
upsampling, the values are uniformly spread over the new timestep (like
bfill).
Compared to the resample methods in Pandas, this method is more accurate
for non-equidistant series. It is slower than Pandas (but faster then the
original timestep_weighted_resample).
Parameters
----------
series0 : pandas.Series
original series to be resampled
freq : str
a Pandas frequency string
Returns
-------
series : pandas.Series
resampled series
"""
series = series0.copy()
# first mutiply by the timestep in the unit of freq
dt = np.diff(series0.index) / Timedelta(1, freq)
series[1:] = series[1:] * dt
# get a new index
index = date_range(series.index[0].floor(freq), series.index[-1],
freq=freq)
# calculate the cumulative sum
series = series.cumsum()
# add NaNs at none-existing values in series at index
series = series.combine_first(Series(np.NaN, index=index))
# interpolate these NaN's, only keep values at index
series = series.interpolate('time')[index]
# calculate the diff again (inverse of cumsum)
series[1:] = series.diff()[1:]
# drop nan's at the beginning
series = series[series.first_valid_index():]
return series
[docs]def to_daily_unit(series, method=True):
"""Experimental method, use wth caution!
Recalculate a timeseries of a stress with a non-daily unit (e/g.
m3/month) to a daily unit (e.g. m3/day). This method just changes the
values of the timeseries, and does not alter the frequency.
"""
if method is True or method == "divide":
dt = series.index.to_series().diff() / Timedelta(1, 'D')
dt[:-1] = dt[1:]
dt[-1] = np.NaN
if not ((dt == 1.0) | dt.isna()).all():
series = series / dt
return series
[docs]def excel2datetime(tindex, freq="D"):
"""Method to convert excel datetime to pandas timetime objects.
Parameters
----------
tindex: datetime index
can be a datetime object or a pandas datetime index.
freq: str
Returns
-------
datetimes: pandas.datetimeindex
"""
datetimes = to_datetime('1899-12-30') + Timedelta(tindex, freq)
return datetimes
[docs]def datenum_to_datetime(datenum):
"""Convert Matlab datenum into Python datetime.
Parameters
----------
datenum: float
date in datenum format
Returns
-------
datetime :
Datetime object corresponding to datenum.
"""
days = datenum % 1.
return datetime.fromordinal(int(datenum)) \
+ timedelta(days=days) - timedelta(days=366)
[docs]def datetime2matlab(tindex):
mdn = tindex + Timedelta(days=366)
frac = (tindex - tindex.round("D")).seconds / (24.0 * 60.0 * 60.0)
return mdn.toordinal() + frac
[docs]def get_stress_tmin_tmax(ml):
"""Get the minimum and maximum time that all of the stresses have data"""
from .model import Model
tmin = Timestamp.min
tmax = Timestamp.max
if isinstance(ml, Model):
for sm in ml.stressmodels:
for st in ml.stressmodels[sm].stress:
tmin = max((tmin, st.series_original.index.min()))
tmax = min((tmax, st.series_original.index.max()))
else:
raise (TypeError('Unknown type {}'.format(type(ml))))
return tmin, tmax
[docs]def initialize_logger(logger=None, level=logging.INFO):
"""Internal method to create a logger instance to log program output.
Parameters
-------
logger : logging.Logger
A Logger-instance. Use ps.logger to initialise the Logging instance
that handles all logging throughout pastas, including all sub modules
and packages.
"""
if logger is None:
logger = logging.getLogger('pastas')
logger.setLevel(level)
remove_file_handlers(logger)
set_console_handler(logger)
# add_file_handlers(logger)
[docs]def set_console_handler(logger=None, level=logging.INFO,
fmt="%(levelname)s: %(message)s"):
"""Method to add a console handler to the logger of Pastas.
Parameters
-------
logger : logging.Logger
A Logger-instance. Use ps.logger to initialise the Logging instance
that handles all logging throughout pastas, including all sub modules
and packages.
"""
if logger is None:
logger = logging.getLogger('pastas')
remove_console_handler(logger)
ch = logging.StreamHandler()
ch.setLevel(level)
formatter = logging.Formatter(fmt=fmt)
ch.setFormatter(formatter)
logger.addHandler(ch)
[docs]def set_log_level(level):
"""Set the log-level of the console. This method is just a wrapper around
set_console_handler.
Parameters
----------
level: str
String with the level to log messages to the screen for. Options
are: "INFO", "WARNING", and "ERROR".
Examples
--------
>>> import pandas as ps
>>> ps.set_log_level("ERROR")
"""
set_console_handler(level=level)
[docs]def remove_console_handler(logger=None):
"""Method to remove the console handler to the logger of Pastas.
Parameters
-------
logger : logging.Logger
A Logger-instance. Use ps.logger to initialise the Logging instance
that handles all logging throughout pastas, including all sub modules
and packages.
"""
if logger is None:
logger = logging.getLogger('pastas')
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
logger.removeHandler(handler)
[docs]def add_file_handlers(logger=None, filenames=('info.log', 'errors.log'),
levels=(logging.INFO, logging.ERROR), maxBytes=10485760,
backupCount=20, encoding='utf8',
fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%y-%m-%d %H:%M'):
"""Method to add file handlers in the logger of Pastas
Parameters
-------
logger : logging.Logger
A Logger-instance. Use ps.logger to initialise the Logging instance
that handles all logging throughout pastas, including all sub modules
and packages.
"""
if logger is None:
logger = logging.getLogger('pastas')
# create formatter
formatter = logging.Formatter(fmt=fmt, datefmt=datefmt)
# create file handlers, set the level & formatter, and add it to the logger
for filename, level in zip(filenames, levels):
fh = handlers.RotatingFileHandler(filename, maxBytes=maxBytes,
backupCount=backupCount,
encoding=encoding)
fh.setLevel(level)
fh.setFormatter(formatter)
logger.addHandler(fh)
[docs]def remove_file_handlers(logger=None):
"""Method to remove any file handlers in the logger of Pastas.
Parameters
-------
logger : logging.Logger
A Logger-instance. Use ps.logger to initialise the Logging instance
that handles all logging throughout pastas, including all sub modules
and packages.
"""
if logger is None:
logger = logging.getLogger('pastas')
for handler in logger.handlers:
if isinstance(handler, handlers.RotatingFileHandler):
logger.removeHandler(handler)
[docs]def validate_name(name):
"""Method to check user-provided names and log a warning if wrong.
Parameters
----------
name: str
String with the name to check for 'illegal' characters.
Returns
-------
name: str
Unchanged name string
Notes
-----
Forbidden characters are: "/", "\", " ".
"""
name = str(name) # Make sure it is a string
for char in ["\\", "/", " "]:
if char in name:
logger.warning("User-provided name '%s' contains illegal "
"character %s", name, char)
return name
[docs]def show_versions(lmfit=False, numba=False):
"""Method to print the version of dependencies.
Parameters
----------
lmfit: bool, optional
Print the version of lmfit. Needs to be installed.
numba: bool, optional
Print the version of numba. Needs to be installed.
"""
from pastas import __version__ as ps_version
from pandas import __version__ as pd_version
from numpy import __version__ as np_version
from scipy import __version__ as sc_version
from matplotlib import __version__ as mpl_version
from sys import version as os_version
msg = (
f"Python version: {os_version}\n"
f"Numpy version: {np_version}\n"
f"Scipy version: {sc_version}\n"
f"Pandas version: {pd_version}\n"
f"Pastas version: {ps_version}\n"
f"Matplotlib version: {mpl_version}"
)
if lmfit:
from lmfit import __version__ as lm_version
msg = msg + f"\nlmfit version: {lm_version}"
if numba:
from numba import __version__ as nb_version
msg = msg + f"\nnumba version: {nb_version}"
return print(msg)