Source code for port_boundary

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Raymond Hoang (raymond.hoang@water.ca.gov)
# 20220728

"""
Script to convert various data formats (from calsim, csv files) into
SCHISM flux, salt and temp time history (.th) files
"""
from pyhecdss import get_ts
from vtools.functions.unit_conversions import CFS2CMS, ec_psu_25c
from vtools.functions.interpolate import rhistinterp
from vtools.functions.filter import ts_gaussian_filter
from vtools.data.vtime import minutes
import matplotlib.pylab as plt
import pandas as pd
import tempfile
import string
import yaml
import os
import click
import json


[docs] def read_csv(file, var, name, dt, p=2.0, interp=True): """ Reads in a csv file of monthly boundary conditions and interpolates Outputs an interpolated DataFrame of that variable """ forecast_df = pd.read_csv(file, index_col=0, header=0, parse_dates=True) forecast_df.index = forecast_df.index.to_period("M") # Check for % sign and convert to fraction if forecast_df[var].dtype == "object": # Ensure it's a string column forecast_df[var] = forecast_df[var].str.strip() # Remove extra spaces forecast_df[var] = forecast_df[var].apply( lambda x: float(x.strip("%")) / 100.0 if "%" in x else float(x) ) else: forecast_df[var] = forecast_df[var].astype("float") if interp: interp_series = rhistinterp(forecast_df[var].astype("float"), dt, p=p) else: forecast_df.index = forecast_df.index.to_timestamp() interp_series = forecast_df[var].resample(dt).ffill() interp_df = pd.DataFrame() interp_df[[name]] = pd.DataFrame({var: interp_series}) return interp_df
def read_dss(file, pathname, sch_name=None, p=2.0): """ Reads in a DSM2 dss file and interpolates Outputs an interpolated DataFrame of that variable """ ts15min = pd.DataFrame() print(pathname) ts = get_ts(os.path.join(dir, file), pathname) for tsi in ts: path_lst = (tsi[0].columns.values[0]).split("/") path_e = path_lst[5] tt = tsi[0][start_date:end_date] pidx = pd.period_range(start_date, tt.index[-1], freq=dss_e2_freq[path_e]) ptt = pd.DataFrame(tt.values[:, 0], pidx) if p != 0: ts15min[[sch_name]] = rhistinterp(ptt, dt, p=p).reindex(df_rng) elif p == 0: ts15min[[sch_name]] = rhistinterp(ptt, dt).reindex(df_rng) else: ts15min[[sch_name]] = tsi[0] if ts15min.empty: raise ValueError(f"Warning: DSS data not found for {b}") return ts15min class SafeDict(dict): """Safe dictionary for string formatting.""" def __missing__(self, key): print(f"Missing key: {key}") # Debugging line return "{" + key + "}" def fmt_string_file(fn_in, str_dict): # Create a temporary yaml filename with tempfile.NamedTemporaryFile( mode="w", suffix=os.path.splitext(fn_in)[-1], delete=False ) as temp_file: temp_fn = temp_file.name print(f"\t Temporary yaml filename: {temp_fn}") """Format a file using a dictionary of strings.""" with open(fn_in, "r") as f: fdata = f.read() # Remove commented lines fdata = "\n".join( line for line in fdata.splitlines() if not line.strip().startswith("#") ) # Replace text with str_dict values fdata = string.Formatter().vformat(fdata, (), SafeDict((str_dict))) with open(temp_fn, "w") as fout: fout.write(fdata) return temp_fn def clean_df(in_df): keep_rows = [0] for r in range(1, len(in_df.index)): if not (list(in_df.iloc[r, :]) == list(in_df.iloc[r - 1, :])): # keep the row where something changes and the row before (for plotting) if not r - 1 in keep_rows: keep_rows.append(r - 1) keep_rows.append(r) out_df = in_df.iloc[keep_rows, :] return out_df def set_gate_fraction(dts, op_var="height", ubound=10, lbound=0): # Filter rows where height is not 10 or 0 fraction_mask = (dts[op_var] != ubound) & (dts[op_var] != lbound) # Apply the mask to filter the DataFrame filtered_dts = dts[fraction_mask] # Group the filtered DataFrame by month for month, group in filtered_dts.groupby(filtered_dts.index.to_period("M")): # Calculate the fraction fraction = group[op_var].iloc[0] / ubound # Get the first fraction of the month fraction_point = int(len(group) * fraction) first_indices = group.index[:fraction_point] second_indices = group.index[fraction_point:] # Set the first half to lbound and the second half to ubound dts.loc[first_indices, op_var] = lbound dts.loc[second_indices, op_var] = ubound return dts @click.command() @click.argument("config_yaml", type=click.Path(exists=True)) @click.option( "--kwargs", type=str, default="{}", help="JSON string representing a dictionary of keyword arguments to populate format strings.", ) @click.help_option("-h", "--help") def port_boundary_cli(config_yaml, kwargs): """ Command line interface for creating SCHISM boundary conditions. """ kwargs_dict = json.loads(kwargs) create_schism_bc(config_yaml, kwargs_dict) def create_schism_bc(config_yaml, kwargs={}): with open(fmt_string_file(config_yaml, kwargs), "r") as f: config = yaml.safe_load(f) dir = config["dir"] # Overwrite if historical data is manipulated. # If forecast data is being applied, overwrite should be False overwrite = config["param"]["overwrite"] # Read in parameters from the YAML file source_map_file = os.path.join(dir, config["file"]["source_map_file"]) schism_flux_file = os.path.join(dir, config["file"]["schism_flux_file"]) schism_salt_file = os.path.join(dir, config["file"]["schism_salt_file"]) schism_temp_file = os.path.join(dir, config["file"]["schism_temp_file"]) schism_dcc_gate_file = None if "schism_dcc_gate_file" in config["file"]: schism_dcc_gate_file = os.path.join(dir, config["file"]["schism_dcc_gate_file"]) out_file_flux = os.path.join(config["file"]["out_file_flux"]) out_file_salt = os.path.join(config["file"]["out_file_salt"]) out_file_temp = os.path.join(config["file"]["out_file_temp"]) out_file_dcc_gate = None if "out_file_dcc_gate" in config["file"]: out_file_dcc_gate = os.path.join(config["file"]["out_file_dcc_gate"]) boundary_kinds = config["param"]["boundary_kinds"] sd = config["param"]["start_date"] ed = config["param"]["end_date"] dt = minutes(15) start_date = pd.Timestamp(sd) end_date = pd.Timestamp(ed) df_rng = pd.date_range(start_date, end_date, freq=dt) # Read and process the source_map file source_map = pd.read_csv(fmt_string_file(source_map_file, kwargs), header=0) # Read in the reference SCHISM flux, salt and temperature files # to be used as a starting point and to substitute timeseries not # available from other data sources. flux = pd.read_csv( schism_flux_file, index_col=0, parse_dates=[0], sep="\\s+", header=0, comment="#", ) salt = pd.read_csv( schism_salt_file, header=0, parse_dates=True, index_col=0, sep="\\s+", comment="#", ) temp = pd.read_csv( schism_temp_file, header=0, parse_dates=True, index_col=0, sep="\\s+", comment="#", ) dcc_gate = None if "dcc_gate" in boundary_kinds: dcc_gate = pd.read_csv( schism_dcc_gate_file, header=0, parse_dates=True, index_col=0, sep="\\s+", comment="#", ) dss_e2_freq = {"1HOUR": "H", "1DAY": "D"} for boundary_kind in boundary_kinds: source_map_bc = source_map.loc[source_map["boundary_kind"] == boundary_kind] if boundary_kind == "flow": dd = flux.copy().reindex(df_rng) out_file = out_file_flux elif boundary_kind == "ec": dd = salt.copy().reindex(df_rng) out_file = out_file_salt elif boundary_kind == "temp": dd = temp.copy().reindex(df_rng) out_file = out_file_temp elif boundary_kind == "dcc_gate": dd = dcc_gate.copy().reindex(df_rng) # Take existing values from the reference file (might only work on DCC gate) dd.iloc[:, dd.columns != "height"] = dcc_gate.iloc[ 0, dcc_gate.columns != "height" ] # enforce integer type else get schism error dd["install"] = dd["install"].astype(int) dd["ndup"] = dd["ndup"].astype(int) out_file = out_file_dcc_gate else: raise ValueError(f"Unknown boundary kind: {boundary_kind}") for index, row in source_map_bc.iterrows(): dfi = pd.DataFrame() name = row["schism_boundary"] source_kind = row["source_kind"] source_file = str(row["source_file"]) derived = str(row["derived"]).capitalize() == "True" interp = str(row["interp"]).capitalize() == "True" var = row["var"] sign = row["sign"] convert = row["convert"] p = row["rhistinterp_p"] formula = row["formula"] print(f"processing {name}") if source_kind == "CSV": # Substitute in an interpolated monthly forecast if derived: print( f"Updating SCHISM {name} with derived timeseries\ expression: {formula}" ) csv = pd.DataFrame() vars = var.split(";") for v in vars: csv[[v]] = read_csv( source_file, v, name, dt, p=p, interp=interp ) dts = eval(formula).to_frame(name).reindex(df_rng) if boundary_kind == "dcc_gate": # Need to make any Percentage between 0 and 100 and convert them to days of month open dts = set_gate_fraction(dts.ffill(), "height") if interp: dfi = ts_gaussian_filter(dts, sigma=100) else: dfi = dts else: print( f"Updating SCHISM {name} with interpolated monthly\ forecast {var}" ) dfi = read_csv(source_file, var, name, dt, p=p) elif source_kind == "DSS": # Substitute in CalSim value. if derived: vars_lst = var.split(";") print( f"Updating SCHISM {name} with derived timeseries\ expression: {formula}" ) dss = pd.DataFrame() for pn in vars_lst: b = pn.split("/")[2] dss[[b]] = read_dss( source_file, pathname=pn, sch_name=name, p=p ) ## quick fix for to use last year pattern as formula ## input clip_1ybackward_start = start_date - pd.DateOffset(years=1) clip_1ybackward_end = end_date - pd.DateOffset(years=1) flux_clipped = flux[clip_1ybackward_start:clip_1ybackward_end] ## reset clipped flux index to dss year flux_clipped.index = flux_clipped.index.map( lambda x: x.replace(year=start_date.year) ) dts = eval(formula).to_frame(name).reindex(df_rng) if interp: dfi = ts_gaussian_filter(dts, sigma=100) else: dfi = dts else: print(f"Updating SCHISM {name} with DSS variable {var}") dfi = read_dss(source_file, pathname=var, sch_name=name, p=p) elif source_kind == "CONSTANT": # Simply fill with a constant specified. dd[name] = float(var) dfi["datetime"] = df_rng dfi = dfi.set_index("datetime") dfi[name] = [float(var)] * len(df_rng) print(f"Updating SCHISM {name} with constant value of {var}") # Maintain the rounding preference of the formula if isinstance(formula, str) and "round" in formula: dfi = dfi.round(int(formula[-2])) if source_kind == "SCHISM": # Simplest case: use existing reference SCHISM data; do nothing print("Use existing SCHISM input") else: # Do conversions. if convert == "CFS_CMS": dfi = dfi * CFS2CMS * sign elif convert == "EC_PSU": dfi = ec_psu_25c(dfi) * sign else: dfi = dfi # Trim dfi so that it starts where schism input file ends, so that dfi doesn't # overwrite any historical data if not overwrite: dfi = dfi[dfi.index >= dd.index[-1]] # Update the dataframe. dd.update(dfi, overwrite=True) # print(dfi) # print(dd) # Format the outputs. dd.index.name = "datetime" # Remove excess/repetitive rows dd = clean_df(dd.ffill()) output_path = os.path.join(dir, out_file) os.makedirs(os.path.dirname(output_path), exist_ok=True) dd.to_csv( os.path.join(dir, out_file), header=True, date_format="%Y-%m-%dT%H:%M", float_format="%.4f", sep=" ", ) dd.plot() print("Done") plt.show() if __name__ == "__main__": port_boundary_cli()