前面把模型网络以及Loss 部分都写了,这篇就主要把之前的部分都串起来,看下mmdetection的训练PipeLine是怎样的。
Train
mmdetection
具体发起训练使用了
mmcv
中的
Runner
,这就不赘述了,我们直接看
mmdet/models/detectors/cascade_rcnn.py
中的
forward_train()
,还是分成3个部分来看
rpn
def forward_train(self,
img,
img_meta,
gt_bboxes,
gt_labels,
gt_bboxes_ignore=None,
gt_masks=None,
proposals=None):
x = self.extract_feat(img)
losses = dict()
if self.with_rpn:
rpn_outs = self.rpn_head(x)
rpn_loss_inputs = rpn_outs + (gt_bboxes, img_meta,
self.train_cfg.rpn)
rpn_losses = self.rpn_head.loss(
*rpn_loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
losses.update(rpn_losses)
proposal_cfg = self.train_cfg.get('rpn_proposal',
self.test_cfg.rpn)
proposal_inputs = rpn_outs + (img_meta, proposal_cfg)
proposal_list = self.rpn_head.get_bboxes(*proposal_inputs)
else:
proposal_list = proposals
在这里可以看到,首先对输入图像提取特征,然后将特征送到
rpn_head
,之后计算
rpn_loss
然后通过
get_bboxes()
取到
proposal_list
。
这里的大部分内容在前面的blog中都有写,这次主要就看
get_bboxes()
这个函数。
在之前
loss
部分我们有提到
rpn_head
的输出并不是每个
anchor
的分类和回归结果,然后在
loss()
中通过
assign
和
sampler
计算得到了
target
。
而
RPN
的作用是用来提取候选框,而候选框就是通过
get_bboxes()
得到的。
这里我们就来一步一步看看
get_bbox
都干了些什么
首先
get_bboxes
定义在
mmdet/models/anchor_heads/anchor_head.py
def get_bboxes(self, cls_scores, bbox_preds, img_metas, cfg,
rescale=False):
assert len(cls_scores) == len(bbox_preds)
num_levels = len(cls_scores)
mlvl_anchors = [
self.anchor_generators[i].grid_anchors(cls_scores[i].size()[-2:],
self.anchor_strides[i])
for i in range(num_levels)
]
result_list = []
for img_id in range(len(img_metas)):
cls_score_list = [
cls_scores[i][img_id].detach() for i in range(num_levels)
]
bbox_pred_list = [
bbox_preds[i][img_id].detach() for i in range(num_levels)
]
img_shape = img_metas[img_id]['img_shape']
scale_factor = img_metas[img_id]['scale_factor']
proposals = self.get_bboxes_single(cls_score_list, bbox_pred_list,
mlvl_anchors, img_shape,
scale_factor, cfg, rescale)
result_list.append(proposals)
return result_list
def get_bboxes_single(self,
cls_scores,
bbox_preds,
mlvl_anchors,
img_shape,
scale_factor,
cfg,
rescale=False):
assert len(cls_scores) == len(bbox_preds) == len(mlvl_anchors)
mlvl_bboxes = []
mlvl_scores = []
for cls_score, bbox_pred, anchors in zip(cls_scores, bbox_preds,
mlvl_anchors):
assert cls_score.size()[-2:] == bbox_pred.size()[-2:]
cls_score = cls_score.permute(1, 2,
0).reshape(-1, self.cls_out_channels)
if self.use_sigmoid_cls:
scores = cls_score.sigmoid()
else:
scores = cls_score.softmax(-1)
bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, 4)
nms_pre = cfg.get('nms_pre', -1)
if nms_pre > 0 and scores.shape[0] > nms_pre:
if self.use_sigmoid_cls:
max_scores, _ = scores.max(dim=1)
else:
max_scores, _ = scores[:, 1:].max(dim=1)
_, topk_inds = max_scores.topk(nms_pre)
anchors = anchors[topk_inds, :]
bbox_pred = bbox_pred[topk_inds, :]
scores = scores[topk_inds, :]
bboxes = delta2bbox(anchors, bbox_pred, self.target_means,
self.target_stds, img_shape)
mlvl_bboxes.append(bboxes)
mlvl_scores.append(scores)
mlvl_bboxes = torch.cat(mlvl_bboxes)
if rescale:
mlvl_bboxes /= mlvl_bboxes.new_tensor(scale_factor)
mlvl_scores = torch.cat(mlvl_scores)
if self.use_sigmoid_cls:
padding = mlvl_scores.new_zeros(mlvl_scores.shape[0], 1)
mlvl_scores = torch.cat([padding, mlvl_scores], dim=1)
det_bboxes, det_labels = multiclass_nms(mlvl_bboxes, mlvl_scores,
cfg.score_thr, cfg.nms,
cfg.max_per_img)
return det_bboxes, det_labels
先获取
anchors
然后通过
get_bboxes_single
来获取候选框,主要有两个比较重要的操作,按照
score
排序取最大的
k
个
boxes
,取到
topk
个
bboxes
过后通过
delta2bbox()
转换成
(x1,y1,x2,y2)
的格式。
之后就是
nms
了,
multiclass_nms()
定义在
mmdet/core/post_processing/bbox_nms.py
def multiclass_nms(multi_bboxes,
multi_scores,
score_thr,
nms_cfg,
max_num=-1,
score_factors=None):
"""NMS for multi-class bboxes.
Args:
multi_bboxes (Tensor): shape (n, #class*4) or (n, 4)
multi_scores (Tensor): shape (n, #class)
score_thr (float): bbox threshold, bboxes with scores lower than it
will not be considered.
nms_thr (float): NMS IoU threshold
max_num (int): if there are more than max_num bboxes after NMS,
only top max_num will be kept.
score_factors (Tensor): The factors multiplied to scores before
applying NMS
Returns:
tuple: (bboxes, labels), tensors of shape (k, 5) and (k, 1). Labels
are 0-based.
"""
num_classes = multi_scores.shape[1]
bboxes, labels = [], []
nms_cfg_ = nms_cfg.copy()
nms_type = nms_cfg_.pop('type', 'nms')
nms_op = getattr(nms_wrapper, nms_type)
for i in range(1, num_classes):
cls_inds = multi_scores[:, i] > score_thr
if not cls_inds.any():
continue
# get bboxes and scores of this class
if multi_bboxes.shape[1] == 4:
_bboxes = multi_bboxes[cls_inds, :]
else:
_bboxes = multi_bboxes[cls_inds, i * 4:(i + 1) * 4]
_scores = multi_scores[cls_inds, i]
if score_factors is not None:
_scores *= score_factors[cls_inds]
cls_dets = torch.cat([_bboxes, _scores[:, None]], dim=1)
cls_dets, _ = nms_op(cls_dets, **nms_cfg_)
cls_labels = multi_bboxes.new_full(
(cls_dets.shape[0], ), i - 1, dtype=torch.long)
bboxes.append(cls_dets)
labels.append(cls_labels)
if bboxes:
bboxes = torch.cat(bboxes)
labels = torch.cat(labels)
if bboxes.shape[0] > max_num:
_, inds = bboxes[:, -1].sort(descending=True)
inds = inds[:max_num]
bboxes = bboxes[inds]
labels = labels[inds]
else:
bboxes = multi_bboxes.new_zeros((0, 5))
labels = multi_bboxes.new_zeros((0, ), dtype=torch.long)
return bboxes, labels
nms
的结果就是候选框了,就可以送到
bbox head
了。
bbox head
for i in range(self.num_stages):
self.current_stage = i
rcnn_train_cfg = self.train_cfg.rcnn[i]
lw = self.train_cfg.stage_loss_weights[i]
# assign gts and sample proposals
sampling_results = []
if self.with_bbox or self.with_mask:
bbox_assigner = build_assigner(rcnn_train_cfg.assigner)
bbox_sampler = build_sampler(
rcnn_train_cfg.sampler, context=self)
num_imgs = img.size(0)
if gt_bboxes_ignore is None:
gt_bboxes_ignore = [None for _ in range(num_imgs)]
for j in range(num_imgs):
assign_result = bbox_assigner.assign(
proposal_list[j], gt_bboxes[j], gt_bboxes_ignore[j],
gt_labels[j])
sampling_result = bbox_sampler.sample(
assign_result,
proposal_list[j],
gt_bboxes[j],
gt_labels[j],
feats=[lvl_feat[j][None] for lvl_feat in x])
sampling_results.append(sampling_result)
# bbox head forward and loss
bbox_roi_extractor = self.bbox_roi_extractor[i]
bbox_head = self.bbox_head[i]
rois = bbox2roi([res.bboxes for res in sampling_results])
bbox_feats = bbox_roi_extractor(x[:bbox_roi_extractor.num_inputs],
rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = bbox_head(bbox_feats)
bbox_targets = bbox_head.get_target(sampling_results, gt_bboxes,
gt_labels, rcnn_train_cfg)
loss_bbox = bbox_head.loss(cls_score, bbox_pred, *bbox_targets)
for name, value in loss_bbox.items():
losses['s{}.{}'.format(i, name)] = (
value * lw if 'loss' in name else value)
这里的输入是之前生成的候选框,但是这些候选框还不能直接送到网络,需要先通过
assign
和
sampler
得到用于训练的候选框和对应的
target
,之后再通过
roi
提取一个固定大小的
feat
就可以送到网络训练了。
mask head
# mask head forward and loss
if self.with_mask:
if not self.share_roi_extractor:
mask_roi_extractor = self.mask_roi_extractor[i]
pos_rois = bbox2roi(
[res.pos_bboxes for res in sampling_results])
mask_feats = mask_roi_extractor(
x[:mask_roi_extractor.num_inputs], pos_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
else:
# reuse positive bbox feats
pos_inds = []
device = bbox_feats.device
for res in sampling_results:
pos_inds.append(
torch.ones(
res.pos_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds.append(
torch.zeros(
res.neg_bboxes.shape[0],
device=device,
dtype=torch.uint8))
pos_inds = torch.cat(pos_inds)
mask_feats = bbox_feats[pos_inds]
mask_head = self.mask_head[i]
mask_pred = mask_head(mask_feats)
mask_targets = mask_head.get_target(sampling_results, gt_masks,
rcnn_train_cfg)
pos_labels = torch.cat(
[res.pos_gt_labels for res in sampling_results])
loss_mask = mask_head.loss(mask_pred, mask_targets, pos_labels)
for name, value in loss_mask.items():
losses['s{}.{}'.format(i, name)] = (
value * lw if 'loss' in name else value)
mask
部分和
bbox
部分基本也一样,也是先获取到
feats
以及
target
然后送到网络训练。
Test
做预测时创建模型和网络的方式和之前训练时一样,就跳过直接看预测时用的函数了。
mmdet/models/detectors/cascade_rcnn.py
的
simple_test
,
def simple_test(self, img, img_meta, proposals=None, rescale=False):
x = self.extract_feat(img)
proposal_list = self.simple_test_rpn(
x, img_meta, self.test_cfg.rpn) if proposals is None else proposals
img_shape = img_meta[0]['img_shape']
ori_shape = img_meta[0]['ori_shape']
scale_factor = img_meta[0]['scale_factor']
# "ms" in variable names means multi-stage
ms_bbox_result = {}
ms_segm_result = {}
ms_scores = []
rcnn_test_cfg = self.test_cfg.rcnn
rois = bbox2roi(proposal_list)
for i in range(self.num_stages):
bbox_roi_extractor = self.bbox_roi_extractor[i]
bbox_head = self.bbox_head[i]
bbox_feats = bbox_roi_extractor(
x[:len(bbox_roi_extractor.featmap_strides)], rois)
if self.with_shared_head:
bbox_feats = self.shared_head(bbox_feats)
cls_score, bbox_pred = bbox_head(bbox_feats)
ms_scores.append(cls_score)
if self.test_cfg.keep_all_stages:
det_bboxes, det_labels = bbox_head.get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
bbox_result = bbox2result(det_bboxes, det_labels,
bbox_head.num_classes)
ms_bbox_result['stage{}'.format(i)] = bbox_result
if self.with_mask:
mask_roi_extractor = self.mask_roi_extractor[i]
mask_head = self.mask_head[i]
if det_bboxes.shape[0] == 0:
segm_result = [
[] for _ in range(mask_head.num_classes - 1)
]
else:
_bboxes = (
det_bboxes[:, :4] * scale_factor
if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
mask_feats = mask_roi_extractor(
x[:len(mask_roi_extractor.featmap_strides)],
mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats, i)
mask_pred = mask_head(mask_feats)
segm_result = mask_head.get_seg_masks(
mask_pred, _bboxes, det_labels, rcnn_test_cfg,
ori_shape, scale_factor, rescale)
ms_segm_result['stage{}'.format(i)] = segm_result
if i < self.num_stages - 1:
bbox_label = cls_score.argmax(dim=1)
rois = bbox_head.regress_by_class(rois, bbox_label, bbox_pred,
img_meta[0])
cls_score = sum(ms_scores) / self.num_stages
det_bboxes, det_labels = self.bbox_head[-1].get_det_bboxes(
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=rescale,
cfg=rcnn_test_cfg)
bbox_result = bbox2result(det_bboxes, det_labels,
self.bbox_head[-1].num_classes)
ms_bbox_result['ensemble'] = bbox_result
if self.with_mask:
if det_bboxes.shape[0] == 0:
segm_result = [
[] for _ in range(self.mask_head[-1].num_classes - 1)
]
else:
_bboxes = (
det_bboxes[:, :4] * scale_factor
if rescale else det_bboxes)
mask_rois = bbox2roi([_bboxes])
aug_masks = []
for i in range(self.num_stages):
mask_roi_extractor = self.mask_roi_extractor[i]
mask_feats = mask_roi_extractor(
x[:len(mask_roi_extractor.featmap_strides)], mask_rois)
if self.with_shared_head:
mask_feats = self.shared_head(mask_feats)
mask_pred = self.mask_head[i](mask_feats)
aug_masks.append(mask_pred.sigmoid().cpu().numpy())
merged_masks = merge_aug_masks(aug_masks,
[img_meta] * self.num_stages,
self.test_cfg.rcnn)
segm_result = self.mask_head[-1].get_seg_masks(
merged_masks, _bboxes, det_labels, rcnn_test_cfg,
ori_shape, scale_factor, rescale)
ms_segm_result['ensemble'] = segm_result
if not self.test_cfg.keep_all_stages:
if self.with_mask:
results = (ms_bbox_result['ensemble'],
ms_segm_result['ensemble'])
else:
results = ms_bbox_result['ensemble']
else:
if self.with_mask:
results = {
stage: (ms_bbox_result[stage], ms_segm_result[stage])
for stage in ms_bbox_result
}
else:
results = ms_bbox_result
return results
整体的流程和训练差不多,也不赘述了。这里有个
keep_all_stages
的参数,用来区别是否保存所有
stage
的结果,但是无论是否保存这个结果,它的结果都会对算最终的
boxes
和
mask
有贡献,因为这里将每个
stage
的
cls_score
求了平均值,并用这个平均值来获取最后的框。通过
get_det_bboxes()
定义在
mmdet/models/bbox_head/bbox_head.py
def get_det_bboxes(self,
rois,
cls_score,
bbox_pred,
img_shape,
scale_factor,
rescale=False,
cfg=None):
if isinstance(cls_score, list):
cls_score = sum(cls_score) / float(len(cls_score))
scores = F.softmax(cls_score, dim=1) if cls_score is not None else None
if bbox_pred is not None:
bboxes = delta2bbox(rois[:, 1:], bbox_pred, self.target_means,
self.target_stds, img_shape)
else:
bboxes = rois[:, 1:]
# TODO: add clip here
if rescale:
bboxes /= scale_factor
if cfg is None:
return bboxes, scores
else:
det_bboxes, det_labels = multiclass_nms(bboxes, scores,
cfg.score_thr, cfg.nms,
cfg.max_per_img)
return det_bboxes, det_labels
先做一次
softmax
得到每个候选框属于哪一类的概率,然后再根据这个概率来做
nms
得到最后的检测结果。之后就是
mask
分支,用到了一个
merge_aug_masks()
来融合多个阶段的分割结果。定义在
mmdet/core/post_processng/merge_augs.py
def merge_aug_masks(aug_masks, img_metas, rcnn_test_cfg, weights=None):
"""Merge augmented mask prediction.
Args:
aug_masks (list[ndarray]): shape (n, #class, h, w)
img_shapes (list[ndarray]): shape (3, ).
rcnn_test_cfg (dict): rcnn test config.
Returns:
tuple: (bboxes, scores)
"""
recovered_masks = [
mask if not img_info[0]['flip'] else mask[..., ::-1]
for mask, img_info in zip(aug_masks, img_metas)
]
if weights is None:
merged_masks = np.mean(recovered_masks, axis=0)
else:
merged_masks = np.average(
np.array(recovered_masks), axis=0, weights=np.array(weights))
return merged_masks
具体的做法就是对多个
stage
的结果取平均或者加权平均。之后调用了
mmdet/models/mask_heads/fcn_mask_head.py
的
get_seg_masks()
来生成
mask map
并且编码成
rle
def get_seg_masks(self, mask_pred, det_bboxes, det_labels, rcnn_test_cfg,
ori_shape, scale_factor, rescale):
"""Get segmentation masks from mask_pred and bboxes.
Args:
mask_pred (Tensor or ndarray): shape (n, #class+1, h, w).
For single-scale testing, mask_pred is the direct output of
model, whose type is Tensor, while for multi-scale testing,
it will be converted to numpy array outside of this method.
det_bboxes (Tensor): shape (n, 4/5)
det_labels (Tensor): shape (n, )
img_shape (Tensor): shape (3, )
rcnn_test_cfg (dict): rcnn testing config
ori_shape: original image size
Returns:
list[list]: encoded masks
"""
if isinstance(mask_pred, torch.Tensor):
mask_pred = mask_pred.sigmoid().cpu().numpy()
assert isinstance(mask_pred, np.ndarray)
cls_segms = [[] for _ in range(self.num_classes - 1)]
bboxes = det_bboxes.cpu().numpy()[:, :4]
labels = det_labels.cpu().numpy() + 1
if rescale:
img_h, img_w = ori_shape[:2]
else:
img_h = np.round(ori_shape[0] * scale_factor).astype(np.int32)
img_w = np.round(ori_shape[1] * scale_factor).astype(np.int32)
scale_factor = 1.0
for i in range(bboxes.shape[0]):
bbox = (bboxes[i, :] / scale_factor).astype(np.int32)
label = labels[i]
w = max(bbox[2] - bbox[0] + 1, 1)
h = max(bbox[3] - bbox[1] + 1, 1)
if not self.class_agnostic:
mask_pred_ = mask_pred[i, label, :, :]
else:
mask_pred_ = mask_pred[i, 0, :, :]
im_mask = np.zeros((img_h, img_w), dtype=np.uint8)
bbox_mask = mmcv.imresize(mask_pred_, (w, h))
bbox_mask = (bbox_mask > rcnn_test_cfg.mask_thr_binary).astype(
np.uint8)
im_mask[bbox[1]:bbox[1] + h, bbox[0]:bbox[0] + w] = bbox_mask
rle = mask_util.encode(
np.array(im_mask[:, :, np.newaxis], order='F'))[0]
cls_segms[label - 1].append(rle)
return cls_segms
总结
本来Train 和Test 是准备分成两篇来写的,后来写完发现Test部分东西有点少,因为用到的东西之前基本都写过了,所以索性就把两篇合成一篇了。
写这部分的时候我只关注了
forward
相关的部分,其余的怎么构建训练流程这部分我都略过了,但是这部分的代码其实写得特别优雅,用到了很多自己写代码的时候很少会用的东西,比如
register
和
hook
,非常值得一看。
到这里,基本就把
mmdetection
大体过了一遍,之后的就是遇到具体的细节再单独写了。