三维点云处理之地面剔除 + 聚类

  • Post author:
  • Post category:其他




1. Mean Shift


问题

:给你一堆二维平面点,以及k个固定的圆,如何放置圆,使里面包含的点数最多?


大致思想

:初始时,将一固定大小的圆放在二维平面点的任意位置上,然后计算圆内所有点的平均位置,并以此作为新的圆心,移动原始固定圆到新位置处,以此类推,直到圆不再挪动为止,此时的圆内包含的点数最多。


Mean Shift在聚类中的应用

  • 任意选取一固定大小的圆,并计算圆内所有点的平均位置,以此作为新的圆心,移动原始固定圆到新位置处(其中,这里固定大小的圆的半径是一个经验值)
  • 重复步骤2,直到圆不再挪动为止
  • 对步骤1、2重复执行N次,由此最终可得N个圆,这N个圆可能会有重叠,对其执行NMS操作,保留包含点数最多的圆(另外,这里的N是经验值)
  • 计算每个点离最后各个圆心之间的距离,每个点就属于离圆心距离最近的那个圆所在类别


Mean Shift总结:

  • 复杂度



    O

    (

    T

    n

    l

    o

    g

    (

    n

    )

    )

    O(T\cdot{n}\cdot{log(n)})






    O


    (


    T














    n















    l


    o


    g


    (


    n


    )



    )








T

T






T





为最终圆的个数,



n

n






n





为点的总数,对于每个点,若以kd-tree/octree进行最近邻搜索,那么复杂度为



l

o

g

(

n

)

log(n)






l


o


g


(


n


)




  • 优点:可自动确定聚类的类别数;只有一个未知参数(半径r);对噪声比较鲁棒

  • 缺点:容易陷入局部极小值;聚类的好坏受到初始化的影响;对于高维数据不易处理



2. DBSCAN——漫水填充

  • 任意选取一个未被访问的点



    p

    p






    p





    ,获取以点



    p

    p






    p





    为圆心,以



    r

    r






    r





    为半径的圆内的所有点

  • 计算圆内所有点的个数num,判断num > min_samples是否成立

    • 若成立,则将点



      p

      p






      p





      记为core point,并标记为已访问,同时创建类别C,然后执行第3步

    • 若不成立,则将点



      p

      p






      p





      记为噪声点,并标记为已访问

  • 访问以点



    p

    p






    p





    为圆心,以



    r

    r






    r





    为半径的圆内的所有点,并将其标记为类别C。另外,在访问时,若访问的当前点也属于core point,则将当前点记为new p,重复步骤3。

  • 通过步骤2、3,可确定所有属于类别C的点。此时,将所有属于类别C的点移除,然后返回步骤1重复执行,直到所有点都被标记为已访问


DBSCAN复杂度:




O

(

n

l

o

g

(

n

)

)

O(n\cdot{log(n)})






O


(


n














l


o


g


(


n


)



)





:因为有



n

n






n





个点,对于每个点,都在半径为r的圆内进行搜索



l

o

g

(

n

)

log(n)






l


o


g


(


n


)





优点

:对类别的形状没有要求;可自动确定类别数;对噪声点比较鲁棒


缺点

:若两类别的点之间有部分稀疏的点存在,则可能会将其聚成一类;对于高维数据不易处理



3. 实践:地面剔除+dbscan聚类

(1)代码实现

# 文件功能:
#     1. 从数据集中加载点云数据
#     2. 从点云数据中滤除地面点云
#     3. 从剩余的点云中提取聚类

import numpy as np
import os
import struct
from sklearn import cluster, datasets, mixture
from itertools import cycle, islice
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.neighbors import KDTree
import open3d as o3d
from torch import segment_reduce

# 功能:从kitti的.bin格式点云文件中读取点云
# 输入:
#     path: 文件路径
# 输出:
#     点云数组
def read_velodyne_bin(path):
    '''
    :param path:
    :return: homography matrix of the point cloud, N*3
    '''
    pc_list = []
    with open(path, 'rb') as f:
        content = f.read()
        pc_iter = struct.iter_unpack('ffff', content)
        for idx, point in enumerate(pc_iter):
            pc_list.append([point[0], point[1], point[2]])
    return np.asarray(pc_list, dtype=np.float32)

