1. Mean Shift
问题
:给你一堆二维平面点,以及k个固定的圆,如何放置圆,使里面包含的点数最多?
大致思想
:初始时,将一固定大小的圆放在二维平面点的任意位置上,然后计算圆内所有点的平均位置,并以此作为新的圆心,移动原始固定圆到新位置处,以此类推,直到圆不再挪动为止,此时的圆内包含的点数最多。
Mean Shift在聚类中的应用
- 任意选取一固定大小的圆,并计算圆内所有点的平均位置,以此作为新的圆心,移动原始固定圆到新位置处(其中,这里固定大小的圆的半径是一个经验值)
- 重复步骤2,直到圆不再挪动为止
- 对步骤1、2重复执行N次,由此最终可得N个圆,这N个圆可能会有重叠,对其执行NMS操作,保留包含点数最多的圆(另外,这里的N是经验值)
- 计算每个点离最后各个圆心之间的距离,每个点就属于离圆心距离最近的那个圆所在类别
Mean Shift总结:
-
复杂度
O(
T
⋅
n
⋅
l
o
g
(
n
)
)
O(T\cdot{n}\cdot{log(n)})
O
(
T
⋅
n
⋅
l
o
g
(
n
)
)
T
T
T
为最终圆的个数,
n
n
n
为点的总数,对于每个点,若以kd-tree/octree进行最近邻搜索,那么复杂度为
l
o
g
(
n
)
log(n)
l
o
g
(
n
)
-
优点:可自动确定聚类的类别数;只有一个未知参数(半径r);对噪声比较鲁棒
-
缺点:容易陷入局部极小值;聚类的好坏受到初始化的影响;对于高维数据不易处理
2. DBSCAN——漫水填充
-
任意选取一个未被访问的点
pp
p
,获取以点
pp
p
为圆心,以
rr
r
为半径的圆内的所有点 -
计算圆内所有点的个数num,判断num > min_samples是否成立
-
若成立,则将点
pp
p
记为core point,并标记为已访问,同时创建类别C,然后执行第3步 -
若不成立,则将点
pp
p
记为噪声点,并标记为已访问
-
若成立,则将点
-
访问以点
pp
p
为圆心,以
rr
r
为半径的圆内的所有点,并将其标记为类别C。另外,在访问时,若访问的当前点也属于core point,则将当前点记为new p,重复步骤3。 - 通过步骤2、3,可确定所有属于类别C的点。此时,将所有属于类别C的点移除,然后返回步骤1重复执行,直到所有点都被标记为已访问
DBSCAN复杂度:
O
(
n
⋅
l
o
g
(
n
)
)
O(n\cdot{log(n)})
O
(
n
⋅
l
o
g
(
n
)
)
:因为有
n
n
n
个点,对于每个点,都在半径为r的圆内进行搜索
l
o
g
(
n
)
log(n)
l
o
g
(
n
)
优点
:对类别的形状没有要求;可自动确定类别数;对噪声点比较鲁棒
缺点
:若两类别的点之间有部分稀疏的点存在,则可能会将其聚成一类;对于高维数据不易处理
3. 实践:地面剔除+dbscan聚类
(1)代码实现
# 文件功能:
# 1. 从数据集中加载点云数据
# 2. 从点云数据中滤除地面点云
# 3. 从剩余的点云中提取聚类
import numpy as np
import os
import struct
from sklearn import cluster, datasets, mixture
from itertools import cycle, islice
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.neighbors import KDTree
import open3d as o3d
from torch import segment_reduce
# 功能:从kitti的.bin格式点云文件中读取点云
# 输入:
# path: 文件路径
# 输出:
# 点云数组
def read_velodyne_bin(path):
'''
:param path:
:return: homography matrix of the point cloud, N*3
'''
pc_list = []
with open(path, 'rb') as f:
content = f.read()
pc_iter = struct.iter_unpack('ffff', content)
for idx, point in enumerate(pc_iter):
pc_list.append([point[0], point[1], point[2]])
return np.asarray(pc_list, dtype=np.float32)
# 功能:从点云文件中滤除地面点(ransac)
# 输入:
# data: 一帧完整点云
# 输出:
# segmengted_cloud: 删除地面点之后的点云
def ground_segmentation(data):
in_ground_points = []
out_ground_points = []
p = 0.99
iter = 100
eps = 0.02
up_dist = 0.15
num, dim = data.shape
pre_inline_num = 0
outline_ratio = 0.6
for _ in range(iter):
# 随机选取三个点 ———— 假设这三点构成一个平面
sample_points_index = np.random.choice(num, 3)
sample_points = data[sample_points_index]
# 判断三点是否共线
sample_points_01 = sample_points[0] - sample_points[1]
sample_points_12 = sample_points[1] - sample_points[2]
if sample_points_01[0] / sample_points_12[0] == sample_points_01[1] / sample_points_12[1] == sample_points_01[2] / sample_points_12[2]:
continue
# 利用向量叉乘计算法向量
normal = np.cross(sample_points_01, sample_points_12) # (3, )
# 利用平面上的点与法向量乘积为0 ==> 确定平面方程中的参数D AX + BY + CZ + D = 0 (D = -(AX + BY + CZ)) (A, B, C)为法向量 (X, Y, Z)为平面上一点
D = -normal.dot(sample_points[0])
# 计算其余点到平面上一点的向量
vector = (data - sample_points[0]) # (num, 3)
# 根据计算出来的向量,计算其余点到平面的距离 点(X0, Y0, Z0)到平面的距离公式:d = |AX0 + BY0 + CZ0 + D| / ||n||
# distances = np.abs(normal.dot(vector)) / np.linalg.norm(normal)
distances = np.abs(vector.dot(normal)) / np.linalg.norm(normal)
# 依据距离阈值
inline_index = distances <= up_dist
# 计算平面上点的个数
inline_num = np.sum(inline_index==True)
if inline_num > pre_inline_num:
pre_inline_num = inline_num # 更新平面上点的个数
iter = np.log(1-p) / np.log(1 - np.power(inline_num / num, 3)) # 更新迭代次数
# 若平面上的点达到一定的比例,则退出迭代
if inline_num / num > 1 - outline_ratio:
break
ground_points = data[inline_index]
out_ground_index = np.logical_not(inline_index)
segmengted_cloud = data[out_ground_index]
print('origin data points num:', data.shape[0])
print('ground points num:', ground_points.shape[0])
print('segmented data points num:', segmengted_cloud.shape[0])
return ground_points, segmengted_cloud
# 功能:从点云中提取聚类(dbscan)
# 输入:
# data: 点云(滤除地面后的点云)
# 输出:
# clusters_index: 一维数组,存储的是点云中每个点所属的聚类编号(参考上一章内容容易理解)
def clustering(data, eps, Minpts):
n = len(data)
unvisited = set(range(n)) # 未访问点的集合
clusters_index = np.zeros(n, dtype=int) # 最终聚类的结果,初始化为0
k = 0
# 构建kd_tree
leaf_size = 4
tree = KDTree(data,leaf_size)
# 通过kdtree的rnn搜索,快速查找所有点的eps邻域点 存储在nearest_idx中
nearest_idx = tree.query_radius(data, eps)
cores = set()
# 通过eps邻域点数量与Minpts大小比较,确定所有的核心点cores
for d in range(n):
if len(nearest_idx[d]) >= Minpts:
cores.add(d)
# 通过cores中的一核心点,确定该核心点的种子区域seed,seed中的点根据是否为核心点往外蔓延进行扩容,同一个seed区域中的点归属于同一类别
while len(cores) > 0:
old_unvisited = unvisited # 备份unvisited
p = list(cores)[np.random.choice(len(cores))] # 随机获取集合cores中的一个核心点p 先转为list,再通过索引获取
unvisited = unvisited - set([p]) # p点记为已访问
seed = [] # 当前核心点p所确定的种子区域
seed.append(p)
# 遍历种子区域seed中的点,若其中含有核心点,则对其进行扩容
while(len(seed)):
p_child = seed[0]
seed.remove(p_child)
if p_child in cores:
S = set(nearest_idx[p_child]) & unvisited # 获取核心点p_child的eps邻域中未被访问的点
seed.extend(S) # seed种子区域扩容
unvisited = unvisited - S # 更新unvisited
cluster = old_unvisited - unvisited # 上面while循环确定了当前类别的所有点,体现在前后unvisited的差异
cores = cores - cluster # 更新当前总核心点
clusters_index[list(cluster)] = k #cluster点归属于第k类
k = k + 1
noise = unvisited # while循环结束后,剩下未被访问的就是噪声点,记为类别-1
clusters_index[list(noise)] = -1
print(clusters_index)
print("生成的聚类个数:%d" %k)
return clusters_index
# 功能:显示聚类点云,每个聚类一种颜色
# 输入:
# data:点云数据(滤除地面后的点云)
# cluster_index:一维数组,存储的是点云中每个点所属的聚类编号(与上同)
def plot_clusters(data, cluster_index):
ax = plt.figure().add_subplot(111, projection = '3d')
colors = np.array(list(islice(cycle(['#377eb8', '#ff7f00', '#4daf4a',
'#f781bf', '#a65628', '#984ea3',
'#999999', '#e41a1c', '#dede00']),
int(max(cluster_index) + 1))))
colors = np.append(colors, ["#000000"])
ax.scatter(data[:, 0], data[:, 1], data[:, 2], s=2, color=colors[cluster_index])
plt.show()
# 功能:显示聚类点云,每个聚类一种颜色
# 输入:
# data:点云数据(滤除地面后的点云)
# cluster_index:一维数组,存储的是点云中每个点所属的聚类编号(与上同)
def plot_clusters2(segmented_ground, segmented_cloud, cluster_index):
"""
Visualize segmentation results using Open3D
Parameters
----------
segmented_cloud: numpy.ndarray
Segmented surrounding objects as N-by-3 numpy.ndarray
segmented_ground: numpy.ndarray
Segmented ground as N-by-3 numpy.ndarray
cluster_index: list of int
Cluster ID for each point
"""
def colormap(c, num_clusters):
"""
Colormap for segmentation result
Parameters
----------
c: int
Cluster ID
C
"""
# outlier:
if c == -1:
color = [1]*3
# surrouding object:
else:
color = [0] * 3
color[c % 3] = c/num_clusters
return color
# ground element:
pcd_ground = o3d.geometry.PointCloud()
pcd_ground.points = o3d.utility.Vector3dVector(segmented_ground)
pcd_ground.colors = o3d.utility.Vector3dVector(
[
[0, 0, 255] for i in range(segmented_ground.shape[0])
]
)
# surrounding object elements:
pcd_objects = o3d.geometry.PointCloud()
pcd_objects.points = o3d.utility.Vector3dVector(segmented_cloud)
num_clusters = max(cluster_index) + 1
pcd_objects.colors = o3d.utility.Vector3dVector(
[
colormap(c, num_clusters) for c in cluster_index
]
)
# visualize:
o3d.visualization.draw_geometries([pcd_ground, pcd_objects])
def main():
root_dir = 'data/' # 数据集路径
cat = os.listdir(root_dir)
# cat = cat[1:]
iteration_num = len(cat)
for i in range(iteration_num):
filename = os.path.join(root_dir, cat[i])
print('clustering pointcloud file:', filename)
origin_points = read_velodyne_bin(filename)
ground_points, segmented_points = ground_segmentation(data=origin_points)
cluster_index = clustering(segmented_points, 0.5, 10)
plot_clusters2(ground_points, segmented_points, cluster_index)
if __name__ == '__main__':
main()
(2)实现效果