movingpandas时空数据分析——旧金山出租车轨迹数据集处理

  • Post author:
  • Post category:其他



旧金山出租车轨迹数据集链接


包含下载好的数据集和数据处理代码的百度网盘——

链接:https://pan.baidu.com/s/1N4dZ3V_b2QbP6vDzpCsVPA

提取码:pnd2



1. 概述

共536辆车,对于每辆车,字段为[latitude,longitude,occupancy,t]。

我们的目的是得到每辆车在一个小时内以15s等时间间隔的数据,也即每辆车的数据为241条。

使用到的库:

  • geopandas: 用于时空数据分析
  • movingpandas: 在geopandas的基础上创建轨迹
  • folium:在地图上按自己想要的方式绘制,用于可视化数据和debug
  • Osmnx:获取城市的路网,基于geopandas和Networkx
  • Networkx:在本文中用于计算最短路。



2. 概览:绘制一辆车的轨迹

sf_one_car_abboip.py

第一辆车在全部时间范围内的轨迹如下:

在这里插入图片描述

可以看出,第一辆车的数据范围为2008-5-17至2008-6-10,大致为一个月;总行驶长度约为700万m+。上图还给出了行驶边界的信息。

接下来尝试绘制第一辆车在某一天的轨迹,打算先对轨迹先有一个直观了解,指导后续如何进行优化。

绘制轨迹可以分为四步:

  • step1. 读数据文件并处理为df
taxi_id = 'abboip'
df = pd.read_csv(f"./cabspottingdata/new_{taxi_id}.txt", header=None, sep=" ")
df.columns = ['latitude', 'longitude', 'occupancy', 't']
df.pop('occupancy')  # drop无关列occupancy
df.insert(0, 'id', [taxi_id for _ in range(df.shape[0])])  # 插入新列:id

print('get df, columns=[latitude, longitude, t]')
  • step2. 提取某个时间范围的数据(这里选取2008.5.18的一天)
df.t = df.t.apply(lambda x: datetime.fromtimestamp(x))  # 时间戳转datetime
chosen_index = df.t.apply(lambda x: (x.month == 5 and x.day == 18))  # option:仅保留一天的数据
df = df[chosen_index]
df = df.sort_values(by=['t'], ascending=[True])  # 按t升序排序
df = df.set_index('t')  # 以t为index

print('now df columns=[latitude, longitude, id], index=t')
  • step3. 根据df创建gdf和trajs
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude), crs=4236)
trajs = mpd.TrajectoryCollection(gdf, 'id')
  • step4. 使用folium绘制地图,保存为taxi_test_page.html
start_point = trajs.trajectories[0].get_start_location()
m = folium.Map(location=[start_point.y, start_point.x], tiles="cartodbpositron", zoom_start=14)  # 经纬度反向
m.add_child(folium.LatLngPopup())
minimap = folium.plugins.MiniMap()
m.add_child(minimap)
folium.TileLayer('OpenStreetMap').add_to(m)

for index, traj in enumerate(trajs.trajectories):
name = f"Taxi {traj.df['id'][0]}"  # 轨迹名称
randr = lambda: np.random.randint(0, 255)
color = '#%02X%02X%02X' % (randr(), randr(), randr())  # black
# line
geo_col = traj.to_point_gdf().geometry
xy = [[y, x] for x, y in zip(geo_col.x, geo_col.y)]
f1 = folium.FeatureGroup(name)
AntPath(locations=xy, color=color, weight=3, opacity=0.7, dash_array=[20, 30]).add_to(f1)
f1.add_to(m)

folium.LayerControl().add_to(m)

m.get_root().render()
m.get_root().save("taxi_test_page.html")

查看绘制结果,可以看到存在前往旧金山南部机场的轨迹,距离旧金山市区较远,因此可以考虑人为限定一个区域(边界矩形框),抛弃距市区较远的”异常“数据。

在这里插入图片描述

