Source code for vtools.functions.envelope

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import PchipInterpolator
from scipy.signal import find_peaks, savgol_filter
from statsmodels.nonparametric.smoothers_lowess import lowess


__all__ = ["generate_simplified_mixed_tide", "tidal_envelope"]


[docs] def chunked_loess_smoothing(ts, window_hours=1.25, chunk_days=10, overlap_days=1): """ Apply LOESS smoothing in overlapping chunks to reduce computation time. Parameters ---------- ts : pd.Series Time series with datetime index and possible NaNs. window_hours : float LOESS smoothing window size in hours. chunk_days : int Core chunk size (e.g., 10 days). overlap_days : int Overlap added before and after each chunk to avoid edge effects. Returns ------- pd.Series Smoothed series, NaNs where input is NaN or unsupported. """ freq = ts.index.freq or pd.infer_freq(ts.index) assert freq is not None, "Time index must have a frequency." dt = pd.to_timedelta(freq) points_per_hour = pd.Timedelta("1h") / dt # frac = window_hours * points_per_hour / len(ts) result = pd.Series(index=ts.index, dtype=float) chunk_len = int(chunk_days * 24 * points_per_hour) overlap_len = int(overlap_days * 24 * points_per_hour) for start in range(0, len(ts), chunk_len): i0 = max(0, start - overlap_len) i1 = min(len(ts), start + chunk_len + overlap_len) sub = ts.iloc[i0:i1] mask = sub.notna() if mask.sum() < 2: continue chunk_len_pts = sub[mask].shape[0] frac = min(1.0, (window_hours * points_per_hour) / max(chunk_len_pts, 1)) smoothed = lowess( sub[mask], sub[mask].index.view("int64"), frac=frac, return_sorted=False ) sub_smoothed = pd.Series(index=sub[mask].index, data=smoothed) # keep only the central region j0 = start j1 = min(start + chunk_len, len(ts)) result.iloc[j0:j1] = sub_smoothed.reindex(ts.iloc[j0:j1].index) return result
[docs] def generate_pink_noise(n, seed=None, scale=1.0): """ Generate pink (1/f) noise using the Voss-McCartney algorithm. Parameters ---------- n : int Number of samples to generate. seed : int or None Random seed for reproducibility. scale : float Standard deviation scaling factor for the noise. Returns ------- np.ndarray Pink noise signal of length n. """ if seed is not None: np.random.seed(seed) n_rows = int(np.ceil(np.log2(n))) array = np.random.randn(n_rows, n) array = np.cumsum(array, axis=1) weight = 2 ** np.arange(n_rows)[:, None] pink = np.sum(array / weight, axis=0) pink -= pink.mean() pink /= pink.std() return scale * pink[:n]
[docs] def generate_simplified_mixed_tide( start_time="2022-01-01", ndays=40, freq="15min", A_M2=1.0, A_K1=0.5, A_O1=0.5, phase_D1=3.14159 / 2.0, noise_amplitude=0.08, return_components=False, ): """ Generate a simplified synthetic mixed semidiurnal/diurnal tide with explicit O1 and K1. Parameters ---------- start_time : str Start time for the series. ndays : int Number of days. freq : str Sampling interval. A_M2 : float Amplitude of M2. A_K1 : float Amplitude of K1. A_O1 : float Amplitude of O1. phase_D1 : float Common phase shift for O1 and K1. return_components : bool Whether to return individual components. Returns ------- pd.Series or pd.DataFrame Combined tide or components with time index. """ index = pd.date_range( start=start_time, periods=int(ndays * 24 * 60 / pd.Timedelta(freq).seconds * 60), freq=freq, ) t_hours = (index - index[0]) / pd.Timedelta(hours=1) M2 = A_M2 * np.sin(2 * np.pi * t_hours / 12.42) K1 = A_K1 * np.sin(2 * np.pi * t_hours / 23.93 + phase_D1) O1 = A_O1 * np.sin(2 * np.pi * t_hours / 25.82 + phase_D1) K1 = A_K1 * np.sin(2 * np.pi * t_hours / 23.93 + phase_D1) O1 = A_O1 * np.sin(2 * np.pi * t_hours / 25.82 + phase_D1) noise = generate_pink_noise(len(index), scale=noise_amplitude) + np.random.normal( scale=noise_amplitude / 4, size=len(index) ) tide = M2 + K1 + O1 + noise if return_components: return pd.DataFrame( {"M2": M2, "K1": K1, "O1": O1, "noise": noise, "tide": tide}, index=index ) else: return pd.Series(tide, index=index, name="tide")
[docs] def smooth_series(ts, window_hours=1.75): return chunked_loess_smoothing( ts, window_hours=window_hours, chunk_days=10, overlap_days=1 )
[docs] def smooth_series2(series, window_pts=25, method="lowess", **kwargs): """ Smooth a time series using the specified method. Currently supports 'lowess', 'moving_average', or 'savgol'. """ if method == "lowess": x = (series.index - series.index[0]).total_seconds() y = series.values frac = min(1.0, window_pts / len(series)) y_smooth = lowess(y, x, frac=frac, return_sorted=False) return pd.Series(y_smooth, index=series.index) elif method == "moving_average": return series.rolling(window=window_pts, min_periods=1, center=True).mean() elif method == "savgol": # window length must be odd and greater than polyorder polyorder = kwargs.get("polyorder", 2) window = window_pts | 1 # make it odd if window <= polyorder: window = polyorder + 3 if (polyorder % 2 == 0) else polyorder + 2 y = series.bfill().values y_smooth = savgol_filter(y, window_length=window, polyorder=polyorder) return pd.Series(y_smooth, index=series.index) else: raise ValueError(f"Unknown smoothing method: {method}")
[docs] def find_raw_extrema(smoothed, prominence=0.01): """ Find raw peaks and troughs using scipy.signal.find_peaks. Returns DataFrames for peaks and troughs. """ y = smoothed.values peaks, _ = find_peaks(y, prominence=prominence) troughs, _ = find_peaks(-y, prominence=prominence) df_peaks = pd.DataFrame( {"time": smoothed.index[peaks], "value": y[peaks], "type": "high"} ) df_troughs = pd.DataFrame( {"time": smoothed.index[troughs], "value": y[troughs], "type": "low"} ) return df_peaks, df_troughs
[docs] def filter_extrema_ngood( extrema_df, smoothed, series, loess_window_pts=25, n_good=3, sig_gap_minutes=45 ): """ Filter extrema based on local and contextual data quality criteria. Parameters ---------- extrema_df : pd.DataFrame DataFrame with columns 'time' and 'value' for candidate extrema. smoothed : pd.Series Smoothed version of the signal used for extrema detection. series : pd.Series Original time series (with gaps). loess_window_pts : int Number of points in the LOESS window. n_good : int Minimum number of non-NaN points required. sig_gap_minutes : float Threshold for detecting significant gaps (in minutes). Returns ------- pd.DataFrame Filtered extrema DataFrame. """ sig_gap = pd.Timedelta(minutes=sig_gap_minutes) idx = series.index not_nan = ~series.isna() valid_times = idx[not_nan] # Identify significant gaps time_diffs = valid_times.to_series().diff() gap_starts = valid_times[time_diffs > sig_gap].tolist() gap_ends = valid_times[time_diffs.shift(-1) > sig_gap].tolist() idx_map = {t: i for i, t in enumerate(smoothed.index)} keep = [] for t in extrema_df["time"]: i = idx_map.get(t, None) if i is None: keep.append(False) continue # 1. Local LOESS support hw = loess_window_pts // 2 lo = max(0, i - hw) hi = min(len(smoothed), i + hw + 1) window_values = smoothed.iloc[lo:hi] local_ok = window_values.notna().sum() >= n_good # 2. Contextual support: enough valid data between nearest gap or boundary prev_gap = max( [g for g in gap_ends + [series.index[0]] if g < t], default=series.index[0] ) next_gap = min( [g for g in gap_starts + [series.index[-1]] if g > t], default=series.index[-1], ) n_before = valid_times[(valid_times > prev_gap) & (valid_times < t)] n_after = valid_times[(valid_times > t) & (valid_times < next_gap)] context_ok = (len(n_before) >= n_good) and (len(n_after) >= n_good) keep.append(local_ok and context_ok) return extrema_df[keep].reset_index(drop=True)
[docs] def select_salient_extrema(extrema, typ, spacing_hours=14, envelope_type="outer"): """ Select salient extrema (HH/LL or HL/LH) using literal spacing-based OR logic. Parameters ---------- extrema : pd.DataFrame with columns ["time", "value"] Candidate extrema. typ : str Either "high" or "low" (for peak or trough selection). spacing_hours : float Time window for neighbor comparison. envelope_type : str Either "outer" (default) or "inner" to switch saliency logic. Returns ------- pd.DataFrame Extrema that passed the saliency test. """ extrema = extrema.copy().sort_values("time").reset_index(drop=True) spacing = pd.Timedelta(hours=spacing_hours) passed_left = [] passed_right = [] kept = [] for i, row in extrema.iterrows(): t, v = row["time"], row["value"] left = extrema[(extrema["time"] < t) & (extrema["time"] >= t - spacing)] right = extrema[(extrema["time"] > t) & (extrema["time"] <= t + spacing)] if typ == "high": if envelope_type == "outer": left_ok = (left["value"] < v).any() right_ok = (right["value"] < v).any() else: left_ok = (left["value"] > v).any() right_ok = (right["value"] > v).any() else: if envelope_type == "outer": left_ok = (left["value"] > v).any() right_ok = (right["value"] > v).any() else: left_ok = (left["value"] < v).any() right_ok = (right["value"] < v).any() passed_left.append(left_ok) passed_right.append(right_ok) kept.append(left_ok or right_ok) extrema["passed_left"] = passed_left extrema["passed_right"] = passed_right extrema["kept"] = kept return ( extrema[extrema["kept"]] .drop(columns=["passed_left", "passed_right", "kept"]) .reset_index(drop=True) )
[docs] def interpolate_envelope(anchor_df, series, max_anchor_gap_hours=36): """ Interpolate envelope using PCHIP, breaking if anchor points are too far apart. """ if anchor_df.empty: return pd.Series(np.nan, index=series.index) times = pd.to_datetime(anchor_df["time"]) values = anchor_df["value"].values t0 = series.index[0] x = (times - t0).dt.total_seconds().values xi = (series.index - t0).total_seconds() breaks = np.where(np.diff(x) > max_anchor_gap_hours * 3600)[0] env = np.full(len(series.index), np.nan) seg_starts = np.concatenate(([0], breaks + 1)) seg_ends = np.concatenate((breaks, [len(x) - 1])) for s, e in zip(seg_starts, seg_ends): if e - s < 1: continue pchip = PchipInterpolator(x[s : e + 1], values[s : e + 1]) mask = (xi >= x[s]) & (xi <= x[e]) env[mask] = pchip(xi[mask]) env_series = pd.Series(env, index=series.index) env_series[series.isna()] = np.nan return env_series
# --- Main envelope extraction pipeline ---
[docs] def tidal_envelope( series, smoothing_window_hours=2.5, n_good=3, peak_prominence=0.05, saliency_window_hours=14, max_anchor_gap_hours=36, envelope_type="outer", ): """ Compute the tidal envelope (high and low) of a time series using smoothing, extrema detection, and interpolation. This function processes a time series to extract its tidal envelope by smoothing the data, identifying significant peaks and troughs, filtering out unreliable extrema, selecting salient extrema within a specified window, and interpolating between anchor points to generate continuous envelope curves. Parameters ---------- series : pandas.Series Time-indexed series of water levels or similar data. smoothing_window_hours : float, optional Window size in hours for smoothing the input series (default is 2.5). n_good : int, optional Minimum number of good points required for an extremum to be considered valid (default is 3). peak_prominence : float, optional Minimum prominence of peaks/troughs to be considered as extrema (default is 0.05). saliency_window_hours : float, optional Window size in hours for selecting salient extrema (default is 14). max_anchor_gap_hours : float, optional Maximum allowed gap in hours between anchor points for interpolation (default is 36). envelope_type : str, optional Type of envelope to compute, e.g., "outer" (default is "outer"). Returns ------- env_high : pandas.Series Interpolated high (upper) envelope of the input series. env_low : pandas.Series Interpolated low (lower) envelope of the input series. anchor_highs : pandas.DataFrame DataFrame of selected anchor points for the high envelope. anchor_lows : pandas.DataFrame DataFrame of selected anchor points for the low envelope. smoothed : pandas.Series Smoothed version of the input series. Notes ----- This function assumes regular time intervals in the input series. If the frequency cannot be inferred, it is estimated from the first two timestamps. """ # Smoothing freq = pd.infer_freq(series.index) if freq is None: freq = pd.Timedelta(series.index[1] - series.index[0]) else: freq = pd.Timedelta(freq) window_pts = int(smoothing_window_hours * 60 / (freq.total_seconds() / 60)) # smoothed = smooth_series(series, window_hours=smoothing_window_hours) smoothed = smooth_series2( series, window_pts=window_pts, method="savgol", polyorder=2 ) # Raw extrema df_peaks, df_troughs = find_raw_extrema(smoothed, prominence=peak_prominence) # Eliminate one-sided extrema near gaps/boundaries df_peaks = filter_extrema_ngood( df_peaks, smoothed, series, loess_window_pts=window_pts, n_good=n_good ) df_troughs = filter_extrema_ngood( df_troughs, smoothed, series, loess_window_pts=window_pts, n_good=n_good ) # Saliency selection anchor_highs = select_salient_extrema( df_peaks, "high", saliency_window_hours, envelope_type ) anchor_lows = select_salient_extrema( df_troughs, "low", saliency_window_hours, envelope_type ) # Interpolation env_high = interpolate_envelope(anchor_highs, series, max_anchor_gap_hours) env_low = interpolate_envelope(anchor_lows, series, max_anchor_gap_hours) return env_high, env_low, anchor_highs, anchor_lows, smoothed
[docs] def main(): components = generate_simplified_mixed_tide(return_components=True) tide = components["tide"].copy() tide.iloc[500:600] = np.nan tide.iloc[2000:2100] = np.nan env_high, env_low, anchor_highs, anchor_lows, smooth = tidal_envelope( tide, envelope_type="outer" ) env_high_in, env_low_in, anchor_highs_in, anchor_lows_in, _ = tidal_envelope( tide, envelope_type="inner" ) plt.figure(figsize=(12, 6)) plt.plot(tide, label="Noisy Gappy Tide", alpha=0.5) plt.plot(smooth, label="LOESS Smooth", color="gray", linestyle="--") plt.plot(env_high, label="Outer Upper Envelope (HHW)", color="red") plt.plot(env_low, label="Outer Lower Envelope (LLW)", color="blue") plt.plot(env_high_in, label="Inner Upper Envelope (HLW)", color="orange") plt.plot(env_low_in, label="Inner Lower Envelope (LHW)", color="purple") plt.scatter( anchor_highs["time"], anchor_highs["value"], color="red", marker="^", label="HHW", ) plt.scatter( anchor_lows["time"], anchor_lows["value"], color="blue", marker="v", label="LLW" ) plt.scatter( anchor_highs_in["time"], anchor_highs_in["value"], color="orange", marker="^", label="HLW", ) plt.scatter( anchor_lows_in["time"], anchor_lows_in["value"], color="purple", marker="v", label="LHW", ) plt.legend() plt.title("Tidal Envelope Extraction: Outer vs Inner") plt.tight_layout() plt.show()
if __name__ == "__main__": main()