Source code for hetero.tasks

"""
This package is solely focused on specifc class tasks that can be expressed in terms of of some input-output relations and nothing else. This condition, despite seeming trivial, is by no means generic, especially when behaving animals in the wild are taken into account. 

More formally, such and assumption reduces the class of tasks to time-dependent target functions :math:`y(t)` that can be cast as some complex transformation of only the input :math:`u(t)`:

.. math::

    y(t) = \\mathcal F[u(t)]

Importantly, for such class of problems, the internal state :math:`X(t)` of the computing machine (the brain) has no influence on the target function. 

.. important:: 
    Due to the pure dependence of the output on the input, we deem these class of tasks as the most generic form `working memory` tasks. Indeed, all experimental cognitive tasks can be expressed in such form. 
    

We focus on three instance of this class explained below.

Taylor: 
  Named after Taylor series (https://en.wikipedia.org/wiki/Taylor_series), the aim of
  this task is to compute different powers tof the input:
  
  .. math::

        y(t) = u(t)^d
     
     
Nostradamus:
  Named after Michel de Nostredame (https://en.wikipedia.org/wiki/Nostradamus), this 
  tasks aims to predict the future (:math:`\Delta > 0`) or recall the past (:math:`\Delta < 0`) of the input:
  
  .. math::

        y(t) = u(t+\Delta)
   

Tayloramus: 
  A mix of the tasks above, aiming to predict the time-shifted input raised to a particular exponent:
  
  .. math::

        y(t) = u(t+\Delta)^d
  
"""



import os 
osjoin = os.path.join # an alias for convenience
from pdb import set_trace
import numpy as np
import pandas as pd
from scipy.io import wavfile
from scipy import signal
from sklearn.preprocessing import StandardScaler


# from hetero.stimuli import chaos
from hetero import stimulus

# PROCESS_TYPES = {
#     'chaotic': ['henon', 'logistic_map', 'lorenz', 'lorenz96', 'mackey_glass', 
#                 'multiscroll', 'doublescroll', 'rabinovich_fabrikant', 
#                 'narma', 'rossler', 'kuramoto_sivashinsky'],
#     'periodic': ['sin', 'cos', 'tan', 'sign'],
#     'stochastic': ['brownian'], 
#     'fromfile': ['ahd'],
# }


# def get_process_type(stim_name):
#     for type, v in PROCESS_TYPES.items():
#         if stim_name in v:
#             return type
#     else:
#         return 'fromfile'


