"""Script by Lily Tomkovic to take a net delta consumptive use and parse it into vsource and vsink"""
from vtools.functions.unit_conversions import CFS2CMS, CMS2CFS
from vtools.functions.interpolate import rhistinterp
from vtools.data.vtime import days
from schimpy.model_time import read_th, is_elapsed
from schimpy.th_calcs import calc_net_source_sink
from schimpy.util.yaml_load import yaml_from_file
from vtools.functions.read_dss import read_dss
from pathlib import Path
import pandas as pd
import numpy as np
import click
import os
bds_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../")
[docs]
def adjust_src_sink(src, sink, net_diff, sinkfrac=0.001, debug=False):
"""Distributes a perturbation to dataframe sc and sink
Parameters
----------
src : DataFrame
DataFrame of positive sources.
sink : DataFrame
DataFrame of sinks. Must be all of the same sign. If positive, they are assumed to represent the the magnitude of the (latent) sinks. If they are negative, they are the sink values themselves. Return value will match the convention of the input.
net_diff : Series
Series representing a perturbation to be distributed proportionally over src and sink. Sign convention agrees with that of src.
When net_diff is negative, it augments sink.
When it is positive it reduces the sink until sinkfrac fraction of the sink is exhausted, after which the remainder augments the source.
sinkfrac : float
Fraction of sink that can be reduced in order to achieve the perturbation by reducing the sink. Remainder is applied by augmenting src. Has no effect on time steps when net_diff is negative.
debug : bool
If True, prints out debug information about the input src and sink DataFrames.
Returns
-------
src : DataFrame
DataFrame of adjusted sources, with the same sign convention as the input src.
sink : DataFrame
DataFrame of adjusted sinks, with the same sign convention as the input sink.
"""
# convert so that sink is a magnitude and switch the sign so that it contributes to sink
# deal with magnitudes and in terms of consumptive use as positive,
# will undo at the end to match convention of the inputs
if (sink > 0).any(axis=None):
possink = True
haspos = sink.loc[(sink > 0).any(axis=1), :]
colswithpos = (haspos > 0).any(axis=0)
if debug:
hasneg.loc[:, colswithneg].to_csv("test_sink.csv")
print("possink")
print(sink.loc[(sink > 0).any(axis=1), :])
print("Sink has all pos values")
# SHOULD BE DataFrame of negative sinks
sink = -sink
else:
possink = False
if debug:
print("Sink has all neg values")
if (src < 0).any(axis=None):
negsrc = True
hasneg = src.loc[(src < 0).any(axis=1), :]
colswithneg = (hasneg < 0).any(axis=0)
if debug:
hasneg.loc[:, colswithneg].to_csv("test_src.csv")
print("negsrc")
print(src.loc[(src < 0).any(axis=1), :])
print("Source has all neg values")
# SHOULD BE DataFrame of positive sources
src = -src
else:
negsrc = False
if debug:
print("Source has all pos values")
src_total = src.sum(axis=1)
sink_total = sink.sum(axis=1)
dcd_total = src_total + sink_total
# Ensure proper formatting of net_diff
if isinstance(net_diff, pd.DataFrame) and net_diff.shape[1] == 1:
net_diff = net_diff.iloc[:, 0]
if isinstance(net_diff.index, pd.PeriodIndex):
net_diff.index = net_diff.index.to_timestamp()
# Create perturbed dataframe for scaling and indexing
pert = net_diff.reindex(
dcd_total.index
) # using only the indices in dcd_total (from the src/sink inputs)
# pert
neg = pert <= 0.0
pos = ~neg
# For negative (e.g. more depletion) , achieve change by scaling all sinks
scaleneg = (
sink_total + pert
) / sink_total # all quantities negative, so scaleneg > 1
scaleneg = scaleneg.where(neg, 1.0) # Ensure No effect in pos case
# For positive adjustments, remove up to FRAC fraction of total sinks, then ...
adjustmag = pert.where(pos, 0.0) # total positive adjustment required
FRAC = sinkfrac
sinklimit = -FRAC * sink_total # max amount done by reducing sink (this is pos)
from_sink = adjustmag.clip(upper=sinklimit)
residual = (
adjustmag - from_sink
) # This should be nonnegative for pertinent cases where ~pos
residual = residual.where(pos, 0.0) # Just to clarify
resscale = (residual + src_total) / src_total
src = src.mul(resscale, axis=0)
# Now put together the adjustments to sinks under both cases, pos and ~pos
scalepos = (from_sink + sink_total) / sink_total # for positive (source increase)
scale = scalepos.where(pos, scaleneg) # choose case
sink = sink.mul(scale, axis=0)
if possink:
sink = -sink
# sink.loc[sink==0.] = -0.0
if negsrc:
src = -src
# sink.loc[sink==0.] = -0.0
# drop NaN's from interpolations etc.
sink = sink.dropna(axis=0, how="any")
src = src.dropna(axis=0, how="any")
return src, sink
[docs]
def calc_net_diff(original_net, adjusted_net, cfs_to_cms=True):
"""
Calculate the difference in net CU between original and adjusted data in cms.
Parameters
----------
original_net: pd.DataFrame
DCD net source and sink data from original timeseries.
Used to calculate difference in net CU to apply to SCHISM inputs.
Entered as a DataFrame with a single column or Series with units of cfs by default.
adjusted_net: pd.DataFrame
DCD net source and sink data from adjusted timeseries.
Used to calculate difference in net CU from original to apply to SCHISM inputs
Entered as a DataFrame with a single column or Series with units of cfs by default.
cfs_to_cms: bool
Set to True if net data is in cfs and needs to be converted to cms for SCHISM.
Only set to False if net data is in cms.
Returns
-------
pd.Series
Series of the net difference in CU between original and adjusted data.
"""
if isinstance(adjusted_net, pd.DataFrame) and adjusted_net.shape[1] == 1:
adjusted_net = adjusted_net.iloc[:, 0]
common_index = adjusted_net.index.intersection(original_net.index)
net_diff = adjusted_net.loc[common_index] - original_net.loc[common_index]
if cfs_to_cms:
net_diff = net_diff * CFS2CMS # convert to cms
return net_diff
[docs]
def sch_dcd_net_diff(
net_diff,
schism_vsource,
schism_vsink,
out_dir,
version,
start_date=None,
end_date=None,
cfs_to_cms=True,
):
r"""
Convert adjusted DCD data from DataFrame into SCHISM-ready \*.th inputs.
Parameters
----------
net_diff: pd.DataFrame
DCD net differences from historical or baseline. Used to apply to SCHISM inputs.
schism_vsource: str|Path
input SCHISM vsource data, needs to be "dated" and not "elapsed.
schism_vsink: str|Path
input SCHISM vsink data, needs to be "dated" and not "elapsed.
out_dir: str|Path
Output directory to store the altered \*.th files.
version: str
Specifies the tag to put on the output files (e.g. vsource.VERSION.dated.th)
start_date: pd.Timestamp, optional
Start date for the timeseries.
end_date: pd.Timestamp, optional
End date for the timeseries.
cfs_to_cms: bool, optional
Set to True if net data is in cfs and needs to be converted to cms for SCHISM.
Only set to False if net data is in cms.
"""
# Normalize paths
schism_vsource = Path(schism_vsource)
schism_vsink = Path(schism_vsink)
out_dir = Path(out_dir)
# Check inputs first
if is_elapsed(schism_vsource):
raise ValueError(
f"SCHISM input is in elapsed format. Needs to be timestamped. File: f{schism_vsource}"
)
if is_elapsed(schism_vsink):
raise ValueError(
f"SCHISM input is in elapsed format. Needs to be timestamped. File: f{schism_vsink}"
)
if not os.path.exists(out_dir):
print(f"Making directory: {out_dir}")
os.makedirs(out_dir, exist_ok=True)
# Read in SCHISM data to net_diff.
sch_src = read_th(schism_vsource)
sch_sink = read_th(schism_vsink)
sch_delta_src = sch_src.loc[:, sch_src.columns.str.contains("delta", case=False)]
sch_delta_sink = sch_sink.loc[:, sch_sink.columns.str.contains("delta", case=False)]
ssrc, ssink = adjust_src_sink(
sch_delta_src, sch_delta_sink, net_diff
) # create the adjusted source/sink values to be used for this version in SCHISM
# Add the adjusted columns to the original SCHISM data
src_out = sch_src.copy()
src_out.update(ssrc)
sink_out = sch_sink.copy()
sink_out.update(ssink)
src_out.index = src_out.index.strftime("%Y-%m-%dT00:00")
sink_out.index = sink_out.index.strftime("%Y-%m-%dT00:00")
fn_src = out_dir / f"vsource.{version}.dated.th"
fn_sink = out_dir / f"vsink.{version}.dated.th"
# Clip to desired daterange:
if start_date is None:
start_date = src_out.index[0]
if end_date is None:
end_date = src_out.index[-1]
src_out = src_out[start_date:end_date]
sink_out = sink_out[start_date:end_date]
if (src_out.values < 0).any():
raise ValueError(
"There are negative values in the source dataframe! They should all be positive"
)
else:
print(f"Writing source to {fn_src}")
src_out.to_csv(fn_src, sep=" ", float_format="%.2f")
if (sink_out.values > 0).any():
raise ValueError(
"There are positive values in the sink dataframe! They should all be negative"
)
else:
print(f"Writing sink to {fn_sink}")
sink_out.to_csv(fn_sink, sep=" ", float_format="%.2f")
[docs]
def calc_srcsnk_dsm2(
dcd_dss_file,
start_date=None,
end_date=None,
dt=days(1),
exclude_pathname="//TOTAL*/////",
):
"""Get net source and net sink timeseries from DCD DSS file.
Parameters
----------
dcd_dss_file: str|Path
Path to the DCD DSS file. Contains DRAIN-FLOW, SEEP-FLOW, and DIV-FLOW data.
start_date: pd.Timestamp, optional
Start date for the timeseries.
end_date: pd.Timestamp, optional
End date for the timeseries.
dt: float or pd.DateOffset, optional
Time step in days for interpolation.
exclude_pathname: str, optional
Pathname parts to exclude, ex: '//TOTAL*/////'
Returns
-------
src: pd.Series
Series of net source flow in cfs.
sink: pd.Series
Series of net sink flow in cfs.
"""
dcd_dss_file = Path(dcd_dss_file)
df_div_dcd = read_dss(
dcd_dss_file,
"///DIV-FLOW////",
start_date=start_date,
end_date=end_date,
dt=dt,
exclude_pathname=exclude_pathname,
)
df_seep_dcd = read_dss(
dcd_dss_file,
"///SEEP-FLOW////",
start_date=start_date,
end_date=end_date,
dt=dt,
exclude_pathname=exclude_pathname,
)
df_drain_dcd = read_dss(
dcd_dss_file,
"///DRAIN-FLOW////",
start_date=start_date,
end_date=end_date,
dt=dt,
exclude_pathname=exclude_pathname,
)
df_div_dcd = df_div_dcd.clip(lower=0.0)
df_seep_dcd = df_seep_dcd.clip(lower=0.0)
df_div_dcd.columns = [strip_dpart(col) for col in df_div_dcd.columns]
df_seep_dcd.columns = [strip_dpart(col) for col in df_seep_dcd.columns]
df_drain_dcd.columns = [strip_dpart(col) for col in df_drain_dcd.columns]
use_cols = [col for col in df_div_dcd.columns if "total" not in col.lower()]
df_div_dcd = df_div_dcd[use_cols]
use_cols = [col for col in df_seep_dcd.columns if "total" not in col.lower()]
df_seep_dcd = df_seep_dcd[use_cols]
use_cols = [col for col in df_drain_dcd.columns if "total" not in col.lower()]
df_drain_dcd = df_drain_dcd[use_cols]
src = df_drain_dcd.sum(axis=1)
sink = df_div_dcd.sum(axis=1) + df_seep_dcd.sum(axis=1)
return src, sink
[docs]
def calc_net_dsm2(
dcd_dss_file,
start_date=None,
end_date=None,
dt=days(1),
exclude_pathname="//TOTAL*/////",
**kwargs,
):
"""Get net timeseries from DCD DSS file.
Parameters
----------
dcd_dss_file: str|Path
Path to the DCD DSS file. Contains DRAIN-FLOW, SEEP-FLOW, and DIV-FLOW data.
start_date: pd.Timestamp, optional
Start date for the timeseries.
end_date: pd.Timestamp, optional
End date for the timeseries.
dt: float or pd.DateOffset, optional
Time step in days for interpolation.
exclude_pathname: str, optional
Pathname parts to exclude, ex: '//TOTAL*/////'
kwargs: dict, optional
Additional keyword arguments to pass to the read_dss function. (unused in this function)
Returns
-------
pd.Series
Series of net consumptive use in cfs, calculated as source - sink.
"""
dcd_dss_file = Path(dcd_dss_file)
src, sink = calc_srcsnk_dsm2(
dcd_dss_file,
start_date=start_date,
end_date=end_date,
dt=dt,
exclude_pathname=exclude_pathname,
)
return src - sink
[docs]
def calc_net_calsim(
dcd_dss_calsim, start_date=None, end_date=None, dt=days(1), **kwargs
):
"""Get net consumptive use timeseries from Calsim DSS file.
Parameters
----------
dcd_dss_calsim: str|Path
Path to the Calsim DSS file. Contains DICU_FLOW data.
start_date: pd.Timestamp, optional
Start date for the timeseries.
end_date: pd.Timestamp, optional
End date for the timeseries.
dt: float or pd.DateOffset, optional
Time step in days for interpolation.
kwargs: dict, optional
Additional keyword arguments to pass to the read_dss function. (unused in this function)
Returns
-------
pd.DataFrame
DataFrame with a single column named 'net' containing net consumptive use in cfs.
The data is read from the DICU_FLOW path in the Calsim DSS file.
"""
dcd_dss_calsim = Path(dcd_dss_calsim)
net = read_dss(
dcd_dss_calsim,
"///DICU_FLOW////",
start_date=start_date,
end_date=end_date,
dt=dt,
**kwargs,
)
net.columns = ["net"]
if isinstance(net, pd.DataFrame) and net.shape[1] == 1:
net = net.iloc[:, 0]
return -net
[docs]
def calc_net_obs_schism(
schism_dir, start_date=None, end_date=None, dt=days(1), **kwargs
):
"""Use the repository to calculate net SCHISM DCU"""
schism_dir = os.path.abspath(os.path.join(bds_dir, "data/channel_depletion/"))
kwargs["vsource"] = "vsource_dated.th"
kwargs["vsink"] = "vsink_dated.th"
net = calc_net_schism(
schism_dir, start_date=start_date, end_date=end_date, dt=dt, **kwargs
)
return net
[docs]
def calc_net_schism(schism_dir, start_date=None, end_date=None, dt=days(1), **kwargs):
"""Get net timeseries from SCHISM th files in cfs.
Parameters
----------
schism_dir: str|Path
SCHISM directory where vsource.th and vsink.th files are found.
start_date: pd.Timestamp, optional
Start date for the timeseries.
end_date: pd.Timestamp, optional
End date for the timeseries.
dt: float or pd.DateOffset, optional
Time step in days for interpolation.
kwargs: dict, optional
Additional keyword arguments to pass to the SCHISM net calculation functions.
For example, if the original data has a specific vsource file, you can pass
``vsource_original='vsource_dated.th'`` and it will be used in the calculation.
You must use _original or _perturbed suffixes to distinguish between original and perturbed data.
Returns
-------
pd.DataFrame
DataFrame with a single column named 'net' containing net consumptive use in cfs.
The data is calculated from the vsource and vsink files in the SCHISM directory.
"""
schism_dir = Path(schism_dir)
vsource = schism_dir / kwargs.get("vsource")
vsink = schism_dir / kwargs.get("vsink")
net, src, sink = calc_net_source_sink(
vsource_file=vsource,
vsink_file=vsink,
search_term="delta",
start_date=start_date,
end_date=end_date,
)
if isinstance(net.index, pd.DatetimeIndex):
net.index = net.index.to_period()
net = rhistinterp(net, dt)
net.columns = ["net"]
net = net * CMS2CFS # convert to cfs
return net
[docs]
def read_net_csv(csv_file, start_date=None, end_date=None, dt=days(1), **kwargs):
"""Read net timeseries from a CSV file.
Parameters
----------
csv_file: str|Path
Path to the CSV file containing net consumptive use data.
start_date: pd.Timestamp, optional
Start date for the timeseries.
end_date: pd.Timestamp, optional
End date for the timeseries.
dt: float or pd.DateOffset, optional
Time step in days for interpolation.
"""
csv_file = Path(csv_file)
net = pd.read_csv(csv_file, index_col=0, parse_dates=[0])
col = kwargs["col"]
net = net[[col]]
# clip to time constraint
if start_date is None:
start_date = net.index[0]
if end_date is None:
end_date = net.index[-1]
if (net.index[0] > pd.to_datetime(end_date)) or (
net.index[-1] < pd.to_datetime(start_date)
):
raise ValueError(
f"File: {csv_file} does not cover the dates requested. \n\tRequested dates are: {start_date} to {end_date}, \n\tand the file covers {net.index[0]} to {net.index[-1]}"
)
net = net[start_date:end_date]
# Resample to daily frequency
net = net.resample("D").ffill()
# Convert DatetimeIndex to PeriodIndex if needed
if isinstance(net.index, pd.DatetimeIndex):
net.index = net.index.to_period("D")
# Convert to cfs if needed
if "cms_to_cfs" in kwargs.keys():
if kwargs["cms_to_cfs"]:
net = net * CMS2CFS # convert to cfs
net = rhistinterp(net, dt)
net.columns = ["net"]
return net
# Map type string to function
SRC_SNK_FUNC_MAP = {
"dsm2": calc_net_dsm2,
"calsim": calc_net_calsim,
"schism": calc_net_schism,
"obs_schism": calc_net_obs_schism,
"csv": read_net_csv,
# add more types as needed
}
[docs]
def strip_dpart(colname):
"""Remove the 5th part of a DSS path, which is the D part.
This is used to strip the D part from the column names in the DCD data.
Parameters
----------
colname: str
The column name in the DSS path format.
Returns
-------
str
The column name with the D part removed.
"""
parts = colname.split("/")
parts[4] = ""
return "/".join(parts)
# Main function
[docs]
def orig_pert_to_schism_dcd(
original_type,
original_filename,
perturbed_type,
perturbed_filename,
schism_vsource,
schism_vsink,
out_dir,
version,
start_date=None,
end_date=None,
dt=days(1),
**kwargs,
):
r"""
Convert original and perturbed DCD data to SCHISM vsource and vsink inputs.
Parameters
----------
original_type: str
Type of the original DCD data (e.g. 'dsm2', 'calsim', 'csv', 'schism').
original_filename: str|Path
Path to the original DCD data file. If DSS then enter the filename.
If SCHISM then enter the directory where vsource.th and vsink.th are found.
perturbed_type: str
Type of the perturbed DCD data (e.g. 'dsm2', 'calsim', 'csv', 'schism').
perturbed_filename: str|Path
Path to the perturbed DCD data file. If DSS then enter the filename.
If SCHISM then enter the directory where vsource.th and vsink.th are found.
schism_vsource: str|Path
Input SCHISM vsource data, needs to be 'dated' and not 'elapsed'.
schism_vsink: str|Path
Input SCHISM vsink data, needs to be 'dated' and not 'elapsed'.
out_dir: str|Path
Output directory to store the altered \*.th files.
version: str
Specifies the tag to put on the output files (e.g. vsource.VERSION.dated.th).
start_date: pd.Timestamp, optional
Start date for the timeseries. If None, uses the start date from the original data.
end_date: pd.Timestamp, optional
End date for the timeseries. If None, uses the end date from the original data.
dt: float or pd.DateOffset, optional
Time step in days for interpolation. Default is 1 day.
**kwargs: dict
Additional keyword arguments to pass to the SCHISM net calculation functions.
For example, if the original data has a specific vsource file, you can pass
vsource_original='vsource_dated.th' and it will be used in the calculation.
You must use _original or _perturbed suffixes to distinguish between original and perturbed data.
"""
original_filename = Path(original_filename)
perturbed_filename = Path(perturbed_filename)
schism_vsource = Path(schism_vsource)
schism_vsink = Path(schism_vsink)
out_dir = Path(out_dir)
# Read in DSS data and compute net difference in the original case
print(f"Reading Original DCD data from {original_type.upper()} inputs...")
original_fxn = SRC_SNK_FUNC_MAP[
original_type.lower()
] # original net flow for source/sink
# Look for 'original' in kwargs and pass to original_fxn
original_kwargs = {
k.replace("_original", ""): v for k, v in kwargs.items() if "original" in k
}
net_original = original_fxn(
original_filename,
start_date=start_date,
end_date=end_date,
dt=dt,
**original_kwargs,
)
if "sign_flip" in original_kwargs:
if original_kwargs["sign_flip"]:
net_original = -net_original
# Read in DSS data and compute net difference in the perturbed case
print(f"Reading Perturbed DCD data from {perturbed_type.upper()} inputs...")
perturbed_fxn = SRC_SNK_FUNC_MAP[
perturbed_type.lower()
] # perturbed net flow for source/sink
perturbed_kwargs = {
k.replace("_perturbed", ""): v for k, v in kwargs.items() if "perturbed" in k
}
net_perturbed = perturbed_fxn(
perturbed_filename,
start_date=start_date,
end_date=end_date,
dt=dt,
**perturbed_kwargs,
)
if "sign_flip" in perturbed_kwargs:
if perturbed_kwargs["sign_flip"]:
net_perturbed = -net_perturbed
# Calculate the net difference between original and perturbed
net_diff = calc_net_diff(
net_original,
net_perturbed,
cfs_to_cms=True,
) # net difference in cms
# Apply differences to SCHISM inputs
sch_dcd_net_diff(
net_diff,
schism_vsource,
schism_vsink,
out_dir,
version,
)
[docs]
def orig_pert_to_schism_dcd_yaml(yaml_fn, envvar=None):
"""
Convert original and perturbed DCD data to SCHISM vsource and vsink inputs from yaml input file.
Parameters
----------
yaml_fn: str|Path
Path to the YAML file containing configuration.
envvar: dict or None, optional
Optional dictionary of environment variables or overrides. If None, no overrides are applied.
"""
yaml_content = yaml_from_file(yaml_fn, envvar=envvar)
cu_inputs = yaml_content["cu"]
# Build a dict of all arguments, with envvar overriding cu_inputs
all_args = {**cu_inputs, **(envvar or {})}
# Remove process key if present (not an argument for orig_pert_to_schism_dcd)
all_args.pop("process", None)
# Optionals from envvar or default
start_date = (
cu_inputs.get("start_date") if cu_inputs and "start_date" in cu_inputs else None
)
end_date = (
cu_inputs.get("end_date") if cu_inputs and "end_date" in cu_inputs else None
)
dt = cu_inputs.get("dt") if cu_inputs and "dt" in cu_inputs else days(1)
cfs_to_cms = (
cu_inputs.get("cfs_to_cms") if cu_inputs and "cfs_to_cms" in cu_inputs else True
)
# Remove any duplicate keys that are not needed
for k in ["start_date", "end_date", "dt"]:
if k in all_args and locals()[k] is not None:
all_args[k] = locals()[k]
if cu_inputs["process"] == "orig_pert_to_schism":
# Required arguments
required_keys = [
"original_type",
"original_filename",
"perturbed_type",
"perturbed_filename",
"schism_vsource",
"schism_vsink",
"out_dir",
"version",
]
missing = [k for k in required_keys if k not in cu_inputs]
if missing:
raise KeyError(
f"Missing required keys in cu_inputs: {missing}\n\tAll keys needed are: {required_keys}"
)
orig_pert_to_schism_dcd(**all_args)
elif cu_inputs["process"] == "net_diff_to_schism":
# Required arguments
required_keys = [
"net_diff_filename",
"schism_vsource",
"schism_vsink",
"out_dir",
"version",
]
missing = [k for k in required_keys if k not in cu_inputs]
if missing:
raise KeyError(
f"Missing required keys in cu_inputs: {missing}\n\tAll keys needed are: {required_keys}"
)
# Read net difference from the specified file
net_diff = read_net_csv(
cu_inputs["net_diff_filename"],
start_date=start_date,
end_date=end_date,
dt=dt,
**kwargs,
)
sch_dcd_net_diff(
net_diff,
cu_inputs["schism_vsource"],
cu_inputs["schism_vsink"],
cu_inputs["out_dir"],
cu_inputs["version"],
cfs_to_cms=cfs_to_cms,
)
else:
raise ValueError(
f"Unknown process {cu_inputs['process']} in cu_inputs. "
"Expected 'orig_pert_to_schism' or 'net_diff_to_schism'."
)
@click.command(
help=(
"Transfer hotstart data from one grid to another'\n\n"
"Arguments:\n"
" YAML Path to the YAML file."
"For instance hotstart_from_hotstart.yaml (found in examples/hotstart/examples)"
)
)
@click.argument("yaml_fn")
@click.help_option("-h", "--help")
@click.argument("extra", nargs=-1)
def parse_cu_yaml_cli(
yaml_fn: str,
extra=(),
):
"""
Command-line interface for transferring hotstart data from one grid to another.
Arguments
---------
yaml_fn Path to the YAML file (e.g., parse_cu.yaml).
"""
# Parse extra arguments into a dictionary (expects --key value pairs)
envvar = {}
key = None
for item in extra:
if item.startswith("--"):
key = item.lstrip("-")
elif key is not None:
envvar[key] = item
key = None
if key is not None:
raise ValueError(f"No value provided for extra argument: {key}")
orig_pert_to_schism_dcd_yaml(yaml_fn, envvar=envvar if envvar else None)
@click.command(
help=(
"Uses consumptive use adjustments to historical data "
"in order to determine what the adjusted values for vsource.th and vsink.th are"
"\n\nExample Usages:"
"\n\n\tbds parse_cu dsm2 ./dsm2/DCD_hist_Lch5.dss calsim ./calsim/DCR2023_DV_9.3.1_v2a_Danube_Adj_v1.8.dss ./schism/vsource_dated.th ./schism/vsink_dated.th ./out_dir dsm2_calsim_v1"
"\n"
"\n\tbds parse_cu schism ./schism/ dsm2 ./dsm2/DCD_hist_Lch5.dss ./schism/simulation/vsource_dated.th ./schism/simulation/vsink_dated.th ./out_dir schism_dsm2_v1 --param vsource_original vsource_2023.th vsink_original vsink_2023.th"
)
)
@click.help_option("-h", "--help")
@click.argument(
"original_type",
type=click.Choice(["dsm2", "calsim", "csv", "schism"], case_sensitive=False),
)
@click.argument(
"original_filename",
type=click.Path(exists=True),
)
@click.argument(
"perturbed_type",
type=click.Choice(["dsm2", "calsim", "csv", "schism"], case_sensitive=False),
)
@click.argument(
"perturbed_filename",
type=click.Path(exists=True),
)
@click.argument("schism_vsource", type=click.Path(exists=True))
@click.argument("schism_vsink", type=click.Path(exists=True))
@click.argument("out_dir", type=click.Path())
@click.argument("version", type=str)
@click.option(
"--start-date",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="Start date (YYYY-MM-DD)",
)
@click.option(
"--end-date",
type=click.DateTime(formats=["%Y-%m-%d"]),
default=None,
help="End date (YYYY-MM-DD)",
)
@click.option("--dt", type=int, default=1, help="Time step in days.")
@click.option(
"--param",
type=(str, str),
multiple=True,
help=(
"Extra key-value parameters to pass to the net calculation functions, "
"e.g. --param vsource_original vsource_dated.th vsource_perturbed vsource_2023.th"
),
)
def parse_cu_cli(
original_type,
original_filename,
perturbed_type,
perturbed_filename,
schism_vsource,
schism_vsink,
out_dir,
version,
start_date,
end_date,
dt,
param,
):
"""Main entry point for consumptive use - SCHISM commands.
Arguments
---------
original_type : str
Type of the original DCD data (e.g. 'dsm2', 'calsim', 'csv', 'schism').
original_filename : str
Path to the original DCD data file. If DSS then enter the filename.
If SCHISM then enter the directory where vsource.th and vsink.th are found.
perturbed_type : str
Type of the perturbed DCD data (e.g. 'dsm2', 'calsim', 'csv', 'schism').
perturbed_filename : str
Path to the perturbed DCD data file. If DSS then enter the filename.
If SCHISM then enter the directory where vsource.th and vsink.th are found.
schism_vsource : str
Input SCHISM vsource data, needs to be "dated" and not "elapsed".
schism_vsink : str
Input SCHISM vsink data, needs to be "dated" and not "elapsed".
out_dir : str
Output directory to store the altered *.th files.
version : str
Specifies the tag to put on the output files (e.g. vsource.VERSION.dated.th).
Options
-------
--start-date : str
Start date (YYYY-MM-DD).
--end-date : str
End date (YYYY-MM-DD).
--dt : int
Time step in days.
--param : (str, str)
Extra key-value parameters to pass to the net calculation functions.
e.g. --param vsource_original vsource_dated.th vsource_perturbed vsource_2023.th
"""
# Convert paths to Path objects
original_filename = Path(original_filename)
perturbed_filename = Path(perturbed_filename)
schism_vsource = Path(schism_vsource)
schism_vsink = Path(schism_vsink)
out_dir = Path(out_dir)
# Ensure out_dir exists
print(f"Creating output directory: {out_dir}")
out_dir.mkdir(parents=True, exist_ok=True)
# Convert param tuples to a dict
extra_kwargs = dict(param)
# Pass extra_kwargs to orig_pert_to_schism_dcd
orig_pert_to_schism_dcd(
original_type,
original_filename,
perturbed_type,
perturbed_filename,
schism_vsource,
schism_vsink,
out_dir,
version,
start_date=start_date,
end_date=end_date,
dt=dt,
**extra_kwargs,
)
if __name__ == "__main__":
parse_cu_cli()