Source code for imod.evaluate.head

import warnings

import numpy as np
import pandas as pd
import xarray as xr


[docs]def convert_pointwaterhead_freshwaterhead( pointwaterhead, density, elevation, density_fresh=1000.0 ): r"""Function to convert point water head (as outputted by seawat) into freshwater head, using Eq.3 from Guo, W., & Langevin, C. D. (2002): .. math:: h_{f}=\frac{\rho}{\rho_{f}}h-\frac{\rho-\rho_{f}}{\rho_{f}}Z An edge case arises when the head is below the cell centre, or entirely below the cell. Strictly applying Eq.3 would result in freshwater heads that are lower than the original point water head, which is physically impossible. This function then outputs the freshwaterhead for the uppermost underlying cell where the original point water head exceeds the cell centre. Parameters ---------- pointwaterhead : float or xr.DataArray of floats the point water head as outputted by SEAWAT, in m. density : float or xr.DataArray of floats the water density at the same locations as `pointwaterhead`. elevation : float or xr.DataArray of floats elevation at the same locations as `pointwaterhead`, in m. density_fresh : float, optional the density of freshwater (1000 kg/m3), or a different value if different units are used, or a different density reference is required. Returns ------- freshwaterhead : float or xr.DataArray of floats """ freshwaterhead = ( density / density_fresh * pointwaterhead - (density - density_fresh) / density_fresh * elevation ) # edge case: point water head below z # return freshwater head of top underlying cell where elevation < pointwaterhead # only for xr.DataArrays if isinstance(pointwaterhead, xr.DataArray) and "layer" in pointwaterhead.dims: freshwaterhead = freshwaterhead.where(pointwaterhead > elevation) freshwaterhead = freshwaterhead.bfill(dim="layer") return freshwaterhead
def _calculate_gxg( head_bimonthly: xr.DataArray, below_surfacelevel: bool = False ) -> xr.DataArray: import bottleneck as bn # Most efficient way of finding the three highest and three lowest is via a # partition. See: # https://bottleneck.readthedocs.io/en/latest/reference.html#bottleneck.partition def lowest3_mean(da: xr.DataArray): a = bn.partition(da.values, kth=2, axis=-1) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) result = np.nanmean(a[..., :3], axis=-1) template = da.isel(bimonth=0) return template.copy(data=result) def highest3_mean(da: xr.DataArray): a = bn.partition(-da.values, kth=2, axis=-1) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) result = np.nanmean(-a[..., :3], axis=-1) template = da.isel(bimonth=0) return template.copy(data=result) timesize = head_bimonthly["time"].size if timesize % 24 != 0: raise ValueError("head is not bimonthly for a full set of years") n_year = int(timesize / 24) # First and second date of March: 4, 5; first date of April: 6. month_index = np.array([4, 5, 6]) # Repeat this for every year in dataset, and increment by 24 per repetition. yearly_increments = (np.arange(n_year) * 24)[:, np.newaxis] # Broadcast to a full set gvg_index = xr.DataArray( data=(month_index + yearly_increments), dims=("hydroyear", "bimonth") ) gvg_data = head_bimonthly.isel(time=gvg_index) # Filters years without 3 available measurments. gvg_years = gvg_data.count("bimonth") == 3 gvg_data = gvg_data.where(gvg_years) # Hydrological years: running from 1 April to 1 April in the Netherlands. # Increment run from April (6th date) to April (30th date) for every year. # Broadcast to a full set newdims = ("hydroyear", "bimonth") gxg_index = xr.DataArray( data=(np.arange(6, 30) + yearly_increments[:-1]), dims=newdims, ) gxg_data = head_bimonthly.isel(time=gxg_index) dims = [dim for dim in gxg_data.dims if dim not in newdims] dims.extend(newdims) gxg_data = gxg_data.transpose(*dims) # Filter years without 24 measurements. gxg_years = gxg_data.count("bimonth") == 24 gxg_data = gxg_data.where(gxg_years) # First compute LG3 and HG3 per hydrological year, then compute the mean over the total. if gxg_data.chunks is not None: # If data is lazily loaded/chunked, process data of one year at a time. gxg_data = gxg_data.chunk({"hydroyear": 1}) lg3 = xr.map_blocks(lowest3_mean, gxg_data, template=gxg_data.isel(bimonth=0)) hg3 = xr.map_blocks(highest3_mean, gxg_data, template=gxg_data.isel(bimonth=0)) else: # Otherwise, just compute it in a single go. lg3 = lowest3_mean(gxg_data) hg3 = highest3_mean(gxg_data) gxg = xr.Dataset() gxg["gvg"] = gvg_data.mean(("hydroyear", "bimonth")) ghg = hg3.mean("hydroyear") glg = lg3.mean("hydroyear") if below_surfacelevel: gxg["glg"] = ghg gxg["ghg"] = glg else: gxg["glg"] = glg gxg["ghg"] = ghg # Add the numbers of years used in the calculation gxg["n_years_gvg"] = gvg_years.sum("hydroyear") gxg["n_years_gxg"] = gxg_years.sum("hydroyear") return gxg def calculate_gxg_points( df: pd.DataFrame, id: str = "id", time: str = "time", head: str = "head", below_surfacelevel: bool = False, tolerance: pd.Timedelta = pd.Timedelta(days=7), ) -> pd.DataFrame: """ Calculate GxG groundwater characteristics from head time series. GLG and GHG (average lowest and average highest groundwater level respectively) are calculated as the average of the three lowest (GLG) or highest (GHG) head values per Dutch hydrological year (april - april), for head values measured at a semi-monthly frequency (14th and 28th of every month). GVG (average spring groundwater level) is calculated as the average of groundwater level on 14th and 28th of March, and 14th of April. Supplied head values are resampled (nearest) to the 14/28 frequency. Hydrological years without all 24 14/28 dates present are discarded for glg and ghg. Years without the 3 dates for gvg are discarded. Parameters ---------- df: pd.DataFrame Dataframe containing the piezometer IDs, dates of measurement, and measured heads. id: str Column name of piezometer ID. time: str Column name of datetime. head: str Column name of head measurement. below_surfacelevel: bool False (default) if heads are relative to a datum (e.g. sea level). If True, heads are taken as m below surface level. tolerance: pd.Timedelta, default: 7 days. Maximum time window allowed when searching for dates around the 14th and 28th of every month. Returns ------- gxg : pd.DataFrame Dataframe containing ``glg``: average lowest head, ``ghg``: average highest head, ``gvg``: average spring head, ``n_years_gvg``: numbers of years used for gvg, ``n_years_gxg``: numbers of years used for glg and ghg. Examples -------- Read some IPF data and compute the GxG values, while specifying the (non-standard) column names: >>> import imod >>> df = imod.ipf.read("piezometers.ipf") >>> gxg = imod.evaluate.calculate_gxg_points( >>> df=df, >>> id="ID-column", >>> time="Date", >>> head="Piezometer head (m)", >>> ) """ def bimonthly(series: pd.Series, dates: pd.DatetimeIndex, tolerance: pd.Timedelta): series = series[~series.index.duplicated(keep="first")] return series.reindex(dates, method="nearest", tolerance=tolerance) if not isinstance(df, pd.DataFrame): raise TypeError(f"df must be a DataFrame, received: {type(df).__name__}") for name in (id, time, head): if name not in df: raise ValueError(f"Column {name} not present in dataframe") # Create a bi-monthly date range start = f"{df[time].min().year}-01-01" end = f"{df[time].max().year}-12-31" dates = pd.date_range( start=start, end=end, freq="SMS", name="time" ) + pd.DateOffset(days=13) # Convert for every location the time series to the same date range. This # forms a rectangular array which we can represent directly as a DataArray. bimonthly_series = ( df.set_index(time).groupby(id)[head].apply(bimonthly, dates, tolerance) ) head_bimonthly = bimonthly_series.to_xarray() # Calculate GXG values per location gxg = _calculate_gxg(head_bimonthly, below_surfacelevel) # Transform back to a DataFrame return gxg.to_dataframe()
[docs]def calculate_gxg( head: xr.DataArray, below_surfacelevel: bool = False, tolerance: pd.Timedelta = pd.Timedelta(days=7), ) -> xr.DataArray: """ Calculate GxG groundwater characteristics from head time series. GLG and GHG (average lowest and average highest groundwater level respectively) are calculated as the average of the three lowest (GLG) or highest (GHG) head values per Dutch hydrological year (april - april), for head values measured at a semi-monthly frequency (14th and 28th of every month). GVG (average spring groundwater level) is calculated as the average of groundwater level on 14th and 28th of March, and 14th of April. Supplied head values are resampled (nearest) to the 14/28 frequency. Hydrological years without all 24 14/28 dates present are discarded for glg and ghg. Years without the 3 dates for gvg are discarded. Parameters ---------- head : xr.DataArray of floats Head relative to sea level, in m, or m below surface level if `below_surfacelevel` is set to True. Must be of dimensions ``("time", "y", "x")``. below_surfacelevel : boolean, optional, default: False. False (default) if heads are relative to a datum (e.g. sea level). If True, heads are taken as m below surface level. tolerance: pd.Timedelta, default: 7 days. Maximum time window allowed when searching for dates around the 14th and 28th of every month. Returns ------- gxg : xr.Dataset Dataset containing ``glg``: average lowest head, ``ghg``: average highest head, ``gvg``: average spring head, ``n_years_gvg``: numbers of years used for gvg, ``n_years_gxg``: numbers of years used for glg and ghg. Examples -------- Load the heads, and calculate groundwater characteristics after the year 2000: >>> import imod >>> heads = imod.idf.open("head*.idf").sel(time=heads.time.dt.year >= 2000) >>> gxg = imod.evaluate.calculate_gxg(heads) Transform to meters below surface level by substracting from surface level: >>> surflevel = imod.idf.open("surfacelevel.idf") >>> gxg = surflevel - gxg Or calculate from groundwater level relative to surface level directly: >>> gwl = surflevel - heads >>> gxg = imod.evaluate.calculate_gxg(gwl, below_surfacelevel=True) """ if not head.dims == ("time", "y", "x"): raise ValueError('Dimensions must be ("time", "y", "x")') if not np.issubdtype(head["time"].dtype, np.datetime64): raise ValueError("Time must have dtype numpy datetime64") # Reindex to GxG frequency date_range: every 14th and 28th of the month. start = f"{int(head['time'][0].dt.year)}-01-01" end = f"{int(head['time'][-1].dt.year)}-12-31" dates = pd.date_range(start=start, end=end, freq="SMS") + pd.DateOffset(days=13) head_bimonthly = head.reindex(time=dates, method="nearest", tolerance=tolerance) gxg = _calculate_gxg(head_bimonthly, below_surfacelevel) return gxg