chainer-骨干网络backbone-RepVGG_Plus代码重构【附源码】

  • Post author:
  • Post category:其他





前言

本文基于chainer实现RepVGG_Plus网络结构,并基于torch的结构方式构建chainer版的,并计算RepVGG_Plus的参数量。




代码实现


class SEBlock(chainer.Chain):
    def __init__(self, input_channels, internal_neurons):
        super(SEBlock, self).__init__()
        self.layers = []

        self.layers += [('down',L.Convolution2D(in_channels=input_channels,out_channels=internal_neurons,ksize=1,stride=1,nobias=True))]
        self.layers += [('_relu',ReLU())]
        self.layers += [('up',L.Convolution2D(in_channels=internal_neurons,out_channels=input_channels,ksize=1,stride=1,nobias=True))]
        self.layers += [('_sigmoid',Sigmoid())]
        self.input_channels = input_channels
        
        with self.init_scope():
            for n in self.layers:
                if not n[0].startswith('_'):
                    setattr(self, n[0], n[1])

    def forward(self, inputs):
        x = average_pooling_2d(inputs, ksize=inputs.shape[2])
        
        for n, f in self.layers:
            if not n.startswith('_'):
                x = getattr(self, n)(x)
            else:
                x = f.apply((x,))[0]
        
        x = x.reshape(-1, self.input_channels, 1, 1)
        
        return inputs * x

class ConvBnRelu(chainer.Chain):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, groups=1) :
        super(ConvBnRelu, self).__init__()
        self.layers = []
        self.layers += [('conv',L.Convolution2D(in_channels=in_channels, out_channels=out_channels,ksize=kernel_size,stride=stride,pad=padding,groups=groups, nobias=True))]
        self.layers += [('bn',BatchNormalization(out_channels))]
        self.layers += [('_relu',ReLU())]
        
        with self.init_scope():
            for n in self.layers:
                if not n[0].startswith('_'):
                    setattr(self, n[0], n[1])
    
    def forward(self, x):
        for n, f in self.layers:
            if not n.startswith('_'):
                x = getattr(self, n)(x)
            else:
                x = f.apply((x,))[0]
        return x

class ConvBn(chainer.Chain):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, groups=1) :
        super(ConvBn, self).__init__()
        self.layers = []
        self.layers += [('conv',L.Convolution2D(in_channels=in_channels, out_channels=out_channels,ksize=kernel_size,stride=stride,pad=padding,groups=groups, nobias=True))]
        self.layers += [('bn',BatchNormalization(out_channels))]
        self.layers += [('_relu',ReLU())]
        
        with self.init_scope():
            for n in self.layers:
                if not n[0].startswith('_'):
                    setattr(self, n[0], n[1])
    
    def forward(self, x):
        for n, f in self.layers:
            if not n.startswith('_'):
                x = getattr(self, n)(x)
            else:
                x = f.apply((x,))[0]
        return x 