查看局部,可以发现存在这种瞬移的折线,可以猜测是相邻采样时间的时间间隔较长所致。因此在构造等差时间轴后会存在缺失数据,需要使用最短路径算法填充缺失值。

在这里插入图片描述



3. 综合所有车的数据

下文提到的代码来自sf_tolerance.py

综合所有车的原始数据,得到raw_df,字段为[t(时间戳), id, latitude, longitude],写入./data/sf/raw_data。

在这里插入图片描述

def gen_raw_data():
    # step. 获取id列表
    fp = open('./cabspottingdata/_cabs.txt', 'r')
    lines = fp.readlines()
    id_list = [line.split("\"")[1] for line in lines]

    # step. 读所有txt,并处理为df
    raw_df = pd.DataFrame()
    s = 1
    for id in id_list:
        df = pd.read_csv(f"./cabspottingdata/new_{id}.txt", header=None, sep=" ")
        df.columns = ['latitude', 'longitude', 'occupancy', 't']
        df.pop('occupancy')  # drop无关列occupancy
        df.insert(0, 'id', [id for _ in range(df.shape[0])])  # 插入新列:id
        raw_df = pd.concat([raw_df, df], axis=0)  # 拼接

        print('Finished merging {}/{}'.format(s, len(id_list)))
        s += 1

    raw_df = raw_df.sort_values(by=['id', 't'], ascending=[True, True])  # 按id和t升序排序
    raw_df = raw_df.set_index('t')  # 以t为index

    print('get raw_df, columns=[latitude, longitude, id], index=t')

    # step. 将包含所有车的原始数据写入./data/sf/raw_data.csv
    raw_df.to_csv('./data/sf/raw_data.csv')



4. 尝试:人为选择一个小时,并对数据进行时间对齐

半年之后再看这段感觉我是憨批。。。其实就是想做一个重采样,直接一句df.resample(‘15s’).first()就完事了,当时不知道

  • step1. 人为选择一个小时(2018.5.18 9~10小时)
raw_df.t = raw_df.t.apply(lambda x: datetime.fromtimestamp(x))  # 时间戳转datetime
chosen_index = raw_df.t.apply(lambda x: (x.month == 5 and x.day == 18 and x.hour == 9))  # option:仅保留一个小时的数据
raw_df = raw_df[chosen_index]
print('Finished choosing an hour')
  • step2. 创建等差时间轴,对数据进行时间对齐。

    每辆车得到241(即4*60+1)条数据,相邻数据的时间间隔为15s。

    时间对齐的示意图如下:

在这里插入图片描述

def round15(x):
    # 商15的余数为1~7时下舍,为8~14时上入
    r = x % 15
    x = x-r if r <= 7 else x+(15-r)
    return x

def slot_and_align_in_period(groupDf):
    groupDf.t = groupDf.t.apply(lambda x: datetime.timestamp(x))  # datetime转时间戳
    groupDf.t = groupDf.t.apply(round15)  # 近似为15s的倍数
    groupDf.t = groupDf.t.apply(lambda x: datetime.fromtimestamp(x))  # 时间戳转datetime
    dateSpan = pd.date_range('20080518090000', '20080518100000', freq='15s')  # TODO 这里要改为每辆车的具体时间范围
    dateSpan = pd.DataFrame({'t': dateSpan})

    slottedGroupDf = pd.merge(dateSpan, groupDf, how="left", on="t")
    # 删除在t上的重复值,仅保留第一次出现重复值的行
    slottedGroupDf.drop_duplicates(subset=['t'], keep='first', inplace=True)
    slottedGroupDf.pop('id')  # 删除id属性,因为groupBy后会以id为一级索引

    print('Finished slotting a car')
    return slottedGroupDf

raw_df = raw_df.groupby('id').apply(func_slot_hour)
raw_df = raw_df.reset_index()  # 分组操作后,将index扁平化(取消二级索引)
raw_df.pop('level_1')  # 删除原因未知的level_1
print('Finished slotting hour')

