#!/usr/bin/env python
# -*- coding: utf-8 -*-
__all__ = ["ts_merge", "ts_splice"]
import pandas as pd
import numpy as np
from functools import reduce
################ Helpers
[docs]
def _apply_names(result, names):
"""Helper to apply renaming and column selection based on `names`."""
if names:
if isinstance(result, pd.Series):
result.name = names
elif isinstance(names, str):
result = result.rename(columns={result.columns[0]: names})
elif hasattr(names, "__iter__"):
result = result[names]
return result
[docs]
def _reindex_to_continuous(result, first_freq):
"""
Reindex the given result (DataFrame or Series) to a continuous index spanning
from the minimum to maximum timestamp of the current index using the provided frequency.
Parameters
----------
result : pandas.DataFrame or pandas.Series
The merged or spliced result whose index will be reindexed.
first_freq : frequency
The frequency (e.g. 'D' for daily) to use for the continuous index, taken from the first input series.
Returns
-------
result : pandas.DataFrame or pandas.Series
The result reindexed to a continuous index. Any gaps in the timeline will be filled with NaN.
If the index is a PeriodIndex, the index is rebuilt with the proper frequency.
"""
if first_freq is None:
return result
start = result.index.min()
end = result.index.max()
if isinstance(result.index, pd.DatetimeIndex):
continuous_index = pd.date_range(start=start, end=end, freq=first_freq)
elif isinstance(result.index, pd.PeriodIndex):
continuous_index = pd.period_range(start=start, end=end, freq=first_freq)
else:
continuous_index = result.index # For other types, leave unchanged
result = result.reindex(continuous_index)
if isinstance(result.index, pd.PeriodIndex):
# For PeriodIndex, rebuild the index because .freq is read-only.
result.index = pd.PeriodIndex(result.index, freq=first_freq)
else:
try:
result.index.freq = first_freq
except ValueError:
result.index.freq = None
return result
#####################
[docs]
def ts_merge(series,
names=None,
strict_priority = False):
"""
Merge multiple time series together, prioritizing series in order.
Parameters
----------
series : sequence of pandas.Series or pandas.DataFrame
Higher priority first. All indexes must be DatetimeIndex.
names : None, str, or iterable of str, optional
- If `None` (default), inputs must share compatible column names.
- If `str`, the output is univariate and will be named accordingly.
- If iterable, it is used as a subset/ordering of columns.
strict_priority : bool, default False
If False (default): lower-priority data may fill NaNs in higher-priority
series anywhere (traditional merge/overlay).
If True: for each column, within the window
[first_valid_index, last_valid_index] of any higher-priority series,
lower-priority data are masked out — even if the higher-priority value is NaN.
Outside those windows, behavior is unchanged.
Returns
-------
pandas.Series or pandas.DataFrame
"""
# --- Input validation (messages match tests) ---
if not isinstance(series, (list, tuple)) or len(series) == 0:
raise ValueError("`series` must be a non-empty tuple or list")
if not all(isinstance(getattr(s, "index", None), pd.DatetimeIndex) for s in series):
raise ValueError("All input series must have a DatetimeIndex.")
# Preserve first series freq (may be None)
first_freq = getattr(series[0].index, "freq", None)
# If any DataFrame is present, convert Series->DataFrame to unify shapes
any_df = any(isinstance(s, pd.DataFrame) for s in series)
if any_df:
series = [s.to_frame(name=s.name) if isinstance(s, pd.Series) else s for s in series]
# Column compatibility checks (messages match tests)
all_df = all(isinstance(s, pd.DataFrame) for s in series)
any_df = any(isinstance(s, pd.DataFrame) for s in series)
any_series = any(isinstance(s, pd.Series) for s in series)
if all_df:
if names is None:
cols0 = list(series[0].columns)
for s in series[1:]:
if list(s.columns) != cols0:
raise ValueError(
"All input DataFrames must have the same columns when `names` is None."
)
elif any_df and any_series:
if names is None:
df_cols = {c for s in series if isinstance(s, pd.DataFrame) for c in s.columns}
for s in series:
if isinstance(s, pd.Series) and s.name not in df_cols:
raise ValueError(
"Mixed Series and DataFrames require Series names to match DataFrame columns."
)
# else: all Series → no column checks needed
# Build the union index, sorted in time order
full_index = series[0].index
for s in series[1:]:
full_index = full_index.union(s.index, sort=False)
full_index = full_index.sort_values()
# Align to union index and keep DataFrame shape for column-wise masking
aligned = []
for s in series:
a = s.reindex(full_index)
if isinstance(a, pd.Series):
a = a.to_frame(name=a.name)
aligned.append(a.copy())
# --- Strict priority dominance windows ---
if strict_priority:
# For each column, mask lower-priority data only within the dominance window
for col in set(c for a in aligned for c in a.columns):
# Find the highest-priority series that contains this column
for i, hi in enumerate(aligned):
if col in hi.columns:
s_col = hi[col]
lo_idx = s_col.first_valid_index()
hi_idx = s_col.last_valid_index()
break
else:
continue # column not present in any input
if lo_idx is None or hi_idx is None:
continue # no dominance window for this column
mask = (full_index >= lo_idx) & (full_index <= hi_idx)
for j in range(i + 1, len(aligned)):
if col in aligned[j].columns:
aligned[j].loc[mask, col] = np.nan # keep numeric dtype
# Combine in priority order via combine_first
merged = aligned[0]
for a in aligned[1:]:
merged = merged.combine_first(a)
# NEW: ensure the full union index is present (including all-NaN rows)
merged = merged.reindex(full_index)
# --- keep union, but drop masked lower-only rows inside dominance window ---
if strict_priority:
# Use the highest-priority input to define the dominance window (per-column union)
top = aligned[0] # DataFrame (we normalized earlier)
lo_candidates = [top[c].first_valid_index() for c in top.columns]
hi_candidates = [top[c].last_valid_index() for c in top.columns]
lo0 = min([x for x in lo_candidates if x is not None], default=None)
hi0 = max([x for x in hi_candidates if x is not None], default=None)
if lo0 is not None and hi0 is not None:
idx = merged.index
in_window = (idx >= lo0) & (idx <= hi0)
in_top_idx = idx.isin(series[0].index) # keep rows that are from the top series' index
# rows that are entirely NaN after strict masking
all_nan = merged.isna().all(axis=1) if isinstance(merged, pd.DataFrame) else merged.isna()
# Drop only those that are NaN, within the window, and not in the top index (i.e., lower-only timestamps)
drop_mask = in_window & (~in_top_idx) & all_nan
if drop_mask.any():
merged = merged.loc[~drop_mask]
# If all inputs were univariate Series, return a Series
all_series = all(isinstance(s, pd.Series) for s in series)
if all_series:
merged = merged.squeeze()
else:
if isinstance(merged, pd.Series):
merged = merged.to_frame()
# Apply naming / selection consistent with your helpers
merged = _apply_names(merged, names) # uses your existing helper
# Reindex to a continuous index using the first series' freq (your helper)
merged = _reindex_to_continuous(merged, first_freq)
return merged
[docs]
def ts_splice(series, names=None, transition="prefer_last", floor_dates=False):
"""
Splice multiple time series together, prioritizing series in patches of time.
Unlike `ts_merge`, which blends overlapping data points, `ts_splice` stitches
together time series without overlap. The function determines when to switch
between series based on a transition strategy.
Parameters
----------
series : tuple or list of pandas.DataFrame or pandas.Series
A tuple or list of time series.
Each series must have a `DatetimeIndex` and consistent column structure.
names : None, str, or iterable of str, optional
- If `None` (default), all input series must share common column names,
and the output will merge common columns.
- If a `str`, all input series must have a **single column**, and the
output will be a DataFrame with this name as the column name.
- If an iterable of `str`, all input DataFrames must have the same
number of columns matching the length of `names`, and these will be
used for the output.
transition : {'prefer_first', 'prefer_last'} or list of pandas.Timestamp
Defines how to determine breakpoints between time series:
- `'prefer_first'`: Uses the earlier series on the list during until its valid timestamp.
- `'prefer_last'`: Uses the later series starting from its first valid timestamp.
- A list of specific timestamps can also be provided as transition points.
floor_dates : bool, optional, default=False
If `True`, inferred transition timestamps (`prefer_first` or `prefer_last`)
are floored to the beginning of the day. This can introduce NaNs if the
input series are regular with a `freq` attribute.
Returns
-------
pandas.DataFrame or pandas.Series
- If the input contains multi-column DataFrames, the output is a DataFrame
with the same column structure.
- If a collection of single-column `Series` is provided, the output will be
a Series.
- The output retains a `freq` attribute if all inputs share the same frequency.
Notes
-----
- The output time index is the union of input time indices.
- If `transition` is `'prefer_first'`, gaps may appear in the final time series.
- If `transition` is `'prefer_last'`, overlapping data is resolved in favor
of later series.
See Also
--------
ts_merge : Merges series by filling gaps in order of priority.
"""
series = [s.copy() for s in series]
if not isinstance(series, (tuple, list)) or len(series) == 0:
raise ValueError(
"`series` must be a non-empty tuple or list of pandas.Series or pandas.DataFrame."
)
if not all(
isinstance(s.index, pd.DatetimeIndex) or isinstance(s.index, pd.PeriodIndex)
for s in series
):
raise ValueError("All input series must have a DatetimeIndex or PeriodIndex.")
# Ensure all input series have the same type of index.
index_type = type(series[0].index)
if not all(isinstance(s.index, index_type) for s in series):
raise ValueError(
f"All input series must have indexes of type {index_type.__name__}."
)
if transition not in ["prefer_last", "prefer_first"] and not isinstance(
transition, list
):
raise ValueError(
"`transition` must be 'prefer_last', 'prefer_first', or a list of timestamps."
)
# Determine if frequency can be preserved.
first_freq = series[0].index.freq
same_freq = all(
s.index.freq == first_freq for s in series if s.index.freq is not None
)
# If names is a string, pre-rename each series for consistency.
if names and isinstance(names, str):
series = [
(
s.rename(columns={s.columns[0]: names})
if isinstance(s, pd.DataFrame)
else s.rename(names)
)
for s in series
]
# Compute transition points.
if isinstance(transition, list):
transition_points = transition
duplicate_keep = "first"
else:
if transition == "prefer_first":
transition_points = [s.last_valid_index() for s in series[:-1]]
duplicate_keep = "first"
elif transition == "prefer_last":
transition_points = [s.first_valid_index() for s in series[1:]]
duplicate_keep = "last"
if floor_dates:
transition_points = [dt.floor("d") for dt in transition_points]
transition_points = [None] + transition_points + [None]
# Extract sections based on transition points.
sections = []
for ts_obj, start, end in zip(
series, transition_points[:-1], transition_points[1:]
):
section = ts_obj.loc[start:end]
if not section.empty:
sections.append(section)
spliced = pd.concat(sections, axis=0).sort_index() if sections else pd.DataFrame()
# If all inputs were univariate, ensure output remains univariate.
univariate = all(
(s.name is not None if isinstance(s, pd.Series) else s.shape[1] == 1)
for s in series
)
if univariate and isinstance(spliced, pd.DataFrame):
spliced = spliced.iloc[:, 0]
# Normalize output type.
if all(isinstance(s, pd.Series) for s in series):
spliced = spliced.squeeze()
else:
if isinstance(spliced, pd.Series):
spliced = spliced.to_frame()
# Remove duplicate timestamps based on transition preference.
dup = spliced.index.duplicated(keep=duplicate_keep)
if dup.any():
spliced = spliced[~dup]
# Apply renaming/column selection.
spliced = _apply_names(spliced, names)
# ***** NEW STEP: Reindex to a continuous index *****
spliced = _reindex_to_continuous(spliced, first_freq)
return spliced