spikingjelly.datasets.utils 源代码

import logging
import math
import os
import shutil
import struct
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Callable, Optional, Union

import numpy as np
import torch
import torch.utils.data
import tqdm
from matplotlib import pyplot as plt
from torchvision import transforms

from .. import configure

try:
    import cupy

    from ..activation_based.cuda_kernel import cuda_utils

    padded_sequence_mask_kernel_code = r"""
    extern "C" __global__
            void padded_sequence_mask_kernel(const int* sequence_len, bool *mask, const int &T, const int &N)
            {
                const int index = blockIdx.x * blockDim.x + threadIdx.x;
                if (index < N)
                {
                    for(int i=0; i < sequence_len[index]; i++)
                    {
                        mask[i * N + index] = true;
                    }
                }
            }
    """
except BaseException as e:
    logging.info(f"spikingjelly.dataset.__init__: {e}")
    cupy = None
    cuda_utils = None
    pass


__all__ = [
    "np_savez",
    "save_as_pic",
    "save_every_frame_of_an_entire_DVS_dataset",
    "play_frame",
    "load_aedat_v3",
    "load_ATIS_bin",
    "load_npz_frames",
    "integrate_events_segment_to_frame",
    "cal_fixed_frames_number_segment_index",
    "integrate_events_by_fixed_frames_number",
    "integrate_events_file_to_frames_file_by_fixed_frames_number",
    "integrate_events_by_fixed_duration",
    "integrate_events_file_to_frames_file_by_fixed_duration",
    "save_frames_to_npz_and_print",
    "create_same_directory_structure",
    "split_to_train_test_set",
    "fast_split_to_train_test_set",
    "pad_sequence_collate",
    "padded_sequence_mask",
    "create_sub_dataset",
]


np_savez = np.savez_compressed if configure.save_datasets_compressed else np.savez