查看每辆车的非空数据条数。不少车仅有个位数的非空数据

(这里可以进一步用直方图可视化非空数据数量的分布)

在241条数据中,有61条非空数据
在241条数据中,有2条非空数据
在241条数据中,有59条非空数据
在241条数据中,有1条非空数据
在241条数据中,有36条非空数据
在241条数据中,有54条非空数据
在241条数据中,有17条非空数据
在241条数据中,有8条非空数据
在241条数据中,有61条非空数据
在241条数据中,有61条非空数据
...



5. 选择各个车的小时

经上述分析,

为了获得质量良好的数据,不能简单地为每辆车选择一个共同的小时。

我们的目的是收集尽可能多驾驶轨迹不同的出租车,而不是非要让数据把我们卡死了,毕竟这个数据的536辆车只是旧金山市全体出租车的一个很小的子集。

但在另一个极端,也不能简单的为每辆车各自选择数据最丰富的小时,会过于破坏数据的真实性。因此采用的做法是:

  • step1.选出

    在所有车中

    数据量最多的小时,称为最佳小时
  • step2.对于每辆车,检查最佳小时是否是对于该车数据量最多的小时的前10名。若是则选用最佳小时,若不是则用该车数据量最多的小时代替。


step1保证数据的真实性,step2保证数据的丰富性。

  • step3.采用第四小节中的时间对齐方法。

step1.

选出最佳小时为2008.5.18 15:00:00~16:00:00。这个下午的时间段出租车的数据最多也符合生活常理。

hours = raw_df.t.apply(lambda x: x - (x % 3600))
hours = hours.apply(lambda x: datetime.fromtimestamp(x))  # 时间戳转datetime
best_hour = hours.value_counts().index[0]
>>>hours.value_counts()
2008-05-18 15:00:00    25012
2008-05-18 14:00:00    24845
2008-05-31 14:00:00    24425
2008-06-08 16:00:00    24421
2008-06-01 13:00:00    24269

step2.

确定每辆车选用的小时。

def select_hour(groupDf, best_hour):
    hours = groupDf.t.apply(lambda x: x - (x % 3600))
    hours = hours.apply(lambda x: datetime.fromtimestamp(x))  # 时间戳转datetime
    top10 = hours.value_counts()[0:10]  # 该车数据最多的前10个小时
    if best_hour not in top10.index:  # 全局最佳小时不在前10名,则选用该车的最佳小时
        best_hour = top10.index[0]
        print('for car {}, best_hour = {}'.format(list(groupDf.id)[0], best_hour))

    groupDf.t = groupDf.t.apply(lambda x: datetime.fromtimestamp(x))  # 时间戳转datetime
    chosen_index = groupDf.t.apply(lambda x: (x.month == best_hour.month and
                                   x.day == best_hour.day and x.hour == best_hour.hour))
    groupDf = groupDf[chosen_index]
    groupDf.pop('id')  # 删除id属性,因为groupBy后会以id为一级索引
    return groupDf

raw_df = raw_df.groupby('id').apply(select_hour, best_hour)  # best_hour是参数

step3.

时间对齐。略,同第四小节。将raw_df输出到.data/sf/processed_data,字段为[t(datetime), id, latitude, longitude]。



6. 用最短路径填充缺失值

接下来的步骤填充第四小节中插图展示的时间对齐后出现的缺失值,因为原始数据不可能保证对齐后每15s都有有效数据。出租车的常规驾驶习惯是选两点间最短路来开,因此使用最短路径填充缺失值是合理的,这种填充方法虽然耗时但可以很大程度地提高数据的性能。

  • step1. 使用osmnx,下载旧金山市的路网。
import osmnx as ox
place = 'San Francisco, California, USA'
#place = 'Shanghai,China'
G = ox.graph_from_place(place, which_result=2)
ox.plot_graph(G)
ox.save_graph_shapefile(G, "./data/sf/map.shp")

