Source code for vtools.data.timeseries

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Time series module
Helpers for creating regular and irregular time series, transforming irregular to regular
and analyzing gaps.
"""
import sys
import numpy as np
import pandas as pd
import datetime as _datetime
from typing import Union, Sequence, Mapping, Callable
from numbers import Number as numtype
from vtools.data.vtime import *


__all__ = [
    "to_dataframe",
    "rename_columns",
    "time_overlap",
    "rts",
    "rts_formula",
    "extrapolate_ts",
    "datetime_elapsed",
    "elapsed_datetime",
    "to_dataframe",
    "is_regular",
]


[docs] def to_dataframe(ts): if isinstance(ts, pd.DataFrame): return ts else: return ts.to_frame()
[docs] def rename_columns( ts: Union[pd.Series, pd.DataFrame], colnames: Union[str, Sequence[str], Mapping[str, str], Callable[[str], str]], convert_df: bool = True ) -> Union[pd.Series, pd.DataFrame]: """ Rename columns (for DataFrame) or the name (for Series). Parameters ---------- ts pandas Series or DataFrame to rename. colnames - str * Series: set `Series.name` to this value. * DataFrame: treated as a single target name; the DataFrame must have exactly one column. Raises if there are multiple columns. - Sequence[str] * Series: must be length 1; that single value becomes `Series.name`. * DataFrame: length must equal the number of columns; these become the new column names in order. - Mapping[str, str] * DataFrame: passed to `DataFrame.rename(columns=...)`. * Series: if the current `Series.name` is a key, it is mapped to the value. If not present, the name is left unchanged. Mapping with `None` can be used to rename a nameless series. - Callable[[str], str] * DataFrame: applied to each column name (via `rename`). * Series: called with the current `Series.name` to compute the new name. convert_df : bool, default True If True and `ts` is a Series, convert it to a single-column DataFrame before renaming. Returns ------- pandas Series or DataFrame A copy of `ts` with updated name(s). Raises ------ TypeError If `ts` is not a Series or DataFrame, or `colnames` has an unsupported type. ValueError If a provided list length does not match the number of columns (DataFrame) or is not exactly 1 (Series), or if a single string is given for a multi-column DataFrame. """ if isinstance(ts,pd.Series) and convert_df: ts = ts.to_frame() if isinstance(ts, pd.DataFrame): # Disallow MultiIndex columns in this helper if isinstance(ts.columns, pd.MultiIndex): raise TypeError("rename_columns does not support MultiIndex columns") if isinstance(colnames, str): if ts.shape[1] != 1: raise ValueError( "rename_columns: single string provided for a DataFrame with " f"{ts.shape[1]} columns; provide a list of names or a mapper." ) out = ts.copy() out.columns = [colnames] return out if isinstance(colnames, Sequence) and not isinstance(colnames, (str, bytes)): colnames = list(colnames) if len(colnames) != ts.shape[1]: raise ValueError( f"rename_columns: list length ({len(colnames)}) must match DataFrame columns ({ts.shape[1]})" ) out = ts.copy() out.columns = colnames return out if callable(colnames) or isinstance(colnames, Mapping): return ts.rename(columns=colnames) raise TypeError("rename_columns: 'colnames' must be str, list/sequence, dict/mapping, or callable for DataFrame") elif isinstance(ts, pd.Series): out = ts.copy() if isinstance(colnames, str): out.name = colnames return out if isinstance(colnames, Sequence) and not isinstance(colnames, (str, bytes)): colnames = list(colnames) if len(colnames) != 1: raise ValueError( f"rename_columns: list length for Series must be 1, got {len(colnames)}" ) out.name = colnames[0] return out if isinstance(colnames, Mapping): key = out.name if key in colnames: out.name = colnames[key] elif key is None and None in colnames: out.name = colnames[None] return out if callable(colnames): out.name = colnames(out.name) return out raise TypeError("rename_columns: 'colnames' must be str, list/sequence, dict/mapping, or callable for Series") else: raise TypeError("rename_columns: expected a pandas Series or DataFrame")
[docs] def time_overlap(ts0, ts1, valid=True): """Check for overlapping time coverage between series Returns a tuple of start and end of overlapping periods. Only considers the time stamps of the start/end, possibly ignoring NaNs at the beginning if valid=True, does not check for actual time stamp alignment """ if valid: start = max(ts0.first_valid_index(), ts1.first_valid_index()) end = min(ts0.last_valid_index(), ts1.last_valid_index()) else: start = max(ts0.index[0], ts1.index[0]) end = min(ts0.index[-1], ts1.index[-1]) return (start, end) if end > start else None
[docs] def rts(data, start, freq, columns=None, props=None): """Create a regular or calendar time series from data and time parameters Parameters ---------- data : array_like Should be a array/list of values. There is no restriction on data type, but not all functionality like addition or interpolation will work on all data. start : :class:`Pandas.Timestamp` Timestamp or a string or type that can be coerced to one. interval : _time_interval Can also be a string representing a pandas `freq`. Returns ------- result : :class:`Pandas.DataFrame` A regular time series with the `freq` attribute set """ if type(data) == list: data = np.array(data) if not props is None: raise NotImplementedError( "Props reserved for future implementation using xarray" ) tslen = data.shape[0] ndx = pd.date_range(start, freq=freq, periods=tslen) ts = pd.DataFrame(data, index=ndx, columns=columns) return ts
[docs] def rts_formula(start, end, freq, valfunc=np.nan): """Create a regular time series filled with constant value or formula based on elapsed seconds Parameters ---------- start : :class:`Pandas.Timestamp` Starting Timestamp or a string or type that can be coerced to one. end : :class:`Pandas.Timestamp` Ending Timestamp or a string or type that can be coerced to one. freq : _time_interval Can also be a string representing an interval. valfunc : dict Constant or dictionary that maps column names to lambdas based on elapsed time from the starts of the series. An example would be {"value": lambda x: np.nan} Returns ------- result : :class:`Pandas.DataFrame` A regular time series with the `freq` attribute set """ ndx = pd.date_range(start=start, end=end, freq=freq) secs = (ndx - ndx[0]).total_seconds() if isinstance(valfunc, numtype): data = np.array([valfunc for x in secs]) cols = ["value"] else: data = np.array([valfunc[x](secs) for x in valfunc]).T cols = valfunc.keys() ts = rts(data, start, freq, columns=cols) return ts
[docs] def extrapolate_ts(ts, start=None, end=None, method="ffill", val=None): """ Extend a regular time series to a new start and/or end using a specified extrapolation method. Parameters ---------- ts : pandas.Series or pandas.DataFrame The input time series with a DateTimeIndex and a regular frequency. start : datetime-like, optional The new starting time. If None, no extension is done before the existing data. end : datetime-like, optional The new ending time. If None, no extension is done after the existing data. method : {'ffill', 'bfill', 'linear_slope', 'taper', 'constant'}, default 'ffill' The method used to fill new values outside the original time range: - 'ffill' : Forward-fill after the original data using its last value. - 'bfill' : Backward-fill before the original data using its first value. - 'linear_slope' : Bidirectional linear extrapolation using the first/last two points. - 'taper' : One-sided linear interpolation to/from a specified value (`val`). - 'constant' : One-sided constant value fill with `val`. val : float, optional Required for 'taper' and 'constant'. Specifies the value to use. Returns ------- extended : pandas.Series or pandas.DataFrame The time series extended and filled using the selected method. Raises ------ ValueError - If extrapolation rules are violated based on the method. - If method requires or forbids `val` and it's misused. - If frequency cannot be inferred. """ if not isinstance(ts, (pd.Series, pd.DataFrame)): raise TypeError("Input must be a pandas Series or DataFrame") freq = ts.index.freq or pd.infer_freq(ts.index) if freq is None: raise ValueError("Time series must have a regular frequency") start = pd.to_datetime(start) if start else ts.index[0] end = pd.to_datetime(end) if end else ts.index[-1] full_index = pd.date_range(start=start, end=end, freq=freq) ts_full = ts.reindex(full_index) print(start, ts.index[0], end, ts.index[-1]) if method == "ffill": if start < ts.index[0]: raise ValueError("'ffill' not allowed when extending before start of data") ts_full.loc[ts.index[-1] :] = ts_full.loc[ts.index[-1] :].ffill() if isinstance(ts, pd.Series): return ts_full.astype(ts.dtype) else: return ts_full.astype(ts.dtypes.to_dict()) elif method == "bfill": if end > ts.index[-1]: raise ValueError("'bfill' not allowed when extending after end of data") ts_full.loc[: ts.index[0]] = ts_full.loc[: ts.index[0]].bfill() if isinstance(ts, pd.Series): return ts_full.astype(ts.dtype) else: return ts_full.astype(ts.dtypes.to_dict()) elif method == "linear_slope": if val is not None: raise ValueError("'linear_slope' does not use 'val'") if len(ts) < 2: raise ValueError( "At least 2 data points are required for slope-based extrapolation." ) result = ts_full.copy().astype(float).interpolate(method="time") idx = ts.index # Forward extrapolation slope_end = (ts.iloc[-1] - ts.iloc[-2]) / (idx[-1] - idx[-2]).total_seconds() late = result.index[result.index > idx[-1]] seconds_late = (late - idx[-1]).total_seconds() result.loc[late] = ts.iloc[-1] + slope_end * seconds_late # Backward extrapolation slope_start = (ts.iloc[1] - ts.iloc[0]) / (idx[1] - idx[0]).total_seconds() early = result.index[result.index < idx[0]] seconds_early = (idx[0] - early).total_seconds() result.loc[early] = ts.iloc[0] - slope_start * seconds_early return result if isinstance(ts, pd.Series) else result.to_frame(ts.columns[0]) elif method == "taper": if val is None: raise ValueError("Taper method requires 'val' to be specified.") if start < ts.index[0] and end > ts.index[-1]: raise ValueError("Taper method only supports one-sided extrapolation.") result = ts_full.copy().astype(float) if start < ts.index[0]: ramp_index = result.index[result.index < ts.index[0]] temp = pd.Series([val, ts.iloc[0]], index=[ramp_index[0], ts.index[0]]) filled = ( temp.reindex(ramp_index.union(temp.index)) .interpolate(method="time") .loc[ramp_index] ) result.loc[ramp_index] = filled.values elif end > ts.index[-1]: ramp_index = result.index[result.index > ts.index[-1]] temp = pd.Series([ts.iloc[-1], val], index=[ts.index[-1], ramp_index[-1]]) filled = ( temp.reindex(ramp_index.union(temp.index)) .interpolate(method="time") .loc[ramp_index] ) result.loc[ramp_index] = filled.values return result if isinstance(ts, pd.Series) else result.to_frame(ts.columns[0]) elif method == "constant": if val is None: raise ValueError("Constant method requires 'val' to be specified.") if start < ts.index[0] and end > ts.index[-1]: raise ValueError("Constant method only supports one-sided extrapolation.") result = ts_full.copy() if start < ts.index[0]: result.loc[result.index < ts.index[0]] = val if end > ts.index[-1]: result.loc[result.index > ts.index[-1]] = val if isinstance(result, pd.Series): return result.astype(ts.dtype) if not result.isna().any() else result else: return ( result.astype(ts.dtypes.to_dict()) if not result.isna().any().any() else result ) else: raise ValueError(f"Unknown method: {method}")
[docs] def datetime_elapsed(index_or_ts, reftime=None, dtype="d", inplace=False): """Convert a time series or DatetimeIndex to an integer/double series of elapsed time Parameters ---------- index_or_ts : :class:`DatatimeIndex <pandas:pandas.DatetimeIndex> or :class:`DataFrame <pandas:pandas.DataFrame>` Time series or index to be transformed reftime : :class:`DatatimeIndex <pandas:pandas.Timestamp>` or something convertible The reference time upon which elapsed time is measured. Default of None means start of series dtype : str like 'i' or 'd' or type like `int` (`Int64`) or `float` (`Float64`) Data type for output, which starts out as a Float64 ('d') and gets converted, typically to Int64 ('i') inplace : `bool` If input is a data frame, replaces the index in-place with no copy Returns ------- result : A new index using elapsed time from `reftime` as its value and of type `dtype` """ try: ndx = index_or_ts.index input_index = False except AttributeError as e: ndx = index_or_ts input_index = True if reftime is None: ref = ndx[0] else: ref = pd.Timestamp(reftime) elapsed = (ndx - ref).total_seconds().astype(dtype) if input_index: return elapsed if inplace: index_or_ts.index = elapsed return index_or_ts else: result = index_or_ts.copy() # Not sure of the merits of this relative to # result.index = ["elapsed"] = elapsed; result.reindex(key = 'elapsed',drop=True) result.index = elapsed return result
[docs] def elapsed_datetime(index_or_ts, reftime=None, time_unit="s", inplace=False): """Convert a time series or numerical Index to a Datetime index or series Parameters ---------- index_or_ts : :class:`DatatimeIndex <pandas:pandas.Int64Index> or float or TimedeltaIndex :class:`DataFrame <pandas:pandas.DataFrame>` Time series or index to be transformed with index in elapsed seconds from `reftime` reftime : :class:`DatatimeIndex <pandas:pandas.Timestamp>` or something convertible The reference time upon which datetimes are to be evaluated. inplace : `bool` If input is a data frame, replaces the index in-place with no copy Returns ------- result : A new index using DatetimeIndex inferred from elapsed time from `reftime` as its value and of type `dtype` """ try: ndx = index_or_ts.index input_index = False except AttributeError as e: ndx = index_or_ts input_index = True if isinstance(ndx, pd.TimedeltaIndex): dtndx = reftime + ndx else: if time_unit.lower() == "h": ndx = ndx * 3600.0 elif time_unit.lower() == "d": ndx = ndx * 86400.0 elif time_unit.lower() == "s": pass else: raise ValueError("time unit must be 's','h',or 'd'") dtndx = reftime + pd.to_timedelta(ndx, unit="s") if input_index: return dtndx if inplace: index_or_ts.index = dtndx return index_or_ts else: result = index_or_ts.copy() # Not sure of the merits of this relative to # result.index = ["elapsed"] = elapsed; result.reindex(key = 'elapsed',drop=True) result.index = dtndx return result
[docs] def is_regular(ts, raise_exception=False): """ Check if a pandas DataFrame, Series, or xarray object with a time axis (axis 0) has a regular time index. Regular means: - The index is unique. - The index equals a date_range spanning from the first to the last value with the inferred frequency. Parameters: ts : DataFrame, Series, or xarray object. Series to evaluate raise_exception : bool If True, raises a ValueError when the index is not regular. Otherwise, returns False. Returns: bool : True if the time index is regular; False otherwise. """ # Determine the index from the object if hasattr(ts, "index"): idx = ts.index # For xarray objects, assume the first dimension is time. elif hasattr(ts, "coords") and ts.dims: time_dim = ts.dims[0] # Try to convert coordinate to a pandas Index coord = ts.coords[time_dim] if hasattr(coord, "to_index"): idx = coord.to_index() else: idx = pd.Index(coord.values) else: msg = "The provided object does not have an accessible time index." if raise_exception: raise ValueError(msg) return False # An empty or single-element index is considered regular. if len(idx) == 0 or len(idx) == 1: return True # Check if the index has duplicate values. if not idx.is_unique: msg = "Index contains duplicate values." if raise_exception: raise ValueError(msg) return False # Ensure we are working with a DatetimeIndex. If not, attempt conversion. if not isinstance(idx, pd.DatetimeIndex): try: idx = pd.to_datetime(idx) except Exception as e: msg = "Index could not be converted to datetime." if raise_exception: raise ValueError(msg) from e return False # Attempt to get the frequency. First check the .freq attribute. freq = idx.freq # If not set, try to infer it. This can often produce false negatives with messy data # but will not fail in this case because every timestamp is checked if freq is None: freq = pd.infer_freq(idx) if freq is None: msg = "Could not infer a frequency from the index; it may not be regular." if raise_exception: raise ValueError(msg) return False # Build the expected index using the determined frequency. expected_index = pd.date_range(start=idx[0], end=idx[-1], freq=freq) if not expected_index.equals(idx): msg = "Index is not regular based on the inferred frequency." if raise_exception: raise ValueError(msg) return False return True
[docs] def example(): ndx = pd.date_range(pd.Timestamp(2017, 1, 1, 12), freq="15min", periods=10) out = datetime_elapsed(ndx, dtype="i") print(out) print(type(out)) vals = np.arange(0.0, 10.0, dtype="d") df = pd.DataFrame({"vals": vals}, index=ndx.copy()) ref = pd.Timestamp(2017, 1, 1, 11, 59) df2 = datetime_elapsed(df, reftime=ref, dtype=int) print(elapsed_datetime(df2, reftime=ref) - df)
if __name__ == "__main__": example()