SpikingFlow.softbp 源代码

import torch
import torch.nn as nn
import torch.nn.functional as F

[文档]class ModelPipeline(nn.Module): def __init__(self): ''' 用于解决显存不足的模型流水线。将一个模型分散到各个GPU上,流水线式的进行训练。设计思路与仿真器非常类似。 运行时建议先取一个很小的batch_size,然后观察各个GPU的显存占用,并调整每个module_list中包含的模型比例。 ''' super().__init__() self.module_list = nn.ModuleList() self.gpu_list = []
[文档] def append(self, nn_module, gpu_id): ''' :param nn_module: 新添加的module :param gpu_id: 该模型所在的GPU,不需要带“cuda:”的前缀。例如“2” :return: None 将nn_module添加到流水线中,nn_module会运行在设备gpu_id上。添加的nn_module会按照它们的添加顺序运行。例如首先添加了\ fc1,又添加了fc2,则实际运行是按照input_data->fc1->fc2->output_data的顺序运行。 ''' self.module_list.append(nn_module.to('cuda:' + gpu_id)) self.gpu_list.append('cuda:' + gpu_id)
[文档] def constant_forward(self, x, T, reduce=True): ''' :param x: 输入数据 :param T: 运行时长 :param reduce: 为True则返回运行T个时长,得到T个输出的和;为False则返回这T个输出 :return: T个输出的和或T个输出 让本模型以恒定输入x运行T次,这常见于使用频率编码的SNN。这种方式比forward(x, split_sizes)的运行速度要快很多 ''' pipeline = [] # pipeline[i]中保存要送入到m[i]的数据 for i in range(self.gpu_list.__len__()): pipeline.append(None) pipeline[0] = x.to(self.gpu_list[0]) # 跑满pipeline # 假设m中有5个模型,m[0] m[1] m[2] m[3] m[4],则代码执行顺序为 # # p[ 1 ] = m[ 0 ](p[ 0 ]) # # p[ 2 ] = m[ 1 ](p[ 1 ]) # p[ 1 ] = m[ 0 ](p[ 0 ]) # # p[ 3 ] = m[ 2 ](p[ 2 ]) # p[ 2 ] = m[ 1 ](p[ 1 ]) # p[ 1 ] = m[ 0 ](p[ 0 ]) # # p[ 4 ] = m[ 3 ](p[ 3 ]) # p[ 3 ] = m[ 2 ](p[ 2 ]) # p[ 2 ] = m[ 1 ](p[ 1 ]) # p[ 1 ] = m[ 0 ](p[ 0 ]) for i in range(0, self.gpu_list.__len__()): for j in range(i, 0, -1): if j - 1 == 0: pipeline[j] = self.module_list[j - 1](pipeline[j - 1]) else: pipeline[j] = self.module_list[j - 1](pipeline[j - 1].to(self.gpu_list[j - 1])) t = 0 # 记录从流水线输出的总数量 while True: for i in range(self.gpu_list.__len__(), 0, -1): if i == self.gpu_list.__len__(): # 获取输出 if t == 0: if reduce: ret = self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1])) else: ret = [] ret.append(self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1]))) else: if reduce: ret += self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1])) else: ret.append(self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1]))) t += 1 if t == T: if reduce == False: return torch.cat(ret, dim=0) return ret else: pipeline[i] = self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1]))
[文档] def forward(self, x, split_sizes): ''' :param x: 输入数据 :param split_sizes: 输入数据x会在维度0上被拆分成每split_size一组,得到[x0, x1, ...],这些数据会被串行的送入\ module_list中的各个模块进行计算 :return: 输出数据 例如将模型分成4部分,因而 ``module_list`` 中有4个子模型;将输入分割为3部分,则每次调用 ``forward(x, split_sizes)`` ,函数内部的\ 计算过程如下: .. code-block:: python step=0 x0, x1, x2 |m0| |m1| |m2| |m3| step=1 x0, x1 |m0| x2 |m1| |m2| |m3| step=2 x0 |m0| x1 |m1| x2 |m2| |m3| step=3 |m0| x0 |m1| x1 |m2| x2 |m3| step=4 |m0| |m1| x0 |m2| x1 |m3| x2 step=5 |m0| |m1| |m2| x0 |m3| x1, x2 step=6 |m0| |m1| |m2| |m3| x0, x1, x2 不使用流水线,则任何时刻只有一个GPU在运行,而其他GPU则在等待这个GPU的数据;而使用流水线,例如上面计算过程中的 ``step=3`` 到\ ``step=4``,尽管在代码的写法为顺序执行: .. code-block:: python x0 = m1(x0) x1 = m2(x1) x2 = m3(x2) 但由于PyTorch优秀的特性,上面的3行代码实际上是并行执行的,因为这3个在CUDA上的计算使用各自的数据,互不影响 ''' assert x.shape[0] % split_sizes == 0, print('x.shape[0]不能被split_sizes整除!') x = list(x.split(split_sizes, dim=0)) x_pos = [] # x_pos[i]记录x[i]应该通过哪个module for i in range(x.__len__()): x_pos.append(i + 1 - x.__len__()) while True: for i in range(x_pos.__len__() - 1, -1, -1): if 0 <= x_pos[i] <= self.gpu_list.__len__() - 1: x[i] = self.module_list[x_pos[i]](x[i].to(self.gpu_list[x_pos[i]])) x_pos[i] += 1 if x_pos[0] == self.gpu_list.__len__(): break return torch.cat(x, dim=0)