"""
This module provides functionality for loading the state (spike train or rate) of the simulated networks.
"""
import os
osjoin = os.path.join # an alias for convenience
from pdb import set_trace
import pickle
import numpy as np
from scipy import signal
from scipy.ndimage import gaussian_filter1d
from hetero.dynamics.li import activation_func
from hetero.config import VALID_ACTIVATIONS
[docs]
def load_mmap(file_path):
"""
Creates a memory-mapped array from a NumPy .npy file.
This function opens a NumPy .npy file and creates a memory-mapped array that
allows direct access to the data on disk without fully loading it into memory.
It reads the file header to determine the array's shape, data type, and memory
layout (C or Fortran order).
Parameters
----------
file_path : str
Path to the .npy file to be memory-mapped.
Returns
-------
numpy.memmap
A memory-mapped array that shares the same data as the array stored in the
.npy file. The array is opened in read-only mode.
.. note::
The function handles the .npy file format internally by:
1. Reading the magic number to verify file format and version
2. Parsing the header to get array metadata
3. Creating a memory map starting at the data offset
The resulting memmap allows efficient access to large arrays without loading
them entirely into memory.
"""
with open(file_path, 'rb') as f:
# 1. Read and validate the magic number (which also gives the format version)
major, minor = np.lib.format.read_magic(f)
# 2. Parse the header to get shape, order (Fortran/C), and dtype
if major == 1:
shape, fortran_order, dtype = np.lib.format.read_array_header_1_0(f)
else:
shape, fortran_order, dtype = np.lib.format.read_array_header_2_0(f)
# 3. Get the current file position, which is right after the header
data_offset = f.tell()
# Now create the memmap using the discovered offset, shape, dtype, etc.
mm_array = np.memmap(file_path, dtype=dtype, mode='r',
offset=data_offset, shape=shape,
order='F' if fortran_order else 'C')
return mm_array
[docs]
def raster2array(raster_it, n_neurons, n_timesteps):
"""
Converts the spike train dictionary (raster) to a binary numpy array.
Parameters
----------
raster : dict
Dictionary containing spike data with keys:
- i: neuron indices
- tick: spike times index
n_neurons : int
Total number of neurons
n_timesteps : int
Total number of timesteps
Returns
-------
ndarray
Binary array of shape (n_neurons, n_timesteps)
"""
neuron_idx, time_idx = raster_it['i'], raster_it['tick']
arrayed = np.zeros([n_neurons, n_timesteps])
np.add.at(arrayed, (neuron_idx, time_idx), 1)
return arrayed
[docs]
def convolve(st_array, kernel, verbose=False, **kwargs):
"""
Apply temporal filtering to spike trains. ``kwargs`` is expected to provide
the kernel parameter (``sigma``, or ``tau_k``) in the unif of timesteps. All
causal kernels are normalized to 1.
Parameters
----------
st_array : ndarray
Binary spike array of shape (N_neurons, N_timesteps)
kernel : str
Kernel type, one of:
- 'None' or ``None``: No filtering (returns the binary spike train)
- 'gaussian': half Gaussian (causal). Requires ``sigma``.
- 'gaussian_nc': full Gaussian (non-causal). Requires ``sigma``.
- 'alpha': Alpha kernel. Requires ``tau_k``.
- 'exp': Exponential decay. Requires ``tau_k``.
- 'moving_avg': Mean count over a constant window. Requires ``win``.
verbose : bool, optional
Print kernel info (default: False)
Returns
-------
ndarray
Filtered spike trains of same shape as input
"""
N,T = st_array.shape
for k,v in kwargs.items():
if k=='kernel':
kernel = v
else:
kparam = v
if verbose:
print(f'INFO: Spike train (N={N} neurons, T={T} timesteps convolved with {kernel} kernel of parmaeter {kparam}*dt.')
if kernel == 'gaussian_nc':
for idx in range(N):
st_array[idx, :] = gaussian_filter1d(st_array[idx, :], sigma=kparam)
else:
_t = np.linspace(0, 10*kparam, 10*kparam + 1)
if kernel == 'gaussian':
kern = np.exp(-_t**2/(2*kparam**2))
elif kernel == 'exp':
kern = np.exp(-_t/kparam)
elif kernel == 'alpha':
kern = _t * np.exp(-_t/kparam)
elif kernel == 'moving_avg':
kern = np.ones(kparam)
else:
raise ValueError(f'Kernel type {kernel} is not valid for convolution.')
kern/= kern.sum() # normalizing the kernel
st_array = signal.fftconvolve(st_array, kern[np.newaxis,:])[:T]
# for idx in range(N):
# st_array[idx, :] = signal.convolve(st_array[idx, :], kern)[:T]
return st_array
[docs]
def load_states(sim_path, net_id, state_name,
activation_params,
n_neuron=None, n_timesteps=None,
verbose=True,
):
"""
Load neural states for a given simulation and network, and further process them
according to specified ``activation_params``.
Parameters
----------
sim_path : str
Filesystem path to the simulation data root (passed to the internal loader).
net_id : str
Identifier for the network instance within the simulation (passed to the internal loader).
state_name : str
Name/key of the state to load (e.g. "spikes", "voltage", etc.; passed to the internal loader).
activation_params : dict
Readout activation settings. Valid keys are defined in the config module and
can be extended. The following are provided by default:
- 'None' or 'none' or ``None``: no activation, return raw states (wheter rate or spikes).
- 'rate': apply an activation function to the voltage traces.
- 'array': convert spike raster dict to dense array (with no convolution).
- Custom kernels based on Gaussian, exponential decay, alpha function, and moving
average, that convolve the arrayed spike trains.
n_neuron : int, optional
Number of neurons expected when converting raster dictionaries to arrays.
Required by raster2array if the input is in raster/dict form and an array
shape must be produced. If None, raster2array may infer size or raise.
n_timesteps : int, optional
Number of time steps expected when converting raster dictionaries to arrays.
Required by raster2array to shape the output array if not inferred.
verbose : bool, default True
If True, pass verbosity flags down to helper functions (activation_func, convolve).
Returns
-------
states : object
The loaded and post-processed state data. The exact type depends on the readout:
- Unchanged raw object for 'None' readout (whatever _loader returns).
- numpy.ndarray or similar for 'rate' (result of activation_func).
- numpy.ndarray for 'array' and convolved readouts (raster2array and convolve results).
Raises
------
ValueError
If the selected readout method is not supported by the function.
(i.e., not one of None/'None', 'rate', 'array', or a kernel readout as described above).
.. note::
- readout_params['readout_activation'] is expected to contain exactly one mapping;
the function uses the first (and typically only) item to select the readout.
- Helper functions referenced here (internal loader, activation_func, raster2array,
convolve) must be available in the module's scope and accept the arguments as
described in the readout_params contract.
- This function performs no file or network I/O itself beyond delegating to the
internal loader; it primarily orchestrates loading + post-processing.
"""
states = _loader(sim_path, net_id, state_name)
ro, ro_kwargs = next(iter(activation_params.items()))
if (ro == 'None') or (ro == 'none') or (ro is None):
pass # don't convovle or pass through the activation
elif ro =='array': # just make an array from raster dict
states = raster2array(states, n_neuron, n_timesteps)
elif ro in VALID_ACTIVATIONS['LIF']:
states = raster2array(states, n_neuron, n_timesteps)
states = convolve(states, **ro_kwargs, verbose=verbose)
elif ro in VALID_ACTIVATIONS['LI']:
states = activation_func(states, **ro_kwargs, verbose=verbose)
else:
raise ValueError(f'Readout method {ro} is not valid.')
return states
def _loader(sim_path, net_id, state_name, mmap=True, dtype=np.float64):
"""
Load a simulation state file (spike raster or voltage array) from disk.
Parameters
----------
sim_path : str
Path to the top-level simulation directory.
net_id : str
Subdirectory name for the specific network.
state_name : str
Base name of the state to load. The loader detects the state type (spike
or rate) from the extension. But the extension itself, can be customarily
left out.
mmap : bool, optional
When loading a .npy file, if True use a memory-mapped loader
(load_mmap) to avoid reading the entire array into memory; if False
use numpy.load. Default is True.
dtype : numpy.dtype, optional
dtype to cast .npy-loaded arrays to. Default is np.float64.
Returns
-------
dict or numpy.ndarray
- If a '.pkl' file is loaded: the unpickled object (commonly a spike
train raster represented as a dict with keys like 'i' (neuron index)
and 't' (spike time in units of dt)).
- If a '.npy' file is loaded: the voltage/activity array cast to `dtype`.
Raises
------
FileNotFoundError
If the constructed file path (os.path.join(sim_path, net_id, f"{state_name}.{ext}"))
does not exist.
"""
# remove the potential extention
for ext in ['.pkl', '.npy']:
if ext in state_name:
state_name = state_name.split(ext)[0]
ext = 'pkl' if 'LIF' in state_name else 'npy'
statepath = osjoin(sim_path, net_id, f'{state_name}.{ext}')
if not os.path.exists(statepath):
raise FileNotFoundError(f'{statepath} is not found.')
if ext == 'pkl':
# spike train raster as a dict wiht keys i (neuron index) and t (spike time
# in the unit of dt, i.e., a value of 5 for t means a physical time of 5dt.)
with open(statepath, 'rb') as f:
return pickle.load(f)
else:
# voltage activity
_loader = load_mmap if mmap else np.load
return _loader(statepath).astype(dtype)