Source code for vtools.functions.filter

""" Module contains filter used in tidal time series analysis. 
"""

## Python libary import.
from numpy import abs
import pandas as pd
import numpy as np
from vtools.data.vtime import seconds, minutes, hours
from scipy import array as sciarray
from scipy.signal import lfilter,firwin,filtfilt
from scipy.signal.filter_design import butter
from scipy.ndimage import gaussian_filter1d

__all__=["cosine_lanczos","butterworth","godin","cosine_lanczos",\
         "lowpass_cosine_lanczos_filter_coef","ts_gaussian_filter"]



_cached_filt_info = {}


########################################################################### 
## Public interface.
###########################################################################



def process_cutoff(cutoff_frequency,cutoff_period,freq):
    if cutoff_frequency is None:
        if cutoff_period is None:
            raise("One of cutoff_frequency or cutoff_period must be given")
        cp = pd.tseries.frequencies.to_offset(cutoff_period)
        return 2.*freq/cp
    else:
        if cutoff_frequency < 0 or cutoff_frequency > 1.:
            raise ValueError("cutoff frequency must be 0 < cf < 1)")
        return cutoff_frequency


def cosine_lanczos(ts,cutoff_period=None,cutoff_frequency=None,filter_len=None,
                   padtype=None,padlen=None,fill_edge_nan=True):
    """ squared low-pass cosine lanczos  filter on a regular time series.
      
        
    Parameters
    ----------
    
    ts : :class:`DataFrame <pandas:pandas.DataFrame>`
    
    filter_len  : int, time_interval
        Size of lanczos window, default is to number of samples within filter_period*1.25.
        
    cutoff_frequency: float,optional
        Cutoff frequency expressed as a ratio of a Nyquist frequency,
        should within the range (0,1). For example, if the sampling frequency
        is 1 hour, the Nyquist frequency is 1 sample/2 hours. If we want a
        36 hour cutoff period, the frequency is 1/36 or 0.0278 cycles per hour. 
        Hence the cutoff frequency argument used here would be
        0.0278/0.5 = 0.056.
                      
    cutoff_period : string  or  _time_interval
         Period of cutting off frequency. If input as a string, it must 
         be  convertible to a _time_interval (Pandas freq).
         cutoff_frequency and cutoff_period can't be specified at the same time.
         
     padtype : str or None, optional
         Must be 'odd', 'even', 'constant', or None. This determines the type
         of extension to use for the padded signal to which the filter is applied. 
         If padtype is None, no padding is used. The default is None.

     padlen : int or None, optional
          The number of elements by which to extend x at both ends of axis 
          before applying the filter. This value must be less than x.shape[axis]-1. 
          padlen=0 implies no padding. If padtye is not None and padlen is not
          given, padlen is be set to 6*m.
    
     fill_edge_nan: bool,optional
          If pading is not used and fill_edge_nan is true, resulting data on 
          the both ends are filled with nan to account for edge effect. This is
          2*m on the either end of the result. Default is true.
  
    Returns
    -------
    result : :class:`~vtools.data.timeseries.TimeSeries`
        A new regular time series with the same interval of ts. If no pading 
        is used the beigning and ending 4*m resulting data will be set to nan
        to remove edge effect.
        
    Raises
    ------
    ValueError
        If input timeseries is not regular, 
        or, cutoff_period and cutoff_frequency are given at the same time,
        or, neither cutoff_period nor curoff_frequence is given,
        or, padtype is not "odd","even","constant",or None,
        or, padlen is larger than data size
        
    """
    
    
        
    
    freq=ts.index.freq
    if freq is None:
        raise ValueError("Time series has no frequency attribute")
    
    m=filter_len
    
    cf = process_cutoff(cutoff_frequency,cutoff_period,freq)
    
    if m is None:
        m = int(1.25 * 2. /cf)
    elif type(m) != int:
        try:
            m = int(m/freq)
        except: 
            raise TypeError("filter_len was not an int or divisible by filter_len (probably a type incompatiblity)")
        #raise NotImplementedError("Only integer length filter lengths or supported currently. Received: ".format(type(m)))

    ##find out nan location and fill with 0.0. This way we can use the
    ## signal processing filtrations out-of-the box without nans causing trouble,
    ## but we have to post process the areas touched by nan
    idx=np.where(np.isnan(ts.values))[0]
    data=np.array(ts.values).copy()
    
    ## figure out indexes that will be nan after the filtration,which
    ## will "grow" the nan region around the original nan by 2*m
    ## slots in each direction
    if  False:
        #len(idx)>0:
        data[idx]=0.0
        shifts=np.arange(-2*m,2*m+1)
        result_nan_idx=np.clip(np.add.outer(shifts,idx),0,len(ts)-1).ravel()
    
    if m<1:
        raise ValueError("bad input cutoff period or frequency")
        
    if padtype is not None:
        if (not padtype in ["odd","even","constant"]):
            raise ValueError("unkown padtype :"+padtype)
    
    if (padlen is None) and (padtype is not None):
        padlen=6*m

    if padlen is not None:   # is None sensible? 
        if padlen>len(data):
            raise ValueError("Padding length is more  than data size")
        
    ## get filter coefficients. sizeo of coefis is 2*m+1 in fact
    coefs= lowpass_cosine_lanczos_filter_coef(cf,int(m))
    
    d2=filtfilt(coefs,[1.0],data,axis=0,padtype=padtype,padlen=padlen)
    

    #if(len(idx)>0):
    #    d2[result_nan_idx]=np.nan
    
    ## replace edge points with nan if pading is not used

    if (padtype is None) and (fill_edge_nan==True):
        d2[0:2*m,np.newaxis]=np.nan
        d2[len(d2)-2*m:len(d2),np.newaxis]=np.nan

    out = ts.copy(deep=True)
    out[:]=d2
        
    return out


