"""Import model."""
from importlib import import_module
from logging import getLogger
from os import path
from packaging import version
import pastas as ps
# Type Hinting
from pastas.typing import Model
logger = getLogger(__name__)
[docs]def load(fname: str, **kwargs) -> Model:
"""Method to load a Pastas Model from file.
Parameters
----------
fname: str
string with the name of the file to be imported including the file extension.
kwargs:
extension specific keyword arguments
Returns
-------
ml: pastas.model.Model
Pastas Model instance.
Examples
--------
>>> import pastas as ps
>>> ml = ps.io.load("model.pas")
"""
if not path.exists(fname):
msg = "File not found: %s"
logger.error(msg, fname)
raise FileNotFoundError(msg % fname)
# Dynamic import of the export module
load_mod = import_module(f"pastas.io{path.splitext(fname)[1]}")
# Get dicts for all data sources
data = load_mod.load(fname, **kwargs)
file_version = data["file_info"]["pastas_version"]
# A single catch for old pas-files, no longer supported
if version.parse(file_version) < version.parse("0.23.0"):
msg = (
"This file was created with a Pastas version prior to 0.23 "
"and cannot be loaded with Pastas >= 1.0. Please load and "
"save the file with Pastas 0.23 first to update the file "
"format."
)
logger.error(msg)
raise ValueError(msg)
ml = _load_model(data)
logger.info(
"Pastas Model from file %s successfully loaded. This file was created with "
"Pastas %s. Your current version of Pastas is: %s",
fname,
file_version,
ps.__version__,
)
return ml
def _load_model(data: dict) -> Model:
"""Internal method to create a model from a dictionary."""
# Create model
oseries = data["oseries"]["series"]
metadata = data["oseries"]["metadata"]
if "constant" in data.keys():
constant = data["constant"]
else:
constant = False
if "name" in data.keys():
name = data["name"]
else:
name = None
ml = ps.Model(
oseries=oseries,
constant=constant,
name=name,
metadata=metadata,
)
if "settings" in data.keys():
if "noise" in data["settings"]:
if not data["settings"]["noise"] and "noisemodel" in data:
# file is saved before pastas 1.5, and solved with ml.solve(noise=False)
# remove noisemodel from data
data.pop("noisemodel")
ml.settings.update(data["settings"])
if "file_info" in data.keys():
ml.file_info.update(data["file_info"])
# Add stressmodels
for name, smdata in data["stressmodels"].items():
sm = _load_stressmodel(smdata, data)
ml.add_stressmodel(sm)
# Add transform
if "transform" in data.keys():
transform = getattr(ps.transform, data["transform"].pop("class"))
transform = transform(**data["transform"])
ml.add_transform(transform)
# Add noisemodel if present
if "noisemodel" in data.keys():
# fixes to read pas-files from before pastas version 1.5
# TODO: uncomment in pastas 2.0.0
# if data["noisemodel"]["class"] == "NoiseModel":
# data["noisemodel"]["class"] = "ArNoiseModel"
# if data["noisemodel"]["class"] == "ArmaModel":
# data["noisemodel"]["class"] = "ArmaNoiseModel"
n = getattr(ps.noisemodels, data["noisemodel"].pop("class"))()
ml.add_noisemodel(n)
# Add solver object to the model from pas-files < 1.3.0 TODO Deprecate
if "fit" in data.keys():
logger.warning(
"The solver object is stored in the model.solver attribute since Pastas "
"1.3. Please update your pas-file to the new format by loading and saving "
"the file with Pastas 1.3."
)
solver = getattr(ps.solver, data["fit"].pop("class"))
ml.solver = solver(**data["fit"])
ml.solver.set_model(ml)
# Add solver object to the model
if "solver" in data.keys():
solver = getattr(ps.solver, data["solver"].pop("class"))
ml.solver = solver(**data["solver"])
ml.solver.set_model(ml)
# Add parameters, use update to maintain correct order
ml.parameters = ml.get_init_parameters(noise=ml.settings["noise"])
ml.parameters.update(data["parameters"])
# Convert parameters to numeric
ml.parameters = ml.parameters.infer_objects()
# When parameter initial values and bounds changed
for pname, pdata in ml.parameters.iterrows():
ml.set_parameter(
name=pname,
initial=pdata.at["initial"],
vary=pdata.at["vary"],
pmin=pdata.at["pmin"],
pmax=pdata.at["pmax"],
)
return ml
def _load_stressmodel(ts, data):
# Create and add stress model
stressmodel = getattr(ps.stressmodels, ts.pop("class"))
if "rfunc" in ts.keys():
rfunc_class = ts["rfunc"].pop("class") # Determine response class
rfunc_up = ts["rfunc"].pop("up", None) # get up value
rfunc_gsf = ts["rfunc"].pop("gain_scale_factor", None) # get gain_scale_factor
rfunc = getattr(ps.rfunc, rfunc_class)(**ts["rfunc"])
rfunc.update_rfunc_settings(up=rfunc_up, gain_scale_factor=rfunc_gsf)
ts["rfunc"] = rfunc
if "recharge" in ts.keys():
recharge_class = ts["recharge"].pop("class")
ts["recharge"] = getattr(ps.recharge, recharge_class)(**ts["recharge"])
metadata = []
settings = []
# Unpack the stress time series
if "stress" in ts.keys():
# Only in the case of the wellmodel stresses are a list
if isinstance(ts["stress"], list):
for i, stress in enumerate(ts["stress"]):
series, meta, setting = _unpack_series(stress)
ts["stress"][i] = series
metadata.append(meta)
settings.append(setting)
else:
series, meta, setting = _unpack_series(ts["stress"])
ts["stress"] = series
metadata.append(meta)
settings.append(setting)
if "prec" in ts.keys():
series, meta, setting = _unpack_series(ts["prec"])
ts["prec"] = series
metadata.append(meta)
settings.append(setting)
if "evap" in ts.keys():
series, meta, setting = _unpack_series(ts["evap"])
ts["evap"] = series
metadata.append(meta)
settings.append(setting)
if "temp" in ts.keys() and ts["temp"] is not None:
series, meta, setting = _unpack_series(ts["temp"])
ts["temp"] = series
metadata.append(meta)
settings.append(setting)
if metadata:
ts["metadata"] = metadata if len(metadata) > 1 else metadata[0]
if settings:
ts["settings"] = settings if len(settings) > 1 else settings[0]
sm = stressmodel(**ts)
return sm
def _unpack_series(data: dict):
"""
Parameters
----------
data: dict
Dictionary defining the TimeSeries
Returns
-------
series, metadata, settings: dict
"""
series = data["series"]
metadata = data["metadata"]
settings = data["settings"]
return series, metadata, settings
def dump(fname: str, data: dict, **kwargs):
"""Method to save a pastas-model to a file.
Parameters
----------
fname: str
string with the name of the file, including a supported file-extension.
Currently supported extension are: .pas.
data: dict
dictionary with the information to store.
kwargs:
extension specific keyword arguments can be provided using kwargs.
Returns
-------
message:
Message if the file-saving was successful.
Notes
-----
The specific dump-module is automatically chosen based on the provided file
extension.
"""
ext = path.splitext(fname)[1]
dump_mod = import_module("pastas.io" + ext)
return dump_mod.dump(fname, data, **kwargs)