import torch
import torch.nn as nn
import torch.nn.functional as F
[文档]class ModelPipeline(nn.Module):
def __init__(self):
'''
用于解决显存不足的模型流水线。将一个模型分散到各个GPU上,流水线式的进行训练。设计思路与仿真器非常类似。
运行时建议先取一个很小的batch_size,然后观察各个GPU的显存占用,并调整每个module_list中包含的模型比例。
'''
super().__init__()
self.module_list = nn.ModuleList()
self.gpu_list = []
[文档] def append(self, nn_module, gpu_id):
'''
:param nn_module: 新添加的module
:param gpu_id: 该模型所在的GPU,不需要带“cuda:”的前缀。例如“2”
:return: None
将nn_module添加到流水线中,nn_module会运行在设备gpu_id上。添加的nn_module会按照它们的添加顺序运行。例如首先添加了\
fc1,又添加了fc2,则实际运行是按照input_data->fc1->fc2->output_data的顺序运行。
'''
self.module_list.append(nn_module.to('cuda:' + gpu_id))
self.gpu_list.append('cuda:' + gpu_id)
[文档] def constant_forward(self, x, T, reduce=True):
'''
:param x: 输入数据
:param T: 运行时长
:param reduce: 为True则返回运行T个时长,得到T个输出的和;为False则返回这T个输出
:return: T个输出的和或T个输出
让本模型以恒定输入x运行T次,这常见于使用频率编码的SNN。这种方式比forward(x, split_sizes)的运行速度要快很多
'''
pipeline = [] # pipeline[i]中保存要送入到m[i]的数据
for i in range(self.gpu_list.__len__()):
pipeline.append(None)
pipeline[0] = x.to(self.gpu_list[0])
# 跑满pipeline
# 假设m中有5个模型,m[0] m[1] m[2] m[3] m[4],则代码执行顺序为
#
# p[ 1 ] = m[ 0 ](p[ 0 ])
#
# p[ 2 ] = m[ 1 ](p[ 1 ])
# p[ 1 ] = m[ 0 ](p[ 0 ])
#
# p[ 3 ] = m[ 2 ](p[ 2 ])
# p[ 2 ] = m[ 1 ](p[ 1 ])
# p[ 1 ] = m[ 0 ](p[ 0 ])
#
# p[ 4 ] = m[ 3 ](p[ 3 ])
# p[ 3 ] = m[ 2 ](p[ 2 ])
# p[ 2 ] = m[ 1 ](p[ 1 ])
# p[ 1 ] = m[ 0 ](p[ 0 ])
for i in range(0, self.gpu_list.__len__()):
for j in range(i, 0, -1):
if j - 1 == 0:
pipeline[j] = self.module_list[j - 1](pipeline[j - 1])
else:
pipeline[j] = self.module_list[j - 1](pipeline[j - 1].to(self.gpu_list[j - 1]))
t = 0 # 记录从流水线输出的总数量
while True:
for i in range(self.gpu_list.__len__(), 0, -1):
if i == self.gpu_list.__len__():
# 获取输出
if t == 0:
if reduce:
ret = self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1]))
else:
ret = []
ret.append(self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1])))
else:
if reduce:
ret += self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1]))
else:
ret.append(self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1])))
t += 1
if t == T:
if reduce == False:
return torch.cat(ret, dim=0)
return ret
else:
pipeline[i] = self.module_list[i - 1](pipeline[i - 1].to(self.gpu_list[i - 1]))
[文档] def forward(self, x, split_sizes):
'''
:param x: 输入数据
:param split_sizes: 输入数据x会在维度0上被拆分成每split_size一组,得到[x0, x1, ...],这些数据会被串行的送入\
module_list中的各个模块进行计算
:return: 输出数据
例如将模型分成4部分,因而 ``module_list`` 中有4个子模型;将输入分割为3部分,则每次调用 ``forward(x, split_sizes)`` ,函数内部的\
计算过程如下:
.. code-block:: python
step=0 x0, x1, x2 |m0| |m1| |m2| |m3|
step=1 x0, x1 |m0| x2 |m1| |m2| |m3|
step=2 x0 |m0| x1 |m1| x2 |m2| |m3|
step=3 |m0| x0 |m1| x1 |m2| x2 |m3|
step=4 |m0| |m1| x0 |m2| x1 |m3| x2
step=5 |m0| |m1| |m2| x0 |m3| x1, x2
step=6 |m0| |m1| |m2| |m3| x0, x1, x2
不使用流水线,则任何时刻只有一个GPU在运行,而其他GPU则在等待这个GPU的数据;而使用流水线,例如上面计算过程中的 ``step=3`` 到\
``step=4``,尽管在代码的写法为顺序执行:
.. code-block:: python
x0 = m1(x0)
x1 = m2(x1)
x2 = m3(x2)
但由于PyTorch优秀的特性,上面的3行代码实际上是并行执行的,因为这3个在CUDA上的计算使用各自的数据,互不影响
'''
assert x.shape[0] % split_sizes == 0, print('x.shape[0]不能被split_sizes整除!')
x = list(x.split(split_sizes, dim=0))
x_pos = [] # x_pos[i]记录x[i]应该通过哪个module
for i in range(x.__len__()):
x_pos.append(i + 1 - x.__len__())
while True:
for i in range(x_pos.__len__() - 1, -1, -1):
if 0 <= x_pos[i] <= self.gpu_list.__len__() - 1:
x[i] = self.module_list[x_pos[i]](x[i].to(self.gpu_list[x_pos[i]]))
x_pos[i] += 1
if x_pos[0] == self.gpu_list.__len__():
break
return torch.cat(x, dim=0)