class RepVGGplusBlock(chainer.Chain):
    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, groups=1, deploy=False, use_post_se=False):
        super(RepVGGplusBlock, self).__init__()
        self.deploy = deploy
        self.groups = groups
        self.in_channels = in_channels

        assert kernel_size == 3
        assert padding == 1

        self.deploy = deploy
        self.layers = []
        if self.deploy:
            self.layers += [('rbr_reparam',L.Convolution2D(in_channels=in_channels, out_channels=out_channels, ksize=kernel_size, stride=stride, pad=padding, groups=groups, nobias=False))]
            self.layers += [('_relu',ReLU())]
            if use_post_se:
                self.layers += [('post_se',SEBlock(out_channels, internal_neurons=out_channels // 4))]
            self.stage_out_channels = out_channels
        else:
            if out_channels == in_channels and stride == 1:
                self.layers += [('@rbr_identity',BatchNormalization(out_channels))]
            self.layers += [('@rbr_dense',ConvBn(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, groups=groups))]
            self.layers += [('@rbr_1x1',ConvBn(in_channels=in_channels, out_channels=out_channels, kernel_size=1, stride=stride, padding=(padding - kernel_size // 2), groups=groups))]
            self.layers += [('_relu',ReLU())]
            if use_post_se:
                self.layers += [('post_se',SEBlock(out_channels, internal_neurons=out_channels // 4))]
            self.stage_out_channels = out_channels
            
        with self.init_scope():
            for n in self.layers:
                if not n[0].startswith('_'):
                    setattr(self, n[0], n[1])
                    
    def forward(self, inputs):
        x = inputs
        if self.deploy:
            for n, f in self.layers:
                if not n.startswith('_'):
                    x = getattr(self, n)(x)
                else:
                    x = f.apply((x,))[0]
        else:
            x = None
            for n, f in self.layers:
                if not n.startswith('_'):
                    if n.startswith('@'):
                        if x is None:
                            x = getattr(self, n)(inputs)
                        else:
                            x += getattr(self, n)(inputs)
                    else:
                        x = getattr(self, n)(x)
                else:
                    x = f.apply((x,))[0]
        return x
    
class RepVGGplusStage(chainer.Chain):
    def __init__(self, stage_name,in_planes, planes, num_blocks, stride, use_post_se=False, deploy=False):
        super(RepVGGplusStage, self).__init__()
        strides = [stride] + [1] * (num_blocks - 1)
        self.in_planes = in_planes
        self.layers = []
        layers_num=0
        for stride in strides:
            layers_num += 1
            cur_groups = 1
            self.layers += [('{0}_block{1}'.format(stage_name,layers_num),RepVGGplusBlock(in_channels=self.in_planes, out_channels=planes, kernel_size=3, stride=stride, padding=1, groups=cur_groups, deploy=deploy, use_post_se=use_post_se))]
            self.in_planes = planes
            self.stage_out_channels = planes
            
        with self.init_scope():
            for n in self.layers:
                if not n[0].startswith('_'):
                    setattr(self, n[0], n[1])

    def forward(self, x):
        for n, f in self.layers:
            if not n.startswith('_'):
                x = getattr(self, n)(x)
            else:
                x = f.apply((x,))[0]
        return x

class AuxForStage(chainer.Chain):
    def __init__(self,stage_out_channels,num_classes,batch_size,output_size):
        super(AuxForStage, self).__init__()
        self.layers = []
        self.layers += [('downsample1',ConvBnRelu(in_channels=stage_out_channels, out_channels=stage_out_channels, kernel_size=3, stride=2, padding=1))]
        self.layers += [('_avgpool',AveragePooling2D(ksize=math.ceil(output_size / 2),stride=1,pad=0))]
        self.layers += [('_reshape',Reshape((batch_size,stage_out_channels)))]
        self.layers += [('fc1',L.Linear(stage_out_channels, num_classes))]

        with self.init_scope():
            for n in self.layers:
                if not n[0].startswith('_'):
                    setattr(self, n[0], n[1])
                    
    def __call__(self, x):
        for n, f in self.layers:
            if not n.startswith('_'):
                x = getattr(self, n)(x)
            else:
                x = f.apply((x,))[0]
        return x

class RepVGG_Plus(chainer.Chain):
    cfgs={
        'repvggplus_l2pse':{'num_blocks':[8, 14, 24, 1],'width_multiplier':[2.5, 2.5, 2.5, 5],'override_groups_map':None,'use_se':False,'use_post_se':True},
    }
    def __init__(self,model_name='repvggplus_l2pse',channels=3,
                 num_classes=1000,batch_size=4,image_size=224,
                 deploy=False,
                 **kwargs):
        super().__init__()

        self.deploy = deploy
        self.num_classes = num_classes
        self.image_size = image_size

        in_channels = min(64, int(64 * self.cfgs[model_name]['width_multiplier'][0]))
        stage_channels = [int(64 * self.cfgs[model_name]['width_multiplier'][0]), int(128 * self.cfgs[model_name]['width_multiplier'][1]), int(256 * self.cfgs[model_name]['width_multiplier'][2]), int(512 * self.cfgs[model_name]['width_multiplier'][3])]
        self.layers = []
        self.layers += [('stage0',RepVGGplusBlock(in_channels=channels, out_channels=in_channels, kernel_size=3, stride=2, padding=1, deploy=self.deploy, use_post_se=self.cfgs[model_name]['use_post_se']))]
        output_size = int((self.image_size-3+2*((3-1)//2))/2+1)
        
        stage1 = RepVGGplusStage('stage1',in_channels, stage_channels[0], self.cfgs[model_name]['num_blocks'][0], stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy)
        output_size = math.ceil(output_size / 2)
        self.layers += [('stage1',stage1)]
        self.layers += [('stage1_aux',AuxForStage(stage1.stage_out_channels,num_classes,batch_size,output_size))]
        
        stage2 = RepVGGplusStage('stage2',stage_channels[0], stage_channels[1], self.cfgs[model_name]['num_blocks'][1], stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy)
        output_size = math.ceil(output_size / 2)
        self.layers += [('stage2',stage2)]
        self.layers += [('stage2_aux',AuxForStage(stage2.stage_out_channels,num_classes,batch_size,output_size))]
        
        stage3_first = RepVGGplusStage('stage3_first',stage_channels[1], stage_channels[2], self.cfgs[model_name]['num_blocks'][2] // 2, stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy)
        output_size = math.ceil(output_size / 2)
        self.layers += [('stage3_first',stage3_first)]
        self.layers += [('stage3_first_aux',AuxForStage(stage3_first.stage_out_channels,num_classes,batch_size,output_size))]
        
        self.layers += [('stage3_second',RepVGGplusStage('stage3_second',stage_channels[2], stage_channels[2], self.cfgs[model_name]['num_blocks'][2] - self.cfgs[model_name]['num_blocks'][2] // 2, stride=1, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy))]
        self.layers += [('stage4',RepVGGplusStage('stage4',stage_channels[2], stage_channels[3], self.cfgs[model_name]['num_blocks'][3], stride=2, use_post_se=self.cfgs[model_name]['use_post_se'], deploy=deploy))]
        output_size = math.ceil(output_size / 2)
        
        self.layers += [('_avgpool',AveragePooling2D(ksize=output_size,stride=1,pad=0))]
        self.layers += [('_reshape',Reshape((batch_size,int(512 * self.cfgs[model_name]['width_multiplier'][3]))))]
        self.layers += [('fc',L.Linear(int(512 * self.cfgs[model_name]['width_multiplier'][3]), num_classes))]

        with self.init_scope():
            for n in self.layers:
                if not n[0].startswith('_'):
                    setattr(self, n[0], n[1])
                    
    def forward(self, x):
        aux=[]
        for n, f in self.layers:
            origin_size = x.shape
            if not n.startswith('_'):
                if chainer.config.train:
                    if 'aux' in n:
                        aux_x = getattr(self, n)(x)
                        aux.append(aux_x)
                    else:
                        x = getattr(self, n)(x)
                else:
                    if 'aux' not in n:
                        x = getattr(self, n)(x)
            else:
                x = f.apply((x,))[0]
                
            if chainer.config.train == False and 'aux' in n:
                continue
            print(n,origin_size,x.shape)
            
        if chainer.config.train:
            return x,aux[0],aux[1],aux[2]
        return F.softmax(x)

注意此类就是RepVGG_Plus的实现过程,注意网络的前向传播过程中,分了训练以及测试。

训练过程中直接返回x,测试过程中会进入softmax得出概率



调用方式


if __name__ == '__main__':
    batch_size = 4
    n_channels = 3
    image_size = 224
    num_classes = 123
    
    model = RepVGG_Plus(num_classes=num_classes, channels=n_channels,image_size=image_size,batch_size=batch_size)
    print("参数量",model.count_params())

    x = np.random.rand(batch_size, n_channels, image_size, image_size).astype(np.float32)
    t = np.random.randint(0, num_classes, size=(batch_size,)).astype(np.int32)
    with chainer.using_config('train', True):
        y1,y2,y3,y4 = model(x)
    loss1 = F.softmax_cross_entropy(y1, t)
    loss2 = F.softmax_cross_entropy(y2, t)
    loss3 = F.softmax_cross_entropy(y3, t)
    loss4 = F.softmax_cross_entropy(y4, t)
    print(loss1.data,loss2.data,loss3.data)

注意。带aux就是辅助分类器,与googlenet类似

在这里插入图片描述



版权声明:本文为ctu_sue原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。