Source code for vtools.functions.interpolate

"""
Module for data interpolation using splines or interfaces unavailable in Pandas.
"""

import numpy as np
import pandas as pd
from vtools.functions._monotonic_spline import _monotonic_spline

__all__ = ["rhistinterp", "monotonic_spline"]


[docs] def interpolate_to_index(df, dest): return df.reindex(df.index.union(dest)).interpolate(method="time").reindex(dest)
[docs] def rhistinterp(ts, dest, p=2.0, lowbound=None, tolbound=1.0e-3, maxiter=5): """Interpolate a regular time series (rts) to a finer rts by rational histospline. The rational histospline preserves area under the curve. This is a good choice of spline for period averaged data where an interpolant is desired that is 'conservative'. Note that it is the underlying continuous interpolant that will be 'conservative though, not the returned discrete time series which merely samples the underlying interpolant. Parameters ---------- ts : :class:`Pandas.DataFrame` Series to be interpolated, with period index and assuming time stamps at beginning of the period and no missing data dest : string or :class:`DateTimeIndex <pandas:pandas.DateTimeIndex>` A pandas freq code (e.g. '16min' or 'D') or a DateTimeIndex p : float, optional Spline tension, usually between 0 and 20. Must >-1. For a 'sufficiently large' value of p, the interpolant will be monotonicity-preserving and will maintain strict positivity (always being strictly > `lowbound`). It will also preserve the original shape of the time series. lowbound : float, optional Lower bound of interpolated values. tolbound : float, optional Tolerance for determining if an input is on the bound. Returns ------- result : :class:`pandas:pandas.DataFrame` A regular time series with same columns as ts, populated with instantaneous values and with an index of type DateTimeIndex """ # Get the source index including two endpoints # todo: easier way? try: ndx_left = ts.index.to_timestamp(how="s") except AttributeError: raise ValueError("Time series to interpolate must have PeriodIndex") ndx_right = ts.index.to_timestamp(how="e").round("s") ndx = ndx_left.union(ndx_right) strt = ndx[0] x = (ndx - strt).total_seconds().to_numpy() if not isinstance(dest, pd.Index): end = ndx[-1].floor(dest) dest = pd.date_range(start=strt, end=end, freq=dest) xnew = (dest - strt).total_seconds().to_numpy() try: cols = ts.columns except: cols = None if cols is None: y = ts.to_numpy() out = rhist_bound( x, y, xnew, y0=y[0], yn=y[-1], lbound=lowbound, p=p, maxiter=maxiter ) result = pd.Series(data=out, index=dest) else: def col_interpolator(col, x, xnew, dest, lbound, p, maxiter): print(col.shape) # x = (ndx - strt).total_seconds().to_numpy() y = col return rhist_bound( x, y, xnew=xnew, y0=y[0], yn=y[-1], lbound=lowbound, p=p, maxiter=maxiter, ) res = [] for col in cols: y = ts[col].to_numpy() interpolated = rhist_bound( x, y, xnew=xnew, y0=y[0], yn=y[-1], lbound=lowbound, p=p, maxiter=maxiter, ) result = pd.Series(data=interpolated, index=dest, name=col) res.append(result) result = pd.concat(res, axis=1) # result = ts.apply(col_interpolator,raw=True, # x=x,xnew=xnew,dest=dest,lbound=lowbound,p=p,maxiter=maxiter) # result = pd.DataFrame([],index=dest) result.index = dest return result
[docs] def rhist_bound( x, y, xnew, y0, yn, p, lbound=None, maxiter=5, pfactor=2, floor_eps=1e-3 ): """Numpy implementation of histospline with bounds Histopline for arrays with lower bound enforcement. This routine drives rhist() but tests that the output array observes the lower bound and adapts the tension parameters as needed. This will not work exactly if the input array has values right on the lower bound. In this case, the parameter floor_eps allows you to specify a tolerance of bound violation to shoot for ... and if it isn't met in maxiter iterations the value is simply floored. Parameters ---------- x : array-like Abscissa array of original data to be interpolated, of length n y : array-like, dimension (n-1) Values (mantissa) of original data giving the rectangle (average) values between x[i] and x[i+1] xnew : array-like Array of new locations at which to interpolate. y0,yn : float Initial and terminal values p: float Tension parameter. This starts out as a global scalar, but will be converted to an array and adapted locally. The higher this goes for a particular x interval, the more rectangular the interpolant will look and the more positivity and shape preserving it is at the expense of accuracy. A good number is 1, and for this routine, p > 0 is required because the adaptive process multiplies it by pfactor each iteration on the expectation that it will get bigger. lbound: float Lower bound to be enforced. If the original y's are strictly above this value, the output has the potential to also be strictly above. If the original y's lie on the lower bound, then the lower bound can only be enforced within a tolerance using the Spath algorithm ... and once the values reach that tolerance they are floored. If lbound = None, this function behaves like rhist() maxiter : integer Number of times to increase p by multiplying it by pfactor before giving up on satisfying floor_eps. pfactor : float Factor by which to multiply individual time step p floor_eps : float Tolerance for lower bound violation at which the algorithm will be terminated and the bounds will be enforced by flooring. Returns ------- ynew : array-like Array that interpolates the original data, on a curve that conserves mass and strictly observes the lower bound. """ if type(p) in (int, float): p = np.ones_like(y, dtype=float) * p q = p ynew = rhist(x, y, xnew, y0, yn, p, q) if lbound != None: iter = 0 bound_viol = lbound - ynew.min() while bound_viol > floor_eps and iter < maxiter: iter += 1 # print ("Iteration {}".format(iter)) # print ("Bound gap = {}".format((ynew.min() - lbound))) xbad = xnew[ynew < lbound] bad_ndx_left = ( np.minimum(np.searchsorted(x, xbad, side="right"), len(x) - 1) - 1 ) p[bad_ndx_left] *= pfactor q[bad_ndx_left] *= pfactor ynew = rhist(x, y, xnew, y0, yn, p, q) bound_viol = lbound - ynew.min() if bound_viol <= floor_eps: if bound_viol < 0.0: return ynew else: return np.maximum(ynew, lbound) else: print(ynew.min()) raise Exception( "Failed to meet lower bound criterion within maxiter iterations" ) return ynew
[docs] def rhist(x, y, xnew, y0, yn, p, q): """ Histopline for arrays with tension. Based by an algorithm rhist2 in One Dimensional Spline Interpolation Algorithms by Helmuth Spath (1995). Parameters ---------- x : array-like Abscissa array of original data, of length n y : array-like, dimension (n-1) Values (mantissa) of original data giving the rectangle (average) values between x[i] and x[i+1] xnew : array-like Array of new locations at which to interpolate. y0,yn : float Initial and terminal values p,q: array-like, dimension (n-1) Tension parameter, p and q are almost always the same. The higher p and q are for a particular x interval, the more rectangular the interpolant will look and the more positivity and shape preserving it is at the expense of accuracy. For this routine any number p,q > -1 is allowed, although the bound routine doesn't use vals less than zero. Returns ------- ynew : array-like Array that interpolates the original data. """ if not xnew[0] >= x[0] and xnew[-1] <= x[-1]: raise ValueError( "Range of xnew must lie in range of original abscissca values (x)" ) a, b, c = rhist_coef(x, y, y0, yn, p, q) return rhist_val(xnew, x, p, q, a, b, c)
[docs] def rhist_coef(x, y, y0, yn, p, q): """Routine that produces coefficients for the histospline""" n = len(x) nintvl = n - 1 if n < 3: raise ValueError("Minimum input array size is 3") if len(y) != nintvl: raise ValueError( "Argument y should have len equal to the number of intervals, one smaller than x" ) if len(p) != nintvl: raise ValueError( "Argument p should have len equal to the number of intervals, one smaller than x" ) if len(q) != nintvl: raise ValueError( "Argument q should have len equal to the number of intervals, one smaller than x" ) # calculate cumulative sum of values of y xdiff = np.ediff1d(x, to_begin=[0.0]) ycum = xdiff.copy() # todo: inefficient but need a better test that includes univariate and multivariate ycum[1:] *= y ycum = np.cumsum(ycum) a, b, c, d = _ratsp1(x, ycum, p, q, y0, yn) h = 1.0 / xdiff[1:] a = h * (b - a) b = h * c c = -h * d return a, b, c
[docs] def _ratsp1(x, y, p, q, y0, yn): """RATSP1 in Spath (1995)""" n = len(x) nintvl = n - 1 if n < 3: raise ValueError("Input array must have len > 3") if len(p) != nintvl: raise ValueError( "Input argument p must be equal to the number of intervals in the series (one smaller than x)" ) if len(q) != nintvl: raise ValueError( "Input argument p must be equal to the number of intervals in the series (one smaller than x)" ) a = np.zeros(nintvl, dtype="d") b = np.zeros_like(a) c = np.zeros_like(a) d = np.zeros_like(a) y1 = np.zeros_like(x) # work array, larger y1[0] = y0 y1[-1] = yn xdiff = np.ediff1d(x) ydiff = np.ediff1d(y) h = 1.0 / xdiff pk2 = p * (p + 3.0) + 3.0 qk2 = q * (q + 3.0) + 3.0 p22 = 2.0 + p q22 = 2.0 + q if np.any(p < -1) or np.any(q < -1): raise ValueError("p and q arguments must be >= -1") a = 1.0 / (p22 * q22 - 1.0) g2 = h * a r2 = h * g2 * ydiff p21 = p22[:-1] qk1 = qk2[:-1] g1 = g2[:-1] r1 = r2[:-1] p22s = p22[1:] q22s = q22[1:] qk2s = qk2[1:] g2s = g2[1:] r2s = r2[1:] pk2s = pk2[1:] # b is lower diagonal, so padded with 0 b[1:] = qk2[1:] * g2[1:] c = qk1 * p21 * g1 + pk2s * q22s * g2s d[:-1] = pk2s * g2s y1[0:-2] = r1 * qk1 * (1.0 + p21) + r2s * pk2s * (1.0 + q22s) y1[0] -= qk1[1] * g1[1] * y0 y1[-3] -= pk2[-1] * g2[-1] * yn x = tridiagonal(b[:-1], c, d[:-1], y1[0:-2]) y1[1:-1] = x y1[0] = y0 y1[-1] = yn h = a * ydiff z = a * xdiff d = -(1.0 + p22) * h + z * (p22 * y1[1:] + y1[:-1]) c = (1.0 + q22) * h - z * (y1[1:] + q22 * y1[:-1]) b = y[1:] - d a = y[:-1] - c return a, b, c, d
## Tri-diagonal matrix Algorithm(a.k.a Thomas algorithm) solver
[docs] def tridiagonal(a, b, c, d): """ a is the lower band (with leading zero) b is the center diagonal (length == nrow) c is upper band (trailing zero) d is right hand side """ nf = len(a) # number of equations ac, bc, cc, dc = map(np.array, (a, b, c, d)) # copy the array for it in range(1, nf): mc = ac[it] / bc[it - 1] bc[it] = bc[it] - mc * cc[it - 1] dc[it] = dc[it] - mc * dc[it - 1] xc = ac xc[-1] = dc[-1] / bc[-1] for il in range(nf - 2, -1, -1): xc[il] = (dc[il] - cc[il] * xc[il + 1]) / bc[il] del bc, cc, dc # delete variables from memory return xc
[docs] def rhist_val(xnew, x, p, q, a, b, c): """Evaluate a histospline at new x points""" n = len(x) nintvl = n - 1 if n < 3: raise ValueError("Length of array x must be > 3") if len(a) != nintvl: raise ValueError( "Length of array a must be same as number of intervals (one less than x)" ) ndx_right = np.minimum(np.searchsorted(x, xnew, side="right"), n - 1) ndx_left = ndx_right - 1 t = (xnew - x[ndx_left]) / (x[ndx_right] - x[ndx_left]) u = 1.0 - t p_left = p[ndx_left] q_left = q[ndx_left] h1 = p_left * t + 1.0 h2 = q_left * u + 1.0 val = ( a[ndx_left] + b[ndx_left] * u * u * (2.0 * p_left * u - 3.0 * (1.0 + p_left)) / (h1 * h1) + c[ndx_left] * t * t * (2.0 * q_left * t - 3.0 * (1.0 + q_left)) / (h2 * h2) ) return val
[docs] def monotonic_spline(ts, dest): """Interpolating a regular time series (rts) to a finer rts by rational histospline. The rational histospline preserves area under the curve. This is a good choice of spline for period averaged data where an interpolant is desired that is 'conservative'. Note that it is the underlying continuous interpolant that will be 'conservative though, not the returned discrete time series which merely samples the underlying interpolant. Parameters ---------- ts : :class:`Pandas.DataFrame` Series to be interpolated, typically with DatetimeIndex dest : a pandas freq code (e.g. '16min' or 'D') or a DateTimeIndex Returns ------- result : :class:`~Pandas.DataFrame` A regular time series with same columns as ts, populated with instantaneous values and with an index of type DateTimeIndex """ # Get the source index including two endpoints # todo: easier way? ndx = ts.index strt = ndx[0] end = ndx[-1] x = (ndx - strt).total_seconds().to_numpy() if not isinstance(dest, pd.Index): end = ndx[-1].round(dest) dest = pd.date_range(start=strt, end=end, freq=dest) xnew = (dest - strt).total_seconds().to_numpy() # print(len(xnew)) cols = ts.columns result = pd.DataFrame([], index=dest) for col in cols: y = ts[col].to_numpy() result[col] = _monotonic_spline(x, y, xnew) return result
[docs] def rhist_example(): import matplotlib.pyplot as plt nper = 20 xx = np.linspace(0, 12.0, nper) yy = np.cos(2.0 * np.pi * xx / 6.0) yy2 = yy + 1.3 * np.sin(2.0 * np.pi * xx / 6.1) ndx = pd.date_range(start=pd.Timestamp(2009, 2, 10), freq="H", periods=nper) ndx2 = pd.date_range(start=pd.Timestamp(2009, 2, 10), freq="15min", periods=90) tsin = pd.DataFrame({"data": yy, "data2": yy2}, index=ndx) tsout = monotonic_spline(tsin, ndx2) fig, ax = plt.subplots(1) tsin.plot(ax=ax) tsout.plot(ax=ax) plt.show() strt = pd.Timestamp(2009, 1, 1) ndx = pd.period_range(start=strt, end=pd.Timestamp(2010, 12, 1), freq="M") print(ndx.to_timestamp()) data = np.array([1.0, 3, 5, 4, 7.7, 8.0] * 4, dtype="d") data2 = data * 1.25 tsin = pd.DataFrame({"data": data, "data2": data2}, index=ndx) strt = ndx[0].start_time ndx2 = pd.date_range(start=strt, end=pd.Timestamp(2010, 11, 1), freq="D") tsout = rhistinterp(tsin, ndx2, p=1.0, lowbound=0.75) fig, ax = plt.subplots(1) tsin.plot(ax=ax, drawstyle="steps-post") tsout.plot(ax=ax) plt.show()