import numpy as np
import pandas as pd
import numba
from vtools.functions.filter import cosine_lanczos
__all__ = [
"get_tidal_hl",
"get_tidal_amplitude",
"get_tidal_hl_zerocrossing",
"get_tidal_phase_diff",
]
[docs]
def get_smoothed_resampled(
df, cutoff_period="2h", resample_period="1min", interpolate_method="pchip"
):
"""Resample the dataframe (indexed by time) to the regular period of resample_period using the interpolate method
Furthermore the cosine lanczos filter is used with a cutoff_period to smooth the signal to remove high frequency noise
Args:
df (DataFrame): A single column dataframe indexed by datetime
cutoff_period (str, optional): cutoff period for cosine lanczos filter. Defaults to '2h'.
resample_period (str, optional): Resample to regular period. Defaults to '1min'.
interpolate_method (str, optional): interpolation for resampling. Defaults to 'pchip'.
Returns:
DataFrame: smoothed and resampled dataframe indexed by datetime
"""
dfb = df.resample(resample_period).bfill()
df = df.resample(resample_period).interpolate(method=interpolate_method)
df[dfb.iloc[:, 0].isna()] = np.nan
return cosine_lanczos(df, cutoff_period)
[docs]
@numba.jit(nopython=True)
def lmax(arr):
"""Local maximum: Returns value only when centered on maximum"""
idx = np.argmax(arr)
if idx == len(arr) / 2:
return arr[idx]
else:
return np.nan
[docs]
@numba.jit(nopython=True)
def lmin(arr):
"""Local minimum: Returns value only when centered on minimum"""
idx = np.argmin(arr)
if idx == len(arr) / 2:
return arr[idx]
else:
return np.nan
[docs]
def periods_per_window(moving_window_size: str, period_str: str) -> int:
"""Number of period size in moving window
Args:
moving_window_size (str): moving window size as a string e.g 7H for 7 hour
period_str (str): period as str e.g. 1T for 1 min
Returns:
int: number of periods in the moving window rounded to an integer
"""
return int(
pd.Timedelta(moving_window_size)
/ pd.to_timedelta(pd.tseries.frequencies.to_offset(period_str))
)
[docs]
def tidal_highs(df, moving_window_size="7h"):
"""Tidal highs (could be upto two highs in a 25 hr period)
Args:
df (DataFrame): a time series with a regular frequency
moving_window_size (str, optional): moving window size to look for highs within. Defaults to '7h'.
Returns:
DataFrame: an irregular time series with highs at resolution of df.index
"""
period_str = df.index.freqstr
periods = periods_per_window(moving_window_size, period_str)
dfmax = df.rolling(moving_window_size, min_periods=periods).apply(lmax, raw=True)
dfmax = dfmax.shift(periods=-(periods // 2 - 1))
dfmax = dfmax.dropna()
dfmax.columns = ["max"]
return dfmax
[docs]
def tidal_lows(df, moving_window_size="7h"):
"""Tidal lows (could be upto two lows in a 25 hr period)
Args:
df (DataFrame): a time series with a regular frequency
moving_window_size (str, optional): moving window size to look for lows within. Defaults to '7h'.
Returns:
DataFrame: an irregular time series with lows at resolution of df.index
"""
period_str = df.index.freqstr
periods = periods_per_window(moving_window_size, period_str)
dfmin = df.rolling(moving_window_size, min_periods=periods).apply(lmin, raw=True)
dfmin = dfmin.shift(periods=-(periods // 2 - 1))
dfmin = dfmin.dropna()
dfmin.columns = ["min"]
return dfmin
[docs]
def get_tidal_hl(
df,
cutoff_period="2h",
resample_period="1min",
interpolate_method="pchip",
moving_window_size="7h",
):
"""Get Tidal highs and lows
Args:
df (DataFrame): A single column dataframe indexed by datetime
cutoff_period (str, optional): cutoff period for cosine lanczos filter. Defaults to '2h'.
resample_period (str, optional): Resample to regular period. Defaults to '1min'.
interpolate_method (str, optional): interpolation for resampling. Defaults to 'pchip'.
moving_window_size (str, optional): moving window size to look for lows within. Defaults to '7h'.
Returns:
tuple of DataFrame: Tidal high and tidal low time series
"""
dfs = get_smoothed_resampled(df, cutoff_period, resample_period, interpolate_method)
return tidal_highs(dfs), tidal_lows(dfs)
get_tidal_hl_rolling = get_tidal_hl # for older refs. #FIXME
[docs]
def get_tidal_amplitude(dfh, dfl):
"""Tidal amplitude given tidal highs and lows
Args:
dfh (DataFrame): Tidal highs time series
dfl (DataFrame): Tidal lows time series
Returns:
DataFrame: Amplitude timeseries, at the times of the low following the high being used for amplitude calculation
"""
dfamp = pd.concat([dfh, dfl], axis=1)
dfamp = dfamp[["min"]].dropna().join(dfamp[["max"]].ffill())
return pd.DataFrame(dfamp["max"] - dfamp["min"], columns=["amplitude"])
[docs]
def get_tidal_amplitude_diff(dfamp1, dfamp2, percent_diff=False, tolerance="4h"):
"""Get the difference of values within +/- 4H of values in the two amplitude arrays
Args:
dfamp1 (DataFrame): Amplitude time series
dfamp2 (DataFrame): Amplitude time series
percent_diff (bool, optional): If true do percent diff. Defaults to False.
Returns:
DataFrame: Difference dfamp1-dfamp2 or % Difference (dfamp1-dfamp2)/dfamp2*100 for values within +/- 4H of each other
"""
dfamp = pd.merge_asof(
dfamp1,
dfamp2,
left_index=True,
right_index=True,
direction="nearest",
tolerance=pd.Timedelta(tolerance),
)
if percent_diff:
dfdiff = 100.0 * (dfamp.iloc[:, 0] - dfamp.iloc[:, 1]) / dfamp.iloc[:, 1]
else:
dfdiff = dfamp.iloc[:, 0] - dfamp.iloc[:, 1]
return pd.DataFrame(dfdiff, columns=["amplitude_diff"])
[docs]
def get_phase_diff(df1, df2, tolerance="4h"):
df1["time"] = df1.index
df2["time"] = df2.index
df21 = pd.merge_asof(
df2,
df1,
left_index=True,
right_index=True,
direction="nearest",
tolerance=pd.Timedelta(tolerance),
)
return (df21["time_x"] - df21["time_y"]).apply(lambda x: x.total_seconds() / 60)
[docs]
def get_tidal_phase_diff(dfh2, dfl2, dfh1, dfl1, tolerance="4h"):
"""Calculates the phase difference between df2 and df1 tidal highs and lows
Scans +/- 4 hours in df1 to get the highs and lows in that windows for df2 to
get the tidal highs and lows at the times of df1
Args:
dfh2 (DataFrame): Timeseries of tidal highs
dfl2 (DataFrame): Timeseries of tidal lows
dfh1 (DataFrame): Timeseries of tidal highs
dfl1 (DataFRame): Timeseries of tidal lows
Returns:
DataFrame: Phase difference (dfh2-dfh1) and (dfl2-dfl1) in minutes
"""
high_phase_diff = get_phase_diff(dfh2, dfh1, tolerance)
low_phase_diff = get_phase_diff(dfl2, dfl1, tolerance)
merged_diff = pd.merge(
pd.DataFrame(high_phase_diff, index=dfh1.index),
pd.DataFrame(low_phase_diff, index=dfl1.index),
how="outer",
left_index=True,
right_index=True,
)
return merged_diff.iloc[:, 0].fillna(merged_diff.iloc[:, 1])
[docs]
def get_tidal_hl_zerocrossing(df, round_to="1min"):
"""
Finds the tidal high and low times using zero crossings of the first derivative.
This works for all situations but is not robust in the face of noise and perturbations in the signal
"""
zc, zi = zerocross(df)
if round_to:
zc = pd.to_datetime(zc).round(round_to)
return zc
[docs]
def zerocross(df):
"""
Calculates the gradient of the time series and identifies locations where gradient changes sign
Returns the time rounded to nearest minute where the zero crossing happens (based on linear derivative assumption)
"""
diffdfv = pd.Series(np.gradient(df[df.columns[0]].values), index=df.index)
indi = np.where((np.diff(np.sign(diffdfv))) & (diffdfv[1:] != 0))[0]
# Find the zero crossing by linear interpolation
zdb = diffdfv[indi].index
zda = diffdfv[indi + 1].index
x = diffdfv.index
y = diffdfv.values
dx = x[indi + 1] - x[indi]
dy = y[indi + 1] - y[indi]
zc = -y[indi] * (dx / dy) + x[indi]
return zc, indi
##---- FUNCTIONS CACHED BELOW THIS LINE PERHAPS TO USE LATER? ---#
[docs]
def where_changed(df):
""" """
diff = np.diff(df[df.columns[0]].values)
wdiff = np.where(diff != 0)[0]
wdiff = np.insert(wdiff, 0, 0) # insert the first value i.e. zero index
return df.iloc[wdiff + 1, :]
[docs]
def where_same(dfg, df):
"""
return dfg only where its value is the same as df for the same time stamps
i.e. the interesection locations with df
"""
dfall = pd.concat([dfg, df], axis=1)
return dfall[dfall.iloc[:, 0] == dfall.iloc[:, 1]].iloc[:, 0]
[docs]
def limit_to_indices(df, si, ei):
return df[(df.index > si) & (df.index < ei)]
[docs]
def filter_where_na(df, dfb):
"""
remove values in df where dfb has na values
"""
dfx = dfb.loc[df.index]
return df.loc[dfx.dropna().index, :]