import tempfile
from typing import Union
from os.path import join as osjoin
import numpy as np
from joblib import Memory
from scipy.integrate import solve_ivp
from scipy.fft import fft, ifft
from hetero.stims.utils import get_seed, rand_generator, RandomState, Generator
memory = Memory(osjoin(osjoin(tempfile.gettempdir(), "hetero"), "datasets"), verbose=0)
def _mg_eq(xt, xtau, a=0.2, b=0.1, n=10):
"""
Mackey-Glass time delay diffential equation, at values x(t) and x(t-tau).
"""
return -b * xt + a * xtau / (1 + xtau**n)
def _mg_rk4(xt, xtau, a, b, n, h=1.0):
"""
Runge-Kuta method (RK4) for Mackey-Glass timeseries discretization.
"""
k1 = h * _mg_eq(xt, xtau, a, b, n)
k2 = h * _mg_eq(xt + 0.5 * k1, xtau, a, b, n)
k3 = h * _mg_eq(xt + 0.5 * k2, xtau, a, b, n)
k4 = h * _mg_eq(xt + k3, xtau, a, b, n)
return xt + k1 / 6 + k2 / 3 + k3 / 3 + k4 / 6
[docs]
def lorenz(
n_timesteps: int,
rho: float = 28.0,
sigma: float = 10.0,
beta: float = 8.0 / 3.0,
x0: Union[list, np.ndarray] = [-15.0577155 , -19.28572772, 31.35055511], # on attractor
h: float = 0.03,
**kwargs,
) -> np.ndarray:
"""Lorenz attractor timeseries as defined by Lorenz in 1963 [6]_ [7]_.
.. math::
\\frac{\\mathrm{d}x}{\\mathrm{d}t} &= \\sigma (y-x) \\\\
\\frac{\\mathrm{d}y}{\\mathrm{d}t} &= x(\\rho - z) - y \\\\
\\frac{\\mathrm{d}z}{\\mathrm{d}t} &= xy - \\beta z
Parameters
----------
n_timesteps : int
Number of timesteps to generate.
rho : float, default to 28.0
:math:`\\rho` parameter of the system.
sigma : float, default to 10.0
:math:`\\sigma` parameter of the system.
beta : float, default to 8/3
:math:`\\beta` parameter of the system.
x0 : array-like of shape (3,), default to [1.0, 1.0, 1.0]
Initial conditions of the system.
h : float, default to 0.03
Time delta between two discrete timesteps.
**kwargs:
Other parameters to pass to the `scipy.integrate.solve_ivp`
solver.
Returns
-------
array of shape (n_timesteps, 3)
Lorenz attractor timeseries.
"""
def lorenz_diff(t, state):
x, y, z = state
return sigma * (y - x), x * (rho - z) - y, x * y - beta * z
t_max = n_timesteps * h
t_eval = np.linspace(0.0, t_max, n_timesteps)
sol = solve_ivp(
lorenz_diff, y0=x0, t_span=(0.0, t_max), t_eval=t_eval, **kwargs
)
return sol.y.T
[docs]
def mackey_glass(
n_timesteps: int,
tau: int = 17,
a: float = 0.2,
b: float = 0.1,
n: int = 10,
x0: float = 1.2,
h: float = 1.0,
seed: Union[int, RandomState, Generator] = None,
**kwargs,
) -> np.ndarray:
"""Mackey-Glass timeseries [8]_ [9]_, computed from the Mackey-Glass
delayed differential equation.
.. math::
\\frac{x}{t} = \\frac{ax(t-\\tau)}{1+x(t-\\tau)^n} - bx(t)
Parameters
----------
n_timesteps : int
Number of timesteps to compute.
tau : int, default to 17
Time delay :math:`\\tau` of Mackey-Glass equation.
By defaults, equals to 17. Other values can
change the choatic behaviour of the timeseries.
a : float, default to 0.2
:math:`a` parameter of the equation.
b : float, default to 0.1
:math:`b` parameter of the equation.
n : int, default to 10
:math:`n` parameter of the equation.
x0 : float, optional, default to 1.2
Initial condition of the timeseries.
h : float, default to 1.0
Time delta between two discrete timesteps.
seed : int or :py:class:`numpy.random.Generator`, optional
Random state seed for reproducibility.
Returns
-------
array of shape (n_timesteps, 1)
Mackey-Glass timeseries.
Note
----
As Mackey-Glass is defined by delayed time differential equations,
the first timesteps of the timeseries can't be initialized at 0
(otherwise, the first steps of computation involving these
not-computed-yet-timesteps would yield inconsistent results).
A random number generator is therefore used to produce random
initial timesteps based on the value of the initial condition
passed as parameter. A default seed is hard-coded to ensure
reproducibility in any case. It can be changed with the
:py:func:`set_seed` function.
"""
# a random state is needed as the method used to discretize
# the timeseries needs to use randomly generated initial steps
# based on the initial condition passed as parameter.
if seed is None:
seed = get_seed()
rs = rand_generator(seed)
# generate random first step based on the value
# of the initial condition
history_length = int(np.floor(tau / h))
history = collections.deque(
x0 * np.ones(history_length) + 0.2 * (rs.random(history_length) - 0.5)
)
xt = x0
X = np.zeros(n_timesteps)
for i in range(0, n_timesteps):
X[i] = xt
if tau == 0:
xtau = 0.0
else:
xtau = history.popleft()
history.append(xt)
xth = _mg_rk4(xt, xtau, a=a, b=b, n=n, h=h)
xt = xth
return X.reshape(-1, 1)
[docs]
def rabinovich_fabrikant(
n_timesteps: int,
alpha: float = 1.1,
gamma: float = 0.89,
x0: Union[list, np.ndarray] = [-1, 0, 0.5],
h: float = 0.05,
**kwargs,
) -> np.ndarray:
"""Rabinovitch-Fabrikant system [12]_ [13]_ timeseries.
.. math::
\\frac{\\mathrm{d}x}{\\mathrm{d}t} &= y(z - 1 + x^2) + \\gamma x \\\\
\\frac{\\mathrm{d}y}{\\mathrm{d}t} &= x(3z + 1 - x^2) + \\gamma y \\\\
\\frac{\\mathrm{d}z}{\\mathrm{d}t} &= -2z(\\alpha + xy)
Parameters
----------
n_timesteps : int
Number of timesteps to generate.
alpha : float, default to 1.1
:math:`\\alpha` parameter of the system.
gamma : float, default to 0.89
:math:`\\gamma` parameter of the system.
x0 : array-like of shape (3,), default to [-1, 0, 0.5]
Initial conditions of the system.
h : float, default to 0.05
Time delta between two discrete timesteps.
**kwargs:
Other parameters to pass to the `scipy.integrate.solve_ivp`
solver.
Returns
-------
array of shape (n_timesteps, 3)
Rabinovitch-Fabrikant system timeseries.
"""
def rabinovich_fabrikant_diff(t, state):
x, y, z = state
dx = y * (z - 1 + x**2) + gamma * x
dy = x * (3 * z + 1 - x**2) + gamma * y
dz = -2 * z * (alpha + x * y)
return dx, dy, dz
t_max = n_timesteps * h
t_eval = np.linspace(0.0, t_max, n_timesteps)
sol = solve_ivp(
rabinovich_fabrikant_diff, y0=x0, t_span=(0.0, t_max), t_eval=t_eval, **kwargs
)
return sol.y.T
[docs]
def lorenz96(
n_timesteps: int,
warmup: int = 0,
N: int = 36,
F: float = 8.0,
dF: float = 0.01,
h: float = 0.01,
x0: Union[list, np.ndarray] = None,
**kwargs,
) -> np.ndarray:
"""Lorenz96 attractor timeseries as defined by Lorenz in 1996 [17]_.
.. math::
\\frac{\\mathrm{d}x_i}{\\mathrm{d} t} = (x_{i+1} - x_{i-2}) x_{i-1} - x_i + F
where :math:`i = 1, \\dots, N` and :math:`x_{-1} = x_{N-1}`
and :math:`x_{N+1} = x_1` and :math:`N \\geq 4`.
Parameters
----------
n_timesteps : int
Number of timesteps to generate.
warmup : int, default to 0
Number of timesteps to discard at the begining of the signal, to remove
transient states.
N: int, default to 36
Dimension of the system.
F : float, default to F
:math:`F` parameter of the system.
dF : float, default to 0.01
Pertubation applied to initial condition if x0 is None.
h : float, default to 0.01
Time delta between two discrete timesteps.
x0 : array-like of shape (N,), default to None
Initial conditions of the system. If None, the array is initialized to
an array of shape (N, ) with value F, except for the first value of the
array that takes the value F + dF.
**kwargs:
Other parameters to pass to the `scipy.integrate.solve_ivp`
solver.
Returns
-------
array of shape (n_timesteps - warmup, N)
Lorenz96 timeseries.
"""
if N < 4:
raise ValueError("N must be >= 4.")
if x0 is None:
x0 = F * np.ones(N)
x0[0] = F + dF
if len(x0) != N:
raise ValueError(
f"x0 should have shape ({N},), but have shape {np.asarray(x0).shape}"
)
def lorenz96_diff(t, state):
ds = np.zeros(N)
for i in range(N):
ds[i] = (state[(i + 1) % N] - state[i - 2]) * state[i - 1] - state[i] + F
return ds
t_max = (warmup + n_timesteps) * h
t_eval = np.linspace(0.0, t_max * h, n_timesteps)
sol = solve_ivp(
lorenz96_diff,
y0=x0,
t_span=(0.0, t_max * h),
t_eval=t_eval,
**kwargs,
)
return sol.y.T[warmup:]
[docs]
def rossler(
n_timesteps: int,
a: float = 0.2,
b: float = 0.2,
c: float = 5.7,
x0: Union[list, np.ndarray] = [-0.1, 0.0, 0.02],
h: float = 0.1,
**kwargs,
) -> np.ndarray:
"""Rössler attractor timeseries [18]_.
.. math::
\\frac{\\mathrm{d}x}{\\mathrm{d}t} &= -y - z \\\\
\\frac{\\mathrm{d}y}{\\mathrm{d}t} &= x + a y \\\\
\\frac{\\mathrm{d}z}{\\mathrm{d}t} &= b + z (x - c)
Parameters
----------
n_timesteps : int
Number of timesteps to generate.
a : float, default to 0.2
:math:`a` parameter of the system.
b : float, default to 0.2
:math:`b` parameter of the system.
c : float, default to 5.7
:math:`c` parameter of the system.
x0 : array-like of shape (3,), default to [-0.1, 0.0, 0.02]
Initial conditions of the system.
h : float, default to 0.1
Time delta between two discrete timesteps.
**kwargs:
Other parameters to pass to the `scipy.integrate.solve_ivp`
solver.
Returns
-------
array of shape (n_timesteps, 3)
Rössler attractor timeseries.
"""
if len(x0) != 3:
raise ValueError(
f"x0 should have shape (3,), but have shape {np.asarray(x0).shape}"
)
def rossler_diff(t, state):
x, y, z = state
dx = -y - z
dy = x + a * y
dz = b + z * (x - c)
return dx, dy, dz
t_max = n_timesteps * h
t_eval = np.linspace(0.0, t_max, n_timesteps)
sol = solve_ivp(
rossler_diff, y0=x0, t_span=(0.0, t_max), t_eval=t_eval, **kwargs
)
return sol.y.T
def _kuramoto_sivashinsky_etdrk4(v, *, g, E, E2, Q, f1, f2, f3):
"""A single step of EDTRK4 to solve Kuramoto-Sivashinsky equation.
Kassam, A. K., & Trefethen, L. N. (2005). Fourth-order time-stepping for stiff PDEs.
SIAM Journal on Scientific Computing, 26(4), 1214-1233.
"""
Nv = g * fft(np.real(ifft(v)) ** 2)
a = E2 * v + Q * Nv
Na = g * fft(np.real(ifft(a)) ** 2)
b = E2 * v + Q * Na
Nb = g * fft(np.real(ifft(b)) ** 2)
c = E2 * a + Q * (2 * Nb - Nv)
Nc = g * fft(np.real(ifft(c)) ** 2)
v = E * v + Nv * f1 + 2 * (Na + Nb) * f2 + Nc * f3
return v
@memory.cache
def _kuramoto_sivashinsky(n_timesteps, *, warmup, N, M, x0, h):
# initial conditions
v0 = fft(x0)
# ETDRK4 scalars
k = np.conj(np.r_[np.arange(0, N / 2), [0], np.arange(-N / 2 + 1, 0)]) / M
L = k**2 - k**4
E = np.exp(h * L)
E2 = np.exp(h * L / 2)
r = np.exp(1j * np.pi * (np.arange(1, M + 1) - 0.5) / M)
LR = h * np.transpose(np.repeat([L], M, axis=0)) + np.repeat([r], N, axis=0)
Q = h * np.real(np.mean((np.exp(LR / 2) - 1) / LR, axis=1))
f1 = (-4 - LR + np.exp(LR) * (4 - 3 * LR + LR**2)) / LR**3
f1 = h * np.real(np.mean(f1, axis=1))
f2 = (2 + LR + np.exp(LR) * (-2 + LR)) / LR**3
f2 = h * np.real(np.mean(f2, axis=1))
f3 = (-4 - 3 * LR - LR**2 + np.exp(LR) * (4 - LR)) / LR**3
f3 = h * np.real(np.mean(f3, axis=1))
g = -0.5j * k
# integration using ETDRK4 method
v = np.zeros((n_timesteps, N), dtype=complex)
v[0] = v0
for n in range(1, n_timesteps):
v[n] = _kuramoto_sivashinsky_etdrk4(
v[n - 1], g=g, E=E, E2=E2, Q=Q, f1=f1, f2=f2, f3=f3
)
return np.real(ifft(v[warmup:]))
[docs]
def kuramoto_sivashinsky(
n_timesteps: int,
warmup: int = 0,
N: int = 128,
M: float = 16,
x0: Union[list, np.ndarray] = None,
h: float = 0.25,
) -> np.ndarray:
"""Kuramoto-Sivashinsky oscillators [19]_ [20]_ [21]_.
.. math::
y_t = -yy_x - y_{xx} - y_{xxxx}, ~~ x \\in [0, 32\\pi]
This 1D partial differential equation is solved using ETDRK4
(Exponential Time-Differencing 4th order Runge-Kutta) method, as described in [22]_.
Parameters
----------
n_timesteps : int
Number of timesteps to generate.
warmup : int, default to 0
Number of timesteps to discard at the begining of the signal, to remove
transient states.
N : int, default to 128
Dimension of the system.
M : float, default to 0.2
Number of points for complex means. Modify beahviour of the resulting
multivariate timeseries.
x0 : array-like of shape (N,), default to None.
Initial conditions of the system. If None, x0 is equal to
:math:`\\cos (\\frac{y}{M}) * (1 + \\sin(\\frac{y}{M}))`
with :math:`y = 2M\\pi x / N, ~~ x \\in [1, N]`.
h : float, default to 0.25
Time delta between two discrete timesteps.
Returns
-------
array of shape (n_timesteps - warmup, N)
Kuramoto-Sivashinsky equation solution.
"""
if x0 is None:
x = 2 * M * np.pi * np.arange(1, N + 1) / N
x0 = np.cos(x / M) * (1 + np.sin(x / M))
else:
if not np.asarray(x0).shape[0] == N:
raise ValueError(
f"Initial condition x0 should be of shape {N} (= N) but "
f"has shape {np.asarray(x0).shape}"
)
else:
x0 = np.asarray(x0)
return _kuramoto_sivashinsky(n_timesteps, warmup=warmup, N=N, M=M, x0=x0, h=h)