对比下面两图,osmnx获取的路网精细且正确,这也启示我们上文提到的边界矩形框的区域可以就限定为路网区域,毕竟超出路网区域的数据无法计算最短路,没有留着的必要。

在这里插入图片描述

在这里插入图片描述

矩形框左下角的经纬度:

在这里插入图片描述

矩形框右上角的经纬度:

在这里插入图片描述

  • step2. 填充缺失值。

    • 对于241个timestamp头部和尾部的缺失值,无法使用最短路径填充,因此头部的缺失值使用它后面的第一个有效值填充,尾部缺失值使用它前面的第一个有效值填充。
    • 对于中间的连续k个缺失值,令其前面的第一个有效值s和后面的第一个有效值e分别为起点和终点,在路网上计算最短路径。将最短路径k+1等分,第i个缺失值使用路径上第i个分位点的坐标填充。

一个求两点间最短路的实例如下图:

origin_point = (37.7266, -122.4780)
destination_point = (37.8281, -122.3717)

origin_node = ox.get_nearest_node(G, origin_point)  # 得到距离源位置最近的节点
destination_node = ox.get_nearest_node(G, destination_point)

route = nx.shortest_path(G, origin_node, destination_node, weight='length') # 使用networkx库计算最短路
longitude = [G.nodes[node]['x'] for node in route]
latitude = [G.nodes[node]['y'] for node in route]

gdf = gpd.GeoDataFrame(geometry=gpd.points_from_xy(longitude, latitude))
gdf['t'] = pd.date_range(start="20210707000000", periods=gdf.shape[0])
gdf['id'] = ['a' for _ in range(gdf.shape[0])]
trajs = mpd.TrajectoryCollection(gdf, 'id')

对于规整的路网,最短路径的精度非常不错:

在这里插入图片描述

然而在一些曲线路段会存在较明显的误差,由于路网本身是呈polygon的形态,对于曲线路段的收集精度不高。不过由于最终会对经纬度坐标进行网格化处理,出现的误差可以接受。

在这里插入图片描述

以下部分对于OSMnx的使用参考https://automating-gis-processes.github.io/site/develop/notebooks/L6/network-analysis.html

得到最短路径后,接下来在最短路上定位分位点。为此需要知道路径总长度以及路径上各条边的长度。

首先得到路网对应的节点和边的gdf:

nodes_gdf, edges_gdf = ox.graph_to_gdfs(G, nodes=True, edges=True)
>>>nodes_gdf.head(3)
                  y           x  ...            highway                     geometry
osmid                            ...                                                
32927563  37.785921 -122.390945  ...                NaN  POINT (-122.39094 37.78592)
32927591  37.731353 -122.424124  ...  motorway_junction  POINT (-122.42412 37.73135)
32927645  37.732706 -122.413261  ...  motorway_junction  POINT (-122.41326 37.73271)
>>>edges_gdf.head(3)
                                                    osmid  oneway  ... tunnel width
u        v          key                                            ...             
32927563 645559609  0    [50690291, 179235221, 661905446]    True  ...    NaN   NaN
32927591 315706881  0                            28715659    True  ...    NaN   NaN
         6469349533 0                           689603997    True  ...    NaN   NaN

edges_gdf的多重索引由边的两个端节点u、v、和含义未知的key组成,即每行代表一条边。有用的属性列是geometry,可通过geometry.length获得边的长度。

计算最短路的总长度:

def route_total_length(edges_gdf, route):
    totalLength = 0
    for i in range(len(route) - 1):
        u, v = route[i], route[i + 1]
        totalLength += edges_gdf.loc[(u, v, 0)].geometry.length
    return totalLength

totalLength = route_total_length(edges_gdf, route)  # 最短路总长度

根据得到的总长度,算出分位点的经纬度坐标:

