# Math computation
import numpy as np
from numpy import random as rd
from scipy.special import erfinv
# Polynomials and spline functions
from numpy.polynomial import polynomial
from scipy.interpolate import Akima1DInterpolator
from scipy.interpolate import UnivariateSpline
# Warnings
import warnings
# Typing
from typing import Union, Optional, List, Callable
[docs]
def spline_function(x: np.ndarray,
y: np.ndarray,
smoothing_factor: float = 0.5) -> Callable:
"""Generate a smoothing spline function
This functions generates a spline, which is mainly used for generating temporal effects.
Parameters
----------
x: np.ndarray
An array of x values
y: np.ndarray
An array of y values
smoothing_factor: float
The smoothing factor used in spline fitting
Returns
-------
spline_values: Callable
A spline function that can be evaluated at point t
"""
# We first normalize the x values to the unit interval [0,1]
x = (x - np.min(x))/(np.max(x) - np.min(x))
# Then we fit a smoothing spline
spl = UnivariateSpline(x, y)
spl.set_smoothing_factor(smoothing_factor)
def spline_values(t):
# We subtract the function value at 0 to ensure that the resulting spline always starts at 0
return (spl(t) - spl(0))[()]
return spline_values
[docs]
def polynomial_function(coefficients: Union[list, np.ndarray],
end_value: float) -> Callable:
"""Generate a polynomial on [0,1]
This can be used to generate temporal effect and generate differentiation path
Parameters
----------
coefficients: list or np.ndarray
An array of the polynomial coefficients.
The resulting polynomial will almost surely not have the same coefficients.
They are used to specify the rough "shape" of the polynomial
end_value: float
The desired end value
Returns
-------
polynomial_values: Callable
A polynomial function that can be evaluated at point t
"""
# We first use the provided coefficient to generate the base polynomial
# However, the resulting polynomial does not guarantee that it will start at 0
# and end at end_value
base_polynomial = polynomial.Polynomial(coefficients, domain=[0, 1])
base_polynomial_at_end_points = polynomial.polyval([0, 1], base_polynomial.coef)
# We use a linear function to adjust the base polynomial
adjust_polynomial = polynomial.Polynomial([base_polynomial_at_end_points[0],
base_polynomial_at_end_points[1]-end_value - base_polynomial_at_end_points[0]],
domain=[0, 1])
adjusted_polynomial_coefficients = polynomial.polysub(base_polynomial.coef, adjust_polynomial.coef)
adjusted_polynomial = polynomial.Polynomial(adjusted_polynomial_coefficients, domain=[0, 1])
def polynomial_values(t):
return (polynomial.polyval(t, adjusted_polynomial.coef))[()]
return polynomial_values
[docs]
def brownian_bridge_function(end_value: float,
N: int = 5,
lb: float = 0,
ub: float = 0.1) -> Callable:
"""Generate a random function that starts at 0 and ends at end_value
This can be used to generate temporal effect and generate differentiation path
Parameters
----------
end_value: float
The desired end value
N: int
Number of steps when simulating a brownian bridge
lb: float
The lower bound of the variance scaling factor
ub: float
The upper bound of the variance scaling factor
Returns
-------
spline_values: Callable
A function that can be evaluated at time t
"""
# We first generate the variance and the time steps
sigma2 = np.abs(end_value) * np.random.uniform(lb, ub, 1).reshape((-1,1))
t_interval: "np.ndarray" = np.linspace(0, 1, num=N, endpoint=True)
delta: float = 1 / (N - 1)
# We then generate a Wiener process
wiener_process = np.zeros((1, N))
wiener_process[0, 1:] = rd.normal(0, np.sqrt(sigma2[0,0]), N - 1) * np.sqrt(delta)
wiener_process = np.cumsum(wiener_process, axis=1)
# Then we adjust the process to make sure it starts at 0 and ends at end_value
brownian_bridge = np.zeros((1, N))
for i in range(N):
brownian_bridge[:,[i]] = wiener_process[:,[i]] - t_interval[i] * (wiener_process[:,[N - 1]] - end_value)
# To smooth the brownian bridge, we use the Akima spline to interpolate
spl = Akima1DInterpolator(t_interval, brownian_bridge[0,:])
def spline_values(t):
return (spl(t))[()]
return spline_values
[docs]
def trajectories(end_values: Optional[Union[list, np.ndarray]] = None,
coefficients: Optional[Union[list, np.ndarray]] = None,
x: Optional[np.ndarray] = None,
y: Optional[np.ndarray] = None,
**kwargs) -> List[Callable]:
"""Vectorize the spline function or the polynomial function or the brownian bridge function
Parameters
----------
end_values: list or np.ndarray
An array of the end values
coefficients: list or np.ndarray
If polynomial is desired, an array of the polynomial coefficients
x: np.ndarray
If spline is sought after, the x values
y: np.ndarray
If spline is sought after, the y values
kwargs:
Other arguments for the brownian bridge method or the spline function
Returns
-------
trajectories_functions: List[Callable]
A list of functions
"""
trajectories_functions = []
if end_values is not None:
# If end_values is supplied, then we know it's either polynomial or brownian bridge
end_values = np.array(end_values).reshape((-1, 1))
n_markers = end_values.shape[0]
if coefficients is None:
# If coefficients is not supplied, then we know it's brownian bridge
for i in range(n_markers):
trajectories_functions.append(brownian_bridge_function(end_value=end_values[i,0], **kwargs))
else:
# Otherwise, it's polynomials
for i in range(n_markers):
trajectories_functions.append(polynomial_function(coefficients=coefficients,
end_value=end_values[i,0]))
elif (x is not None) and (y is not None):
# For spline, we need both x and y
trajectories_functions.append(spline_function(x, y, **kwargs))
else:
raise ValueError('Unknown type')
return trajectories_functions
[docs]
def univariate_noise_model(noise_distribution: Optional[str] = None,
**kwargs) -> Callable:
"""Generate a noise distribution
This is mainly used to generate background noise in the cytof_data object.
.. versionchanged:: 0.2.0 The default `noise_distribution` is changed to `uniform`. If no user-specified value is provided, a warning is given to inform users of the change.
.. versionadded:: 0.2.0 Added the `half_normal` option to the `noise_distribution` parameter.
Parameters
----------
noise_distribution: str
Either "normal", "half_normal", or "uniform"
kwargs:
extra parameters needed for numpy.random.normal or numpy.random.uniform
Returns
-------
model: Callable
A RV generator that only takes size as its input
"""
if noise_distribution is None:
warnings.warn("The default `noise_distribution` is now changed from `normal` to `uniform` as of v0.2.0. Please see the release notes for details.")
noise_distribution = "uniform"
if noise_distribution == "half_normal":
def model(size):
return -np.abs(rd.normal(**kwargs, size=size))
elif noise_distribution == "normal":
def model(size):
return rd.normal(**kwargs, size=size)
elif noise_distribution == "uniform":
def model(size):
return rd.uniform(**kwargs, size=size)
else:
raise ValueError('Unknown noise distribution')
return model
[docs]
def estimate_noise_model(data: np.ndarray,
noise_distribution: str = "uniform") -> Callable:
"""Estimate the noise model from data
.. versionadded:: 0.2.0
Parameters
----------
data : np.ndarray
An array of expression matrix
noise_distribution : str, optional
Either "half_normal" or "uniform", by default "uniform"
Returns
-------
Callable
A RV generator that only takes size as its input
"""
para_dict = {"noise_distribution": noise_distribution}
if noise_distribution == "uniform":
min_val = np.min(data)
para_dict["low"] = min_val
para_dict["high"] = 0
if noise_distribution == "half_normal":
m = np.median(data[np.where(data<=0)])
scale = np.abs(m/(np.sqrt(2)*erfinv(0.5)))
para_dict["loc"] = 0
para_dict["scale"] = scale
return univariate_noise_model(**para_dict)