# 功能:从点云文件中滤除地面点(ransac)
# 输入:
#     data: 一帧完整点云
# 输出:
#     segmengted_cloud: 删除地面点之后的点云
def ground_segmentation(data):
    in_ground_points = []
    out_ground_points = []
    p = 0.99
    iter = 100
    eps = 0.02
    up_dist = 0.15
    num, dim = data.shape
    pre_inline_num = 0
    outline_ratio = 0.6
    for _ in range(iter):
        # 随机选取三个点  ————  假设这三点构成一个平面
        sample_points_index = np.random.choice(num, 3)
        sample_points = data[sample_points_index]
        # 判断三点是否共线
        sample_points_01 = sample_points[0] - sample_points[1]
        sample_points_12 = sample_points[1] - sample_points[2]
        if sample_points_01[0] / sample_points_12[0] == sample_points_01[1] / sample_points_12[1] == sample_points_01[2] / sample_points_12[2]:
            continue
        # 利用向量叉乘计算法向量
        normal = np.cross(sample_points_01, sample_points_12)  # (3, )
        # 利用平面上的点与法向量乘积为0  ==>  确定平面方程中的参数D  AX + BY + CZ + D = 0  (D = -(AX + BY + CZ))  (A, B, C)为法向量  (X, Y, Z)为平面上一点
        D = -normal.dot(sample_points[0])
        # 计算其余点到平面上一点的向量
        vector = (data - sample_points[0])  # (num, 3)
        # 根据计算出来的向量,计算其余点到平面的距离  点(X0, Y0, Z0)到平面的距离公式:d = |AX0 + BY0 + CZ0 + D| / ||n||
        # distances = np.abs(normal.dot(vector)) / np.linalg.norm(normal)
        distances = np.abs(vector.dot(normal)) / np.linalg.norm(normal)
        # 依据距离阈值
        inline_index = distances <= up_dist
        # 计算平面上点的个数
        inline_num = np.sum(inline_index==True)
        if inline_num > pre_inline_num:
            pre_inline_num = inline_num  # 更新平面上点的个数
            iter = np.log(1-p) / np.log(1 - np.power(inline_num / num, 3))  # 更新迭代次数
        
        # 若平面上的点达到一定的比例,则退出迭代
        if inline_num / num > 1 - outline_ratio:
            break
    ground_points = data[inline_index]
    out_ground_index = np.logical_not(inline_index)
    segmengted_cloud = data[out_ground_index]

    print('origin data points num:', data.shape[0])
    print('ground points num:', ground_points.shape[0])
    print('segmented data points num:', segmengted_cloud.shape[0])
    return ground_points, segmengted_cloud

# 功能:从点云中提取聚类(dbscan)
# 输入:
#     data: 点云(滤除地面后的点云)
# 输出:
#     clusters_index: 一维数组,存储的是点云中每个点所属的聚类编号(参考上一章内容容易理解)
def clustering(data, eps, Minpts):
    n = len(data)
    unvisited = set(range(n))  # 未访问点的集合
    clusters_index = np.zeros(n, dtype=int)  # 最终聚类的结果,初始化为0
    k = 0
    # 构建kd_tree
    leaf_size = 4
    tree = KDTree(data,leaf_size)
    # 通过kdtree的rnn搜索,快速查找所有点的eps邻域点  存储在nearest_idx中   
    nearest_idx = tree.query_radius(data, eps)  
    cores = set()
    # 通过eps邻域点数量与Minpts大小比较,确定所有的核心点cores
    for d in range(n):
        if len(nearest_idx[d]) >= Minpts:  
            cores.add(d)   
    # 通过cores中的一核心点,确定该核心点的种子区域seed,seed中的点根据是否为核心点往外蔓延进行扩容,同一个seed区域中的点归属于同一类别
    while len(cores) > 0:   
        old_unvisited = unvisited  # 备份unvisited
        p = list(cores)[np.random.choice(len(cores))]  # 随机获取集合cores中的一个核心点p  先转为list,再通过索引获取
        unvisited = unvisited - set([p])  # p点记为已访问
        seed = []  # 当前核心点p所确定的种子区域
        seed.append(p)
        # 遍历种子区域seed中的点,若其中含有核心点,则对其进行扩容
        while(len(seed)):
            p_child = seed[0]
            seed.remove(p_child)
            if p_child in cores:
                S = set(nearest_idx[p_child]) & unvisited  # 获取核心点p_child的eps邻域中未被访问的点
                seed.extend(S)  # seed种子区域扩容
                unvisited = unvisited - S  # 更新unvisited
        cluster = old_unvisited - unvisited  # 上面while循环确定了当前类别的所有点,体现在前后unvisited的差异
        cores = cores - cluster  # 更新当前总核心点
        clusters_index[list(cluster)] = k  #cluster点归属于第k类
        k = k + 1
    noise = unvisited  # while循环结束后,剩下未被访问的就是噪声点,记为类别-1
    clusters_index[list(noise)] = -1
        
    print(clusters_index)
    print("生成的聚类个数:%d" %k)

    return clusters_index