[docs] class TemporalTask(object): """An abstract class for temporal tasks. This class maintains task parameters and creates all subtasks associated with each task parameter. To this aim, it also synthesizes a potentially multi-dimensional input and processes it according to the rule imposed by the "working memory" task. It also provides a score template for registering the performance levels. """ def __init__(self, name='temporal', save_type='float16',**task_params): self.name = name self.stype = save_type
[docs] def standardize(self, stim): """Standardizes a multi-dimensional input component-wise so that the temporal mean and standard deviations are 0 and 1, respectively. """ return StandardScaler().fit_transform(stim.T).T
[docs] def make_subtasks(self): """creates all subtask parameters.""" NotImplemented
[docs] def make_subtarget(self): """creates a target function the task parameters.""" NotImplemented
[docs] def make_stim(self, n_timesteps, dt_scaler=1, **stim_dict): u = [] for stim_name_id, stim_params in stim_dict.items(): stim_name = stim_name_id.split('-')[0] _u = stimulus.synthesize(stim_name, n_timesteps=n_timesteps, dt_scaler=dt_scaler, **stim_params).T # (ndim x ntimes) # stim_type = get_process_type(stim_name) # if stim_type == 'chaotic': # u = eval(f'chaos.{stim_name}')(n_timesteps=n_timesteps, **stim_params).T # elif stim_type == 'periodic': # omega = stim_params['omega'] # phase = stim_params['phase'] # if stim_name == 'sign': # u = np.sign(np.sin(omega*t + phase)).reshape(1,-1) # else: # u = eval(f'np.{stim_name}')(omega*t + phase).reshape(1,-1) # else: # raise NotImplementedError u.append(_u) u = np.concat(u) # (Σ ndim x ntimes) u = self.standardize(u) # (Σ ndim x ntimes) # print(u.mean(axis=1), u.std(axis=1), ) return u # (Σ ndim x ntimes)
[docs] def make_target(self, u): """Makes target array given the input timeseries. Parameters ---------- u : ndarray Input t Returns ------- ndarray Target array """ #TODO: this dioesn't work for taylor Y = [self.make_subtarget(u, *subtask) for subtask in self.subtasks] return np.array(Y)
# if flatten: # Y = self.flatten_target(Y) # print(f'hi hee target maker. {Y.shape}') # return Y
[docs] def flatten(self, y, verbose=True): """ Flattens both target and subtasks for multi dimensional targets. """ y_flt = self.flatten_target(y=y, verbose=verbose) subtasks_flt = self.flatten_subtasks(n_dims=y.shape[1], verbose=verbose) return subtasks_flt, y_flt
[docs] def flatten_target(self, y, verbose=True): """ For Multi-dimensional targets of shape (n_subtasks, n_dim, n_timesteps), flattens the targets into an array of size (n_dim * n_subtasks, n_timesteps) where the each row first iterates over the subtask within each dimension and then switches to the next dimension. """ n_subtasks, n_dims, n_times = y.shape y_flt = y.swapaxes(0, 1).reshape(-1, n_times) if verbose: print(f' Initial target shape {y.shape} its final shape: {y_flt.shape}') return y_flt
[docs] def flatten_subtasks(self, n_dims, verbose=True): """ For Multi-dimensional targets of shape (n_subtasks, n_dim, n_timesteps), target flattening must also update the subtask parameters: (subtask_params) --> (dim, subtask_params) The task_param repetitions are ordered such that the `dim` component varies after progressing over all combinations of subtasks. This is coherent with the order target orders in `flatten_target`. """ n_subtasks = len(self.subtasks) subtasks_flt = np.c_[ np.repeat(np.arange(n_dims), n_subtasks), np.tile(np.array(self.subtasks), (n_dims, 1)) ] if verbose: print(f' Initial subtask shape {np.array(self.subtasks).shape} its final shape: {subtasks_flt.shape}') return subtasks_flt.tolist()
[docs] def get_score_template(self): """Returns a pandas dataframe with the first columns indicating all subtasks""" NotImplemented
[docs] class Taylor(TemporalTask): """Implements the Tayor task. There are two ways to provide the exponents. Either by providing a list or by specifying the (min,max,num) of exponents. If the latter is chosen, only the integer exponent will be considered. Also note that the maximum degree is excluded. Parameters ---------- degrees : array-like, optional Exponents to raise to, by default None deg_min : int, optional Minimum exponent to raise to, by default None deg_max : int, optional Maximum exponent to raise to, by default None deg_n : int, optional Number of exponent between the min and max exponents, by default None """ def __init__(self, degrees=None, deg_min=None, deg_max=None, deg_n=None): super().__init__(name='taylor') degree_isnt_given = degrees is None deg_range_isnt_given = deg_max is None or deg_min is None or deg_n is None deg_is_invalid = degree_isnt_given and deg_range_isnt_given assert not deg_is_invalid, f'{self.name} task cannot be set up. Please provide either the exponents or their range and number.' if degrees is None: degrees = np.linspace(deg_min, deg_max, deg_n, dtype=int) self.degrees = degrees
[docs] def make_subtasks(self): self.subtasks = np.arange(1, self.degrees.max()+1).tolist()
[docs] def make_subtarget(self, u, degree): return u**degree
[docs] def get_score_template(self): return pd.DataFrame(np.array(self.subtasks).astype(int), columns=['dim','deg'])
[docs] class Nostradamus(TemporalTask): """Implements the Nostradamus task. There are two ways to provide the time shifts; Either by providing a list or by specifying the (min,max,num) of shifts. If the latter is chosen, only the integer exponent will be considered. Also note that the maximum degree is excluded. Parameters ---------- deltas : array-like, optional An array of time shifts, by default None delt_min : float, optional Minumum shift, by default None delt_max : float, optional Maximum shift, by default None delt_n : int, optional Number of shifts, will be converted to the next odd number if even, by default None delt_spacing : str, optional The scaling between time shifts (`lin` for linear and `log` for logarithmic), by default 'log' base_stride : _type_, optional The number ot time samples per each unit of time, by default None .. note:: The shift values must be provided in the units of the base stimulus timescale. For instance, shift values in the interval [−2,2] denote temporal shifts of the input to the past or future by up to twice the period of the main timescale of the stimulus. """ def __init__(self, delt_spacing='log', deltas=None, delt_min=None, delt_max=None, delt_n=None, base_stride=None, ): super().__init__(name='nostradamus') delta_isnt_given = deltas is None delt_range_isnt_given = delt_max is None or delt_min is None or delt_n is None delt_is_invalid = (delta_isnt_given and delt_range_isnt_given) assert not delt_is_invalid, f'{self.name} task cannot be set up. Please provide either the deltas or their range and number.' if deltas is None: if delt_n%2==0: delt_n+=1 # base stride converts the deltas from physical times to steps if base_stride is not None: delt_min *= base_stride delt_max *= base_stride if delt_spacing == 'lin': deltas = np.linspace(delt_min, delt_max, delt_n, dtype=int) else: deltas = np.unique(np.rint(np.geomspace(1, delt_max, delt_n)).astype(int)) deltas = np.concat([deltas, -deltas, [0]]) self.deltas = np.sort(deltas)
[docs] def make_subtasks(self): self.subtasks = self.deltas.tolist()
[docs] def make_subtarget(self, u, delta): y = np.roll(u, -delta, axis=1) if delta>0: y[:, -delta:] = np.nan if delta<0: y[:, :-delta] = np.nan return y
[docs] def get_score_template(self): return pd.DataFrame(np.array(self.subtasks).astype(int), columns=['dim','delta'])
[docs] class Tayloramus(TemporalTask): """"Implements the Tayloramus task. The arguments synopsis follows the convension of ``Taylor`` and ``Nostradamus`` tasks. Parameters ---------- degrees : array-like, optional Exponents to raise to, by default None deg_min : int, optional Minimum exponent to raise to, by default None deg_max : int, optional Maximum exponent to raise to, by default None deg_n : int, optional Number of exponent between the min and max exponents, by default None deltas : array-like, optional An array of time shifts, by default None delt_min : float, optional Minumum shift, by default None delt_max : float, optional Maximum shift, by default None delt_n : int, optional Number of shifts, will be converted to the next odd number if even, by default None delt_spacing : str, optional The scaling between time shifts (`lin` for linear and `log` for logarithmic), by default 'log' base_stride : int, optional The number ot time samples per each unit of time, by default None """ def __init__(self, delt_spacing='log', degrees=None, deg_min=None, deg_max=None, deg_n=None, deltas=None, delt_min=None, delt_max=None, delt_n=None, base_stride=None, ): super().__init__(name='tayloramus') degree_isnt_given = degrees is None deg_range_isnt_given = deg_max is None or deg_min is None or deg_n is None deg_is_invalid = degree_isnt_given and deg_range_isnt_given assert not deg_is_invalid, f'{self.name} task cannot be set up. Please provide either the exponents or their range and number.' delta_isnt_given = deltas is None delt_range_isnt_given = delt_max is None or delt_min is None or delt_n is None delt_is_invalid = (delta_isnt_given and delt_range_isnt_given) assert not delt_is_invalid, f'{self.name} task cannot be set up. Please provide either the deltas or their range and number.' if deltas is None: if delt_n%2==0: delt_n+=1 # base stride converts the deltas from physical times to steps if base_stride is not None: delt_min *= base_stride delt_max *= base_stride if delt_spacing == 'lin': deltas = np.linspace(delt_min, delt_max, delt_n, dtype=int) else: deltas = np.unique(np.rint(np.geomspace(1, delt_max, delt_n)).astype(int)) deltas = np.concat([deltas, -deltas, [0]]) if degrees is None: degrees = np.linspace(deg_min, deg_max, deg_n, dtype=int) self.deltas = np.sort(deltas) self.degrees = degrees
[docs] def make_subtasks(self): self.subtasks = [] for degree in self.degrees: for delta in self.deltas: self.subtasks.append([delta, degree])
[docs] def make_subtarget(self, u, delta, degree): y = np.roll(u, -delta, axis=1) if delta>0: y[:, -delta:] = np.nan if delta<0: y[:, :-delta] = np.nan return y**degree
[docs] def get_score_template(self): return pd.DataFrame(np.array(self.subtasks).astype(int), columns=['dim','delta', 'deg'])
# class ASHD(object): # def __init__(self, path, name='ashd', # ds_fac=10, encoding='concat', # shuffle=True, rng=None): # self.name = name # self.path = path # self.ds_fac = ds_fac # self.encoding = encoding # self.shuffle = shuffle # self.rng = rng # if shuffle: # assert rng is not None # trn_path = osjoin(self.path, 'train') # tst_path = osjoin(self.path, 'test') # msg = f""" # The .wav files must be already splitted into train and test sets, and placed # respectively on the following paths. # Train: {self.path}/train # Test: {self.path}/test # Could not find these folders. Do they exists? # """ # assert os.path.exists(trn_path) and os.path.exists(tst_path), msg # print('reading train files') # self.trn_files = sorted([f for f in os.listdir(trn_path) if os.path.isfile(osjoin(trn_path, f))]) # print('reading test files') # self.tst_files = sorted([f for f in os.listdir(tst_path) if os.path.isfile(osjoin(tst_path, f))]) # def make_stim_target(self, n_timesteps, dt=None): # us = [] # stims # ys = [] # targets # dirs = ['train', 'test'] # sample_lists = [self.trn_files, self.tst_files] # sample_id = 0 # print('creating stim files') # for _dir, _sample_list in zip(dirs, sample_lists): # u = [] # yid = [] # ydigit = [] # ysample = [] # only for identifying the sample id # for sample in _sample_list: # _,i,_,d = sample.split('_') # _id = int(i.split('-')[1]) # _digit = int(d.split('-')[1][0]) + 10*('german' in sample) # german numbers are added by 10 # rate, adata = wavfile.read(osjoin(self.path, _dir, sample)) # # adata = signal.decimate(adata, self.ds_fac) # adata = adata[::self.ds_fac] # adata = abs(standardize(adata.reshape(1,-1)).squeeze()) # u.append(adata) # yid.append( np.ones(len(adata), dtype=int)*_id ) # ydigit.append( np.ones(len(adata), dtype=int)*_digit ) # ysample.append( np.ones(len(adata), dtype=int)*sample_id ) # sample_id+=1 # # print(f'ha ha {min([min(s) for s in ysample])}') # # print(f'ha ha {max([max(s) for s in ysample])}') # if self.shuffle: # perm = self.rng.permutation(len(u)) # u = [u[i] for i in perm] # yid = [yid[i] for i in perm] # ydigit = [ydigit[i] for i in perm] # ysample = [ysample[i] for i in perm] # ys.append(np.c_[np.concat(ydigit), np.concat(yid), np.concat(ysample)].T) # us.append(np.concat(u)) # us = np.concat(us).reshape(1, -1) # # us = standardize(us) # ys = np.concat(ys, axis=1) # dt = self.ds_fac/rate # n_timesteps = us.shape[1] # t = np.linspace(0, n_timesteps*dt, n_timesteps) # return t, us, ys # def make_subtasks(self): # self.subtasks = ['digit', 'id'] # def get_score_template(self): # return pd.DataFrame(np.array(self.subtasks).astype(str), # columns=['subtask'])
[docs] def needs_synthesis(task_name): """Checks if a given task requires stimulus synthesis. Those tasks in which the input and output are saved on disk, e.g,. do not need synthesis. Has to be adapted if non-temporal tasks were added. """ if task_name in ['taylor', 'nostradamus', 'tayloramus']: return True else: return False