[文档] def save_as_pic( x: Union[torch.Tensor, np.ndarray], save_pic_to: str = "./", pic_first_name: str = "pic", ) -> None: r""" **API Language:** :ref:`中文 <save_as_pic-cn>` | :ref:`English <save_as_pic-en>` ---- .. _save_as_pic-cn: * **中文** 将事件帧 ``x`` 保存为一组图片。函数会将 ``x[:, 0]`` 写入绿色通道、``x[:, 1]`` 写入蓝色通道, 并按 ``{pic_first_name}_{t}.png`` 的格式逐帧保存。 :param x: 形状为 ``[T, 2, H, W]`` 的帧序列 :type x: Union[torch.Tensor, np.ndarray] :param save_pic_to: 图片保存目录 :type save_pic_to: str :param pic_first_name: 图片文件名前缀,保存文件名形如 ``f"{pic_first_name}_{t}.png"`` :type pic_first_name: str :return: None :rtype: None ---- .. _save_as_pic-en: * **English** Save event frames in ``x`` as images. The function writes ``x[:, 0]`` to the green channel, ``x[:, 1]`` to the blue channel, and stores each frame as ``{pic_first_name}_{t}.png``. :param x: frames with ``shape=[T, 2, H, W]`` :type x: Union[torch.Tensor, np.ndarray] :param save_pic_to: where to store images :type save_pic_to: str :param pic_first_name: prefix for image names (stored image names are: ``f"{pic_first_name}_{t}.png"``) :type pic_first_name: str :return: None :rtype: None ---- * **代码示例 | Example** .. code:: python save_as_pic(frame, './demo', 'first_pic') """ if isinstance(x, np.ndarray): x = torch.from_numpy(x) save_pic_to = Path(save_pic_to) to_img = transforms.ToPILImage() img_tensor = torch.zeros([x.shape[0], 3, x.shape[2], x.shape[3]]) img_tensor[:, 1] = x[:, 0] img_tensor[:, 2] = x[:, 1] for t in range(img_tensor.shape[0]): plt.imshow(to_img(img_tensor[t])) plt.axis("off") plt.savefig( save_pic_to / f"{pic_first_name}_{t}.png", bbox_inches="tight", pad_inches=0 )
[文档] def save_every_frame_of_an_entire_DVS_dataset( dataset: str, dataset_path: str, time_steps: int, save_pic_to: str = "./", number_of_threads: int = 4, ): """ **API Language:** :ref:`中文 <save_every_frame_of_an_entire_DVS_dataset-cn>` | :ref:`English <save_every_frame_of_an_entire_DVS_dataset-en>` ---- .. _save_every_frame_of_an_entire_DVS_dataset-cn: * **中文** 将指定 DVS 数据集的每个样本按固定帧数加载为帧数据, 并将所有帧逐张保存为图片。 :param dataset: 要保存的数据集名称。当前可用的选项有:DVS128Gesture、CIFAR10DVS 和 NCaltech101。 :type dataset: str :param dataset_path: 与加载数据集相同的存储路径。 :type dataset_path: str :param time_steps: 与加载数据集相同的 T。 :type time_steps: int :param save_pic_to: 每一帧图像的保存位置。 :type save_pic_to: str :param number_of_threads: 用于保存图像的线程数。 :type number_of_threads: int :return: None :rtype: None :raises ValueError: 当必要参数为空, 或 ``dataset`` 不是 ``"DVS128Gesture"``, ``"CIFAR10DVS"``, ``"NCaltech101"`` 之一时抛出。 ---- .. _save_every_frame_of_an_entire_DVS_dataset-en: * **English** Load every sample from the specified DVS dataset as frame data with a fixed frame count, and save every frame as an image. :param dataset: name of the dataset to be saved. The current available options are: DVS128Gesture, CIFAR10DVS and NCaltech101. :type dataset: str :param dataset_path: same storage path as loading dataset. :type dataset_path: str :param time_steps: same T as loading the dataset. :type time_steps: int :param save_pic_to: where to store each frame's image. :type save_pic_to: str :param number_of_threads: how many threads are used to save images. :type number_of_threads: int :return: None :rtype: None :raises ValueError: raised when required arguments are empty, or when ``dataset`` is not one of ``"DVS128Gesture"``, ``"CIFAR10DVS"``, or ``"NCaltech101"``. ---- * **代码示例 | Example** .. code:: python save_every_frame_of_an_entire_DVS_dataset(dataset='DVS128Gesture', dataset_path="../../datasets/DVS128Gesture", time_steps=16, save_pic_to='./demo', number_of_threads=20) save_every_frame_of_an_entire_DVS_dataset(dataset='CIFAR10DVS', dataset_path="../../datasets/cifar10dvs", time_steps=10, save_pic_to='./demo', number_of_threads=20) save_every_frame_of_an_entire_DVS_dataset(dataset='NCaltech101', dataset_path="../../datasets/NCaltech101", time_steps=14, save_pic_to='./demo', number_of_threads=20) """ if not dataset or not dataset_path or time_steps is None or not save_pic_to: raise ValueError( "All parameters(dataset, dataset_path, time_steps and save_pic_to) must be provided and cannot be empty." ) if dataset == "DVS128Gesture": from spikingjelly.datasets.dvs128_gesture import DVS128Gesture data = DVS128Gesture( root=dataset_path, train=False, data_type="frame", split_by="number", frames_number=time_steps, ) elif dataset == "CIFAR10DVS": from spikingjelly.datasets.cifar10_dvs import CIFAR10DVS data = CIFAR10DVS( root=dataset_path, data_type="frame", split_by="number", frames_number=time_steps, ) elif dataset == "NCaltech101": from spikingjelly.datasets.n_caltech101 import NCaltech101 data = NCaltech101( root=dataset_path, data_type="frame", split_by="number", frames_number=time_steps, ) else: raise ValueError( "The dataset attribute can only be DVS128Gesture, CIFAR10DVS or NCaltech101" ) import multiprocessing multiprocessing.freeze_support() pool = multiprocessing.Pool(processes=number_of_threads) for i in range(len(data)): frame, _ = data[i] pool.apply_async(save_as_pic, args=(frame, save_pic_to, str(i))) pool.close() pool.join() print("complete!!!")
[文档] def play_frame(x: Union[torch.Tensor, np.ndarray], save_gif_to: str = None) -> None: r""" **API Language:** :ref:`中文 <play_frame-cn>` | :ref:`English <play_frame-en>` ---- .. _play_frame-cn: * **中文** :param x: 形状为 ``shape=[T, 2, H, W]`` 的帧 :type x: Union[torch.Tensor, np.ndarray] :param save_gif_to: 如果 ``None``,此函数将播放帧。 如果不为 ``None``,此函数将不播放帧,而是将帧保存到路径 ``save_gif_to`` 中的 gif 文件 :type save_gif_to: str :return: None :rtype: None ---- .. _play_frame-en: * **English** :param x: frames with ``shape=[T, 2, H, W]`` :type x: Union[torch.Tensor, np.ndarray] :param save_gif_to: If ``None``, this function will play the frames. If not ``None``, this function will not play the frames but save the frames to a gif file in the path ``save_gif_to`` :type save_gif_to: str :return: None :rtype: None """ if isinstance(x, np.ndarray): x = torch.from_numpy(x) to_img = transforms.ToPILImage() img_tensor = torch.zeros([x.shape[0], 3, x.shape[2], x.shape[3]]) img_tensor[:, 1] = x[:, 0] img_tensor[:, 2] = x[:, 1] if save_gif_to is None: while True: for t in range(img_tensor.shape[0]): plt.imshow(to_img(img_tensor[t])) plt.pause(0.01) else: img_list = [] for t in range(img_tensor.shape[0]): img_list.append(to_img(img_tensor[t])) img_list[0].save(save_gif_to, save_all=True, append_images=img_list[1:], loop=0) print(f"Save frames to [{save_gif_to}].")
[文档] def load_aedat_v3(file_name: Union[str, Path]) -> dict: r""" **API Language:** :ref:`中文 <load_aedat_v3-cn>` | :ref:`English <load_aedat_v3-en>` ---- .. _load_aedat_v3-cn: * **中文** 此函数参考了 https://gitlab.com/inivation/dv/dv-python 编写。 它可以用于 DVS128 Gesture。 :param file_name: aedat v3 文件的路径 :type file_name: Union[str, pathlib.Path] :return: 一个字典,其键为 ``['t', 'x', 'y', 'p']``,值为 ``numpy.ndarray`` :rtype: dict ---- .. _load_aedat_v3-en: * **English** This function is written by referring to https://gitlab.com/inivation/dv/dv-python . It can be used for DVS128 Gesture. :param file_name: path of the aedat v3 file :type file_name: Union[str, pathlib.Path] :return: a dict whose keys are ``['t', 'x', 'y', 'p']`` and values are ``numpy.ndarray`` :rtype: dict """ with open(file_name, "rb") as bin_f: # skip ascii header line = bin_f.readline() while line.startswith(b"#"): if line == b"#!END-HEADER\r\n": break else: line = bin_f.readline() txyp = {"t": [], "x": [], "y": [], "p": []} while True: header = bin_f.read(28) if not header or len(header) == 0: break # read header e_type = struct.unpack("H", header[0:2])[0] # e_source = struct.unpack("H", header[2:4])[0] e_size = struct.unpack("I", header[4:8])[0] # e_offset = struct.unpack("I", header[8:12])[0] e_tsoverflow = struct.unpack("I", header[12:16])[0] e_capacity = struct.unpack("I", header[16:20])[0] # e_number = struct.unpack("I", header[20:24])[0] # e_valid = struct.unpack("I", header[24:28])[0] data_length = e_capacity * e_size data = bin_f.read(data_length) counter = 0 if e_type == 1: while data[counter : counter + e_size]: aer_data = struct.unpack("I", data[counter : counter + 4])[0] timestamp = ( struct.unpack("I", data[counter + 4 : counter + 8])[0] | e_tsoverflow << 31 ) x = (aer_data >> 17) & 0x00007FFF y = (aer_data >> 2) & 0x00007FFF pol = (aer_data >> 1) & 0x00000001 counter = counter + e_size txyp["x"].append(x) txyp["y"].append(y) txyp["t"].append(timestamp) txyp["p"].append(pol) else: # non-polarity event packet, not implemented pass txyp["x"] = np.asarray(txyp["x"]) txyp["y"] = np.asarray(txyp["y"]) txyp["t"] = np.asarray(txyp["t"]) txyp["p"] = np.asarray(txyp["p"]) return txyp
[文档] def load_ATIS_bin(file_name: Union[str, Path]) -> dict: r""" **API Language:** :ref:`中文 <load_ATIS_bin-cn>` | :ref:`English <load_ATIS_bin-en>` ---- .. _load_ATIS_bin-cn: * **中文** 此函数参考了 https://github.com/jackd/events-tfds 编写。 每个 ATIS 二进制示例都是一个独立的二进制文件,包含一个事件列表。每个事件占用 40 位,如下所述: 位 39 - 32: X地址(以像素为单位) 位 31 - 24: Y地址(以像素为单位) 位 23: 极性(0 表示 OFF,1 表示 ON) 位 22 - 0: 时间戳(以微秒为单位) :param file_name: ATIS 二进制文件的路径 :type file_name: Union[str, pathlib.Path] :return: 一个字典,其键为 ``['t', 'x', 'y', 'p']``,值为 ``numpy.ndarray`` :rtype: dict ---- .. _load_ATIS_bin-en: * **English** This function is written by referring to https://github.com/jackd/events-tfds . Each ATIS binary example is a separate binary file consisting of a list of events. Each event occupies 40 bits as described below: bit 39 - 32: Xaddress (in pixels) bit 31 - 24: Yaddress (in pixels) bit 23: Polarity (0 for OFF, 1 for ON) bit 22 - 0: Timestamp (in microseconds) :param file_name: path of the ATIS binary file :type file_name: Union[str, pathlib.Path] :return: a dict whose keys are ``['t', 'x', 'y', 'p']`` and values are ``numpy.ndarray`` :rtype: dict """ with open(file_name, "rb") as bin_f: # `& 128` 是取一个8位二进制数的最高位 # `& 127` 是取其除了最高位,也就是剩下的7位 raw_data = np.uint32(np.fromfile(bin_f, dtype=np.uint8)) x = raw_data[0::5] y = raw_data[1::5] rd_2__5 = raw_data[2::5] p = (rd_2__5 & 128) >> 7 t = ((rd_2__5 & 127) << 16) | (raw_data[3::5] << 8) | (raw_data[4::5]) return {"t": t, "x": x, "y": y, "p": p}
[文档] def load_npz_frames(file_name: Union[str, Path]) -> np.ndarray: r""" **API Language:** :ref:`中文 <load_npz_frames-cn>` | :ref:`English <load_npz_frames-en>` ---- .. _load_npz_frames-cn: * **中文** :param file_name: 保存帧的 npz 文件的路径 :type file_name: Union[str, pathlib.Path] :return: 帧 :rtype: np.ndarray ---- .. _load_npz_frames-en: * **English** :param file_name: path of the npz file that saves the frames :type file_name: Union[str, pathlib.Path] :return: frames :rtype: np.ndarray """ return np.load(file_name, allow_pickle=True)["frames"].astype(np.float32)
[文档] def integrate_events_segment_to_frame( x: np.ndarray, y: np.ndarray, p: np.ndarray, H: int, W: int, j_l: int = 0, j_r: int = -1, ) -> np.ndarray: r""" **API Language:** :ref:`中文 <integrate_events_segment_to_frame-cn>` | :ref:`English <integrate_events_segment_to_frame-en>` ---- .. _integrate_events_segment_to_frame-cn: * **中文** 将双通道帧记为 :math:`F`,像素 :math:`(p, x, y)` 的像素值是从索引在 :math:`[j_{l}, j_{r})` 内的事件数据积分得到的: .. math:: F(p, x, y) = \\sum_{i = j_{l}}^{j_{r} - 1} \\mathcal{I}_{p, x, y}(p_{i}, x_{i}, y_{i}) 其中 :math:`\\lfloor \\cdot \\rfloor` 是取整运算,:math:`\\mathcal{I}_{p, x, y}(p_{i}, x_{i}, y_{i})` 是指示函数,仅在 :math:`(p, x, y) = (p_{i}, x_{i}, y_{i})` 时等于 1。 :param x: 事件的 x 坐标 :type x: numpy.ndarray :param y: 事件的 y 坐标 :type y: numpy.ndarray :param p: 事件的极性 :type p: numpy.ndarray :param H: 帧的高度 :type H: int :param W: 帧的宽度 :type W: int :param j_l: 积分区间的起始索引(包含) :type j_l: int :param j_r: 积分区间的右端索引(不包含) :type j_r: int :return: 单个双通道帧 :rtype: np.ndarray ---- .. _integrate_events_segment_to_frame-en: * **English** Denote a two channels frame as :math:`F` and a pixel at :math:`(p, x, y)` as :math:`F(p, x, y)`, the pixel value is integrated from the events data whose indices are in :math:`[j_{l}, j_{r})`: .. math:: F(p, x, y) = \\sum_{i = j_{l}}^{j_{r} - 1} \\mathcal{I}_{p, x, y}(p_{i}, x_{i}, y_{i}) where :math:`\\lfloor \\cdot \\rfloor` is the floor operation, :math:`\\mathcal{I}_{p, x, y}(p_{i}, x_{i}, y_{i})` is an indicator function and it equals 1 only when :math:`(p, x, y) = (p_{i}, x_{i}, y_{i})`. :param x: x-coordinate of events :type x: numpy.ndarray :param y: y-coordinate of events :type y: numpy.ndarray :param p: polarity of events :type p: numpy.ndarray :param H: height of the frame :type H: int :param W: width of the frame :type W: int :param j_l: the start index of the integral interval, which is included :type j_l: int :param j_r: the right index of the integral interval, which is not included :type j_r: int :return: a single two-channel frame :rtype: np.ndarray """ # 累计脉冲需要用bitcount而不能直接相加,原因可参考下面的示例代码,以及 # https://stackoverflow.com/questions/15973827/handling-of-duplicate-indices-in-numpy-assignments # We must use ``bincount`` rather than simply ``+``. See the following reference: # https://stackoverflow.com/questions/15973827/handling-of-duplicate-indices-in-numpy-assignments # Here is an example: # height = 3 # width = 3 # frames = np.zeros(shape=[2, height, width]) # events = { # 'x': np.asarray([1, 2, 1, 1]), # 'y': np.asarray([1, 1, 1, 2]), # 'p': np.asarray([0, 1, 0, 1]) # } # # frames[0, events['y'], events['x']] += (1 - events['p']) # frames[1, events['y'], events['x']] += events['p'] # print('wrong accumulation\n', frames) # # frames = np.zeros(shape=[2, height, width]) # for i in range(events['p'].__len__()): # frames[events['p'][i], events['y'][i], events['x'][i]] += 1 # print('correct accumulation\n', frames) # # frames = np.zeros(shape=[2, height, width]) # frames = frames.reshape(2, -1) # # mask = [events['p'] == 0] # mask.append(np.logical_not(mask[0])) # for i in range(2): # position = events['y'][mask[i]] * width + events['x'][mask[i]] # events_number_per_pos = np.bincount(position) # idx = np.arange(events_number_per_pos.size) # frames[i][idx] += events_number_per_pos # frames = frames.reshape(2, height, width) # print('correct accumulation by bincount\n', frames) frame = np.zeros(shape=[2, H * W]) x = x[j_l:j_r].astype(int) # avoid overflow y = y[j_l:j_r].astype(int) p = p[j_l:j_r] mask = [] mask.append(p == 0) mask.append(np.logical_not(mask[0])) for c in range(2): position = y[mask[c]] * W + x[mask[c]] events_number_per_pos = np.bincount(position) frame[c][np.arange(events_number_per_pos.size)] += events_number_per_pos return frame.reshape((2, H, W))
[文档] def cal_fixed_frames_number_segment_index( events_t: np.ndarray, split_by: str, frames_num: int ) -> tuple: r""" **API Language:** :ref:`中文 <cal_fixed_frames_number_segment_index-cn>` | :ref:`English <cal_fixed_frames_number_segment_index-en>` ---- .. _cal_fixed_frames_number_segment_index-cn: * **中文** 将 ``frames_num`` 记为 :math:`M`,如果 ``split_by`` 为 ``'time'``,则 .. math:: \\Delta T & = [\\frac{t_{N-1} - t_{0}}{M}] \\\\ j_{l} & = \\mathop{\\arg\\min}\\limits_{k} \\{t_{k} | t_{k} \\geq t_{0} + \\Delta T \\cdot j\\} \\\\ j_{r} & = \\begin{cases} \\mathop{\\arg\\max}\\limits_{k} \\{t_{k} | t_{k} < t_{0} + \\Delta T \\cdot (j + 1)\\} + 1, & j < M - 1 \\cr N, & j = M - 1 \\end{cases} 如果 ``split_by`` 为 ``'number'``,则 .. math:: j_{l} & = [\\frac{N}{M}] \\cdot j \\\\ j_{r} & = \\begin{cases} [\\frac{N}{M}] \\cdot (j + 1), & j < M - 1 \\cr N, & j = M - 1 \\end{cases} :param events_t: 事件的 t :type events_t: numpy.ndarray :param split_by: 'time' 或 'number' :type split_by: str :param frames_num: 帧的数量 :type frames_num: int :return: 一个元组 ``(j_l, j_r)`` :rtype: tuple ---- .. _cal_fixed_frames_number_segment_index-en: * **English** Denote ``frames_num`` as :math:`M`, if ``split_by`` is ``'time'``, then .. math:: \\Delta T & = [\\frac{t_{N-1} - t_{0}}{M}] \\\\ j_{l} & = \\mathop{\\arg\\min}\\limits_{k} \\{t_{k} | t_{k} \\geq t_{0} + \\Delta T \\cdot j\\} \\\\ j_{r} & = \\begin{cases} \\mathop{\\arg\\max}\\limits_{k} \\{t_{k} | t_{k} < t_{0} + \\Delta T \\cdot (j + 1)\\} + 1, & j < M - 1 \\cr N, & j = M - 1 \\end{cases} If ``split_by`` is ``'number'``, then .. math:: j_{l} & = [\\frac{N}{M}] \\cdot j \\\\ j_{r} & = \\begin{cases} [\\frac{N}{M}] \\cdot (j + 1), & j < M - 1 \\cr N, & j = M - 1 \\end{cases} :param events_t: events' t :type events_t: numpy.ndarray :param split_by: 'time' or 'number' :type split_by: str :param frames_num: the number of frames :type frames_num: int :return: a tuple ``(j_l, j_r)`` :rtype: tuple """ j_l = np.zeros(shape=[frames_num], dtype=int) j_r = np.zeros(shape=[frames_num], dtype=int) N = events_t.size if split_by == "number": di = N // frames_num for i in range(frames_num): j_l[i] = i * di j_r[i] = j_l[i] + di j_r[-1] = N elif split_by == "time": dt = (events_t[-1] - events_t[0]) // frames_num idx = np.arange(N) for i in range(frames_num): t_l = dt * i + events_t[0] t_r = t_l + dt mask = np.logical_and(events_t >= t_l, events_t < t_r) idx_masked = idx[mask] j_l[i] = idx_masked[0] j_r[i] = idx_masked[-1] + 1 j_r[-1] = N else: raise NotImplementedError return j_l, j_r
[文档] def integrate_events_by_fixed_frames_number( events: dict, split_by: str, frames_num: int, H: int, W: int ) -> np.ndarray: r""" **API Language:** :ref:`中文 <integrate_events_by_fixed_frames_number-cn>` | :ref:`English <integrate_events_by_fixed_frames_number-en>` ---- .. _integrate_events_by_fixed_frames_number-cn: * **中文** 按固定帧数将事件积分到帧中。 详见 :func:`cal_fixed_frames_number_segment_index` 和 :func:`integrate_events_segment_to_frame`。 :param events: 一个字典,其键为 ``['t', 'x', 'y', 'p']``,值为 ``numpy.ndarray`` :type events: dict :param split_by: 'time' 或 'number' :type split_by: str :param frames_num: 帧的数量 :type frames_num: int :param H: 帧的高度 :type H: int :param W: 帧的宽度 :type W: int :return: 帧 :rtype: np.ndarray ---- .. _integrate_events_by_fixed_frames_number-en: * **English** Integrate events to frames by fixed frames number. See :func:`cal_fixed_frames_number_segment_index` and :func:`integrate_events_segment_to_frame` for more details. :param events: a dict whose keys are ``['t', 'x', 'y', 'p']`` and values are ``numpy.ndarray`` :type events: dict :param split_by: 'time' or 'number' :type split_by: str :param frames_num: the number of frames :type frames_num: int :param H: the height of frame :type H: int :param W: the width of frame :type W: int :return: frames :rtype: np.ndarray """ t, x, y, p = (events[key] for key in ("t", "x", "y", "p")) j_l, j_r = cal_fixed_frames_number_segment_index(t, split_by, frames_num) frames = np.zeros([frames_num, 2, H, W]) for i in range(frames_num): frames[i] = integrate_events_segment_to_frame(x, y, p, H, W, j_l[i], j_r[i]) return frames
[文档] def integrate_events_file_to_frames_file_by_fixed_frames_number( loader: Callable, events_np_file: str, output_dir: str, split_by: str, frames_num: int, H: int, W: int, print_save: bool = False, ) -> None: """ **API Language:** :ref:`中文 <integrate_events_file_to_frames_file_by_fixed_frames_number-cn>` | :ref:`English <integrate_events_file_to_frames_file_by_fixed_frames_number-en>` ---- .. _integrate_events_file_to_frames_file_by_fixed_frames_number-cn: * **中文** 将单个事件文件按固定帧数积分成帧,并将结果保存到 ``output_dir`` 下与 ``events_np_file`` 同名的 ``.npz`` 文件中。保存文件包含键 ``frames``。 :param loader: 从 ``events_np_file`` 加载事件字典的函数 :type loader: Callable :param events_np_file: 事件文件路径 :type events_np_file: str :param output_dir: 帧文件输出目录 :type output_dir: str :param split_by: ``'time'`` 或 ``'number'`` :type split_by: str :param frames_num: 帧数量 :type frames_num: int :param H: 帧高度 :type H: int :param W: 帧宽度 :type W: int :param print_save: 若为 ``True``,则打印保存路径 :type print_save: bool :return: None :rtype: None ---- .. _integrate_events_file_to_frames_file_by_fixed_frames_number-en: * **English** Integrate an event file to frames by a fixed frame count and save it. The saved archive contains the ``frames`` key. See :func:`cal_fixed_frames_number_segment_index` and :func:`integrate_events_segment_to_frame` for more details. :param loader: a function that can load events from `events_np_file` :type loader: Callable :param events_np_file: path of the events np file :type events_np_file: str :param output_dir: output directory for saving the frames :type output_dir: str :param split_by: 'time' or 'number' :type split_by: str :param frames_num: the number of frames :type frames_num: int :param H: the height of frame :type H: int :param W: the width of frame :type W: int :param print_save: If ``True``, this function will print saved files' paths. :type print_save: bool :return: None :rtype: None """ fname = os.path.join(output_dir, os.path.basename(events_np_file)) np_savez( fname, frames=integrate_events_by_fixed_frames_number( loader(events_np_file), split_by, frames_num, H, W ), ) if print_save: print(f"Frames [{fname}] saved.")
[文档] def integrate_events_by_fixed_duration( events: dict, duration: int, H: int, W: int ) -> np.ndarray: r""" **API Language:** :ref:`中文 <integrate_events_by_fixed_duration-cn>` | :ref:`English <integrate_events_by_fixed_duration-en>` ---- .. _integrate_events_by_fixed_duration-cn: * **中文** 按每帧固定时间时长将事件积分到帧中。 :param events: 一个字典,其键为 ``['t', 'x', 'y', 'p']``,值为 ``numpy.ndarray`` :type events: dict :param duration: 每帧的时间时长 :type duration: int :param H: 帧的高度 :type H: int :param W: 帧的宽度 :type W: int :return: 帧 :rtype: np.ndarray ---- .. _integrate_events_by_fixed_duration-en: * **English** Integrate events to frames by fixed time duration of each frame. :param events: a dict whose keys are ``['t', 'x', 'y', 'p']`` and values are ``numpy.ndarray`` :type events: dict :param duration: the time duration of each frame :type duration: int :param H: the height of frame :type H: int :param W: the width of frame :type W: int :return: frames :rtype: np.ndarray """ x = events["x"] y = events["y"] t = events["t"] p = events["p"] N = t.size t = t - t.min() frames_num = int(math.ceil(t[-1] / duration)) frames = np.zeros([frames_num, 2, H, W]) frame_index = t // duration left = 0 for i in range(frames_num - 1): right = np.searchsorted(frame_index, i + 1, side="left") frames[i] = integrate_events_segment_to_frame(x, y, p, H, W, left, right) left = right frames[-1] = integrate_events_segment_to_frame(x, y, p, H, W, left, N) return frames
[文档] def integrate_events_file_to_frames_file_by_fixed_duration( loader: Callable, events_np_file: str, output_dir: str, duration: int, H: int, W: int, print_save: bool = False, ) -> int: r""" **API Language:** :ref:`中文 <integrate_events_file_to_frames_file_by_fixed_duration-cn>` | :ref:`English <integrate_events_file_to_frames_file_by_fixed_duration-en>` ---- .. _integrate_events_file_to_frames_file_by_fixed_duration-cn: * **中文** 按每帧固定时间时长将事件积分到帧中并保存。 :param loader: 一个可以从 `events_np_file` 加载事件的函数 :type loader: Callable :param events_np_file: 事件的 np 文件的路径 :type events_np_file: str :param output_dir: 保存帧的输出目录 :type output_dir: str :param duration: 每帧的时间时长 :type duration: int :param H: 帧的高度 :type H: int :param W: 帧的宽度 :type W: int :param print_save: 如果 ``True``,此函数将打印保存的文件的路径。 :type print_save: bool :return: 帧的数量 :rtype: int ---- .. _integrate_events_file_to_frames_file_by_fixed_duration-en: * **English** Integrate events to frames by fixed time duration of each frame. :param loader: a function that can load events from `events_np_file` :type loader: Callable :param events_np_file: path of the events np file :type events_np_file: str :param output_dir: output directory for saving the frames :type output_dir: str :param duration: the time duration of each frame :type duration: int :param H: the height of frame :type H: int :param W: the weight of frame :type W: int :param print_save: If ``True``, this function will print saved files' paths. :type print_save: bool :return: number of frames saved :rtype: int """ frames = integrate_events_by_fixed_duration(loader(events_np_file), duration, H, W) fname, _ = os.path.splitext(os.path.basename(events_np_file)) fname = os.path.join(output_dir, f"{fname}_{frames.shape[0]}.npz") np_savez(fname, frames=frames) if print_save: print(f"Frames [{fname}] saved.") return frames.shape[0]
[文档] def save_frames_to_npz_and_print(fname: str, frames: np.ndarray): r""" **API Language:** :ref:`中文 <save_frames_to_npz_and_print-cn>` | :ref:`English <save_frames_to_npz_and_print-en>` ---- .. _save_frames_to_npz_and_print-cn: * **中文** :param fname: 目标 npz 文件的路径 :type fname: str :param frames: 帧对象 :type frames: np.ndarray :return: None ---- .. _save_frames_to_npz_and_print-en: * **English** :param fname: path of the target npz file :type fname: str :param frames: frames object :type frames: np.ndarray :return: None """ np_savez(fname, frames=frames) print(f"Frames [{fname}] saved.")
[文档] def create_same_directory_structure( source_dir: Union[str, Path], target_dir: Union[str, Path] ) -> None: r""" **API Language:** :ref:`中文 <create_same_directory_structure-cn>` | :ref:`English <create_same_directory_structure-en>` ---- .. _create_same_directory_structure-cn: * **中文** 在 ``target_dir`` 中创建与 ``source_dir`` 相同的目录结构。 :param source_dir: 要复制的目录的路径 :type source_dir: Union[str, pathlib.Path] :param target_dir: 要复制到的目录的路径 :type target_dir: Union[str, pathlib.Path] :return: None ---- .. _create_same_directory_structure-en: * **English** Create the same directory structure in ``target_dir`` with that of ``source_dir``. :param source_dir: Path of the directory that be copied from :type source_dir: Union[str, pathlib.Path] :param target_dir: Path of the directory that be copied to :type target_dir: Union[str, pathlib.Path] :return: None """ for sub_dir_name in os.listdir(source_dir): source_sub_dir = os.path.join(source_dir, sub_dir_name) if os.path.isdir(source_sub_dir): target_sub_dir = os.path.join(target_dir, sub_dir_name) os.mkdir(target_sub_dir) print(f"Mkdir [{target_sub_dir}].") create_same_directory_structure(source_sub_dir, target_sub_dir)
[文档] def split_to_train_test_set( train_ratio: float, origin_dataset: torch.utils.data.Dataset, num_classes: int, random_split: bool = False, ): r""" **API Language:** :ref:`中文 <split_to_train_test_set-cn>` | :ref:`English <split_to_train_test_set-en>` ---- .. _split_to_train_test_set-cn: * **中文** :param train_ratio: 将原始数据集按此比例划分为训练集 :type train_ratio: float :param origin_dataset: 原始数据集 :type origin_dataset: torch.utils.data.Dataset :param num_classes: 总类别数,例如 MNIST 数据集为 ``10`` :type num_classes: int :param random_split: 如果 ``False``,每个类的前半部分样本将包含在训练集中,其余部分包含在测试集中。 如果 ``True``,此函数将随机划分每个类别的样本。 随机性由 ``numpy.random.seed`` 控制 :type random_split: bool :return: 一个元组 ``(train_set, test_set)``, 二者均为基于 ``origin_dataset`` 构造的 :class:`torch.utils.data.Subset` :rtype: tuple[torch.utils.data.Subset, torch.utils.data.Subset] ---- .. _split_to_train_test_set-en: * **English** :param train_ratio: split the ratio of the origin dataset as the train set :type train_ratio: float :param origin_dataset: the origin dataset :type origin_dataset: torch.utils.data.Dataset :param num_classes: total classes number, e.g., ``10`` for the MNIST dataset :type num_classes: int :param random_split: If ``False``, the front ratio of samples in each classes will be included in train set, while the reset will be included in test set. If ``True``, this function will split samples in each classes randomly. The randomness is controlled by ``numpy.random.seed`` :type random_split: bool :return: a tuple ``(train_set, test_set)``, where both elements are :class:`torch.utils.data.Subset` instances built from ``origin_dataset`` :rtype: tuple[torch.utils.data.Subset, torch.utils.data.Subset] """ label_idx = [] for i in range(num_classes): label_idx.append([]) for i, item in enumerate(tqdm.tqdm(origin_dataset)): y = item[1] if isinstance(y, np.ndarray) or isinstance(y, torch.Tensor): y = y.item() label_idx[y].append(i) train_idx = [] test_idx = [] if random_split: for i in range(num_classes): np.random.shuffle(label_idx[i]) for i in range(num_classes): pos = math.ceil(label_idx[i].__len__() * train_ratio) train_idx.extend(label_idx[i][0:pos]) test_idx.extend(label_idx[i][pos : label_idx[i].__len__()]) return torch.utils.data.Subset(origin_dataset, train_idx), torch.utils.data.Subset( origin_dataset, test_idx )
[文档] def fast_split_to_train_test_set( train_ratio: float, origin_dataset: torch.utils.data.Dataset, num_classes: int, random_split: bool = False, batch_size: int = 16, ): r""" **API Language:** :ref:`中文 <fast_split_to_train_test_set-cn>` | :ref:`English <fast_split_to_train_test_set-en>` ---- .. _fast_split_to_train_test_set-cn: * **中文** :param train_ratio: 将原始数据集按此比例划分为训练集 :type train_ratio: float :param origin_dataset: 原始数据集 :type origin_dataset: torch.utils.data.Dataset :param num_classes: 总类别数,例如 MNIST 数据集为 ``10`` :type num_classes: int :param random_split: 如果 ``False``,每个类的前半部分样本将包含在训练集中,其余部分包含在测试集中。 如果 ``True``,此函数将随机划分每个类别的样本。随机性由 ``numpy.random.seed`` 控制 :type random_split: bool :param batch_size: 每个批次处理的样本数量 :type batch_size: int :return: 一个元组 ``(train_set, test_set)``, 二者均为基于 ``origin_dataset`` 构造的 :class:`torch.utils.data.Subset` :rtype: tuple[torch.utils.data.Subset, torch.utils.data.Subset] ---- .. _fast_split_to_train_test_set-en: * **English** :param train_ratio: split the ratio of the origin dataset as the train set :type train_ratio: float :param origin_dataset: the origin dataset :type origin_dataset: torch.utils.data.Dataset :param num_classes: total classes number, e.g., ``10`` for the MNIST dataset :type num_classes: int :param random_split: If ``False``, the front ratio of samples in each classes will be included in train set, while the reset will be included in test set. If ``True``, this function will split samples in each classes randomly. The randomness is controlled by ``numpy.random.seed`` :type random_split: bool :param batch_size: the number of samples to process in each batch :type batch_size: int :return: a tuple ``(train_set, test_set)``, where both elements are :class:`torch.utils.data.Subset` instances built from ``origin_dataset`` :rtype: tuple[torch.utils.data.Subset, torch.utils.data.Subset] """ label_idx = [[] for _ in range(num_classes)] def process_batch(start_idx, end_idx): for i in range(start_idx, end_idx): item = origin_dataset[i] y = item[1] if isinstance(y, np.ndarray) or isinstance(y, torch.Tensor): y = y.item() label_idx[y].append(i) num_samples = len(origin_dataset) with ThreadPoolExecutor() as executor: futures = [] for start_idx in range(0, num_samples, batch_size): end_idx = min(start_idx + batch_size, num_samples) futures.append(executor.submit(process_batch, start_idx, end_idx)) for future in tqdm.tqdm(futures, desc="Processing batches"): future.result() train_idx = [] test_idx = [] if random_split: for i in range(num_classes): np.random.shuffle(label_idx[i]) for i in range(num_classes): pos = math.ceil(len(label_idx[i]) * train_ratio) train_idx.extend(label_idx[i][:pos]) test_idx.extend(label_idx[i][pos:]) return torch.utils.data.Subset(origin_dataset, train_idx), torch.utils.data.Subset( origin_dataset, test_idx )
[文档] def pad_sequence_collate(batch: list): """ **API Language:** :ref:`中文 <pad_sequence_collate-cn>` | :ref:`English <pad_sequence_collate-en>` ---- .. _pad_sequence_collate-cn: * **中文** * **中文** 可作为 ``DataLoader`` 的 ``collate_fn`` 处理变长序列样本,例如按固定时长积分为帧的 ``NeuromorphicDatasetFolder``。函数会将每个 ``x`` 转成 ``torch.Tensor``,再用 ``torch.nn.utils.rnn.pad_sequence(..., batch_first=True)`` 补齐。 :param batch: 样本列表,每个样本形如 ``(x, y)``,其中 ``x`` 是长度可变的序列,``y`` 是标签 :type batch: list :return: ``(x_p, y, x_len)``, 其中 ``x_p`` 是按相同长度补齐后的批数据, ``y`` 是标签张量, ``x_len`` 是各样本原始长度张量 :rtype: tuple[torch.Tensor, torch.Tensor, torch.Tensor] ---- .. _pad_sequence_collate-en: * **English** * **English** This function can be used as the ``collate_fn`` for ``DataLoader`` to process the dataset with variable length, e.g., a ``NeuromorphicDatasetFolder`` with fixed duration to integrate events to frames. :param batch: a list of samples ``(x, y)``, where ``x`` is a variable-length sequence and ``y`` is the label :type batch: list :return: batched samples ``(x_p, y, x_len)``, where ``x_p`` is padded ``x`` to the same length, ``y`` is the label, and ``x_len`` is the length of ``x`` :rtype: tuple[torch.Tensor, torch.Tensor, torch.Tensor] ---- * **代码示例 | Example** .. code-block:: python class VariableLengthDataset(torch.utils.data.Dataset): def __init__(self, n=1000): super().__init__() self.n = n def __getitem__(self, i): return torch.rand([i + 1, 2]), self.n - i - 1 def __len__(self): return self.n loader = torch.utils.data.DataLoader( VariableLengthDataset(n=32), batch_size=2, collate_fn=pad_sequence_collate, shuffle=True, ) for i, (x_p, label, x_len) in enumerate(loader): print(f"x_p.shape={x_p.shape}, label={label}, x_len={x_len}") if i == 2: break Outputs: .. code-block:: bash x_p.shape=torch.Size([2, 18, 2]), label=tensor([14, 30]), x_len=tensor([18, 2]) x_p.shape=torch.Size([2, 29, 2]), label=tensor([3, 6]), x_len=tensor([29, 26]) x_p.shape=torch.Size([2, 23, 2]), label=tensor([ 9, 23]), x_len=tensor([23, 9]) :return: None :rtype: None """ x_list = [] x_len_list = [] y_list = [] for x, y in batch: x_list.append(torch.as_tensor(x)) x_len_list.append(x.shape[0]) y_list.append(y) return ( torch.nn.utils.rnn.pad_sequence(x_list, batch_first=True), torch.as_tensor(y_list), torch.as_tensor(x_len_list), )
[文档] def padded_sequence_mask(sequence_len: torch.Tensor, T: Optional[int] = None): r""" **API Language:** :ref:`中文 <padded_sequence_mask-cn>` | :ref:`English <padded_sequence_mask-en>` ---- .. _padded_sequence_mask-cn: * **中文** * **中文** 根据每个样本的有效序列长度生成形状为 ``[T, N]`` 的布尔掩码。若 ``T`` 为 ``None``, 则使用 ``sequence_len`` 中的最大值。若 ``sequence_len`` 位于 CUDA 且可用 ``cupy``, 则调用自定义 CuPy kernel;否则使用 PyTorch 广播计算。 :param sequence_len: 形状为 ``[N]`` 的张量,包含每个 batch 元素的序列长度 :type sequence_len: torch.Tensor :param T: 序列最大长度。若为 ``None``,则取 ``sequence_len`` 中的最大值 :type T: Optional[int] :return: 形状为 ``[T, N]`` 的布尔掩码,填充位置为 ``False`` :rtype: torch.Tensor ---- .. _padded_sequence_mask-en: * **English** * **English** Generate a bool mask with shape ``[T, N]`` from the valid sequence length of each sample. If ``T`` is ``None``, the maximum element in ``sequence_len`` will be used. When ``sequence_len`` is on CUDA and ``cupy`` is available, this function uses the custom CuPy kernel; otherwise it falls back to PyTorch broadcasting. :param sequence_len: a tensor ``shape = [N]`` that contains sequence lengths of each batch element :type sequence_len: torch.Tensor :param T: the maximum length of sequences. If ``None``, the maximum element in ``sequence_len`` will be used as ``T`` :type T: Optional[int] :return: a bool mask with shape = [T, N], where the padded position is ``False`` :rtype: torch.Tensor ---- * **代码示例 | Example** .. code-block:: python x1 = torch.rand([2, 6]) x2 = torch.rand([3, 6]) x3 = torch.rand([4, 6]) x = torch.nn.utils.rnn.pad_sequence([x1, x2, x3]) # [T, N, *] print("x.shape=", x.shape) x_len = torch.as_tensor([x1.shape[0], x2.shape[0], x3.shape[0]]) mask = padded_sequence_mask(x_len) print("mask.shape=", mask.shape) print("mask=\n", mask) Outputs: .. code-block:: bash x.shape= torch.Size([4, 3, 6]) mask.shape= torch.Size([4, 3]) mask= tensor([[ True, True, True], [ True, True, True], [False, True, True], [False, False, True]]) :return: None :rtype: None """ if T is None: T = sequence_len.max().item() N = sequence_len.numel() device_id = sequence_len.get_device() if device_id >= 0 and cupy is not None: mask = torch.zeros([T, N], dtype=bool, device=sequence_len.device) with cuda_utils.DeviceEnvironment(device_id): blocks = cuda_utils.cal_blocks(N) T = cupy.asarray(T) N = cupy.asarray(N) sequence_len, mask, T, N = cuda_utils.get_contiguous( sequence_len.to(torch.int), mask, T, N ) kernel_args = [sequence_len, mask, T, N] kernel = cupy.RawKernel( padded_sequence_mask_kernel_code, "padded_sequence_mask_kernel", options=configure.cuda_compiler_options, backend=configure.cuda_compiler_backend, ) kernel( (blocks,), (configure.cuda_threads,), cuda_utils.wrap_args_to_raw_kernel(device_id, *kernel_args), ) return mask else: t_seq = torch.arange(0, T).unsqueeze(1).repeat(1, N).to(sequence_len) # [T, N] return t_seq < sequence_len.unsqueeze(0).repeat(T, 1)
[文档] def create_sub_dataset( source_dir: str, target_dir: str, ratio: float, use_soft_link=True, randomly=False ): """ **API Language:** :ref:`中文 <create_sub_dataset-cn>` | :ref:`English <create_sub_dataset-en>` ---- .. _create_sub_dataset-cn: * **中文** 从原始数据集中按类别子目录结构复制一个子数据集。每个叶子目录会按 ``ratio`` 选择样本; 若 ``use_soft_link`` 为 ``True`` 则创建软链接,否则复制文件。若 ``randomly`` 为 ``True``, 则使用 ``numpy.random.shuffle`` 随机打乱待选文件顺序。 :param source_dir: 原始数据集目录 :type source_dir: str :param target_dir: 子数据集目录 :type target_dir: str :param ratio: 子数据集从原始数据集中复制的样本比例 :type ratio: float :param use_soft_link: 若为 ``True``,则使用软链接;否则直接复制文件 :type use_soft_link: bool :param randomly: 若为 ``True``,则随机选择复制的文件。随机性由 ``numpy.random.seed`` 控制 :type randomly: bool :return: None :rtype: None ---- .. _create_sub_dataset-en: * **English** Create a sub dataset with copy ``ratio`` of samples from the origin dataset. :param source_dir: the directory path of the origin dataset :type source_dir: str :param target_dir: the directory path of the sub dataset :type target_dir: str :param ratio: the ratio of samples the sub dataset will copy from the origin dataset :type ratio: float :param use_soft_link: if ``True``, the sub dataset will use soft links; otherwise, it will copy files :type use_soft_link: bool :param randomly: if ``True``, the files copied from the origin dataset will be picked up randomly. The randomness is controlled by ``numpy.random.seed`` :type randomly: bool :return: None :rtype: None """ if not os.path.exists(target_dir): os.makedirs(target_dir) print(f"Mkdir [{target_dir}].") create_same_directory_structure(source_dir, target_dir) warnings_info = [] for e_root, e_dirs, e_files in os.walk(source_dir, followlinks=True): if e_files.__len__() > 0: output_dir = os.path.join(target_dir, os.path.relpath(e_root, source_dir)) if ratio >= 1.0: samples_number = e_files.__len__() else: samples_number = int(ratio * e_files.__len__()) if samples_number == 0: warnings_info.append( f"Warning: the samples number is 0 in [{output_dir}]." ) if randomly: np.random.shuffle(e_files) for i, e_file in enumerate(e_files): if i >= samples_number: break source_file = os.path.join(e_root, e_file) target_file = os.path.join(output_dir, os.path.basename(source_file)) if use_soft_link: os.symlink(source_file, target_file) # print(f'symlink {source_file} -> {target_file}') else: shutil.copyfile(source_file, target_file) # print(f'copyfile {source_file} -> {target_file}') print( f"[{samples_number}] files in [{e_root}] have been copied to [{output_dir}]." ) for i in range(len(warnings_info)): print(warnings_info[i])