def get_lat_lon_of_quantile(edges_gdf, route, totalLength, quantile):
    # quantile: 分位点,0.5即中点,0.33即距起点更近的三分位点
    resLength = quantile * totalLength  # 起点到分位点的长度
    x, y = -1, -1
    for i in range(len(route) - 1):  # 遍历路径上所有边,确定分位点在哪条边上
        u, v = route[i], route[i + 1]
        curEdgeLength = edges_gdf.loc[(u, v, 0)].geometry.length  # 当前边的长度
        if resLength > curEdgeLength:  # 继续查看下一条边
            resLength -= curEdgeLength
        else:  # 分位点在当前边上
            u_x, u_y = G.nodes[u]['x'], G.nodes[u]['y']
            v_x, v_y = G.nodes[v]['x'], G.nodes[v]['y']
            x = u_x + (resLength / curEdgeLength) * (v_x - u_x)
            y = u_y + (resLength / curEdgeLength) * (v_y - u_y)
            break

    assert x != -1 and y != -1
    return x, y  # 经度,纬度

lon, lat = get_lat_lon_of_quantile(edges_gdf, route, totalLength, 0.5)
lon1, lat1 = get_lat_lon_of_quantile(edges_gdf, route, totalLength, 0.33)
lon2, lat2 = get_lat_lon_of_quantile(edges_gdf, route, totalLength, 0.66)

将找到的中点和两个三分位点绘制在地图上,符合预期。

在这里插入图片描述

对每辆车的小时数据都填充缺失值后,查看一下终于规规整整的数据:

>>>raw_df
              id                    t   latitude   longitude
0         abboip  2008-05-18 14:00:00  37.787530 -122.421680
1         abboip  2008-05-18 14:00:15  37.787530 -122.421680
2         abboip  2008-05-18 14:00:30  37.787645 -122.421716
3         abboip  2008-05-18 14:00:45  37.787709 -122.421623
4         abboip  2008-05-18 14:01:00  37.787861 -122.421654
...          ...                  ...        ...         ...
129171  uvreoipy  2008-05-21 09:59:00  37.783991 -122.422743
129172  uvreoipy  2008-05-21 09:59:15  37.784177 -122.422781
129173  uvreoipy  2008-05-21 09:59:30  37.784364 -122.422819
129174  uvreoipy  2008-05-21 09:59:45  37.784650 -122.422880
129175  uvreoipy  2008-05-21 10:00:00  37.784650 -122.422880
>>>raw_df.info()
<class 'pandas.core.frame.DataFrame'>
Int64Index: 129176 entries, 0 to 129175
Data columns (total 4 columns):
 #   Column     Non-Null Count   Dtype  
---  ------     --------------   -----  
 0   id         129176 non-null  object 
 1   t          129176 non-null  object 
 2   latitude   129176 non-null  float64
 3   longitude  129176 non-null  float64
dtypes: float64(2), object(2)
memory usage: 4.9+ MB

共536*241=129176条数据,所有车辆的数据都已经时间对齐。

从”129176 non-null“可以看出,所有缺失值都已经被填充完毕。大功告成!

将raw_df输出到.data/sf/processed_fillna_data,字段为[t(datetime), id, latitude, longitude]。



7. 划分网格,将经纬度转换为网格坐标

暂不考虑地球的曲面特性,认为地球是平面。

按照1:10^5的比例,对边界矩形框内部区域划分网格。经度范围0.0791,横向网格数7910;纬度范围0.0696,纵向网格数6960。

对于每一条数据,根据经纬度得到x、y坐标,以及相应的到矩形框左下角的距离x_distance和y_distance。

将raw_df输出到.data/sf/processed_train_data, 字段为[timestamp, id, latitude, longitude,x, y, x_distance, y_distance]。

普渡:(另一个同类任务的数据集,收集来自普渡大学学生志愿者的时空坐标)

经度(宽)差值为0.0203 网格数200

纬度(高)差值为0.011 网格数120

旧金山:

经度差值0.0791 网格数 7910

纬度差值0.0696 网格数6960



8. 进阶:通过tolerance筛选异常数据

