python车辆轨迹分析_Ngsim数据集分析与python处理,NGSIM,解析,及
簡介
NGSIM(Next Generation Simulation)數據集
是美國FHWA搜集的美國高速公路行車數據,它包括了US101、I-80等道路上的所有車輛在一個時間段的車輛行駛狀況。數據是采用攝像頭獲取,然后加工成一條一條的軌跡點記錄。我在項目中用到的是US101數據集,其他數據結構大同小異。一下著重介紹該路段數據集。
US101數據集介紹
NGSIM官方提供了使用API的方法獲取數據集,但是文檔并不夠良心,因此我還是直接選擇了下載整個數據集,下載鏈接可以在官方網站找出,這里再給出
鏈接
,下載下倆的數據集是一個1.6G+的CSV文件。這個文件包含
所有道路的所有數據
。打開看一看,數據結構如表:
注:不同的道路其車道信息不同,行車數據差異較大。
相同的表你可以到剛才的下載鏈接中找到,我來介紹一下遇到的坑。
Col
介紹
Vehicle_ID
給每一個進入記錄區域的車輛的編號。不同的車會重復利用!!
Frame_ID
該條數據在某一時刻的幀號,同一Vehicle_ID的幀號不會重復
Total_Frame
該車出現的總幀數
Global_Time
全局時間,單位為ms
Local_X,Y
采集區域內的坐標,采集區域不同,坐標系不同,會有不同的零點
Global_x,Y
全局坐標,只有一個零點,可用作數據篩選
這幾個很難倒騰,而且篩選數據主要看這幾個,其他數據可以直接看官網介紹。
python處理數據
我想得到的是某一區域內的車輛及它們未來10秒的軌跡。讀取CSV,用python的話當然用pandas咯。
最好添加
usecols
過濾你不需要的列,因為數據集很大,載入真的很慢。
import pandas as pd
init_df = pd.read_csv('./data.csv', usecols=['你所需要的列名'])
首先,數據集很大,要把數據多余的內容除掉。我要的是US101數據,因此定義一個函數篩選并按照
全局時間
排序,按照frame排序是行不通的,如果要按照路段排序同理選取Global的數據。
def cutbyRoad(df=None, road=None):
'''
:param df: 打開后文件
:param road: 路段名稱
:return: 切路df,按照全局時間排序
'''
road_df = df[df['Location'] == road]
return road_df.sort_values(by='Global_Time', ascending=True)
原數據集的單位,時間是ms,長度單位全是ft。給它轉換一下:
def unitConversion(df):
'''
轉換后長度單位為m,時間單位為0.1秒
:param df: 被轉換df
:return: 轉換后df
'''
ft_to_m = 0.3048
df['Global_Time'] = df['Global_Time'] / 100
for strs in ["Global_X", "Global_Y", "Local_X", "Local_Y", "v_length", "v_Width"]:
df[strs] = df[strs] * ft_to_m
df["v_Vel"] = df["v_Vel"] * ft_to_m*3.6
return df
我需要的數據是某一區域內,某一時刻的車輛及它們未來10s的行車數據,因此首先要獲得某一區域某一時刻某一區域的車輛ID列表:
def cutbyPosition(road_df, start_y=0, start_time=0, area_length=50):
'''
給定起始時間,起始y,區間長度,輸出區間內車輛list
:param road_df:限定路段后的df
:param start_y: 區域開始段,單位為m
:param start_time: 起始時間,0.1s
:param area_length: 區域長度單位為m
:return: vehicle_list為起始框內部車輛編號
'''
area_df = road_df[road_df['Global_Time'] == start_time]
area_df = area_df[(area_df['Global_Y'] - start_y <= area_length) & (
area_df['Global_Y'] - start_y >= 0)]
vehicle_list = area_df['Vehicle_ID'].unique()
if len(list(vehicle_list)) <= 2:
return None
else:
return list(vehicle_list)
其中
start_y, start_time
可以根據自己的需要取
Global_Y,Global_Time
的最大值最小值區間內值。
然后是迭代獲取10s內車輛數據:
def cutbyTime(road_df, start_time=0, vehicle_list=None, time_length=10.0, stride=1):
'''
:param inint_df:road_df
:param start_frame: 開始幀
:param time_length: 采樣時間長度,單位為s
:param stride: 采樣步長
:return: 返回一組清洗完數據time
'''
temp_df = road_df[road_df['Vehicle_ID'].isin(vehicle_list)]
one_sequence = pd.DataFrame()
for vehicle in vehicle_list:
for time in range(int(time_length * 10 / stride)):
df = temp_df[
(temp_df['Vehicle_ID'] == vehicle) & (temp_df['Global_Time'] == (start_time + time * stride))]
if df.shape[0] == 1:
one_sequence = pd.concat([one_sequence, df])
else:
return None
return one_sequence
最后是存儲數據至CSV
def saveCsv(df, file_name):
'''
:param df: 存入df
:param file_name: 文件名
:return: 無
'''
df=df.reset_index(drop=True) #重置索引
if not os.path.exists('\\'):
os.makedirs('\\')
df.to_csv('data\\' + file_name + '.csv', mode='a', header=True)
我的main函數,也就是調用這些方法的主干函數,代碼中調用的json文件是配置文件:
with open("conf/dataExecute.json", "r") as f:
conf = json.load(f)
init_df = pd.read_csv(conf['data_source'], usecols=conf['useCols'])
road_df = cutbyRoad(init_df, road=conf['road'])
road_df = unitConversion(road_df)
min_Global_Y, max_Global_Y = road_df['Global_Y'].min()+100, road_df['Global_Y'].max()
min_Global_Time, max_Global_Time = road_df['Global_Time'].min(), road_df['Global_Time'].max()
total_dist=int((max_Global_Y - min_Global_Y) / (conf['area_step']))
total_time=int((max_Global_Time - min_Global_Time) / (conf['time_step'] * 10))
print("共計{}--{}組數據,時間步長為{},距離步長為{}".format(total_dist,total_time,conf['time_step'],conf['area_step']))
total_data=0
for dist_index in range(conf["hist_dist"],total_dist):
for time_index in range(conf["hist_time"],total_time):
if conf["noise"]:
time_noise=random.randint(0,100)
dist_noise=random.randint(0,100)
else:
time_noise,dist_noise=0,0
start_time = min_Global_Time + time_index * conf['time_step'] * 10+time_noise*10
start_y = min_Global_Y + dist_index * conf['area_step']+dist_noise
vehicle_list = cutbyPosition(road_df, start_y=start_y, start_time=start_time,
area_length=conf['area_length'])
if vehicle_list is None:
# print('{}秒時刻,{}為起點區域內車輛過少,進入下個時段'.format(start_time * 0.1, start_y))
print('{}--{}內車輛過少,進入下個時段'.format(dist_index, time_index))
continue
one_sequence = cutbyTime(road_df, start_time=start_time, vehicle_list=vehicle_list,
time_length=conf['time_length'],
stride=conf['stride'])
if one_sequence is None:
# print('{}時刻,{}為起點區域內車輛存在消失,進入下個時段'.format(start_time * 0.1, start_y))
print('{}--{}內車輛存在消失,進入下個時段'.format(dist_index, time_index))
else:
total_data+=1
saveCsv(one_sequence, file_name=conf['road'])
print('{}/{} - {}/{} saved! Exist {} data! '.format(dist_index, total_dist,time_index, total_time,total_data))
if total_data == conf["need_num"]:
print("數據采集完成")
break
if total_data ==conf["need_num"]:
print("數據采集完成")
break
傳參我用了一個json文件:
{
"data_source": "./data/data.csv",
"road": "us-101",
"useCols": [
"Vehicle_ID",
"Frame_ID",
"Total_Frames",
"Global_Time",
"Global_X",
"Global_Y",
"Local_X",
"Local_Y",
"v_length",
"v_Width",
"v_Class",
"Location",
"v_Vel",
"Lane_ID"
],
"area_length": 80,
"time_length": 10,
"area_step": 20,
"time_step": 30,
"stride": 5.0,
"hist_dist":6,
"hist_time":0,
"noise": 1,
"need_num": 20
}
我做這個主要是為了采集區域行車數據做訓練集。因此需要迭代獲取。如果有什么問題歡迎留言和我討論!
總結
以上是生活随笔為你收集整理的python车辆轨迹分析_Ngsim数据集分析与python处理,NGSIM,解析,及的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql用sql语句怎么做个脚本备份_
- 下一篇: set集合python_python基础