"""
This package is solely focused on specifc class tasks that can be expressed in terms of of some input-output relations and nothing else. This condition, despite seeming trivial, is by no means generic, especially when behaving animals in the wild are taken into account.
More formally, such and assumption reduces the class of tasks to time-dependent target functions :math:`y(t)` that can be cast as some complex transformation of only the input :math:`u(t)`:
.. math::
y(t) = \\mathcal F[u(t)]
Importantly, for such class of problems, the internal state :math:`X(t)` of the computing machine (the brain) has no influence on the target function.
.. important::
Due to the pure dependence of the output on the input, we deem these class of tasks as the most generic form `working memory` tasks. Indeed, all experimental cognitive tasks can be expressed in such form.
We focus on three instance of this class explained below.
Taylor:
Named after Taylor series (https://en.wikipedia.org/wiki/Taylor_series), the aim of
this task is to compute different powers tof the input:
.. math::
y(t) = u(t)^d
Nostradamus:
Named after Michel de Nostredame (https://en.wikipedia.org/wiki/Nostradamus), this
tasks aims to predict the future (:math:`\Delta > 0`) or recall the past (:math:`\Delta < 0`) of the input:
.. math::
y(t) = u(t+\Delta)
Tayloramus:
A mix of the tasks above, aiming to predict the time-shifted input raised to a particular exponent:
.. math::
y(t) = u(t+\Delta)^d
"""
import os
osjoin = os.path.join # an alias for convenience
from pdb import set_trace
import numpy as np
import pandas as pd
from scipy.io import wavfile
from scipy import signal
from sklearn.preprocessing import StandardScaler
# from hetero.stimuli import chaos
from hetero import stimulus
# PROCESS_TYPES = {
# 'chaotic': ['henon', 'logistic_map', 'lorenz', 'lorenz96', 'mackey_glass',
# 'multiscroll', 'doublescroll', 'rabinovich_fabrikant',
# 'narma', 'rossler', 'kuramoto_sivashinsky'],
# 'periodic': ['sin', 'cos', 'tan', 'sign'],
# 'stochastic': ['brownian'],
# 'fromfile': ['ahd'],
# }
# def get_process_type(stim_name):
# for type, v in PROCESS_TYPES.items():
# if stim_name in v:
# return type
# else:
# return 'fromfile'
[docs]
class TemporalTask(object):
"""An abstract class for temporal tasks. This class maintains task parameters and creates
all subtasks associated with each task parameter. To this aim, it also synthesizes a potentially
multi-dimensional input and processes it according to the rule imposed by the "working memory"
task. It also provides a score template for registering the performance levels.
"""
def __init__(self, name='temporal', save_type='float16',**task_params):
self.name = name
self.stype = save_type
[docs]
def standardize(self, stim):
"""Standardizes a multi-dimensional input component-wise so that the
temporal mean and standard deviations are 0 and 1, respectively.
"""
return StandardScaler().fit_transform(stim.T).T
[docs]
def make_subtasks(self):
"""creates all subtask parameters."""
NotImplemented
[docs]
def make_subtarget(self):
"""creates a target function the task parameters."""
NotImplemented
[docs]
def make_stim(self, n_timesteps, dt_scaler=1, **stim_dict):
u = []
for stim_name_id, stim_params in stim_dict.items():
stim_name = stim_name_id.split('-')[0]
_u = stimulus.synthesize(stim_name,
n_timesteps=n_timesteps,
dt_scaler=dt_scaler,
**stim_params).T # (ndim x ntimes)
# stim_type = get_process_type(stim_name)
# if stim_type == 'chaotic':
# u = eval(f'chaos.{stim_name}')(n_timesteps=n_timesteps, **stim_params).T
# elif stim_type == 'periodic':
# omega = stim_params['omega']
# phase = stim_params['phase']
# if stim_name == 'sign':
# u = np.sign(np.sin(omega*t + phase)).reshape(1,-1)
# else:
# u = eval(f'np.{stim_name}')(omega*t + phase).reshape(1,-1)
# else:
# raise NotImplementedError
u.append(_u)
u = np.concat(u) # (Σ ndim x ntimes)
u = self.standardize(u) # (Σ ndim x ntimes)
# print(u.mean(axis=1), u.std(axis=1), )
return u # (Σ ndim x ntimes)
[docs]
def make_target(self, u):
"""Makes target array given the input timeseries.
Parameters
----------
u : ndarray
Input t
Returns
-------
ndarray
Target array
"""
#TODO: this dioesn't work for taylor
Y = [self.make_subtarget(u, *subtask) for subtask in self.subtasks]
return np.array(Y)
# if flatten:
# Y = self.flatten_target(Y)
# print(f'hi hee target maker. {Y.shape}')
# return Y
[docs]
def flatten(self, y, verbose=True):
"""
Flattens both target and subtasks for multi dimensional targets.
"""
y_flt = self.flatten_target(y=y, verbose=verbose)
subtasks_flt = self.flatten_subtasks(n_dims=y.shape[1], verbose=verbose)
return subtasks_flt, y_flt
[docs]
def flatten_target(self, y, verbose=True):
"""
For Multi-dimensional targets of shape (n_subtasks, n_dim, n_timesteps),
flattens the targets into an array of size (n_dim * n_subtasks, n_timesteps)
where the each row first iterates over the subtask within each dimension and
then switches to the next dimension.
"""
n_subtasks, n_dims, n_times = y.shape
y_flt = y.swapaxes(0, 1).reshape(-1, n_times)
if verbose:
print(f' Initial target shape {y.shape} its final shape: {y_flt.shape}')
return y_flt
[docs]
def flatten_subtasks(self, n_dims, verbose=True):
"""
For Multi-dimensional targets of shape (n_subtasks, n_dim, n_timesteps),
target flattening must also update the subtask parameters:
(subtask_params) --> (dim, subtask_params)
The task_param repetitions are ordered such that the `dim` component varies
after progressing over all combinations of subtasks. This is coherent with
the order target orders in `flatten_target`.
"""
n_subtasks = len(self.subtasks)
subtasks_flt = np.c_[
np.repeat(np.arange(n_dims), n_subtasks),
np.tile(np.array(self.subtasks), (n_dims, 1))
]
if verbose:
print(f' Initial subtask shape {np.array(self.subtasks).shape} its final shape: {subtasks_flt.shape}')
return subtasks_flt.tolist()
[docs]
def get_score_template(self):
"""Returns a pandas dataframe with the first columns indicating all subtasks"""
NotImplemented
[docs]
class Taylor(TemporalTask):
"""Implements the Tayor task. There are two ways to provide the exponents.
Either by providing a list or by specifying the (min,max,num) of exponents.
If the latter is chosen, only the integer exponent will be considered. Also
note that the maximum degree is excluded.
Parameters
----------
degrees : array-like, optional
Exponents to raise to, by default None
deg_min : int, optional
Minimum exponent to raise to, by default None
deg_max : int, optional
Maximum exponent to raise to, by default None
deg_n : int, optional
Number of exponent between the min and max exponents, by default None
"""
def __init__(self, degrees=None,
deg_min=None, deg_max=None, deg_n=None):
super().__init__(name='taylor')
degree_isnt_given = degrees is None
deg_range_isnt_given = deg_max is None or deg_min is None or deg_n is None
deg_is_invalid = degree_isnt_given and deg_range_isnt_given
assert not deg_is_invalid, f'{self.name} task cannot be set up. Please provide either the exponents or their range and number.'
if degrees is None:
degrees = np.linspace(deg_min, deg_max, deg_n, dtype=int)
self.degrees = degrees
[docs]
def make_subtasks(self):
self.subtasks = np.arange(1, self.degrees.max()+1).tolist()
[docs]
def make_subtarget(self, u, degree):
return u**degree
[docs]
def get_score_template(self):
return pd.DataFrame(np.array(self.subtasks).astype(int),
columns=['dim','deg'])
[docs]
class Nostradamus(TemporalTask):
"""Implements the Nostradamus task. There are two ways to provide the time
shifts; Either by providing a list or by specifying the (min,max,num) of
shifts. If the latter is chosen, only the integer exponent will be considered. Also
note that the maximum degree is excluded.
Parameters
----------
deltas : array-like, optional
An array of time shifts, by default None
delt_min : float, optional
Minumum shift, by default None
delt_max : float, optional
Maximum shift, by default None
delt_n : int, optional
Number of shifts, will be converted to the next odd number if even, by default None
delt_spacing : str, optional
The scaling between time shifts (`lin` for linear and `log` for logarithmic), by default 'log'
base_stride : _type_, optional
The number ot time samples per each unit of time, by default None
.. note::
The shift values must be provided in the units of the base stimulus timescale. For
instance, shift values in the interval [−2,2] denote temporal shifts of the input
to the past or future by up to twice the period of the main timescale of the stimulus.
"""
def __init__(self, delt_spacing='log', deltas=None,
delt_min=None, delt_max=None, delt_n=None,
base_stride=None,
):
super().__init__(name='nostradamus')
delta_isnt_given = deltas is None
delt_range_isnt_given = delt_max is None or delt_min is None or delt_n is None
delt_is_invalid = (delta_isnt_given and delt_range_isnt_given)
assert not delt_is_invalid, f'{self.name} task cannot be set up. Please provide either the deltas or their range and number.'
if deltas is None:
if delt_n%2==0:
delt_n+=1
# base stride converts the deltas from physical times to steps
if base_stride is not None:
delt_min *= base_stride
delt_max *= base_stride
if delt_spacing == 'lin':
deltas = np.linspace(delt_min, delt_max, delt_n, dtype=int)
else:
deltas = np.unique(np.rint(np.geomspace(1, delt_max, delt_n)).astype(int))
deltas = np.concat([deltas, -deltas, [0]])
self.deltas = np.sort(deltas)
[docs]
def make_subtasks(self):
self.subtasks = self.deltas.tolist()
[docs]
def make_subtarget(self, u, delta):
y = np.roll(u, -delta, axis=1)
if delta>0:
y[:, -delta:] = np.nan
if delta<0:
y[:, :-delta] = np.nan
return y
[docs]
def get_score_template(self):
return pd.DataFrame(np.array(self.subtasks).astype(int),
columns=['dim','delta'])
[docs]
class Tayloramus(TemporalTask):
""""Implements the Tayloramus task. The arguments synopsis follows the convension of
``Taylor`` and ``Nostradamus`` tasks.
Parameters
----------
degrees : array-like, optional
Exponents to raise to, by default None
deg_min : int, optional
Minimum exponent to raise to, by default None
deg_max : int, optional
Maximum exponent to raise to, by default None
deg_n : int, optional
Number of exponent between the min and max exponents, by default None
deltas : array-like, optional
An array of time shifts, by default None
delt_min : float, optional
Minumum shift, by default None
delt_max : float, optional
Maximum shift, by default None
delt_n : int, optional
Number of shifts, will be converted to the next odd number if even, by default None
delt_spacing : str, optional
The scaling between time shifts (`lin` for linear and `log` for logarithmic), by default 'log'
base_stride : int, optional
The number ot time samples per each unit of time, by default None
"""
def __init__(self, delt_spacing='log',
degrees=None, deg_min=None, deg_max=None, deg_n=None,
deltas=None, delt_min=None, delt_max=None, delt_n=None,
base_stride=None,
):
super().__init__(name='tayloramus')
degree_isnt_given = degrees is None
deg_range_isnt_given = deg_max is None or deg_min is None or deg_n is None
deg_is_invalid = degree_isnt_given and deg_range_isnt_given
assert not deg_is_invalid, f'{self.name} task cannot be set up. Please provide either the exponents or their range and number.'
delta_isnt_given = deltas is None
delt_range_isnt_given = delt_max is None or delt_min is None or delt_n is None
delt_is_invalid = (delta_isnt_given and delt_range_isnt_given)
assert not delt_is_invalid, f'{self.name} task cannot be set up. Please provide either the deltas or their range and number.'
if deltas is None:
if delt_n%2==0:
delt_n+=1
# base stride converts the deltas from physical times to steps
if base_stride is not None:
delt_min *= base_stride
delt_max *= base_stride
if delt_spacing == 'lin':
deltas = np.linspace(delt_min, delt_max, delt_n, dtype=int)
else:
deltas = np.unique(np.rint(np.geomspace(1, delt_max, delt_n)).astype(int))
deltas = np.concat([deltas, -deltas, [0]])
if degrees is None:
degrees = np.linspace(deg_min, deg_max, deg_n, dtype=int)
self.deltas = np.sort(deltas)
self.degrees = degrees
[docs]
def make_subtasks(self):
self.subtasks = []
for degree in self.degrees:
for delta in self.deltas:
self.subtasks.append([delta, degree])
[docs]
def make_subtarget(self, u, delta, degree):
y = np.roll(u, -delta, axis=1)
if delta>0:
y[:, -delta:] = np.nan
if delta<0:
y[:, :-delta] = np.nan
return y**degree
[docs]
def get_score_template(self):
return pd.DataFrame(np.array(self.subtasks).astype(int),
columns=['dim','delta', 'deg'])
# class ASHD(object):
# def __init__(self, path, name='ashd',
# ds_fac=10, encoding='concat',
# shuffle=True, rng=None):
# self.name = name
# self.path = path
# self.ds_fac = ds_fac
# self.encoding = encoding
# self.shuffle = shuffle
# self.rng = rng
# if shuffle:
# assert rng is not None
# trn_path = osjoin(self.path, 'train')
# tst_path = osjoin(self.path, 'test')
# msg = f"""
# The .wav files must be already splitted into train and test sets, and placed
# respectively on the following paths.
# Train: {self.path}/train
# Test: {self.path}/test
# Could not find these folders. Do they exists?
# """
# assert os.path.exists(trn_path) and os.path.exists(tst_path), msg
# print('reading train files')
# self.trn_files = sorted([f for f in os.listdir(trn_path) if os.path.isfile(osjoin(trn_path, f))])
# print('reading test files')
# self.tst_files = sorted([f for f in os.listdir(tst_path) if os.path.isfile(osjoin(tst_path, f))])
# def make_stim_target(self, n_timesteps, dt=None):
# us = [] # stims
# ys = [] # targets
# dirs = ['train', 'test']
# sample_lists = [self.trn_files, self.tst_files]
# sample_id = 0
# print('creating stim files')
# for _dir, _sample_list in zip(dirs, sample_lists):
# u = []
# yid = []
# ydigit = []
# ysample = [] # only for identifying the sample id
# for sample in _sample_list:
# _,i,_,d = sample.split('_')
# _id = int(i.split('-')[1])
# _digit = int(d.split('-')[1][0]) + 10*('german' in sample) # german numbers are added by 10
# rate, adata = wavfile.read(osjoin(self.path, _dir, sample))
# # adata = signal.decimate(adata, self.ds_fac)
# adata = adata[::self.ds_fac]
# adata = abs(standardize(adata.reshape(1,-1)).squeeze())
# u.append(adata)
# yid.append( np.ones(len(adata), dtype=int)*_id )
# ydigit.append( np.ones(len(adata), dtype=int)*_digit )
# ysample.append( np.ones(len(adata), dtype=int)*sample_id )
# sample_id+=1
# # print(f'ha ha {min([min(s) for s in ysample])}')
# # print(f'ha ha {max([max(s) for s in ysample])}')
# if self.shuffle:
# perm = self.rng.permutation(len(u))
# u = [u[i] for i in perm]
# yid = [yid[i] for i in perm]
# ydigit = [ydigit[i] for i in perm]
# ysample = [ysample[i] for i in perm]
# ys.append(np.c_[np.concat(ydigit), np.concat(yid), np.concat(ysample)].T)
# us.append(np.concat(u))
# us = np.concat(us).reshape(1, -1)
# # us = standardize(us)
# ys = np.concat(ys, axis=1)
# dt = self.ds_fac/rate
# n_timesteps = us.shape[1]
# t = np.linspace(0, n_timesteps*dt, n_timesteps)
# return t, us, ys
# def make_subtasks(self):
# self.subtasks = ['digit', 'id']
# def get_score_template(self):
# return pd.DataFrame(np.array(self.subtasks).astype(str),
# columns=['subtask'])
[docs]
def needs_synthesis(task_name):
"""Checks if a given task requires stimulus synthesis. Those tasks in which
the input and output are saved on disk, e.g,. do not need synthesis.
Has to be adapted if non-temporal tasks were added.
"""
if task_name in ['taylor', 'nostradamus', 'tayloramus']:
return True
else:
return False