前言
本文基于chainer实现RepVGG_Plus网络结构,并基于torch的结构方式构建chainer版的,并计算RepVGG_Plus的参数量。
代码实现
class SEBlock(chainer.Chain):
def __init__(self, input_channels, internal_neurons):
super(SEBlock, self).__init__()
self.layers = []
self.layers += [('down',L.Convolution2D(in_channels=input_channels,out_channels=internal_neurons,ksize=1,stride=1,nobias=True))]
self.layers += [('_relu',ReLU())]
self.layers += [('up',L.Convolution2D(in_channels=internal_neurons,out_channels=input_channels,ksize=1,stride=1,nobias=True))]
self.layers += [('_sigmoid',Sigmoid())]
self.input_channels = input_channels
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, inputs):
x = average_pooling_2d(inputs, ksize=inputs.shape[2])
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
x = x.reshape(-1, self.input_channels, 1, 1)
return inputs * x
class ConvBnRelu(chainer.Chain):
def __init__(self, in_channels, out_channels, kernel_size, stride, padding, groups=1) :
super(ConvBnRelu, self).__init__()
self.layers = []
self.layers += [('conv',L.Convolution2D(in_channels=in_channels, out_channels=out_channels,ksize=kernel_size,stride=stride,pad=padding,groups=groups, nobias=True))]
self.layers += [('bn',BatchNormalization(out_channels))]
self.layers += [('_relu',ReLU())]
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
return x
class ConvBn(chainer.Chain):
def __init__(self, in_channels, out_channels, kernel_size, stride, padding, groups=1) :
super(ConvBn, self).__init__()
self.layers = []
self.layers += [('conv',L.Convolution2D(in_channels=in_channels, out_channels=out_channels,ksize=kernel_size,stride=stride,pad=padding,groups=groups, nobias=True))]
self.layers += [('bn',BatchNormalization(out_channels))]
self.layers += [('_relu',ReLU())]
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
return x
class RepVGGplusBlock(chainer.Chain):
def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, groups=1, deploy=False, use_post_se=False):
super(RepVGGplusBlock, self).__init__()
self.deploy = deploy
self.groups = groups
self.in_channels = in_channels
assert kernel_size == 3
assert padding == 1
self.deploy = deploy
self.layers = []
if self.deploy:
self.layers += [('rbr_reparam',L.Convolution2D(in_channels=in_channels, out_channels=out_channels, ksize=kernel_size, stride=stride, pad=padding, groups=groups, nobias=False))]
self.layers += [('_relu',ReLU())]
if use_post_se:
self.layers += [('post_se',SEBlock(out_channels, internal_neurons=out_channels // 4))]
self.stage_out_channels = out_channels
else:
if out_channels == in_channels and stride == 1:
self.layers += [('@rbr_identity',BatchNormalization(out_channels))]
self.layers += [('@rbr_dense',ConvBn(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, groups=groups))]
self.layers += [('@rbr_1x1',ConvBn(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=stride, padding=(padding - kernel_size // 2), groups=groups))]
self.layers += [('_relu',ReLU())]
if use_post_se:
self.layers += [('post_se',SEBlock(out_channels, internal_neurons=out_channels // 4))]
self.stage_out_channels = out_channels
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, inputs):
x = inputs
if self.deploy:
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
else:
x = None
for n, f in self.layers:
if not n.startswith('_'):
if n.startswith('@'):
if x is None:
x = getattr(self, n)(inputs)
else:
x += getattr(self, n)(inputs)
else:
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
return x
class RepVGGplusStage(chainer.Chain):
def __init__(self, stage_name,in_planes, planes, num_blocks, stride, use_post_se=False, deploy=False):
super(RepVGGplusStage, self).__init__()
strides = [stride] + [1] * (num_blocks - 1)
self.in_planes = in_planes
self.layers = []
layers_num=0
for stride in strides:
layers_num += 1
cur_groups = 1
self.layers += [('{0}_block{1}'.format(stage_name,layers_num),RepVGGplusBlock(in_channels=self.in_planes, out_channels=planes, kernel_size=3, stride=stride, padding=1, groups=cur_groups, deploy=deploy, use_post_se=use_post_se))]
self.in_planes = planes
self.stage_out_channels = planes
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
return x
class AuxForStage(chainer.Chain):
def __init__(self,stage_out_channels,num_classes,batch_size,output_size):
super(AuxForStage, self).__init__()
self.layers = []
self.layers += [('downsample1',ConvBnRelu(in_channels=stage_out_channels, out_channels=stage_out_channels, kernel_size=3, stride=2, padding=1))]
self.layers += [('_avgpool',AveragePooling2D(ksize=math.ceil(output_size / 2),stride=1,pad=0))]
self.layers += [('_reshape',Reshape((batch_size,stage_out_channels)))]
self.layers += [('fc1',L.Linear(stage_out_channels, num_classes))]
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def __call__(self, x):
for n, f in self.layers:
if not n.startswith('_'):
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
return x
class RepVGG_Plus(chainer.Chain):
cfgs={
'repvggplus_l2pse':{'num_blocks':[8, 14, 24, 1],'width_multiplier':[2.5, 2.5, 2.5, 5],'override_groups_map':None,'use_se':False,'use_post_se':True},
}
def __init__(self,model_name='repvggplus_l2pse',channels=3,
num_classes=1000,batch_size=4,image_size=224,
deploy=False,
**kwargs):
super().__init__()
self.deploy = deploy
self.num_classes = num_classes
self.image_size = image_size
in_channels = min(64, int(64 * self.cfgs[model_name]['width_multiplier'][0]))
stage_channels = [int(64 * self.cfgs[model_name]['width_multiplier'][0]), int(128 * self.cfgs[model_name]['width_multiplier'][1]), int(256 * self.cfgs[model_name]['width_multiplier'][2]), int(512 * self.cfgs[model_name]['width_multiplier'][3])]
self.layers = []
self.layers += [('stage0',RepVGGplusBlock(in_channels=channels, out_channels=in_channels, kernel_size=3, stride=2, padding=1, deploy=self.deploy, use_post_se=self.cfgs[model_name]['use_post_se']))]
output_size = int((self.image_size-3+2*((3-1)//2))/2+1)
stage1 = RepVGGplusStage('stage1',in_channels, stage_channels[0], self.cfgs[model_name]['num_blocks'][0], stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy)
output_size = math.ceil(output_size / 2)
self.layers += [('stage1',stage1)]
self.layers += [('stage1_aux',AuxForStage(stage1.stage_out_channels,num_classes,batch_size,output_size))]
stage2 = RepVGGplusStage('stage2',stage_channels[0], stage_channels[1], self.cfgs[model_name]['num_blocks'][1], stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy)
output_size = math.ceil(output_size / 2)
self.layers += [('stage2',stage2)]
self.layers += [('stage2_aux',AuxForStage(stage2.stage_out_channels,num_classes,batch_size,output_size))]
stage3_first = RepVGGplusStage('stage3_first',stage_channels[1], stage_channels[2], self.cfgs[model_name]['num_blocks'][2] // 2, stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy)
output_size = math.ceil(output_size / 2)
self.layers += [('stage3_first',stage3_first)]
self.layers += [('stage3_first_aux',AuxForStage(stage3_first.stage_out_channels,num_classes,batch_size,output_size))]
self.layers += [('stage3_second',RepVGGplusStage('stage3_second',stage_channels[2], stage_channels[2], self.cfgs[model_name]['num_blocks'][2] - self.cfgs[model_name]['num_blocks'][2] // 2, stride=1, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy))]
self.layers += [('stage4',RepVGGplusStage('stage4',stage_channels[2], stage_channels[3], self.cfgs[model_name]['num_blocks'][3], stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy))]
output_size = math.ceil(output_size / 2)
self.layers += [('_avgpool',AveragePooling2D(ksize=output_size,stride=1,pad=0))]
self.layers += [('_reshape',Reshape((batch_size,int(512 * self.cfgs[model_name]['width_multiplier'][3]))))]
self.layers += [('fc',L.Linear(int(512 * self.cfgs[model_name]['width_multiplier'][3]), num_classes))]
with self.init_scope():
for n in self.layers:
if not n[0].startswith('_'):
setattr(self, n[0], n[1])
def forward(self, x):
aux=[]
for n, f in self.layers:
origin_size = x.shape
if not n.startswith('_'):
if chainer.config.train:
if 'aux' in n:
aux_x = getattr(self, n)(x)
aux.append(aux_x)
else:
x = getattr(self, n)(x)
else:
if 'aux' not in n:
x = getattr(self, n)(x)
else:
x = f.apply((x,))[0]
if chainer.config.train == False and 'aux' in n:
continue
print(n,origin_size,x.shape)
if chainer.config.train:
return x,aux[0],aux[1],aux[2]
return F.softmax(x)
注意此类就是RepVGG_Plus的实现过程,注意网络的前向传播过程中,分了训练以及测试。
训练过程中直接返回x,测试过程中会进入softmax得出概率
调用方式
if __name__ == '__main__':
batch_size = 4
n_channels = 3
image_size = 224
num_classes = 123
model = RepVGG_Plus(num_classes=num_classes, channels=n_channels,image_size=image_size,batch_size=batch_size)
print("参数量",model.count_params())
x = np.random.rand(batch_size, n_channels, image_size, image_size).astype(np.float32)
t = np.random.randint(0, num_classes, size=(batch_size,)).astype(np.int32)
with chainer.using_config('train', True):
y1,y2,y3,y4 = model(x)
loss1 = F.softmax_cross_entropy(y1, t)
loss2 = F.softmax_cross_entropy(y2, t)
loss3 = F.softmax_cross_entropy(y3, t)
loss4 = F.softmax_cross_entropy(y4, t)
print(loss1.data,loss2.data,loss3.data)
注意。带aux就是辅助分类器,与googlenet类似
版权声明:本文为ctu_sue原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。