import pandas as pd
import numpy as np
from functools import reduce
from vtools.functions.colname_align import align_inputs_strict
from vtools.data.gap import gap_distance
from vtools.functions.merge import _reindex_to_continuous
__all__ = ["ts_blend"]
[docs]
def _distance_to_gap(hi_col: pd.Series, mode: str = "count") -> pd.Series:
"""
Distance to nearest gap (NaN) in hi_col.
Parameters
----------
hi_col : Series
Higher-priority series.
mode : {'count', 'freq'}
'count' -> distance in # of samples (0 at gaps).
'freq' -> distance as Timedelta, using hi_col.index.freq.
Returns
-------
Series
Same index as hi_col, distance to nearest NaN.
"""
idx = hi_col.index
n = len(idx)
mask = hi_col.isna().to_numpy()
# No gaps -> everything is effectively "far away"
if not mask.any():
dist = np.full(n, np.inf, dtype=float)
return pd.Series(dist, index=idx)
dist = np.full(n, np.inf, dtype=float)
# Forward pass: distance from the last gap
last_gap = None
for i in range(n):
if mask[i]:
dist[i] = 0.0
last_gap = i
elif last_gap is not None:
dist[i] = float(i - last_gap)
# Backward pass: distance from the next gap
last_gap = None
for i in range(n - 1, -1, -1):
if mask[i]:
last_gap = i
elif last_gap is not None:
dist[i] = min(dist[i], float(last_gap - i))
dist_s = pd.Series(dist, index=idx)
if mode == "count":
return dist_s
if mode == "freq":
freq = idx.freq
if freq is None:
raise ValueError("Time-based blending requires a regular index with .freq set.")
if isinstance(freq, str):
try:
freq_delta = pd.to_timedelta(1, unit=freq)
except ValueError as exc:
raise ValueError(
"Time-based blending requires a frequency that can be converted to a Timedelta."
) from exc
else:
try:
freq_delta = pd.Timedelta(freq)
except (TypeError, ValueError) as exc:
raise ValueError(
"Time-based blending requires a frequency that can be converted to a Timedelta."
) from exc
# counts * freq → Timedelta
return dist_s * freq_delta
raise ValueError("mode must be 'count' or 'freq'")
[docs]
def _normalize_blend_length(blend_length, index):
"""
Interpret blend_length as sample count or time span.
Returns
-------
(mode, L)
mode : {'count', 'freq'} or None
L : numeric (count) or Timedelta
"""
if blend_length is None:
return None, None
# Integer: number of samples
if isinstance(blend_length, (int, np.integer)):
if blend_length <= 0:
return None, None
return "count", float(blend_length)
# Timedelta-like: e.g. '2H', '30min'
td = pd.to_timedelta(blend_length)
if not isinstance(index, (pd.DatetimeIndex, pd.PeriodIndex)):
raise ValueError("Time-based blend_length requires a DatetimeIndex or PeriodIndex.")
if index.freq is None:
raise ValueError("Time-based blend_length requires a regular index with a .freq attribute.")
if td <= pd.Timedelta(0):
return None, None
return "freq", td
[docs]
def _blend_two(
aligned_hi: pd.DataFrame,
aligned_lo: pd.DataFrame,
blend_mode: str,
blend_L,
) -> pd.DataFrame:
"""
Blend a lower-priority DataFrame into a higher-priority DataFrame.
Parameters
----------
aligned_hi, aligned_lo : DataFrame
Same index. Higher priority is 'aligned_hi'.
blend_mode : {'count', 'freq'} or None
blend_L : float or Timedelta
Returns
-------
DataFrame
Blended result.
"""
# No blending requested → just do priority overlay
if blend_mode is None or blend_L is None:
return aligned_hi.combine_first(aligned_lo)
idx = aligned_hi.index
out = aligned_hi.copy()
cols = sorted(set(aligned_hi.columns) | set(aligned_lo.columns))
for col in cols:
hi_col = aligned_hi[col] if col in aligned_hi.columns else pd.Series(index=idx, dtype=float)
lo_col = aligned_lo[col] if col in aligned_lo.columns else pd.Series(index=idx, dtype=float)
hi_nan = hi_col.isna()
lo_nan = lo_col.isna()
# Priority baseline: hi where present, otherwise lo
merged = hi_col.copy()
fill_mask = hi_nan & (~lo_nan)
merged[fill_mask] = lo_col[fill_mask]
# Distance to nearest gap in the *high-priority* series
dist_to_gap = _distance_to_gap(
hi_col,
mode="count" if blend_mode == "count" else "freq",
)
# Candidate points for blending on the shoulders of gaps:
# - hi has data
# - lo has data
near_gap = (~hi_nan) & (~lo_nan)
if blend_mode == "count":
near_gap &= (dist_to_gap > 0) & (dist_to_gap <= blend_L)
if not near_gap.any():
out[col] = merged
continue
d = dist_to_gap[near_gap].astype(float)
t = (blend_L - d) / blend_L
else: # 'freq' mode (Timedelta)
near_gap &= (dist_to_gap > pd.Timedelta(0)) & (dist_to_gap <= blend_L)
if not near_gap.any():
out[col] = merged
continue
d = dist_to_gap[near_gap]
t = 1.0 - (d / blend_L)
t = t.clip(lower=0.0, upper=1.0)
# Kernel: lower-priority gets up to 0.5 weight at the gap edge,
# tapering to 0 at distance >= blend_L.
w_lo = 0.5 * t
w_hi = 1.0 - w_lo
hi_vals = hi_col[near_gap].astype(float)
lo_vals = lo_col[near_gap].astype(float)
blended_vals = (
w_hi.to_numpy() * hi_vals.to_numpy()
+ w_lo.to_numpy() * lo_vals.to_numpy()
)
# IMPORTANT: use .loc with a boolean mask, not .at, so we never hit
# DataFrame._set_value with a non-scalar index.
merged.loc[near_gap] = blended_vals
out[col] = merged
return out
[docs]
@align_inputs_strict(seq_arg=0, names_kw="names")
def ts_blend(
series,
names=None,
blend_length=None,
):
"""
Blend multiple time series together, using higher priority where possible,
but ramping in lower-priority data near gaps in the higher-priority series.
Parameters
----------
series : sequence of pandas.Series or pandas.DataFrame
Higher priority first. All indexes must be DatetimeIndex or PeriodIndex.
names : None, str, or iterable of str, optional
Same semantics as ts_merge / ts_splice.
blend_length : int or Timedelta-like, optional
Controls the width of the blending zone around gaps in the
higher-priority series:
- If an integer `N` is given, then up to `N` samples on each side of
any gap in the higher-priority series will be blended using a kernel
based on the distance to the gap edge (in sample counts).
- If a Timedelta-like value (e.g. '2H', pd.Timedelta('30min')), then
a regular DatetimeIndex with `.freq` is required, and distances are
measured in time.
If None or non-positive, ts_blend behaves like a hard-priority merge
(equivalent to ts_merge with strict_priority=False).
Returns
-------
pandas.Series or pandas.DataFrame
A time series combining all inputs, with soft transitions near gaps.
"""
if not isinstance(series, (list, tuple)) or len(series) == 0:
raise ValueError("`series` must be a non-empty tuple or list")
if not all(
isinstance(getattr(s, "index", None), (pd.DatetimeIndex, pd.PeriodIndex))
for s in series
):
raise ValueError("All input series must have a DatetimeIndex or PeriodIndex.")
# Preserve first series freq (may be None)
first_freq = getattr(series[0].index, "freq", None)
# If any DataFrame is present, normalize all to DataFrame
any_df = any(isinstance(s, pd.DataFrame) for s in series)
if any_df:
series = [s.to_frame(name=s.name) if isinstance(s, pd.Series) else s for s in series]
all_df = all(isinstance(s, pd.DataFrame) for s in series)
any_series = any(isinstance(s, pd.Series) for s in series)
# Column compatibility checks similar to ts_merge
if all_df:
if names is None:
cols0 = list(series[0].columns)
for s in series[1:]:
if list(s.columns) != cols0:
raise ValueError(
"All input DataFrames must have the same columns when `names` is None."
)
elif any_df and any_series:
if names is None:
df_cols = {c for s in series if isinstance(s, pd.DataFrame) for c in s.columns}
for s in series:
if isinstance(s, pd.Series) and s.name not in df_cols:
raise ValueError(
"Mixed Series and DataFrames require Series names to match DataFrame columns."
)
# else: all Series → no column checks needed
# Build the union index, sorted in time order
full_index = series[0].index
for s in series[1:]:
full_index = full_index.union(s.index, sort=False)
full_index = full_index.sort_values()
# Normalize blend_length against the union index
blend_mode, blend_L = _normalize_blend_length(blend_length, full_index)
# Align all to union index and normalize to DataFrames
aligned = []
for s in series:
a = s.reindex(full_index)
if isinstance(a, pd.Series):
a = a.to_frame(name=a.name)
aligned.append(a.copy())
# Start from top priority and fold in lower priorities with blending
blended = aligned[0]
for lo in aligned[1:]:
blended = _blend_two(blended, lo, blend_mode, blend_L)
# If all inputs were univariate Series, return a Series
all_series = all(isinstance(s, pd.Series) for s in series)
if all_series:
blended = blended.squeeze()
elif isinstance(blended, pd.Series):
blended = blended.to_frame()
# Reindex to a continuous index using the first series' freq
blended = _reindex_to_continuous(blended, first_freq)
return blended