[docs]def butterworth(ts,cutoff_period=None,cutoff_frequency=None,order=4): """ low-pass butterworth-squared filter on a regular time series. Parameters ---------- ts : :class:`DataFrame <pandas:pandas.DataFrame>` Must be one or two dimensional, and regular. order: int ,optional The default is 4. cutoff_frequency: float,optional Cutoff frequency expressed as a ratio with Nyquist frequency, should within the range (0,1). For a discretely sampled system, the Nyquist frequency is the fastest frequency that can be resolved by that sampling, which is half the sampling frequency. For example, if the sampling frequency is 1 sample/1 hour, the Nyquist frequency is 1 sample/2 hours. If we want a 36 hour cutoff period, the frequency is 1/36 or 0.0278 cycles per hour. Hence the cutoff frequency argument used here would be 0.0278/0.5 = 0.056. cutoff_period : string or _time_interval Period corresponding to cutoff frequency. If input as a string, it must be convertible to a regular interval using the same rules as a pandas frequency.. cutoff_frequency and cutoff_period can't be specified at the same time. Returns ------- result : A new regular time series with the same interval as ts. Raises ------ ValueError If input order is not even, or input timeseries is not regular, or neither cutoff_period and cutoff_frequency is given while input time series interval is not 15min or 1 hour, or cutoff_period and cutoff_frequency are given at the same time. """ if (order%2): raise ValueError("only even order is accepted") #if not ts.is_regular(): # raise ValueError("Only regular time series can be filtered.") freq=ts.index.freq # if (not (interval in _butterworth_interval)) and (cutoff_period is None) and (cutoff_frequency is None): # raise ValueError("time interval is not supported by butterworth if no cuttoff period/frequency given.") if (not (cutoff_frequency is None)) and (not(cutoff_period is None)): raise ValueError("cutoff_frequency and cutoff_period can't be specified simultaneously") if (cutoff_frequency is None) and (cutoff_period is None): raise ValueError("Either cutoff_frequency or cutoff_period must be given") cf=cutoff_frequency if (cf is None): if (not(cutoff_period is None)): cutoff_period = pd.tseries.frequencies.to_offset(cutoff_period) cf = 2.*freq/cutoff_period else: cf=butterworth_cutoff_frequencies[interval] ## get butter filter coefficients. [b,a]=butter(order/2,cf) d2=filtfilt(b,a,ts.values,axis=0,padlen=90) out = ts.copy(deep=True) out[:]=d2 # prop={} # for key,val in ts.props.items(): # prop[key]=val # prop[TIMESTAMP]=INST # prop[AGGREGATION]=INDIVIDUAL # time_interval return out
def lowpass_cosine_lanczos_filter_coef(cf,m,normalize=True): """return the convolution coefficients for low pass lanczos filter. Parameters ---------- cf: float Cutoff frequency expressed as a ratio of a Nyquist frequency. m: int Size of filtering window size. Returns ------- results: list Coefficients of filtering window. """ if (cf,m) in _cached_filt_info: return _cached_filt_info[(cf,m)] coscoef=[cf*np.sin(np.pi*k*cf)/(np.pi*k*cf) for k in np.arange(1,m+1,1,dtype='d')] sigma=[np.sin(np.pi*k/m)/(np.pi*k/m) for k in np.arange(1,m+1,1,dtype='float')] prod= [c*s for c,s in zip(coscoef,sigma)] temp = prod[-1::-1]+[cf]+prod res=np.array(temp) if normalize: res = res/res.sum() _cached_filt_info[(cf,m)] = res return res def generate_godin_fir(freq): ''' generate godin filter impulse response for given freq freq is a pandas freq ''' freqstr=str(freq) if freqstr in _cached_filt_info: return _cached_filt_info[freqstr] dt_sec = int(freq/seconds(1)) nsample24 = int(86400//dt_sec) # 24 hours by dt (24 for hour, 96 for 15min) wts24=np.zeros(nsample24,dtype='d') wts24[:]=1./nsample24 nsample25=(1490*60)//dt_sec # 24 hr 50min in seconds by dt if nsample25%2==0: # ensure odd nsample25+=1 wts25=np.zeros(nsample25,dtype='d') wts25[:]=1.0/nsample25 wts24=np.zeros(nsample24,dtype='d') wts24[:]=1./nsample24 v = np.convolve(wts25,np.convolve(wts24,wts24)) _cached_filt_info[freqstr] = v return v def godin(ts): """ Low-pass Godin filter a regular time series. Applies the :math:`\mathcal{A_{24}^{2}A_{25}}` Godin filter [1]_ The filter is generalized to be the equivalent of one boxcar of the length of the lunar diurnal (~25 hours) constituent and two of the solar diurnal (~24 hours), though the implementation combines these steps. Parameters ---------- ts : :class:`DataFrame <pandas:pandas.DataFrame>` Returns ------- result : :class:`DataFrame <pandas:pandas.DataFrame>` A new regular time series with the same interval of ts. Raises ------ NotImplementedError If input time series is not univariate References ---------- .. [1] Godin (1972) Analysis of Tides """ freq=ts.index.freq if freq is None: raise ValueException("Series must be regular (have freq set)") godin_ir=generate_godin_fir(freq) nfilt = len(godin_ir) nhalffilt = nfilt//2 #if not (len(ts.columns) == 1): # raise NotImplementedError("Godin Filter not functional for multivariate series yet") if hasattr(ts,'columns'): dfg = ts.apply(np.convolve,axis=0,v=godin_ir,mode='same') dfg.columns=ts.columns dfg.iloc[0:nhalffilt,:] = np.nan dfg.iloc[-nhalffilt:,:] = np.nan else: # Assume series outdata = np.convolve(ts.to_numpy(),v=godin_ir,mode='same') dfg = pd.DataFrame(data=outdata,index=ts.index) return dfg def convert_span_to_nstep(freq,span): if type(span) == int: return span span = pd.tseries.frequencies.to_offset(span) freq = pd.tseries.frequencies.to_offset(freq) return int(span/freq) def _gf1d(ts,sigma,order,mode,cval,truncate): tscopy = ts.copy() tscopy.loc[:] = gaussian_filter1d(ts.squeeze().to_numpy(),sigma=sigma,order=order,mode=mode,cval=cval,truncate=truncate) return tscopy def ts_gaussian_filter(ts,sigma,order=0,mode='reflect', cval=0.0, truncate=4.0): """ Column-wise Gaussian smoothing of regular time series. Missing/irregular values are not handled, which means this function is not much different from a rolling window gaussian average in pandas which may be preferable in the case of missing data (ts.rolling(window=5,win_type='gaussian').mean. This function has been kept around awaiting irreg as an aspiration but yet to be implemented. Parameters ---------- ts : :class:`DataFrame <pandas:pandas.DataFrame>` The series to be smoothed sigma : int or freq The sigma scale of the smoothing (analogous to std. deviation), given as a number of steps or freq Returns ------- result : :class:`DataFrame <pandas:pandas.DataFrame>` A new regular time series with the same interval of ts. """ freq = ts.index.freq if type(sigma) != int: sigma = convert_span_to_nstep(freq,sigma) if isinstance(ts,pd.Series): tsout = _gf1d(ts,sigma=sigma,order=order,mode=mode,cval=cval,truncate=truncate) else: tsout = ts.apply(_gf1d,sigma=sigma,order=order,mode=mode,cval=cval,truncate=truncate) return tsout