from abc import abstractmethod
from typing import Union, Iterable, Optional
import math
import torch
import torch.nn as nn
from numpy import sqrt, newaxis, integer
from numpy.fft import irfft, rfftfreq
from numpy.random import default_rng, Generator, RandomState
from numpy import sum as npsum
from .. import surrogate, base
__all__ = [
"powerlaw_psd_gaussian",
"NoisyBaseNode",
"NoisyCUBALIFNode",
"NoisyILCBaseNode",
"NoisyILCCUBALIFNode",
"NoisyNonSpikingBaseNode",
"NoisyNonSpikingIFNode",
]
[文档]
def powerlaw_psd_gaussian(
exponent: float,
size: Union[int, Iterable[int]],
fmin: float = 0.0,
random_state: Optional[Union[int, Generator, RandomState]] = None,
):
r"""
**API Language:**
:ref:`中文 <powerlaw_psd_gaussian-cn>` | :ref:`English <powerlaw_psd_gaussian-en>`
----
.. _powerlaw_psd_gaussian-cn:
* **中文**
生成具有 :math:`(1/f)^\beta` 功率谱的高斯噪声。生成的噪声满足
.. math::
S(f) = (1 / f)^\beta
Flicker / pink noise:
.. math::
\beta = 1
Brown noise:
.. math::
\beta = 2
自相关衰减比例为 :math:`\text{lag}^{-\gamma}`,其中 :math:`\gamma = 1 - \beta (0 < \beta < 1)`。
对于接近 1 的 :math:`\beta` 值可能存在有限大小效应。该算法基于文章
Timmer, J. and Koenig, M.:
On generating power law noise.
Astron. Astrophys. 300, 707-710 (1995).
:param exponent: 噪声的功率谱指数 :math:`\beta`
:type exponent: float
:param size: 输出样本的形状,最后一个维度作为时间轴,其余维度独立。
:type size: Union[int, Iterable[int]]
:param fmin: 低频截止,默认为 0,对应原始论文。低于 fmin 的频率功率谱平坦。fmin 定义为
相对于单位采样率。内部会映射为 max(fmin, 1/samples)。最大值为 fmin = 0.5,
即 Nyquist 频率,此时输出为白噪声。
:type fmin: float, optional
:param random_state: 可选,设置 NumPy 随机数生成器状态。支持整数、None、
np.random.Generator 或 np.random.RandomState。
:type random_state: int, numpy.integer, numpy.random.Generator,
numpy.random.RandomState, optional
:return: 生成的噪声样本
:rtype: array
----
.. _powerlaw_psd_gaussian-en:
* **English**
Generate Gaussian noise with a power spectrum proportional to :math:`(1/f)^\beta`.
The generated noise satisfies
.. math::
S(f) = (1 / f)^\beta
Flicker / pink noise:
.. math::
\beta = 1
Brown noise:
.. math::
\beta = 2
The autocorrelation decays proportional to :math:`\text{lag}^{-\gamma}`,
where :math:`\gamma = 1 - \beta` for :math:`0 < \beta < 1`. Finite-size effects may
occur when :math:`\beta` is close to 1. The algorithm is based on:
Timmer, J. and Koenig, M.:
On generating power law noise.
Astron. Astrophys. 300, 707-710 (1995).
:param exponent: the power spectrum exponent $\beta$.
:type exponent: float
:param size: shape of the output samples. The last axis is taken as time,
and all other axes are independent.
:type size: Union[int, Iterable[int]]
:param fmin: low-frequency cutoff. Default 0 corresponds to the original paper.
Frequencies below fmin are flat. fmin is defined relative to unit sampling rate.
Internally mapped to max(fmin, 1/samples). The maximum allowed value
is 0.5 (Nyquist frequency), producing white noise.
:type fmin: float, optional
:param random_state: optional, sets the state of NumPy's underlying random number generator.
Supports int, None, np.random.Generator, or np.random.RandomState.
:type random_state: int, numpy.integer, numpy.random.Generator,
numpy.random.RandomState, optional
:return: generated Gaussian noise samples
:rtype: array
----
**Examples:**
.. code-block:: python
# generate 1/f noise == pink noise == flicker noise
>>> import colorednoise as cn
>>> y = cn.powerlaw_psd_gaussian(1, 5)
"""
# Make sure size is a list so we can iterate it and assign to it.
if isinstance(size, (integer, int)):
size = [size]
elif isinstance(size, Iterable):
size = list(size)
else:
raise ValueError("Size must be of type int or Iterable[int]")
# The number of samples in each time series
samples = size[-1]
# Calculate Frequencies (we asume a sample rate of one)
# Use fft functions for real output (-> hermitian spectrum)
f = rfftfreq(samples) # type: ignore # mypy 1.5.1 has problems here
# Validate / normalise fmin
if 0 <= fmin <= 0.5:
fmin = max(fmin, 1.0 / samples) # Low frequency cutoff
else:
raise ValueError("fmin must be chosen between 0 and 0.5.")
# Build scaling factors for all frequencies
s_scale = f
ix = npsum(s_scale < fmin) # Index of the cutoff
if ix and ix < len(s_scale):
s_scale[:ix] = s_scale[ix]
s_scale = s_scale ** (-exponent / 2.0)
# Calculate theoretical output standard deviation from scaling
w = s_scale[1:].copy()
w[-1] *= (1 + (samples % 2)) / 2.0 # correct f = +-0.5
sigma = 2 * sqrt(npsum(w**2)) / samples
# Adjust size to generate one Fourier component per frequency
size[-1] = len(f)
# Add empty dimension(s) to broadcast s_scale along last
# dimension of generated random power + phase (below)
dims_to_add = len(size) - 1
s_scale = s_scale[(newaxis,) * dims_to_add + (Ellipsis,)]
# prepare random number generator
normal_dist = _get_normal_distribution(random_state)
# Generate scaled random power + phase
sr = normal_dist(scale=s_scale, size=size)
si = normal_dist(scale=s_scale, size=size)
# If the signal length is even, frequencies +/- 0.5 are equal
# so the coefficient must be real.
if not (samples % 2):
si[..., -1] = 0
sr[..., -1] *= sqrt(2) # Fix magnitude
# Regardless of signal length, the DC component must be real
si[..., 0] = 0
sr[..., 0] *= sqrt(2) # Fix magnitude
# Combine power + corrected phase to Fourier components
s = sr + 1j * si
# Transform to real time series & scale to unit variance
y = irfft(s, n=samples, axis=-1) / sigma
return y
def _get_normal_distribution(
random_state: Optional[Union[int, Generator, RandomState]],
):
normal_dist = None
if isinstance(random_state, (integer, int)) or random_state is None:
random_state = default_rng(random_state)
normal_dist = random_state.normal
elif isinstance(random_state, (Generator, RandomState)):
normal_dist = random_state.normal
else:
raise ValueError(
"random_state must be one of integer, numpy.random.Generator, "
"numpy.random.Randomstate"
)
return normal_dist
[文档]
class NoisyBaseNode(nn.Module, base.MultiStepModule):
def __init__(
self,
num_node,
is_training: bool = True,
T: int = 5,
sigma_init: float = 0.5,
beta: float = 0.0,
v_threshold: float = 0.5,
v_reset: Optional[float] = 0.0,
surrogate_function: surrogate.SurrogateFunctionBase = surrogate.Rect(),
):
assert isinstance(v_reset, float) or v_reset is None
assert isinstance(v_threshold, float)
super().__init__()
self.num_node = num_node
self.is_training = is_training
self.T = T
self.beta = beta
self.sigma_v = sigma_init / math.sqrt(num_node)
self.cn_v = None
self.sigma_s = sigma_init / math.sqrt(num_node)
self.cn_s = None
self.v_threshold = v_threshold
self.v_reset = v_reset
self.surrogate_function = surrogate_function
[文档]
@abstractmethod
def neuronal_charge(self, x: torch.Tensor):
raise NotImplementedError
[文档]
def neuronal_fire(self):
return self.surrogate_function(self.v - self.v_threshold)
[文档]
def neuronal_reset(self, spike):
if self.v_reset is None:
self.v = self.v - spike * self.v_threshold
else:
self.v = (1.0 - spike) * self.v + spike * self.v_reset
[文档]
def init_tensor(self, data: torch.Tensor):
self.v = torch.full_like(data, fill_value=self.v_reset)
[文档]
def forward(self, x_seq: torch.Tensor):
self.init_tensor(x_seq[0].data)
y = []
if self.is_training:
if self.cn_v is None or self.cn_s is None:
self.noise_step += 1
for t in range(self.T):
if self.cn_v is None:
self.neuronal_charge(
x_seq[t]
+ self.sigma_v
* self.eps_v_seq[self.noise_step][t].to(x_seq.device)
)
else:
self.neuronal_charge(x_seq[t] + self.sigma_v * self.cn_v[:, t])
spike = self.neuronal_fire()
self.neuronal_reset(spike)
if self.cn_s is None:
spike = spike + self.sigma_s * self.eps_s_seq[self.noise_step][
t
].to(x_seq.device)
else:
spike = spike + self.sigma_s * self.cn_s[:, t]
y.append(spike)
else:
for t in range(self.T):
self.neuronal_charge(x_seq[t])
spike = self.neuronal_fire()
self.neuronal_reset(spike)
y.append(spike)
return torch.stack(y)
[文档]
def reset_noise(self, num_rl_step):
eps_shape = [self.num_node, num_rl_step * self.T]
per_order = [1, 2, 0]
# (nodes, steps * T) -> (nodes, steps, T) -> (steps, T, nodes)
self.eps_v_seq = torch.FloatTensor(
powerlaw_psd_gaussian(self.beta, eps_shape).reshape(
self.num_node, num_rl_step, self.T
)
).permute(per_order)
self.eps_s_seq = torch.FloatTensor(
powerlaw_psd_gaussian(self.beta, eps_shape).reshape(
self.num_node, num_rl_step, self.T
)
).permute(per_order)
self.noise_step = -1
[文档]
def get_colored_noise(self):
cn = [self.eps_v_seq[self.noise_step], self.eps_s_seq[self.noise_step]]
return torch.cat(cn, dim=1)
[文档]
def load_colored_noise(self, cn):
self.cn_v = cn[:, :, : self.num_node]
self.cn_s = cn[:, :, self.num_node :]
[文档]
def cancel_load(self):
self.cn_v = None
self.cn_s = None
[文档]
class NoisyCUBALIFNode(NoisyBaseNode):
def __init__(
self,
num_node,
c_decay: float = 0.5,
v_decay: float = 0.75,
is_training: bool = True,
T: int = 5,
sigma_init: float = 0.5,
beta: float = 0.0,
v_threshold: float = 0.5,
v_reset: Optional[float] = 0.0,
surrogate_function: surrogate.SurrogateFunctionBase = surrogate.Rect(),
):
super().__init__(
num_node,
is_training,
T,
sigma_init,
beta,
v_threshold,
v_reset,
surrogate_function,
)
self.c_decay = c_decay
self.v_decay = v_decay
[文档]
def neuronal_charge(self, x: torch.Tensor):
self.c = self.c * self.c_decay + x
self.v = self.v * self.v_decay + self.c
[文档]
def init_tensor(self, data: torch.Tensor):
self.c = torch.full_like(data, fill_value=0.0)
self.v = torch.full_like(data, fill_value=self.v_reset)
[文档]
class NoisyILCBaseNode(nn.Module, base.MultiStepModule):
def __init__(
self,
act_dim,
dec_pop_dim,
is_training: bool = True,
T: int = 5,
sigma_init: float = 0.5,
beta: float = 0.0,
v_threshold: float = 1.0,
v_reset: Optional[float] = 0.0,
surrogate_function: surrogate.SurrogateFunctionBase = surrogate.Rect(),
):
assert isinstance(v_reset, float) or v_reset is None
assert isinstance(v_threshold, float)
super().__init__()
self.act_dim = act_dim
self.num_node = act_dim * dec_pop_dim
self.dec_pop_dim = dec_pop_dim
self.conn = nn.Conv1d(act_dim, self.num_node, dec_pop_dim, groups=act_dim)
self.is_training = is_training
self.T = T
self.beta = beta
self.sigma_v = sigma_init / math.sqrt(self.num_node)
self.cn_v = None
self.sigma_s = sigma_init / math.sqrt(self.num_node)
self.cn_s = None
self.v_threshold = v_threshold
self.v_reset = v_reset
self.surrogate_function = surrogate_function
[文档]
@abstractmethod
def neuronal_charge(self, x: torch.Tensor):
raise NotImplementedError
[文档]
def neuronal_fire(self):
return self.surrogate_function(self.v - self.v_threshold)
[文档]
def neuronal_reset(self, spike):
if self.v_reset is None:
self.v = self.v - spike * self.v_threshold
else:
self.v = (1.0 - spike) * self.v + spike * self.v_reset
[文档]
def init_tensor(self, data: torch.Tensor):
self.v = torch.full_like(data, fill_value=self.v_reset)
[文档]
def forward(self, x_seq: torch.Tensor):
self.init_tensor(x_seq[0].data)
y = []
if self.is_training:
if self.cn_v is None or self.cn_s is None:
self.noise_step += 1
for t in range(self.T):
if self.cn_v is None:
self.neuronal_charge(
x_seq[t]
+ self.sigma_v
* self.eps_v_seq[self.noise_step][t].to(x_seq.device)
)
else:
self.neuronal_charge(x_seq[t] + self.sigma_v * self.cn_v[:, t])
spike = self.neuronal_fire()
self.neuronal_reset(spike)
if self.cn_s is None:
spike = spike + self.sigma_s * self.eps_s_seq[self.noise_step][
t
].to(x_seq.device)
else:
spike = spike + self.sigma_s * self.cn_s[:, t]
y.append(spike)
if t < self.T - 1:
x_seq[t + 1] = x_seq[t + 1] + self.conn(
spike.view(-1, self.act_dim, self.dec_pop_dim)
).view(-1, self.num_node)
else:
for t in range(self.T):
self.neuronal_charge(x_seq[t])
spike = self.neuronal_fire()
self.neuronal_reset(spike)
y.append(spike)
if t < self.T - 1:
x_seq[t + 1] = x_seq[t + 1] + self.conn(
spike.view(-1, self.act_dim, self.dec_pop_dim)
).view(-1, self.num_node)
return torch.stack(y)
[文档]
def reset_noise(self, num_rl_step):
eps_shape = [self.num_node, num_rl_step * self.T]
per_order = [1, 2, 0]
# (nodes, steps * T) -> (nodes, steps, T) -> (steps, T, nodes)
self.eps_v_seq = torch.FloatTensor(
powerlaw_psd_gaussian(self.beta, eps_shape).reshape(
self.num_node, num_rl_step, self.T
)
).permute(per_order)
self.eps_s_seq = torch.FloatTensor(
powerlaw_psd_gaussian(self.beta, eps_shape).reshape(
self.num_node, num_rl_step, self.T
)
).permute(per_order)
self.noise_step = -1
[文档]
def get_colored_noise(self):
cn = [self.eps_v_seq[self.noise_step], self.eps_s_seq[self.noise_step]]
return torch.cat(cn, dim=1)
[文档]
def load_colored_noise(self, cn):
self.cn_v = cn[:, :, : self.num_node]
self.cn_s = cn[:, :, self.num_node :]
[文档]
def cancel_load(self):
self.cn_v = None
self.cn_s = None
[文档]
class NoisyILCCUBALIFNode(NoisyILCBaseNode):
def __init__(
self,
act_dim,
dec_pop_dim,
c_decay: float = 0.5,
v_decay: float = 0.75,
is_training: bool = True,
T: int = 5,
sigma_init: float = 0.5,
beta: float = 0.0,
v_threshold: float = 1.0,
v_reset: Optional[float] = 0.0,
surrogate_function: surrogate.SurrogateFunctionBase = surrogate.Rect(),
):
super().__init__(
act_dim,
dec_pop_dim,
is_training,
T,
sigma_init,
beta,
v_threshold,
v_reset,
surrogate_function,
)
self.c_decay = c_decay
self.v_decay = v_decay
[文档]
def neuronal_charge(self, x: torch.Tensor):
self.c = self.c * self.c_decay + x
self.v = self.v * self.v_decay + self.c
[文档]
def init_tensor(self, data: torch.Tensor):
self.c = torch.full_like(data, fill_value=0.0)
self.v = torch.full_like(data, fill_value=self.v_reset)
[文档]
class NoisyNonSpikingBaseNode(nn.Module, base.MultiStepModule):
def __init__(
self,
num_node,
is_training: bool = True,
T: int = 5,
sigma_init: float = 0.5,
beta: float = 0.0,
decode: Optional[str] = None,
):
super().__init__()
self.num_node = num_node
self.is_training = is_training
self.T = T
self.beta = beta
self.decode = decode
self.sigma = nn.Parameter(torch.FloatTensor(num_node))
self.sigma.data.fill_(sigma_init / math.sqrt(num_node))
self.cn = None
[文档]
@abstractmethod
def neuronal_charge(self, x: torch.Tensor):
raise NotImplementedError
[文档]
def init_tensor(self, data: torch.Tensor):
self.v = torch.full_like(data, fill_value=0.0)
[文档]
def forward(self, x_seq: torch.Tensor):
self.init_tensor(x_seq[0].data)
v_seq = []
if self.is_training:
if self.cn is None:
self.noise_step += 1
for t in range(self.T):
if self.cn is None:
self.neuronal_charge(
x_seq[t]
+ self.sigma.mul(
self.eps_seq[self.noise_step][t].to(x_seq.device)
)
)
else:
self.neuronal_charge(
x_seq[t] + self.sigma.mul(self.cn[:, t].to(x_seq.device))
)
v_seq.append(self.v)
else:
for t in range(self.T):
self.neuronal_charge(x_seq[t])
v_seq.append(self.v)
if self.decode == "max-mem":
mem = torch.max(torch.stack(v_seq, 0), 0).values
elif self.decode == "max-abs-mem":
v_stack = torch.stack(v_seq, 0)
max_mem = torch.max(v_stack, 0).values
min_mem = torch.min(v_stack, 0).values
mem = max_mem * (max_mem.abs() > min_mem.abs()) + min_mem * (
max_mem.abs() <= min_mem.abs()
)
elif self.decode == "mean-mem":
mem = torch.mean(torch.stack(v_seq, 0), 0)
elif self.decode == "last-me":
mem = v_seq[-1]
else:
mem = v_seq
return mem
[文档]
def reset_noise(self, num_rl_step):
eps_shape = [self.num_node, num_rl_step * self.T]
per_order = [1, 2, 0]
self.eps_seq = torch.FloatTensor(
powerlaw_psd_gaussian(self.beta, eps_shape).reshape(
self.num_node, num_rl_step, self.T
)
).permute(per_order)
self.noise_step = -1
[文档]
def get_colored_noise(self):
return self.eps_seq[self.noise_step]
[文档]
def load_colored_noise(self, cn):
self.cn = cn
[文档]
def cancel_load(self):
self.cn = None
[文档]
class NoisyNonSpikingIFNode(NoisyNonSpikingBaseNode):
[文档]
def neuronal_charge(self, x: torch.Tensor):
self.v = self.v + x