from sklearn.neighbors import KernelDensity
from scipy import signal
from scipy.signal import argrelextrema
import numpy as np
import math
from itertools import compress
from typing import Tuple, List
[docs]def sort_noise(timecourses: np.ndarray = None,
lag1: np.ndarray = None,
return_logpdf: bool = False,
method: str = 'KDE',
verbose: bool = False) -> Tuple[np.ndarray, int, np.ndarray]:
'''
Sorts timecourses into two clusters (signal and noise) based on
lag-1 autocorrelation.
Arguments:
timecourses: Input to calculate noise threshold.
Should be a np array of shape (n, t).
lag1: Required if the timecourses are not provided.
alternate input to calculate noise threshold.
Should be a np array of shape (n, t).
return_logpdf: Whether to return the KDE log density function.
method: The method to calculate the cutoff. Currently only KDE is supported.
verbose: Whether to record a verbose output.
Returns:
noise_components, a np array with a value of 1 where all noise
timecourses detected. as well as the cutoff value detected.
cutoff: The cutoff index, anything above this value is considered to be noise.
log_pdf: Returned only if return_logpdf is True.
The pdf function evaluated between -0.2 and 1.2.
'''
if method == 'KDE':
# Calculate lag autocorrelations.
if lag1 is None:
assert timecourses is not None, 'sortNoise requires either timecourses or lag1'
lag1 = lag_n_autocorr(timecourses, 1)
# Calculate minimum between min and max peaks.
kde_skl = KernelDensity(kernel='gaussian',
bandwidth=0.05).fit(lag1[:, np.newaxis])
x_grid = np.linspace(-0.2, 1.2, 1200)
log_pdf = kde_skl.score_samples(x_grid[:, np.newaxis])
maxima = argrelextrema(np.exp(log_pdf), np.greater)[0]
if len(maxima) <= 1:
if verbose:
print('Only one cluster found')
cutoff = 0
else:
cutoff_index = np.argmin(np.exp(log_pdf)[maxima[0]:maxima[-1]]) \
+ maxima[0]
cutoff = x_grid[cutoff_index]
if verbose:
print('autocorr cutoff:', cutoff)
noise_components = (lag1 < cutoff).astype('uint8')
else:
raise Exception('method: {0} is unknown!'.format(method))
if return_logpdf:
return noise_components, cutoff, log_pdf
else:
return noise_components, cutoff
[docs]def get_peak_separation(log_pdf: np.ndarray,
x_grid: np.ndarray = None) -> float:
'''
Get peak to peak separation value from the log pdf.
This is used in
Arguments:
log_pdf: The log pdf function specifying the density distribution.
x_grid: Optional argument specifying the coordiates matching the log_pdf.
If empty, -0.2 : 1.2 in 1200 points is the default.
Returns:
peak_separation: The peak to peak distance of the log_pdf.
'''
if x_grid is None:
x_grid = np.linspace(-0.2, 1.2, 1200)
maxima = argrelextrema(np.exp(log_pdf), np.greater)[0]
if len(maxima) > 2:
maxima = np.delete(maxima, np.argmin(np.exp(log_pdf)[maxima]))
peak_separation = x_grid[maxima[-1]] - x_grid[maxima[0]]
return peak_separation
[docs]def lag_n_autocorr(x: np.ndarray, n: int, verbose: bool = True):
'''
Calculate the lag-n autocorrelation. Lower values are more likely to be noise.
Arguments:
x: The 1-D input vector to calculate lag autcorrelation from.
n: The integer lag to calculate.
verbose: Whether to print a verbose output.
'''
if x.ndim == 1:
return np.corrcoef(x[n:], x[:-n])[0, 1]
elif x.ndim == 2:
if verbose:
print('calculating {0}-lag autocorrelation'.format(n),
'along first dimension:', x.shape)
nt = x.shape[0]
corrmatrix = np.corrcoef(x[:, n:], x[:, :-n])
return np.diag(corrmatrix[:nt, nt:])
else:
print('Invalid input!!')
raise AssertionError
[docs]def butterworth(data: np.ndarray,
high: float = None,
low: float = None,
fps: int = 10,
order: int = 5) -> np.ndarray:
'''
Apply a butterworth filter on the data.
Arguments:
data: A 1-D time series array to filter.
high: The high pass filter to apply.
low: The low pass filter to apply.
fps: The number of frames per second of the input data.
order: The butterworth filter order to apply.
Returns:
data: The filtered dataset.
'''
def butter_highpass(cutoff, fps, order=order):
nyq = 0.5 * fps
normal_cutoff = cutoff / nyq
b, a = signal.butter(order, normal_cutoff, btype='high', analog=False)
return b, a
def butter_lowpass(cutoff, fps, order=order):
nyq = 0.5 * fps
normal_cutoff = cutoff / nyq
b, a = signal.butter(order, normal_cutoff, btype='low', analog=False)
return b, a
if low is not None:
b, a = butter_highpass(low, fps, order=order)
data = signal.filtfilt(b, a, data)
if high is not None:
b, a = butter_lowpass(high, fps, order=order)
data = signal.filtfilt(b, a, data)
return data
[docs]def local_max(x_values: np.ndarray,
array1d: np.ndarray,
sig: np.ndarray = None) -> Tuple[np.ndarray, np.ndarray]:
'''
Finds the local max array values and their respective position (xvalues)
by giving a significance cuttoff value array of the same size as the array1d,
this will also return the cutoff significance at each local maxima.
Args:
x_values: positional information (i.e. time, sequence, etc.)
array1d: data values (i.e. Dfof, etc.)
sig: third list, significance of signal, that is returned
(in realitity this could be any array of equal len as array1d)
Returns:
xvalues of where the local max
data values of the local max
third list of significance of where the local max
'''
if sig is not None:
i = np.r_[True, array1d[1:] > array1d[:-1]] & np.r_[
array1d[:-1] > array1d[1:], True]
return list(compress(x_values,
i)), list(compress(array1d,
i)), list(compress(sig, i))
else:
i = np.r_[True, array1d[1:] > array1d[:-1]] & np.r_[
array1d[:-1] > array1d[1:], True]
return list(compress(x_values, i)), list(compress(array1d, i))
[docs]def local_min(x_values: np.ndarray,
array1d: np.ndarray,
sig=None) -> Tuple[List[np.ndarray], List[np.ndarray]]:
'''
Finds the local min array values and their respective position (xvalues)
by giving a significance cuttoff value array of the same size as the array1d,
this will also return the cutoff significance at each local maxima.
Args:
x_values: positional information (i.e. time, sequence, etc.)
array1d: data values (i.e. Dfof, etc.)
sig: third list, significance of signal, that is returned
(in realitity this could be any array of equal len as array1d)
Returns:
xvalues of where the local min
data values of the local min
third list of significance of where the local min
'''
if sig is not None:
i = np.r_[True, array1d[1:] < array1d[:-1]] & np.r_[
array1d[:-1] < array1d[1:], True]
return list(compress(x_values,
i)), list(compress(array1d,
i)), list(compress(sig, i))
else:
i = np.r_[True, array1d[1:] < array1d[:-1]] & np.r_[
array1d[:-1] < array1d[1:], True]
return list(compress(x_values, i)), list(compress(array1d, i))
[docs]def abline(slope: float,
intercept: float,
x_max: float,
label: str = None,
color: str = None,
x_min: float = 0) -> None:
'''
Plot a line to the currently active plt figure based on slope and intercept.
Arguments:
slope: The line slope.
intercept: The line y intercept.
x_max: The maximum x value to draw the line to.
x_min: The minimum x value to draw the line to.
label: The line label, if applicable.
color: The matplotlib color string. If not specified, uses the next default option.
Returns:
None
'''
x_vals = np.array((0, x_min))
y_vals = intercept + slope * x_vals
if color != None:
plt.plot(x_vals, y_vals, label=label, color=color)
else:
plt.plot(x_vals, y_vals, label=label)
[docs]def linear_regression(time: np.ndarray,
signal: np.ndarray,
verbose=True) -> Tuple[float, float]:
'''
Applies a linear regression to the time-series signal.
Arguments:
time: The time, or x axis to fit the linear regression to.
signal: The signal, or y axis to fit the linear regression to.
verbose: Whether to print a verbose output specifying the fit results.
Returns:
slope: The linear regression slope.
intercept: The linear regression intercept.
'''
regr = linear_model.LinearRegression(fit_intercept=True)
regr.fit(time.reshape(-1, 1), signal.reshape(-1, 1))
wsumpred = regr.predict(signal.reshape(-1, 1))
slope = regr.coef_[0]
intercept = regr.intercept_[0]
if verbose:
print('Coefficients: \n', regr.coef_)
print('Mean squared error: %.2f' % mean_squared_error(signal, wsumpred))
print('Variance score: %.2f' % r2_score(signal, wsumpred))
return slope, intercept
[docs]def tdelay_correlation(
vectors: np.ndarray,
n: int,
max_offset: int = 150,
return_window: bool = False) -> Tuple[np.ndarray, np.ndarray]:
'''
Calculates correlations of timecourses stored in an array 'vectors', of shape
(m, t) against the 'n'th element of the array, or an input vector of size 't'.
Returns
Arguments:
vectors: a (m,t) numpy array containing all time series, including the one to compare to.
n: The element to compare each other element to.
max_offset: The integer maximum time offset to calculate correlation out to.
return_window: Whether to return the maximum correlation value in the context of the correlation window.
Returns:
x_corr: the correlation of each vector with vector 'n', and time offset.
t_delay: The time delay which maximizes the correlation value.
corr_window: Returns the maximum correlation at the given time delay for each time series,
all other values are zero. Only returned if return_window is True.
'''
if type(n) is int:
tc = vectors[n].copy()
elif type(n) is np.ndarray:
if vectors.ndim == 1:
vectors = vectors[None, :]
assert n.size == vectors[0].size, \
'vector `n` shape ({0}) was not same shape as vectors in array ({1})'\
.format(n.size, vectors[0].size)
tc = n
tc = (tc - tc.mean()) / (tc.std() * len(tc))
vectors = (vectors.copy() - vectors.mean(axis=1)[:,None]) / \
vectors.std(axis=1)[:,None]
n_elements = vectors[:, 0].size
x_corr = np.zeros(n_elements)
t_delay = np.zeros(n_elements, dtype=np.int32)
if max_offset > tc.size:
max_offset = tc.size
if return_window:
corr_window = np.zeros((n_elements, 2 * max_offset + 1))
for i, v in enumerate(vectors):
corr = np.correlate(v, tc, mode='full') # full correlation
corr = corr[tc.size - max_offset - 1:tc.size +
max_offset] # crop to window
maxind = np.argsort(np.abs(corr))[-1] #get largest value
x_corr[i] = np.abs(corr)[maxind]
t_delay[i] = maxind - max_offset
if return_window:
corr_window[i] = corr
if return_window:
return x_corr, t_delay, corr_window
else:
return x_corr, t_delay
[docs]def gaussian_smooth_2d(matrix: np.ndarray, dj: float, dt: float) -> np.ndarray:
'''
This takes a 2-d matrix and applies a smoothing gaussian
filter defined by the two sigma variables (dj, dt)
Arguments:
matrix: 2d matrix of values to smooth
dj = sigma to define the first dimension gaussian
dt = sigma to define the second dimension gaussian
Returns:
Smoothed 2d matrix
'''
sigma = [dj, dt]
smooth_matrix = gaussian_filter(matrix.real, sigma=sigma)
smooth_matrix += gaussian_filter(matrix.imag, sigma=sigma).imag
return smooth_matrix