Source code for pastas.io.base

"""Import model."""

from importlib import import_module
from logging import getLogger
from os import path
from packaging import version
from numpy import log

import pastas as ps
from pandas import to_numeric

logger = getLogger(__name__)


[docs]def load(fname, **kwargs): """Method to load a Pastas Model from file. Parameters ---------- fname: str string with the name of the file to be imported including the file extension. kwargs: extension specific keyword arguments Returns ------- ml: pastas.model.Model Pastas Model instance. Examples -------- >>> import pastas as ps >>> ml = ps.io.load("model.pas") """ if not path.exists(fname): logger.error("File not found: %s", fname) # Dynamic import of the export module load_mod = import_module(f"pastas.io{path.splitext(fname)[1]}") # Get dicts for all data sources data = load_mod.load(fname, **kwargs) ml = _load_model(data) logger.info("Pastas Model from file %s successfully loaded. This file " "was created with Pastas %s. Your current version of Pastas " "is: %s", fname, data["file_info"]["pastas_version"], ps.__version__) return ml
def _load_model(data): """Internal method to create a model from a dictionary.""" # Create model oseries = ps.TimeSeries(**data["oseries"]) if "constant" in data.keys(): constant = data["constant"] else: constant = False if "metadata" in data.keys(): metadata = data["metadata"] else: metadata = None if "name" in data.keys(): name = data["name"] else: name = None if "noisemodel" in data.keys(): noise = True else: noise = False ml = ps.Model(oseries, constant=constant, noisemodel=noise, name=name, metadata=metadata) if "settings" in data.keys(): ml.settings.update(data["settings"]) if "file_info" in data.keys(): ml.file_info.update(data["file_info"]) # Add stressmodels for name, ts in data["stressmodels"].items(): # Deal with old StressModel2 files for version 0.22.0. Remove in 0.23.0. if ts["stressmodel"] == "StressModel2": logger.warning("StressModel2 is removed since Pastas 0.22.0 and " "is replaced by the RechargeModel using a Linear " "recharge model. Make sure to save this file " "again using Pastas version 0.22.0 as this file " "cannot be loaded in newer Pastas versions. This " "will automatically update your model to the newer " "RechargeModel stress model.") ts["stressmodel"] = "RechargeModel" ts["recharge"] = "Linear" ts["prec"] = ts["stress"][0] ts["evap"] = ts["stress"][1] ts.pop("stress") ts.pop("up") # Deal with old parameter value b in HantushWellModel: b_new = np.log(b_old) if ((ts["stressmodel"] == "WellModel") and (version.parse(data["file_info"]["pastas_version"]) < version.parse("0.22.0"))): logger.warning("The value of parameter 'b' in HantushWellModel" "was modified in 0.22.0: b_new = log(b_old). The value of " "'b' is automatically updated on load.") wnam = ts["name"] for pcol in ["initial", "optimal", "pmin", "pmax"]: if wnam + "_b" in data["parameters"].index: if data["parameters"].loc[wnam + "_b", pcol] > 0: data["parameters"].loc[wnam + "_b", pcol] = \ log(data["parameters"].loc[wnam + "_b", pcol]) stressmodel = getattr(ps.stressmodels, ts["stressmodel"]) ts.pop("stressmodel") if "rfunc" in ts.keys(): rfunc_kwargs = {} if "rfunc_kwargs" in ts: rfunc_kwargs = ts.pop("rfunc_kwargs") ts["rfunc"] = getattr(ps.rfunc, ts["rfunc"])(**rfunc_kwargs) if "recharge" in ts.keys(): recharge_kwargs = {} if 'recharge_kwargs' in ts: recharge_kwargs = ts.pop("recharge_kwargs") ts["recharge"] = getattr( ps.recharge, ts["recharge"])(**recharge_kwargs) if "stress" in ts.keys(): for i, stress in enumerate(ts["stress"]): ts["stress"][i] = ps.TimeSeries(**stress) if "prec" in ts.keys(): ts["prec"] = ps.TimeSeries(**ts["prec"]) if "evap" in ts.keys(): ts["evap"] = ps.TimeSeries(**ts["evap"]) if "temp" in ts.keys() and ts["temp"] is not None: ts["temp"] = ps.TimeSeries(**ts["temp"]) stressmodel = stressmodel(**ts) ml.add_stressmodel(stressmodel) # Add transform if "transform" in data.keys(): transform = getattr(ps.transform, data["transform"]["transform"]) data["transform"].pop("transform") transform = transform(**data["transform"]) ml.add_transform(transform) # Add noisemodel if present if "noisemodel" in data.keys(): n = getattr(ps.noisemodels, data["noisemodel"]["type"])() ml.add_noisemodel(n) # Add fit object to the model if "fit" in data.keys(): fit = getattr(ps.solver, data["fit"]["name"]) data["fit"].pop("name") ml.fit = fit(ml=ml, **data["fit"]) # Add parameters, use update to maintain correct order ml.parameters = ml.get_init_parameters(noise=ml.settings["noise"]) ml.parameters.update(data["parameters"]) ml.parameters = ml.parameters.apply(to_numeric, errors="ignore") # When initial values changed for param, value in ml.parameters.loc[:, "initial"].items(): ml.set_parameter(name=param, initial=value) return ml def dump(fname, data, **kwargs): """Method to save a pastas-model to a file. Parameters ---------- fname: str string with the name of the file, including a supported file-extension. Currently supported extension are: .pas. data: dict dictionary with the information to store. kwargs: extension specific keyword arguments can be provided using kwargs. Returns ------- message: Message if the file-saving was successful. Notes ----- The specific dump-module is automatically chosen based on the provided file extension. """ ext = path.splitext(fname)[1] dump_mod = import_module("pastas.io" + ext) return dump_mod.dump(fname, data, **kwargs)