Source code for hetero.utils

import os 
from os.path import join as osjoin
from os.path import sep as pathsep

import re
import inspect
# from pdb import set_trace
# import copy

import json
import time
import functools
import itertools

import pandas as pd

import numpy as np
import numpy.lib.format
from numpy.fft import fft, ifft
# from numpy.lib.format import read_array_header_1_0, read_array_header_2_0, read_magic

# import scipy.special as sp
from scipy.io import wavfile
import scipy.signal as signal
from scipy.optimize import curve_fit, bisect
from scipy.sparse import load_npz 
from sklearn.metrics.pairwise import cosine_similarity


# from hetero.config import DEFAULTS, ARGUMENT_NAMING_MAP
# from hetero.naming_wizard import fit_parser, gain_parser
# from hetero.tasks import TASK_NEEDS_SYNTHESIS
        
        
[docs] def get_default_args(func): """ Gets the default arguments of a function Parameters ---------- func : callable the function to be investigated Returns ------- dict default arguments of the provided function """ signature = inspect.signature(func) return { k: v.default for k, v in signature.parameters.items() if v.default is not inspect.Parameter.empty }
[docs] class NpEncoder(json.JSONEncoder):
[docs] def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NpEncoder, self).default(obj)
[docs] def timer(func): """Print the runtime of the decorated function""" @functools.wraps(func) def wrapper_timer(*args, **kwargs): tic = time.perf_counter() value = func(*args, **kwargs) toc = time.perf_counter() run_time = toc - tic print(f'All in all, {func.__name__!r} took {run_time:.4f} second to run in parallel.') return value return wrapper_timer
[docs] def asave(path, arr, target_type): """a simple array saver that safely cast arrays to the target type.""" min_val = np.finfo(target_type).min max_val = np.finfo(target_type).max np.save(path, np.clip(arr, min_val, max_val).astype(target_type))
[docs] def get_wav_shape(wav_path): _, data = wavfile.read(wav_path, mmap=True) return data.shape
# with open(array_path, 'rb') as f: # version = read_magic(f) # if version == (1, 0): # shape, fortran_order, dtype = read_array_header_1_0(f) # elif version == (2, 0): # shape, fortran_order, dtype = read_array_header_2_0(f) # else: # raise ValueError("Unsupported .npy version: {}".format(version)) # return shape
[docs] def get_base_freqs(stim, dt=1): base_freqs = [] if stim.ndim == 1: freqs, psd = signal.periodogram(stim, 1./dt) base_freqs.append(freqs[np.argmax(psd)]) else: for dim in range(stim.shape[0]): freqs, psd = signal.periodogram(stim[dim], 1./dt) base_freqs.append(freqs[np.argmax(psd)]) return base_freqs
[docs] def rescale_time(t, stim, target_base_freq=1., avg_method='geom'): dt = t[1] - t[0] base_freqs = get_base_freqs(stim, dt=dt) if avg_method == 'lin': bf = np.mean(base_freqs) elif avg_method == 'geom': bf = np.prod(base_freqs)**(1/len(base_freqs)) else: raise NotImplementedError t *= 1.*bf/target_base_freq return t, bf, base_freqs
# def compute_dataset_size(n_trn, n_tst, base_stride, task_params, N=0): # task, tparams = next(iter(task_params.items())) # if TASK_NEEDS_SYNTHESIS[task]: # N_tst = int(n_tst * base_stride) # N_trn = int(n_trn * base_stride * (1 + N)) # else: # trn_path = osjoin(tparams['path'], 'train') # tst_path = osjoin(tparams['path'], 'test') # msg = f""" # The samples must be already splitted into train and test sets, and placed respectively on the train and test subdirectories. # Could not find these folders. Do they exists? # """ # assert os.path.exists(trn_path) and os.path.exists(tst_path), msg # N_trn = len([f for f in os.listdir(trn_path) if os.path.isfile(osjoin(trn_path, f))]) # N_tst = len([f for f in os.listdir(tst_path) if os.path.isfile(osjoin(tst_path, f))]) # return N_trn, N_tst # def compute_dataset_length(N_trn, N_tst, base_stride, task_params): # task, tparams = next(iter(task_params.items())) # if task == 'taylor': # L_trn = N_trn # L_tst = N_tst # elif task == 'tayloramus' or task == 'nostradamus': # L_trn = N_trn + 2 * tparams['delt_max']*base_stride # L_tst = N_tst + 2 * tparams['delt_max']*base_stride # elif task == 'ashd': # factor = tparams['ds_fac'] # trn_path = osjoin(tparams['path'], 'train') # tst_path = osjoin(tparams['path'], 'test') # trn_samples = [f for f in os.listdir(trn_path) if os.path.isfile(osjoin(trn_path, f))] # tst_samples = [f for f in os.listdir(tst_path) if os.path.isfile(osjoin(tst_path, f))] # L_trn = [] # for trn_sample in trn_samples: # L_raw = get_wav_shape(osjoin(trn_path, trn_sample))[0] # L_trn.append(L_raw//factor + int((L_raw % factor)>0) ) # takes care of downsampling # L_tst = [] # for tst_sample in tst_samples: # L_raw = get_wav_shape(osjoin(tst_path, tst_sample))[0] # L_tst.append(L_raw//factor + int((L_raw % factor)>0) ) # takes care of downsampling # L_trn = sum(L_trn) # L_tst = sum(L_tst) # return L_trn, L_tst
[docs] def compute_dataset_slices(L_trn, L_tst, base_stride, task_params): """Note that the task_params are in the physical units and have to be multiplied by base_stride to get the steps.""" task, tparams = next(iter(task_params.items())) if task == 'taylor': s_trn = slice(0, L_trn) s_tst = slice(L_trn, L_trn + L_tst) elif task == 'tayloramus' or task == 'nostradamus': s_trn = slice(0 + tparams['delt_max']*base_stride, L_trn - tparams['delt_max']*base_stride) s_tst = slice(L_trn + tparams['delt_max']*base_stride, L_trn + L_tst - tparams['delt_max']*base_stride) elif task == 'ashd': s_trn = slice(0, L_trn) s_tst = slice(L_trn, L_trn + L_tst) return s_trn, s_tst
[docs] def compute_complexity(u, y, subtasks, savepath): # trimming out the nan values non_nan = (~np.isnan(y)).prod(axis=0).astype(bool) Y = y[:, non_nan] U = u[:, non_nan] ndims = U.shape[0] Cs= [] for suby, subu in zip(np.split(Y, ndims), U): cos = cosine_similarity(suby, subu.reshape(1,-1)).squeeze() Cs.append(np.clip(1-abs(cos), 0, None)) Cs = np.array(Cs).ravel() df = pd.DataFrame(np.c_[subtasks, Cs], columns=['dim', 'delta','deg', 'complexity']) df.to_csv(savepath)
[docs] def read_metadata(metadata_path): if not os.path.exists(osjoin(metadata_path,'metadata.json')): raise FileNotFoundError(f'No metadata file exist in {metadata_path}.') else: with open(osjoin(metadata_path,'metadata.json'), 'r') as file: md = json.load(file) return md
[docs] def make_net_ids(means, stds): offset_m, offset_s = 0, 0 if min(means)>0: offset_m=1 if min(stds)>0: offset_s=1 mean_ids = list(range(offset_m, len(means)+offset_m)) std_ids = list(range(offset_s, len(stds)+offset_s)) net_ids = [f'{mid}{sid}' for mid, sid in itertools.product(mean_ids, std_ids)] return net_ids
[docs] def add_complexity_trier(df, N=3): # df['tier'] = (abs(df.delta) / (2*df.bs) ) # df['tier'] = pd.cut(df['tier'], bins=np.linspace(0, max(df.tier), N+1), labels=False, include_lowest=True) tier_borders= np.linspace(0., 1.+1e-6, N+1) for tier in range(len(tier_borders)-1): idx = (df.complexity >= tier_borders[tier]) & (df.complexity < tier_borders[tier+1]) df.loc[idx, 'tier'] = tier return df
[docs] def add_homog_score(df, std=False): test='test' train='train' if std: test += '_std' train += '_std' df[f'homog_{test}'] = -1. df[f'homog_{train}'] = -1. for idx, (group, _df) in enumerate(df.groupby(['n', 'task'])): df.loc[_df.index, f'homog_{test}' ] = _df[_df.hn==0][test].iloc[0] df.loc[_df.index, f'homog_{train}'] = _df[_df.hn==0][train].iloc[0] return df
[docs] def aggregate_scores(root, sim_pattern, invalid_patterns=None, score_name=None, return_std=True, n_tiers=3, transform=True, transforms=(lambda x: np.exp(x-1), lambda x: 2/np.pi*np.arctan(x)) ): tic = time.time() # finding simulation names sims = sorted([f.path.split('/')[-1] for f in os.scandir(root) if (f.is_dir() and sim_pattern in f.path)]) if invalid_patterns is not None: if type(invalid_patterns)==str: sims = sorted([sim for sim in sims if invalid_patterns not in sim]) elif type(invalid_patterns)==list: valids = [] for sim in sims: is_valid = True for inval_pat in invalid_patterns: if inval_pat in sim: is_valid = False break valids.append(is_valid) sims = sorted(np.array(sims)[valids].tolist()) print(f'The results of the following {len(sims)} simulations will be aggregated:') for sim in sims: print('\t', sim) dfs = [] for sim in sims: print(f'aggregating scores from {sim}') try: net_ids = sorted([f.name for f in os.scandir(osjoin(root, sim)) if f.is_dir()]) score_names = sorted([f.name.split('.csv')[0].split('tst')[1] for f in os.scandir(osjoin(root, sim, net_ids[0])) if 'tst_' in f.name]) _score_dfs = [] for scr_name in score_names: _net_dfs = [] for net_id in net_ids: dtypes = {0: int, 1:int, 2: int} path_trn = osjoin(root, sim, net_id, f'trn{scr_name}.csv') path_tst = osjoin(root, sim, net_id, f'tst{scr_name}.csv') tst_df = pd.read_csv(path_tst, dtype=dtypes) trn_df = pd.read_csv(path_trn, dtype=dtypes) _net_df = tst_df.set_index(['dim','delta','deg']).mean(axis=1) # test mean _net_df = _net_df.reset_index().rename(columns={0: 'test'}) _net_df['train'] = trn_df.set_index(['dim','delta','deg']).mean(axis=1).values # train mean _net_df['test_std'] = tst_df.set_index(['dim','delta','deg']).std(axis=1).values _net_df['train_std']= trn_df.set_index(['dim','delta','deg']).std(axis=1).values # task id and network id _net_df['net_id']= net_id _net_df['n'] = int(net_id[0]) _net_df['hn'] = int(net_id[1]) _net_df['task'] = _net_df.apply(lambda row: (int(row.dim), int(row.delta), int(row.deg)), axis=1) _net_dfs.append(_net_df) _score_df = pd.concat(_net_dfs).reset_index(drop=True) _score_df = add_homog_score(_score_df) _score_df = add_homog_score(_score_df, std=True) _score_df['scr'] = scr_name # fitting parameters fit_params = fit_parser(scr_name) _score_df['fit_method'] = fit_params['method'] _score_df['fit_lam'] = fit_params['lam'] _score_df['fit_kernel'] = fit_params['conv_params']['kernel'] _score_df['fit_tk'] = fit_params['conv_params']['tau_k'] # gains gain_params = gain_parser(scr_name) _score_df['Jn'] = gain_params['Jn'] _score_df['J' ] = gain_params['J'] _score_df['Ju'] = gain_params['Ju'] _score_dfs.append(_score_df) _sim_df = pd.concat(_score_dfs).reset_index(drop=True) _sim_df['sim'] = sim # simulation parameters sim_params = read_metadata(osjoin(root, sim)) _sim_df['N'] = sim_params['net']['N'] _sim_df['p'] = sim_params['net']['p'] _sim_df['f'] = sim_params['net']['f'] _sim_df['mue'] = sim_params['net']['mue'] _sim_df['sig'] = sim_params['net']['sig0'] _sim_df['n_trn'] = sim_params['set']['n_trn'] _sim_df['n_tst'] = sim_params['set']['n_tst'] _sim_df['n_trials'] = sim_params['set']['n_trials'] _sim_df['n_timesteps'] = sim_params['n_timesteps'] _sim_df['bf'] = sim_params['bf'] _sim_df['bs'] = sim_params['base_stride'] _sim_df['udim'] = sim_params['dim'] _sim_df['hetero'] = list(sim_params['hetero'].keys())[0] _sim_df['hetero_ld'] = list(sim_params['hetero'].values())[0]['log_distance'] # add complexity try: _sim_df['complexity'] = -1. complexity_df = pd.read_csv(osjoin(root, sim, 'complexities.csv'), index_col=0, dtype=dtypes) complexity_df = complexity_df.set_index(['dim','delta','deg']) for cidx in complexity_df.index: _sim_df.loc[_sim_df.task==cidx, 'complexity'] = complexity_df.loc[cidx].complexity except Exception as e: print(f'Could not read complexity of {sim} and raised {e}.') dfs.append(_sim_df) except Exception as e: print(f'Could not aggregate {sim} and raised {e}.') dfs = pd.concat(dfs).reset_index(drop=True) if transform: dfs['test'] = transforms[0](dfs['test']) dfs['train'] = transforms[0](dfs['train']) dfs['homog_test'] = transforms[0](dfs['homog_test']) dfs['homog_train'] = transforms[0](dfs['homog_train']) if return_std: dfs['test_std'] = transforms[1](dfs['test_std']) dfs['train_std'] = transforms[1](dfs['train_std']) dfs['homog_test_std'] = transforms[1](dfs['homog_test_std']) dfs['homog_train_std'] = transforms[1](dfs['homog_train_std']) if n_tiers > 0: dfs = add_complexity_trier(dfs, N=n_tiers) print(f'Aggregation time: {time.time()-tic:.2f} seconds.' ) return dfs.reset_index(drop=True)
[docs] def sample_taus(size, mu, std, distr='lognormal', clip_below=False, rng=None, tol_prcntg=-1, verbose=False, ): """ The given mu and std are the desired mean and standard deviation and are different from the parameters passed to rng. It can also handle the case of std=0, by returning a constant vector of the desired size. """ if rng is None: rng = np.random.default_rng() if std == 0: taus = np.ones(size)*mu else: if distr == 'lognormal': std_ = np.sqrt(np.log((std**2) / (mu**2) + 1)) mu_ = np.log(mu) - (std_**2) / 2 # mu_ = np.log(mu**2/np.sqrt(mu**2 + std**2)) # std_ = np.sqrt(np.log(1+(std/mu)**2)) rng_ = rng.lognormal kwargs = {'sigma':std_, 'mean': mu_} elif distr == 'normal': rng_ = rng.normal kwargs = {'loc':mu, 'scale':std} elif distr == 'gamma': k = (mu / std)**2 theta = mu / k rng_ = rng.gamma kwargs = {'shape':k, 'scale':theta} elif distr == 'uniform': # The mean and variance can be used to find the min and max # of the distribution. # mean = 1/2 (low + high) # var = 1/12 (high - low)**2 low = mu - 3*(std**2)/mu high = mu + 3*(std**2)/mu rng_ = rng.uniform kwargs = {'low':low, 'high':high} elif distr == 'poly2': # here we sample from a pdf ~ 1/(1 + x**2). This PDF has # no bounded moments. Thus, we mean and variance are meaningful # onty if the distribution is truncated by a minimum and maximum # tau. `clip_below` is interpreted as the minimum tau. Yet, # the maximum tau, has to be numerically estimated. Like other # cases, this is based on the prescribed values of `mu` and `std`. # # With `mx` and `mn`, the empirical truncated pdf will be # uniquely determined. This pdf is then passed to the the # poly2_sampler function, a custom-made sampler, to sample # timescales accordingly. mn = float(clip_below) # minimum tau mx = estimate_poly2_max_from_stats(mu, std, verbose) # maximum tau rng_ = poly2_sampler kwargs = {'mn': mn, 'mx': mx} else: raise NotImplementedError('Only gamma and lognoraml are supported.') taus = rng_(size=size, **kwargs) if tol_prcntg>0: # Empirical mean and variance, due to sampling can be # different from the prescribed values. To handle this, # we simply redraw taus. This only matters at small N err_mu = abs(taus.mean()-mu)/mu err_std= abs(taus.std()-std)/(std+1e-12) * (std > 0) correction = 0 while (err_mu > tol_prcntg) or (err_std>tol_prcntg): if correction % 10000 == 0: print(f'{correction} is made.') taus = rng_(size=size, **kwargs) err_mu = (taus.mean()-mu)/mu err_std= (taus.std()-std)/(std+1e-12) * (std > 0) correction+=1 if bool(clip_below): resample = taus[taus < clip_below] resample_size = len(resample) while resample_size>0: taus[taus < clip_below] = rng_(size=resample_size, **kwargs) resample = taus[taus < clip_below] resample_size = len(resample) return taus
[docs] def estimate_poly2_max_from_stats(mu, std, verbose=False): """ If mn and mx are the minimum and maximum values, the pdf would have the following form: pdf(x) = 1/Z * 1/(1+x**2) (0) Z = atan(mx) - atan(mn) And the first and second moments of this truncated pdf are: E[x] = 1/(2*Z) * log( (1+mx**2) / (1+mn**2) ) (1) E[x**2] = (mx - mn)/Z - 1 (2) One must find mx and mn such that: E[x] = mu E[x**2] = std**2 - mu**2 which is very hard to solve. Fortunately, it turns out that both statistics are mostly influenced by `mx` and not `mn`. Thus we can use approximations `mn ≈ 0`, which gives raise to: E[x] ≈ log( 1+ mx**2 ) / (2 * atan(mx)) (3) E[x**2] ≈ mx / atan(mx) - 1 (4) Now, we can numerically solve for `mx`. However, only one of these equations can be used. The reason is that the first and second moment of the truncated pdf are not independent. Setting mu will determine std and vice versa. Here we only use equation (3). The resulting `std` will be reported. """ def mx_from_mu(x, mu): mu1_app = 0.5 * np.log(1 + (x)**2) / (np.arctan(x)) return mu - mu1_app # def mx_from_std(x, std): # mu1_app = 0.5 * np.log(1 + (x)**2) / (np.arctan(x)) # mu2_app = x / (np.arctan(x)) - 1 # return std - np.sqrt(mu2_app - mu1_app**2) assert mu>0, f'The mean of tau must by positive. Got {mu}.' # the upper bound is selected such its sign is always different # than the one of low bound mx = bisect(mx_from_mu , mu/1000, np.exp(np.pi*mu/2), args=(mu,), maxiter=1000, ) if verbose: actual_std = np.sqrt(mx / np.arctan(mx) - 1 - mu**2) print(f'In poly2 the std of {std} was requested but actual std is {actual_std}.') # mx2 = bisect(mx_from_std, max(mu-std, .1)/1000, (mu+std)*1000, args=(std,), maxiter=1000, ) # print(mx1, mx2) # return (mx1 + mx2)/2 return mx
[docs] def poly2_sampler(mn, mx ,size): """ Since numpy doesn't have a sampler for 1/(1+x**2) distribution we use the "inverse transform sampling" method to do so. Read more about this method here: https://en.wikipedia.org/wiki/Inverse_trigonometric_functions """ a = 1. def Z(a, mn, mx): return 1./a * (np.arctan(a*mx) - np.arctan(a*mn)) def mu(a, mn, mx): z = Z(a, mn, mx) return 1/(a*z) * 1/(2*a) * np.log((1+(a*mx)**2) / (1+ (a*mn)**2)) def mu2(a, mn, mx): z = Z(a, mn, mx) return 1/(a**2) * ((mx-mn)/z - 1) def sig(a, mn, mx): return np.sqrt(mu2(a, mn,mx) - mu(a, mn, mx)**2) u = np.random.uniform(0, 1, size) z = Z(a, mn, mx) return 1./a * np.tan(a*z*u + np.arctan(a*mn))
[docs] def fftautocorr(x): """ This implements a circular autocorrelation as opposed to the zero-padded correlation of numpy as and scipy. It is adapted from https://stackoverflow.com/a/28285527/7106957. x must be a real sequence. """ correls = ifft(fft(x) * fft(x).conj()).real correls /= correls[0] lags = np.arange(len(x)) return lags, correls
[docs] def autocorr(x, maxlags=None): """ based on matplotlib's acorr function. Always returns the normalized symmetric lag autocorrelation. """ x = np.asarray(x) x = x - np.mean(x) # zero-mean for meaningful autocorrelation result = np.correlate(x, x, mode='full') result /= result[len(x) - 1] # normalize so lag=0 autocorr = 1 lags = np.arange(-len(x) + 1, len(x)) result = result[lags>=0] lags = lags[lags>=0] return lags, result
# Nx = len(x) # correls = np.correlate(x, x, mode="full") # correls /= np.dot(x, x) # if maxlags is None: # maxlags = Nx - 1 # lags = np.arange(-maxlags, maxlags + 1) # correls = correls[Nx - 1 - maxlags:Nx + maxlags] # return lags, correls
[docs] def find_top_idx(x): # Find all the index of all peaks peaks, _ = signal.find_peaks(x, prominence=(0, None), ) # let's see which one is truely the tallest top_peak_idx = np.argmax(x[peaks]) # and return that index top_idx = peaks[top_peak_idx] # Sort peaks by prominence # prominences = properties["prominences"] # if len(peaks) == 0: # return np.array([]) # no peaks found # proms_sorted = np.argsort(prominences)[::-1]#[:n] # peaks_sorted = peaks[proms_sorted] return top_idx
[docs] def find_base_lags(u, average='geometric'): # u is assumed to be of shape (ndim x ntimes) base_lags = [] # for each component in the input for i in range(u.shape[0]): # first let's compute the autocorrelation function lags, ac = autocorr(u[i, :]) # and then compute the index of the top peak (after 1) top_idx = find_top_idx(ac) # and add it to the lag values base_lags.append(lags[top_idx]) if average=='linear': base = np.mean(base_lags) else: base = np.prod(base_lags)**(1./len(base_lags)) return base_lags, base
[docs] def canonicalize_value(val): """Format values consistently for filenames. Parameters ---------- val : int, float, str Value to format Returns ------- str Formatted value: - Floats in scientific notation with no + signs - Strings in uppercase - Other types stringified """ if isinstance(val, float): return f"{val:.2f}" # return s.replace("+", "") else: return str(val)
[docs] def sanitize_token(s): """Clean up strings for safe filenames. Parameters ---------- s : str String to clean Returns ------- str Safe filename string with only: - Alphanumeric characters - Underscores - Dashes - Dots """ return re.sub(r"[^\w\-.]", "", s)
[docs] def load_inputs(sim_path, load_noise=True, mmap=False): loader = np.load if mmap: loader = load_mmap t = loader(osjoin(sim_path, 't.npy')) u = loader(osjoin(sim_path, 'u.npy')) if load_noise: xi = loader(osjoin(sim_path, 'xi.npy')) return t, u.astype(float), xi.astype(float) else: return t, u.astype(float)
[docs] def load_target(sim_path, mmap=False): loader = np.load if mmap: loader = load_mmap return loader(osjoin(sim_path, 'y.npy')).astype(float)
[docs] def load_complexity(sim_path): return pd.read_csv(osjoin(sim_path, 'complexities.csv'), index_col=0)
[docs] def load_weights(sim_path): w = np.load(osjoin(sim_path, 'w.npy')) w_ff = np.load(osjoin(sim_path, 'w_ff.npy')) w_fb = np.load(osjoin(sim_path, 'w_fb.npy')) return w_ff, w, w_fb
[docs] def load_topology(sim_path): return load_npz(osjoin(sim_path, 'A_adj.npz'))
[docs] def load_nets(sim_path): return sorted(list(os.walk(sim_path))[0][1])
[docs] def load_taus(sim_path): return np.array([np.load(osjoin(sim_path, net_id, 'taus.npy')) for net_id in load_nets(sim_path)])
[docs] def load_ic(sim_path): with open(osjoin(sim_path,'metadata.json'), 'r') as file: md = json.load(file) print('Zero initial condition was prescribed.') return np.zeros(md['net']['N'], dtype=float)