import os
from os.path import join as osjoin
from os.path import sep as pathsep
import re
import inspect
# from pdb import set_trace
# import copy
import json
import time
import functools
import itertools
import pandas as pd
import numpy as np
import numpy.lib.format
from numpy.fft import fft, ifft
# from numpy.lib.format import read_array_header_1_0, read_array_header_2_0, read_magic
# import scipy.special as sp
from scipy.io import wavfile
import scipy.signal as signal
from scipy.optimize import curve_fit, bisect
from scipy.sparse import load_npz
from sklearn.metrics.pairwise import cosine_similarity
# from hetero.config import DEFAULTS, ARGUMENT_NAMING_MAP
# from hetero.naming_wizard import fit_parser, gain_parser
# from hetero.tasks import TASK_NEEDS_SYNTHESIS
[docs]
def get_default_args(func):
"""
Gets the default arguments of a function
Parameters
----------
func : callable
the function to be investigated
Returns
-------
dict
default arguments of the provided function
"""
signature = inspect.signature(func)
return {
k: v.default
for k, v in signature.parameters.items()
if v.default is not inspect.Parameter.empty
}
[docs]
class NpEncoder(json.JSONEncoder):
[docs]
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
return super(NpEncoder, self).default(obj)
[docs]
def timer(func):
"""Print the runtime of the decorated function"""
@functools.wraps(func)
def wrapper_timer(*args, **kwargs):
tic = time.perf_counter()
value = func(*args, **kwargs)
toc = time.perf_counter()
run_time = toc - tic
print(f'All in all, {func.__name__!r} took {run_time:.4f} second to run in parallel.')
return value
return wrapper_timer
[docs]
def asave(path, arr, target_type):
"""a simple array saver that safely cast arrays to the target type."""
min_val = np.finfo(target_type).min
max_val = np.finfo(target_type).max
np.save(path, np.clip(arr, min_val, max_val).astype(target_type))
[docs]
def get_wav_shape(wav_path):
_, data = wavfile.read(wav_path, mmap=True)
return data.shape
# with open(array_path, 'rb') as f:
# version = read_magic(f)
# if version == (1, 0):
# shape, fortran_order, dtype = read_array_header_1_0(f)
# elif version == (2, 0):
# shape, fortran_order, dtype = read_array_header_2_0(f)
# else:
# raise ValueError("Unsupported .npy version: {}".format(version))
# return shape
[docs]
def get_base_freqs(stim, dt=1):
base_freqs = []
if stim.ndim == 1:
freqs, psd = signal.periodogram(stim, 1./dt)
base_freqs.append(freqs[np.argmax(psd)])
else:
for dim in range(stim.shape[0]):
freqs, psd = signal.periodogram(stim[dim], 1./dt)
base_freqs.append(freqs[np.argmax(psd)])
return base_freqs
[docs]
def rescale_time(t, stim, target_base_freq=1., avg_method='geom'):
dt = t[1] - t[0]
base_freqs = get_base_freqs(stim, dt=dt)
if avg_method == 'lin':
bf = np.mean(base_freqs)
elif avg_method == 'geom':
bf = np.prod(base_freqs)**(1/len(base_freqs))
else:
raise NotImplementedError
t *= 1.*bf/target_base_freq
return t, bf, base_freqs
# def compute_dataset_size(n_trn, n_tst, base_stride, task_params, N=0):
# task, tparams = next(iter(task_params.items()))
# if TASK_NEEDS_SYNTHESIS[task]:
# N_tst = int(n_tst * base_stride)
# N_trn = int(n_trn * base_stride * (1 + N))
# else:
# trn_path = osjoin(tparams['path'], 'train')
# tst_path = osjoin(tparams['path'], 'test')
# msg = f"""
# The samples must be already splitted into train and test sets, and placed respectively on the train and test subdirectories.
# Could not find these folders. Do they exists?
# """
# assert os.path.exists(trn_path) and os.path.exists(tst_path), msg
# N_trn = len([f for f in os.listdir(trn_path) if os.path.isfile(osjoin(trn_path, f))])
# N_tst = len([f for f in os.listdir(tst_path) if os.path.isfile(osjoin(tst_path, f))])
# return N_trn, N_tst
# def compute_dataset_length(N_trn, N_tst, base_stride, task_params):
# task, tparams = next(iter(task_params.items()))
# if task == 'taylor':
# L_trn = N_trn
# L_tst = N_tst
# elif task == 'tayloramus' or task == 'nostradamus':
# L_trn = N_trn + 2 * tparams['delt_max']*base_stride
# L_tst = N_tst + 2 * tparams['delt_max']*base_stride
# elif task == 'ashd':
# factor = tparams['ds_fac']
# trn_path = osjoin(tparams['path'], 'train')
# tst_path = osjoin(tparams['path'], 'test')
# trn_samples = [f for f in os.listdir(trn_path) if os.path.isfile(osjoin(trn_path, f))]
# tst_samples = [f for f in os.listdir(tst_path) if os.path.isfile(osjoin(tst_path, f))]
# L_trn = []
# for trn_sample in trn_samples:
# L_raw = get_wav_shape(osjoin(trn_path, trn_sample))[0]
# L_trn.append(L_raw//factor + int((L_raw % factor)>0) ) # takes care of downsampling
# L_tst = []
# for tst_sample in tst_samples:
# L_raw = get_wav_shape(osjoin(tst_path, tst_sample))[0]
# L_tst.append(L_raw//factor + int((L_raw % factor)>0) ) # takes care of downsampling
# L_trn = sum(L_trn)
# L_tst = sum(L_tst)
# return L_trn, L_tst
[docs]
def compute_dataset_slices(L_trn, L_tst, base_stride, task_params):
"""Note that the task_params are in the physical units and have to be
multiplied by base_stride to get the steps."""
task, tparams = next(iter(task_params.items()))
if task == 'taylor':
s_trn = slice(0, L_trn)
s_tst = slice(L_trn, L_trn + L_tst)
elif task == 'tayloramus' or task == 'nostradamus':
s_trn = slice(0 + tparams['delt_max']*base_stride,
L_trn - tparams['delt_max']*base_stride)
s_tst = slice(L_trn + tparams['delt_max']*base_stride,
L_trn + L_tst - tparams['delt_max']*base_stride)
elif task == 'ashd':
s_trn = slice(0, L_trn)
s_tst = slice(L_trn, L_trn + L_tst)
return s_trn, s_tst
[docs]
def compute_complexity(u, y, subtasks, savepath):
# trimming out the nan values
non_nan = (~np.isnan(y)).prod(axis=0).astype(bool)
Y = y[:, non_nan]
U = u[:, non_nan]
ndims = U.shape[0]
Cs= []
for suby, subu in zip(np.split(Y, ndims), U):
cos = cosine_similarity(suby, subu.reshape(1,-1)).squeeze()
Cs.append(np.clip(1-abs(cos), 0, None))
Cs = np.array(Cs).ravel()
df = pd.DataFrame(np.c_[subtasks, Cs],
columns=['dim', 'delta','deg', 'complexity'])
df.to_csv(savepath)
[docs]
def make_net_ids(means, stds):
offset_m, offset_s = 0, 0
if min(means)>0:
offset_m=1
if min(stds)>0:
offset_s=1
mean_ids = list(range(offset_m, len(means)+offset_m))
std_ids = list(range(offset_s, len(stds)+offset_s))
net_ids = [f'{mid}{sid}' for mid, sid in itertools.product(mean_ids, std_ids)]
return net_ids
[docs]
def add_complexity_trier(df, N=3):
# df['tier'] = (abs(df.delta) / (2*df.bs) )
# df['tier'] = pd.cut(df['tier'], bins=np.linspace(0, max(df.tier), N+1), labels=False, include_lowest=True)
tier_borders= np.linspace(0., 1.+1e-6, N+1)
for tier in range(len(tier_borders)-1):
idx = (df.complexity >= tier_borders[tier]) & (df.complexity < tier_borders[tier+1])
df.loc[idx, 'tier'] = tier
return df
[docs]
def add_homog_score(df, std=False):
test='test'
train='train'
if std:
test += '_std'
train += '_std'
df[f'homog_{test}'] = -1.
df[f'homog_{train}'] = -1.
for idx, (group, _df) in enumerate(df.groupby(['n', 'task'])):
df.loc[_df.index, f'homog_{test}' ] = _df[_df.hn==0][test].iloc[0]
df.loc[_df.index, f'homog_{train}'] = _df[_df.hn==0][train].iloc[0]
return df
[docs]
def aggregate_scores(root, sim_pattern, invalid_patterns=None,
score_name=None, return_std=True, n_tiers=3,
transform=True, transforms=(lambda x: np.exp(x-1),
lambda x: 2/np.pi*np.arctan(x))
):
tic = time.time()
# finding simulation names
sims = sorted([f.path.split('/')[-1] for f in os.scandir(root) if (f.is_dir() and sim_pattern in f.path)])
if invalid_patterns is not None:
if type(invalid_patterns)==str:
sims = sorted([sim for sim in sims if invalid_patterns not in sim])
elif type(invalid_patterns)==list:
valids = []
for sim in sims:
is_valid = True
for inval_pat in invalid_patterns:
if inval_pat in sim:
is_valid = False
break
valids.append(is_valid)
sims = sorted(np.array(sims)[valids].tolist())
print(f'The results of the following {len(sims)} simulations will be aggregated:')
for sim in sims:
print('\t', sim)
dfs = []
for sim in sims:
print(f'aggregating scores from {sim}')
try:
net_ids = sorted([f.name for f in os.scandir(osjoin(root, sim)) if f.is_dir()])
score_names = sorted([f.name.split('.csv')[0].split('tst')[1] for f in os.scandir(osjoin(root, sim, net_ids[0])) if 'tst_' in f.name])
_score_dfs = []
for scr_name in score_names:
_net_dfs = []
for net_id in net_ids:
dtypes = {0: int, 1:int, 2: int}
path_trn = osjoin(root, sim, net_id, f'trn{scr_name}.csv')
path_tst = osjoin(root, sim, net_id, f'tst{scr_name}.csv')
tst_df = pd.read_csv(path_tst, dtype=dtypes)
trn_df = pd.read_csv(path_trn, dtype=dtypes)
_net_df = tst_df.set_index(['dim','delta','deg']).mean(axis=1) # test mean
_net_df = _net_df.reset_index().rename(columns={0: 'test'})
_net_df['train'] = trn_df.set_index(['dim','delta','deg']).mean(axis=1).values # train mean
_net_df['test_std'] = tst_df.set_index(['dim','delta','deg']).std(axis=1).values
_net_df['train_std']= trn_df.set_index(['dim','delta','deg']).std(axis=1).values
# task id and network id
_net_df['net_id']= net_id
_net_df['n'] = int(net_id[0])
_net_df['hn'] = int(net_id[1])
_net_df['task'] = _net_df.apply(lambda row:
(int(row.dim), int(row.delta), int(row.deg)), axis=1)
_net_dfs.append(_net_df)
_score_df = pd.concat(_net_dfs).reset_index(drop=True)
_score_df = add_homog_score(_score_df)
_score_df = add_homog_score(_score_df, std=True)
_score_df['scr'] = scr_name
# fitting parameters
fit_params = fit_parser(scr_name)
_score_df['fit_method'] = fit_params['method']
_score_df['fit_lam'] = fit_params['lam']
_score_df['fit_kernel'] = fit_params['conv_params']['kernel']
_score_df['fit_tk'] = fit_params['conv_params']['tau_k']
# gains
gain_params = gain_parser(scr_name)
_score_df['Jn'] = gain_params['Jn']
_score_df['J' ] = gain_params['J']
_score_df['Ju'] = gain_params['Ju']
_score_dfs.append(_score_df)
_sim_df = pd.concat(_score_dfs).reset_index(drop=True)
_sim_df['sim'] = sim
# simulation parameters
sim_params = read_metadata(osjoin(root, sim))
_sim_df['N'] = sim_params['net']['N']
_sim_df['p'] = sim_params['net']['p']
_sim_df['f'] = sim_params['net']['f']
_sim_df['mue'] = sim_params['net']['mue']
_sim_df['sig'] = sim_params['net']['sig0']
_sim_df['n_trn'] = sim_params['set']['n_trn']
_sim_df['n_tst'] = sim_params['set']['n_tst']
_sim_df['n_trials'] = sim_params['set']['n_trials']
_sim_df['n_timesteps'] = sim_params['n_timesteps']
_sim_df['bf'] = sim_params['bf']
_sim_df['bs'] = sim_params['base_stride']
_sim_df['udim'] = sim_params['dim']
_sim_df['hetero'] = list(sim_params['hetero'].keys())[0]
_sim_df['hetero_ld'] = list(sim_params['hetero'].values())[0]['log_distance']
# add complexity
try:
_sim_df['complexity'] = -1.
complexity_df = pd.read_csv(osjoin(root, sim, 'complexities.csv'), index_col=0, dtype=dtypes)
complexity_df = complexity_df.set_index(['dim','delta','deg'])
for cidx in complexity_df.index:
_sim_df.loc[_sim_df.task==cidx, 'complexity'] = complexity_df.loc[cidx].complexity
except Exception as e:
print(f'Could not read complexity of {sim} and raised {e}.')
dfs.append(_sim_df)
except Exception as e:
print(f'Could not aggregate {sim} and raised {e}.')
dfs = pd.concat(dfs).reset_index(drop=True)
if transform:
dfs['test'] = transforms[0](dfs['test'])
dfs['train'] = transforms[0](dfs['train'])
dfs['homog_test'] = transforms[0](dfs['homog_test'])
dfs['homog_train'] = transforms[0](dfs['homog_train'])
if return_std:
dfs['test_std'] = transforms[1](dfs['test_std'])
dfs['train_std'] = transforms[1](dfs['train_std'])
dfs['homog_test_std'] = transforms[1](dfs['homog_test_std'])
dfs['homog_train_std'] = transforms[1](dfs['homog_train_std'])
if n_tiers > 0:
dfs = add_complexity_trier(dfs, N=n_tiers)
print(f'Aggregation time: {time.time()-tic:.2f} seconds.' )
return dfs.reset_index(drop=True)
[docs]
def sample_taus(size, mu, std, distr='lognormal', clip_below=False,
rng=None, tol_prcntg=-1, verbose=False, ):
"""
The given mu and std are the desired mean and standard deviation and
are different from the parameters passed to rng. It can also handle
the case of std=0, by returning a constant vector of the desired size.
"""
if rng is None:
rng = np.random.default_rng()
if std == 0:
taus = np.ones(size)*mu
else:
if distr == 'lognormal':
std_ = np.sqrt(np.log((std**2) / (mu**2) + 1))
mu_ = np.log(mu) - (std_**2) / 2
# mu_ = np.log(mu**2/np.sqrt(mu**2 + std**2))
# std_ = np.sqrt(np.log(1+(std/mu)**2))
rng_ = rng.lognormal
kwargs = {'sigma':std_, 'mean': mu_}
elif distr == 'normal':
rng_ = rng.normal
kwargs = {'loc':mu, 'scale':std}
elif distr == 'gamma':
k = (mu / std)**2
theta = mu / k
rng_ = rng.gamma
kwargs = {'shape':k, 'scale':theta}
elif distr == 'uniform':
# The mean and variance can be used to find the min and max
# of the distribution.
# mean = 1/2 (low + high)
# var = 1/12 (high - low)**2
low = mu - 3*(std**2)/mu
high = mu + 3*(std**2)/mu
rng_ = rng.uniform
kwargs = {'low':low, 'high':high}
elif distr == 'poly2':
# here we sample from a pdf ~ 1/(1 + x**2). This PDF has
# no bounded moments. Thus, we mean and variance are meaningful
# onty if the distribution is truncated by a minimum and maximum
# tau. `clip_below` is interpreted as the minimum tau. Yet,
# the maximum tau, has to be numerically estimated. Like other
# cases, this is based on the prescribed values of `mu` and `std`.
#
# With `mx` and `mn`, the empirical truncated pdf will be
# uniquely determined. This pdf is then passed to the the
# poly2_sampler function, a custom-made sampler, to sample
# timescales accordingly.
mn = float(clip_below) # minimum tau
mx = estimate_poly2_max_from_stats(mu, std, verbose) # maximum tau
rng_ = poly2_sampler
kwargs = {'mn': mn, 'mx': mx}
else:
raise NotImplementedError('Only gamma and lognoraml are supported.')
taus = rng_(size=size, **kwargs)
if tol_prcntg>0:
# Empirical mean and variance, due to sampling can be
# different from the prescribed values. To handle this,
# we simply redraw taus. This only matters at small N
err_mu = abs(taus.mean()-mu)/mu
err_std= abs(taus.std()-std)/(std+1e-12) * (std > 0)
correction = 0
while (err_mu > tol_prcntg) or (err_std>tol_prcntg):
if correction % 10000 == 0:
print(f'{correction} is made.')
taus = rng_(size=size, **kwargs)
err_mu = (taus.mean()-mu)/mu
err_std= (taus.std()-std)/(std+1e-12) * (std > 0)
correction+=1
if bool(clip_below):
resample = taus[taus < clip_below]
resample_size = len(resample)
while resample_size>0:
taus[taus < clip_below] = rng_(size=resample_size, **kwargs)
resample = taus[taus < clip_below]
resample_size = len(resample)
return taus
[docs]
def estimate_poly2_max_from_stats(mu, std, verbose=False):
"""
If mn and mx are the minimum and maximum values, the pdf would have
the following form:
pdf(x) = 1/Z * 1/(1+x**2) (0)
Z = atan(mx) - atan(mn)
And the first and second moments of this truncated pdf are:
E[x] = 1/(2*Z) * log( (1+mx**2) / (1+mn**2) ) (1)
E[x**2] = (mx - mn)/Z - 1 (2)
One must find mx and mn such that:
E[x] = mu
E[x**2] = std**2 - mu**2
which is very hard to solve. Fortunately, it turns out that both
statistics are mostly influenced by `mx` and not `mn`. Thus we can
use approximations `mn ≈ 0`, which gives raise to:
E[x] ≈ log( 1+ mx**2 ) / (2 * atan(mx)) (3)
E[x**2] ≈ mx / atan(mx) - 1 (4)
Now, we can numerically solve for `mx`. However, only one of these
equations can be used. The reason is that the first and second
moment of the truncated pdf are not independent. Setting mu will
determine std and vice versa. Here we only use equation (3). The
resulting `std` will be reported.
"""
def mx_from_mu(x, mu):
mu1_app = 0.5 * np.log(1 + (x)**2) / (np.arctan(x))
return mu - mu1_app
# def mx_from_std(x, std):
# mu1_app = 0.5 * np.log(1 + (x)**2) / (np.arctan(x))
# mu2_app = x / (np.arctan(x)) - 1
# return std - np.sqrt(mu2_app - mu1_app**2)
assert mu>0, f'The mean of tau must by positive. Got {mu}.'
# the upper bound is selected such its sign is always different
# than the one of low bound
mx = bisect(mx_from_mu , mu/1000, np.exp(np.pi*mu/2), args=(mu,), maxiter=1000, )
if verbose:
actual_std = np.sqrt(mx / np.arctan(mx) - 1 - mu**2)
print(f'In poly2 the std of {std} was requested but actual std is {actual_std}.')
# mx2 = bisect(mx_from_std, max(mu-std, .1)/1000, (mu+std)*1000, args=(std,), maxiter=1000, )
# print(mx1, mx2)
# return (mx1 + mx2)/2
return mx
[docs]
def poly2_sampler(mn, mx ,size):
"""
Since numpy doesn't have a sampler for 1/(1+x**2) distribution we
use the "inverse transform sampling" method to do so. Read more
about this method here:
https://en.wikipedia.org/wiki/Inverse_trigonometric_functions
"""
a = 1.
def Z(a, mn, mx):
return 1./a * (np.arctan(a*mx) - np.arctan(a*mn))
def mu(a, mn, mx):
z = Z(a, mn, mx)
return 1/(a*z) * 1/(2*a) * np.log((1+(a*mx)**2) / (1+ (a*mn)**2))
def mu2(a, mn, mx):
z = Z(a, mn, mx)
return 1/(a**2) * ((mx-mn)/z - 1)
def sig(a, mn, mx):
return np.sqrt(mu2(a, mn,mx) - mu(a, mn, mx)**2)
u = np.random.uniform(0, 1, size)
z = Z(a, mn, mx)
return 1./a * np.tan(a*z*u + np.arctan(a*mn))
[docs]
def fftautocorr(x):
"""
This implements a circular autocorrelation as opposed to the
zero-padded correlation of numpy as and scipy. It is adapted
from https://stackoverflow.com/a/28285527/7106957.
x must be a real sequence.
"""
correls = ifft(fft(x) * fft(x).conj()).real
correls /= correls[0]
lags = np.arange(len(x))
return lags, correls
[docs]
def autocorr(x, maxlags=None):
"""
based on matplotlib's acorr function. Always returns the normalized
symmetric lag autocorrelation.
"""
x = np.asarray(x)
x = x - np.mean(x) # zero-mean for meaningful autocorrelation
result = np.correlate(x, x, mode='full')
result /= result[len(x) - 1] # normalize so lag=0 autocorr = 1
lags = np.arange(-len(x) + 1, len(x))
result = result[lags>=0]
lags = lags[lags>=0]
return lags, result
# Nx = len(x)
# correls = np.correlate(x, x, mode="full")
# correls /= np.dot(x, x)
# if maxlags is None:
# maxlags = Nx - 1
# lags = np.arange(-maxlags, maxlags + 1)
# correls = correls[Nx - 1 - maxlags:Nx + maxlags]
# return lags, correls
[docs]
def find_top_idx(x):
# Find all the index of all peaks
peaks, _ = signal.find_peaks(x, prominence=(0, None), )
# let's see which one is truely the tallest
top_peak_idx = np.argmax(x[peaks])
# and return that index
top_idx = peaks[top_peak_idx]
# Sort peaks by prominence
# prominences = properties["prominences"]
# if len(peaks) == 0:
# return np.array([]) # no peaks found
# proms_sorted = np.argsort(prominences)[::-1]#[:n]
# peaks_sorted = peaks[proms_sorted]
return top_idx
[docs]
def find_base_lags(u, average='geometric'):
# u is assumed to be of shape (ndim x ntimes)
base_lags = []
# for each component in the input
for i in range(u.shape[0]):
# first let's compute the autocorrelation function
lags, ac = autocorr(u[i, :])
# and then compute the index of the top peak (after 1)
top_idx = find_top_idx(ac)
# and add it to the lag values
base_lags.append(lags[top_idx])
if average=='linear':
base = np.mean(base_lags)
else:
base = np.prod(base_lags)**(1./len(base_lags))
return base_lags, base
[docs]
def canonicalize_value(val):
"""Format values consistently for filenames.
Parameters
----------
val : int, float, str
Value to format
Returns
-------
str
Formatted value:
- Floats in scientific notation with no + signs
- Strings in uppercase
- Other types stringified
"""
if isinstance(val, float):
return f"{val:.2f}"
# return s.replace("+", "")
else:
return str(val)
[docs]
def sanitize_token(s):
"""Clean up strings for safe filenames.
Parameters
----------
s : str
String to clean
Returns
-------
str
Safe filename string with only:
- Alphanumeric characters
- Underscores
- Dashes
- Dots
"""
return re.sub(r"[^\w\-.]", "", s)
[docs]
def load_target(sim_path, mmap=False):
loader = np.load
if mmap:
loader = load_mmap
return loader(osjoin(sim_path, 'y.npy')).astype(float)
[docs]
def load_complexity(sim_path):
return pd.read_csv(osjoin(sim_path, 'complexities.csv'), index_col=0)
[docs]
def load_weights(sim_path):
w = np.load(osjoin(sim_path, 'w.npy'))
w_ff = np.load(osjoin(sim_path, 'w_ff.npy'))
w_fb = np.load(osjoin(sim_path, 'w_fb.npy'))
return w_ff, w, w_fb
[docs]
def load_topology(sim_path):
return load_npz(osjoin(sim_path, 'A_adj.npz'))
[docs]
def load_nets(sim_path):
return sorted(list(os.walk(sim_path))[0][1])
[docs]
def load_taus(sim_path):
return np.array([np.load(osjoin(sim_path, net_id, 'taus.npy')) for net_id in load_nets(sim_path)])
[docs]
def load_ic(sim_path):
with open(osjoin(sim_path,'metadata.json'), 'r') as file:
md = json.load(file)
print('Zero initial condition was prescribed.')
return np.zeros(md['net']['N'], dtype=float)