如果一条traj中,存在相邻两个数据点间的距离大于设置的阈值(tolerance),则认为存在异常的大跨度“瞬移”,这可能是GPS信号弱导致中间的数据未被采集到导致的。比如上文提到的出租船。

尽管对于瞬移的两点间的缺失值也可以使用最短路径法填充,但由于距离跨度太大,最短路径填充的合理性不高,出租车在这两点间很可能存在中间目的地。因此需要进行筛选,删除掉这些异常数据。

tolerance作为参数需人为设置:

  • tolerance设置的小,代表我们的筛选更严格,更多的数据无法通过检测。

  • 设置的大,代表我们的筛选更宽松,存在瞬移的traj有可能无法被删除,给模型的学习造成困难。

在本任务中,由于数据量比较丰富,并且需要为模型学习保驾护航,因此尽可能设置了较小的tolerance(1000m)。

筛选日志如下

(对于瞬移值,也可以通过直方图查看一下分布,肯定是一个左高右低的分布,极端异常值小)

未通过tolerance检查!瞬移值:
[1283.2304981149646]
未通过tolerance检查!瞬移值:
[1041.9191417867703]
for car ifragcic, chosen_hour = 2008-06-07 15:00:00
该小时是该车的top2
for car igglitby, chosen_hour = 2008-05-26 10:00:00
该小时是该车的top0
未通过tolerance检查!瞬移值:
[1170.4311847827967]
for car igsoysla, chosen_hour = 2008-05-21 08:00:00
该小时是该车的top1

大部分车经过筛选,选用的也仍是最佳小时或top1~3。有几辆离谱的车:

for car udwadla, chosen_hour = 2008-05-27 07:00:00
该小时是该车的top32
for car upthin, chosen_hour = 2008-06-08 12:00:00
该小时是该车的top292



9. Debug:网格化后的xy坐标出现Nan值

值得注意的是,使用最短路填充空值时,填充点出现了落在矩形框外的情况。这是很有可能的,当不得不使用越过矩形框的曲线路段作为最短路时。

该错误直接导致网格化后的xy坐标是空值。检测x坐标是空值的元组:

raw_df[raw_df['x'].isna()]

使用以下代码剔除这些插值后经纬度超出矩形框的点,剔除了4个x异常的点,3个y异常的点:

def x_into_region(x):
    if x <= lower_left[0]:
        print('find x error!')
        x = lower_left[0]+0.00001
    elif x >= upper_right[0]:
        print('find x error!')
        x = upper_right[0]-0.00001
    return x

def y_into_region(y):
    if y <= lower_left[1]:
        print('find y error!')
        y = lower_left[1]+0.00001
    elif y >= upper_right[1]:
        print('find y error!')
        y = upper_right[1]-0.00001
    return y


lower_left = [-122.4620, 37.7441]
upper_right = [-122.3829, 37.8137]

raw_df = pd.read_csv('./data/sf_tolerance/processed_fillna_data.csv')
raw_df.longitude = raw_df.longitude.apply(x_into_region)
raw_df.latitude = raw_df.latitude.apply(y_into_region)

raw_df.to_csv('./data/sf_tolerance/processed_fillna_data.csv')

检查结果,完成~

>>> raw_df = pd.read_csv('./data/sf_tolerance/processed_train_half_data.csv')
>>> raw_df[raw_df['x'].isna()]
Empty DataFrame
Columns: [Unnamed: 0, id, Unnamed: 0.1, Unnamed: 0.1.1, Unnamed: 0.1.1.1, Unnamed: 0.1.1.1.1, timestamp, latitude, longitude, x, x_distance, y, y_distance]
Index: []

>>> raw_df[raw_df['y'].isna()]
Empty DataFrame
Columns: [Unnamed: 0, id, Unnamed: 0.1, Unnamed: 0.1.1, Unnamed: 0.1.1.1, Unnamed: 0.1.1.1.1, timestamp, latitude, longitude, x, x_distance, y, y_distance]
Index: []



版权声明:本文为weixin_38889219原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。