import numpy as np
import pandas as pd
from numpy.polynomial import polynomial as P
from numpy.polynomial.polynomial import polyvander
from numba import njit
import matplotlib.pyplot as plt
all = ["savgol_filter_weighted"]
[docs]
def savgol_filter_weighted(
data, window_length, degree, error=None, cov_matrix=None, deriv=None, use_numba=True
):
"""
Apply a Savitzky–Golay filter with weights to a univariate DataFrame or Series.
Parameters
----------
data : pandas.DataFrame or pandas.Series
DataFrame or Series containing your data.
window_length : int
Length of the filter window (must be odd).
degree : int
Degree of the polynomial fit.
error : pandas.Series, optional
Series containing the error (used to compute weights).
cov_matrix : 2D numpy array, optional
Covariance matrix for the errors.
deriv : int, optional
Derivative order to compute.
use_numba : bool, optional
If True, uses the Numba-accelerated kernel.
Returns
-------
pandas.Series
Series of the filtered values.
Notes
-----
The practical size of `window_length` depends on the data and the computational resources.
Larger window lengths provide smoother results but require more computation and may not capture
local variations well. It is recommended to experiment with different window lengths to find
the optimal value for your specific application.
Some of the workflow derived from this work:
https://github.com/surhudm/savitzky_golay_with_errors
"""
# Interpolate to fill NaNs using linear interpolation
interpolated_data = data.interpolate(method="linear")
# Mark interpolated values in the error series
if error is not None:
error = error.copy()
error[data.isna()] = np.nan
# Extract data from DataFrame or Series
y = interpolated_data.to_numpy()
error = error.to_numpy() if error is not None else None
# Choose the underlying implementation
if use_numba:
if error is None:
raise ValueError("Numba version requires an 'error' Series for weights.")
y_filtered = savgol_filter_numba(y, window_length, degree, error)
else:
y_filtered = savgol_filter_werror(
y, window_length, degree, error=error, cov=cov_matrix, deriv=deriv
)
return pd.Series(y_filtered, index=data.index)
# --- Corrected Implementation ---
[docs]
def solve_polyfit(xarr, yarr, degree, weight, deriv=None):
# Use numpy.polyfit with weights via the polynomial module.
# Skip NaN values and handle weights properly.
mask = ~np.isnan(yarr) & ~np.isnan(weight)
xarr = xarr[mask]
yarr = yarr[mask]
weight = weight[mask]
try:
z = P.polyfit(xarr, yarr, deg=degree, w=weight)
except np.linalg.LinAlgError:
# Fallback to np.linalg.lstsq with regularization
vander = polyvander(xarr, degree)
lhs = vander.T * weight
rhs = yarr * weight
reg = 1e-4 * np.eye(lhs.shape[0])
z, _, _, _ = np.linalg.lstsq(lhs @ lhs.T + reg, lhs @ rhs, rcond=None)
if deriv is not None:
z = P.polyder(z, m=deriv)
return z
[docs]
def solve_leastsq(yarr, ycov, vander, vanderT, deriv=None):
# Solve (V^T C^{-1} V) z = V^T C^{-1} y using np.linalg.solve for stability.
ycovinv = np.linalg.inv(ycov)
A = np.dot(np.dot(vanderT, ycovinv), vander)
b = np.dot(np.dot(vanderT, ycovinv), yarr)
z = np.linalg.solve(A, b)
if deriv is not None:
z = P.polyder(z, m=deriv)
return z
[docs]
def savgol_filter_werror(y, window_length, degree, error=None, cov=None, deriv=None):
ynew = np.empty_like(y)
ynew.fill(np.nan) # Initialize with NaNs
if window_length % 2 == 0:
raise ValueError("Window length must be odd")
margin = window_length // 2
xarr = np.arange(-margin, margin + 1)
if cov is not None:
vander = polyvander(xarr, deg=degree)
vanderT = np.transpose(vander)
else:
weight = np.zeros_like(error)
weight[~np.isnan(error)] = 1.0 / error[~np.isnan(error)]
weight[np.isnan(error)] = 1e-10 # Set very low weight for interpolated values
# Main loop for the central part of the array
for i in range(margin, y.size - margin):
if np.isnan(y[i - margin : i + margin + 1]).sum() > margin:
continue # Skip if there are too many NaNs in the window
try:
if cov is None:
z = solve_polyfit(
xarr,
y[i - margin : i + margin + 1],
degree,
weight[i - margin : i + margin + 1],
deriv=deriv,
)
else:
z = solve_leastsq(
y[i - margin : i + margin + 1],
cov[i - margin : i + margin + 1, i - margin : i + margin + 1],
vander,
vanderT,
deriv=deriv,
)
ynew[i] = P.polyval(0.0, z)
except Exception as e:
print(f"Error at index {i}: {e}")
# Left boundary: use the first window_length points
for i in range(margin):
if not np.isnan(y[:window_length]).any():
try:
if cov is None:
z = solve_polyfit(
xarr,
y[:window_length],
degree,
weight[:window_length],
deriv=deriv,
)
else:
z = solve_leastsq(
y[:window_length],
cov[:window_length, :window_length],
vander,
vanderT,
deriv=deriv,
)
ynew[i] = P.polyval(xarr[i], z)
except Exception as e:
print(f"Error at left boundary index {i}: {e}")
# Right boundary: use the last window_length points
for i in range(margin):
if not np.isnan(y[-window_length:]).any():
try:
if cov is None:
z = solve_polyfit(
xarr,
y[-window_length:],
degree,
weight[-window_length:],
deriv=deriv,
)
else:
z = solve_leastsq(
y[-window_length:],
cov[-window_length:, -window_length:],
vander,
vanderT,
deriv=deriv,
)
ynew[y.size - margin + i] = P.polyval(xarr[i + margin + 1], z)
except Exception as e:
print(f"Error at right boundary index {i}: {e}")
return ynew
# --- Numba-Accelerated Implementation ---
[docs]
@njit
def _build_vander(x, degree):
n = x.shape[0]
A = np.empty((n, degree + 1))
for i in range(n):
v = 1.0
for j in range(degree + 1):
A[i, j] = v
v *= x[i]
return A
[docs]
@njit
def _polyfit_window(x, y, w, degree):
A = _build_vander(x, degree)
n = A.shape[0]
m = degree + 1
ATA = np.zeros((m, m))
ATy = np.zeros(m)
for i in range(n):
for j in range(m):
ATy[j] += A[i, j] * w[i] * y[i]
for k in range(m):
ATA[j, k] += A[i, j] * w[i] * A[i, k]
c = np.linalg.solve(ATA, ATy)
return c
[docs]
@njit
def _evaluate_poly(c, x):
res = 0.0
for i in range(c.shape[0] - 1, -1, -1):
res = res * x + c[i]
return res
[docs]
@njit
def savgol_filter_numba(y, window_length, degree, error):
n = y.shape[0]
margin = window_length // 2
ynew = np.empty(n)
# Precompute the x positions for the window
xarr = np.empty(window_length)
for i in range(window_length):
xarr[i] = i - margin
inv_error = np.empty_like(error)
for i in range(error.shape[0]):
inv_error[i] = 1.0 / error[i]
if np.isnan(error[i]):
inv_error[i] = 1e-10 # Set very low weight for interpolated values
# Main loop over the central data points
for i in range(margin, n - margin):
y_window = y[i - margin : i + margin + 1]
w_window = inv_error[i - margin : i + margin + 1]
c = _polyfit_window(xarr, y_window, w_window, degree)
ynew[i] = _evaluate_poly(c, 0.0)
# Left boundary: fit first window_length points
c_left = _polyfit_window(xarr, y[:window_length], inv_error[:window_length], degree)
for i in range(margin):
ynew[i] = _evaluate_poly(c_left, xarr[i])
# Right boundary: fit last window_length points
c_right = _polyfit_window(
xarr, y[-window_length:], inv_error[-window_length:], degree
)
for i in range(margin):
ynew[n - margin + i] = _evaluate_poly(c_right, xarr[i + margin + 1])
return ynew
# --- Example Usage ---
[docs]
def main():
# Create some sample data.
t = pd.date_range(start="2023-01-01", periods=50 * 24, freq="h")
pre_noise = np.sin(np.linspace(0, 40, len(t))) + np.cos(np.linspace(0, 15, len(t)))
y = pre_noise + 0.23 * np.random.randn(len(t))
error = 1 * np.ones_like(y) # Example measurement uncertainties
# Introduce gaps
y[100:103] = np.nan # 3-hour gap
y[200:209] = np.nan # 9-hour gap
y[300:318] = np.nan # 18-hour gap
y[400:427] = np.nan # 27-hour gap
# Put data in a DataFrame.
df = pd.DataFrame({"signal": y, "error": error}, index=t)
# Apply the filter (using the pure numpy version)
filtered_series = savgol_filter_weighted(
df["signal"], window_length=75, degree=3, error=df["error"]
)
filtered_numba = (
savgol_filter_weighted(
df["signal"], window_length=75, degree=3, error=df["error"], use_numba=True
)
+ 0.05
)
# Plot the original and filtered signals
plt.figure(figsize=(10, 6))
plt.plot(df.index, df["signal"], label="Original Signal", alpha=0.5)
plt.plot(
df.index, pre_noise, label="pre-noise", linewidth=2, color="0.7", linestyle=":"
)
plt.plot(df.index, filtered_series, label="Filtered Signal", linewidth=2)
plt.plot(df.index, filtered_numba, label="Filtered Numba", linewidth=2)
plt.xlabel("Time")
plt.ylabel("Signal")
plt.legend()
plt.title("Savitzky-Golay Filter Example")
plt.show(block=True) # Ensure the plot is displayed before the script exits
if __name__ == "__main__":
main()