# 功能:显示聚类点云,每个聚类一种颜色
# 输入:
#      data:点云数据(滤除地面后的点云)
#      cluster_index:一维数组,存储的是点云中每个点所属的聚类编号(与上同)
def plot_clusters(data, cluster_index):
    ax = plt.figure().add_subplot(111, projection = '3d')
    colors = np.array(list(islice(cycle(['#377eb8', '#ff7f00', '#4daf4a',
                                             '#f781bf', '#a65628', '#984ea3',
                                             '#999999', '#e41a1c', '#dede00']),
                                      int(max(cluster_index) + 1))))
    colors = np.append(colors, ["#000000"])
    ax.scatter(data[:, 0], data[:, 1], data[:, 2], s=2, color=colors[cluster_index])
    plt.show()

# 功能:显示聚类点云,每个聚类一种颜色
# 输入:
#      data:点云数据(滤除地面后的点云)
#      cluster_index:一维数组,存储的是点云中每个点所属的聚类编号(与上同)
def plot_clusters2(segmented_ground, segmented_cloud, cluster_index):
    """
    Visualize segmentation results using Open3D
    Parameters
    ----------
    segmented_cloud: numpy.ndarray
        Segmented surrounding objects as N-by-3 numpy.ndarray
    segmented_ground: numpy.ndarray
        Segmented ground as N-by-3 numpy.ndarray
    cluster_index: list of int
        Cluster ID for each point
    """
    def colormap(c, num_clusters):
        """
        Colormap for segmentation result
        Parameters
        ----------
        c: int
            Cluster ID
        C
        """
        # outlier:
        if c == -1:
            color = [1]*3
        # surrouding object:
        else:
            color = [0] * 3
            color[c % 3] = c/num_clusters
 
        return color
 
    # ground element:
    pcd_ground = o3d.geometry.PointCloud()
    pcd_ground.points = o3d.utility.Vector3dVector(segmented_ground)
    pcd_ground.colors = o3d.utility.Vector3dVector(
        [
            [0, 0, 255] for i in range(segmented_ground.shape[0])
        ]
    )
 
    # surrounding object elements:
    pcd_objects = o3d.geometry.PointCloud()
    pcd_objects.points = o3d.utility.Vector3dVector(segmented_cloud)
    num_clusters = max(cluster_index) + 1
    pcd_objects.colors = o3d.utility.Vector3dVector(
        [
            colormap(c, num_clusters) for c in cluster_index
        ]
    )
 
    # visualize:
    o3d.visualization.draw_geometries([pcd_ground, pcd_objects])

def main():
    root_dir = 'data/' # 数据集路径
    cat = os.listdir(root_dir)
    # cat = cat[1:]
    iteration_num = len(cat)

    for i in range(iteration_num):
        filename = os.path.join(root_dir, cat[i])
        print('clustering pointcloud file:', filename)

        origin_points = read_velodyne_bin(filename)
        ground_points, segmented_points = ground_segmentation(data=origin_points)
        cluster_index = clustering(segmented_points, 0.5, 10)

        plot_clusters2(ground_points, segmented_points, cluster_index)

if __name__ == '__main__':
    main()

(2)实现效果

在这里插入图片描述



版权声明:本文为LDST_CSDN原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。