Source code for imod.wq.wel

import jinja2
import numpy as np

import imod
from imod import util
from imod.wq import timeutil
from imod.wq.pkgbase import BoundaryCondition

[docs]class Well(BoundaryCondition): """ The Well package is used to simulate a specified flux to individual cells and specified in units of length3/time. Parameters ---------- id_name: str or list of str name of the well(s). x: float or list of floats x coordinate of the well(s). y: float or list of floats y coordinate of the well(s). rate: float or list of floats. pumping rate in the well(s). layer: "None" or int, optional layer from which the pumping takes place. time: "None" or listlike of np.datetime64, datetime.datetime, pd.Timestamp, cftime.datetime time during which the pumping takes place. Only need to specify if model is transient. save_budget: bool, optional is a flag indicating if the budget should be saved (IRIVCB). Default is False. """ _pkg_id = "wel" _template = jinja2.Template( " {%- for time, timedict in wels.items() -%}" " {%- for layer, value in timedict.items() %}\n" " wel_p{{time}}_s{{system_index}}_l{{layer}} = {{value}}" " {%- endfor -%}\n" " {%- endfor -%}" ) # TODO: implement well to concentration IDF and use ssm_template # Ignored for now, since wells are nearly always extracting
[docs] def __init__( self, id_name, x, y, rate, layer=None, time=None, concentration=None, save_budget=False, ): super().__init__() variables = { "id_name": id_name, "x": x, "y": y, "rate": rate, "layer": layer, "time": time, "concentration": concentration, } variables = {k: np.atleast_1d(v) for k, v in variables.items() if v is not None} length = max(map(len, variables.values())) index = np.arange(1, length + 1) self["index"] = index for k, v in variables.items(): if v.size == index.size: self[k] = ("index", v) elif v.size == 1: self[k] = ("index", np.full(length, v)) else: raise ValueError(f"Length of {k} does not match other arguments") self["save_budget"] = save_budget
def _max_active_n(self, varname, nlayer, nrow, ncol): """ Determine the maximum active number of cells that are active during a stress period. Parameters ---------- varname : str name of the variable to use to calculate the maximum number of active cells. Not used for well, here for polymorphism. nlayer, nrow, ncol : int """ nmax = np.unique(self["id_name"]).size if "layer" not in self.dataset.coords: # Then it applies to every layer nmax *= nlayer self._cellcount = nmax self._ssm_cellcount = nmax return nmax def _compose_values_layer(self, directory, nlayer, name, time=None, compress=True): values = {} d = {"directory": directory, "name": name, "extension": ".ipf"} if time is None: if "layer" in self.dataset: for layer in np.unique(self.dataset["layer"]): layer = int(layer) d["layer"] = layer values[layer] = self._compose(d) else: values["?"] = self._compose(d) else: d["time"] = time if "layer" in self.dataset: # Since the well data is in long table format, it's the only # input that has to be inspected. select = np.argwhere((self.dataset["time"] == time).values) for layer in np.unique(self.dataset["layer"].values[select]): d["layer"] = layer values[layer] = self._compose(d) else: values["?"] = self._compose(d) if "layer" in self.dataset: # Compose does not accept non-integers, so use 0, then replace d["layer"] = 0 if np.unique(self.dataset["layer"].values).size == nlayer: token_path = imod.util.compose(d).as_posix() token_path = token_path.replace("_l0", "_l$") values = {"$": token_path} elif compress: range_path = imod.util.compose(d).as_posix() range_path = range_path.replace("_l0", "_l:") # TODO: temporarily disable until imod-wq is fixed values = self._compress_idflayers(values, range_path) return values def _compose_values_time(self, directory, name, globaltimes, nlayer): # TODO: rename to _compose_values_timelayer? values = {} if "time" in self.dataset: self_times = np.unique(self.dataset["time"].values) if "stress_repeats" in self.dataset.attrs: stress_repeats_keys = np.array( list(self.dataset.attrs["stress_repeats"].keys()) ) stress_repeats_values = np.array( list(self.dataset.attrs["stress_repeats"].values()) ) package_times, inds = np.unique( np.concatenate([self_times, stress_repeats_keys]), return_index=True ) # Times to write in the runfile runfile_times = np.concatenate([self_times, stress_repeats_values])[ inds ] else: runfile_times = package_times = self_times starts_ends = timeutil.forcing_starts_ends(package_times, globaltimes) for time, start_end in zip(runfile_times, starts_ends): # Check whether any range occurs in the input. # If does does, compress should be False compress = not (":" in start_end) values[start_end] = self._compose_values_layer( directory, nlayer=nlayer, name=name, time=time, compress=compress ) else: # for all periods values["?"] = self._compose_values_layer( directory, nlayer=nlayer, name=name ) return values def _render(self, directory, globaltimes, system_index, nlayer): d = {"system_index": system_index} name = directory.stem d["wels"] = self._compose_values_time(directory, name, globaltimes, nlayer) return self._template.render(d) def _render_ssm(self, directory, globaltimes, nlayer): if "concentration" in self.dataset.data_vars: d = {"pkg_id": self._pkg_id} name = f"{directory.stem}-concentration" if "species" in self.dataset["concentration"].coords: concentration = {} for species in self.dataset["concentration"]["species"].values: concentration[species] = self._compose_values_time( directory, f"{name}_c{species}", globaltimes, nlayer=nlayer ) else: concentration = { 1: self._compose_values_time( directory, name, globaltimes, nlayer=nlayer ) } d["concentration"] = concentration return self._ssm_template.render(d) else: return "" def _save_layers(self, df, directory, time=None): d = {"directory": directory, "name": directory.stem, "extension": ".ipf"} d["directory"].mkdir(exist_ok=True, parents=True) if time is not None: d["time"] = time if "layer" in df: for layer, layerdf in df.groupby("layer"): d["layer"] = layer # Ensure right order outdf = layerdf[["x", "y", "rate", "id_name"]] path = self._compose(d) imod.ipf.write(path, outdf) else: outdf = df[["x", "y", "rate", "id_name"]] path = self._compose(d) imod.ipf.write(path, outdf) @staticmethod def _save_layers_concentration(df, directory, name, time=None): d = {"directory": directory, "name": name, "extension": ".ipf"} d["directory"].mkdir(exist_ok=True, parents=True) if time is not None: d["time"] = time if "layer" in df: for layer, layerdf in df.groupby("layer"): d["layer"] = layer # Ensure right order outdf = layerdf[["x", "y", "concentration", "id_name"]] path = util.compose(d) imod.ipf.write(path, outdf) else: outdf = df[["x", "y", "concentration", "id_name"]] path = util.compose(d) imod.ipf.write(path, outdf) def save(self, directory): all_species = [None] if "concentration" in self.dataset.data_vars: if "species" in self.dataset["concentration"].coords: all_species = self.dataset["concentration"]["species"].values # Loop over species if applicable for species in all_species: if species is not None: ds = self.dataset.sel(species=species) else: ds = self.dataset if "time" in ds: for time, timeda in ds.groupby("time"): timedf = timeda.to_dataframe() self._save_layers(timedf, directory, time=time) if "concentration" in ds.data_vars: name = f"{directory.stem}-concentration" if species is not None: name = f"{name}_c{species}" self._save_layers_concentration( timedf, directory, name, time=time ) else: self._save_layers(ds.to_dataframe(), directory) if "concentration" in ds.data_vars: name = f"{directory.stem}-concentration" if species is not None: name = f"{name}_c{species}" self._save_layers_concentration(timedf, directory, name) def _pkgcheck(self, ibound=None): # TODO: implement pass def repeat_stress(self, stress_repeats, use_cftime=False): # To ensure consistency, it isn't possible to use differing stress_repeatss # between rate and concentration: the number of points might change # between stress periods, and isn't especially easy to check. if "time" not in self.dataset: raise ValueError( "This Wel package does not have time, cannot add stress_repeats." ) # Replace both key and value by the right datetime type d = { timeutil.to_datetime(k, use_cftime): timeutil.to_datetime(v, use_cftime) for k, v in stress_repeats.items() } self.dataset.attrs["stress_repeats"] = d