333 lines
12 KiB
Python
333 lines
12 KiB
Python
|
|
"""
|
||
|
|
The Lyft data pre-processing and evaluation is modified from
|
||
|
|
https://github.com/poodarchu/Det3D
|
||
|
|
"""
|
||
|
|
|
||
|
|
import operator
|
||
|
|
from functools import reduce
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
import tqdm
|
||
|
|
from lyft_dataset_sdk.utils.data_classes import Box, Quaternion
|
||
|
|
from lyft_dataset_sdk.lyftdataset import LyftDataset
|
||
|
|
from lyft_dataset_sdk.utils.geometry_utils import transform_matrix
|
||
|
|
from lyft_dataset_sdk.eval.detection.mAP_evaluation import Box3D
|
||
|
|
|
||
|
|
|
||
|
|
def get_available_scenes(lyft):
|
||
|
|
available_scenes = []
|
||
|
|
print('total scene num:', len(lyft.scene))
|
||
|
|
for scene in lyft.scene:
|
||
|
|
scene_token = scene['token']
|
||
|
|
scene_rec = lyft.get('scene', scene_token)
|
||
|
|
sample_rec = lyft.get('sample', scene_rec['first_sample_token'])
|
||
|
|
sd_rec = lyft.get('sample_data', sample_rec['data']['LIDAR_TOP'])
|
||
|
|
has_more_frames = True
|
||
|
|
scene_not_exist = False
|
||
|
|
while has_more_frames:
|
||
|
|
lidar_path, boxes, _ = lyft.get_sample_data(sd_rec['token'])
|
||
|
|
if not Path(lidar_path).exists():
|
||
|
|
scene_not_exist = True
|
||
|
|
break
|
||
|
|
else:
|
||
|
|
break
|
||
|
|
# if not sd_rec['next'] == '':
|
||
|
|
# sd_rec = nusc.get('sample_data', sd_rec['next'])
|
||
|
|
# else:
|
||
|
|
# has_more_frames = False
|
||
|
|
if scene_not_exist:
|
||
|
|
continue
|
||
|
|
available_scenes.append(scene)
|
||
|
|
print('exist scene num:', len(available_scenes))
|
||
|
|
return available_scenes
|
||
|
|
|
||
|
|
|
||
|
|
def get_sample_data(lyft, sample_data_token):
|
||
|
|
sd_rec = lyft.get("sample_data", sample_data_token)
|
||
|
|
cs_rec = lyft.get("calibrated_sensor", sd_rec["calibrated_sensor_token"])
|
||
|
|
|
||
|
|
sensor_rec = lyft.get("sensor", cs_rec["sensor_token"])
|
||
|
|
pose_rec = lyft.get("ego_pose", sd_rec["ego_pose_token"])
|
||
|
|
|
||
|
|
boxes = lyft.get_boxes(sample_data_token)
|
||
|
|
|
||
|
|
box_list = []
|
||
|
|
for box in boxes:
|
||
|
|
box.translate(-np.array(pose_rec["translation"]))
|
||
|
|
box.rotate(Quaternion(pose_rec["rotation"]).inverse)
|
||
|
|
|
||
|
|
box.translate(-np.array(cs_rec["translation"]))
|
||
|
|
box.rotate(Quaternion(cs_rec["rotation"]).inverse)
|
||
|
|
|
||
|
|
box_list.append(box)
|
||
|
|
|
||
|
|
return box_list, pose_rec
|
||
|
|
|
||
|
|
|
||
|
|
def quaternion_yaw(q: Quaternion) -> float:
|
||
|
|
"""
|
||
|
|
Calculate the yaw angle from a quaternion.
|
||
|
|
Note that this only works for a quaternion that represents a box in lidar or global coordinate frame.
|
||
|
|
It does not work for a box in the camera frame.
|
||
|
|
:param q: Quaternion of interest.
|
||
|
|
:return: Yaw angle in radians.
|
||
|
|
"""
|
||
|
|
|
||
|
|
# Project into xy plane.
|
||
|
|
v = np.dot(q.rotation_matrix, np.array([1, 0, 0]))
|
||
|
|
|
||
|
|
# Measure yaw using arctan.
|
||
|
|
yaw = np.arctan2(v[1], v[0])
|
||
|
|
|
||
|
|
return yaw
|
||
|
|
|
||
|
|
|
||
|
|
def fill_trainval_infos(data_path, lyft, train_scenes, val_scenes, test=False, max_sweeps=10):
|
||
|
|
train_lyft_infos = []
|
||
|
|
val_lyft_infos = []
|
||
|
|
progress_bar = tqdm.tqdm(total=len(lyft.sample), desc='create_info', dynamic_ncols=True)
|
||
|
|
|
||
|
|
# ref_chans = ["LIDAR_TOP", "LIDAR_FRONT_LEFT", "LIDAR_FRONT_RIGHT"]
|
||
|
|
ref_chan = "LIDAR_TOP"
|
||
|
|
|
||
|
|
for index, sample in enumerate(lyft.sample):
|
||
|
|
progress_bar.update()
|
||
|
|
|
||
|
|
ref_info = {}
|
||
|
|
ref_sd_token = sample["data"][ref_chan]
|
||
|
|
ref_sd_rec = lyft.get("sample_data", ref_sd_token)
|
||
|
|
ref_cs_token = ref_sd_rec["calibrated_sensor_token"]
|
||
|
|
ref_cs_rec = lyft.get("calibrated_sensor", ref_cs_token)
|
||
|
|
|
||
|
|
ref_to_car = transform_matrix(
|
||
|
|
ref_cs_rec["translation"],
|
||
|
|
Quaternion(ref_cs_rec["rotation"]),
|
||
|
|
inverse=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
ref_from_car = transform_matrix(
|
||
|
|
ref_cs_rec["translation"],
|
||
|
|
Quaternion(ref_cs_rec["rotation"]),
|
||
|
|
inverse=True,
|
||
|
|
)
|
||
|
|
|
||
|
|
ref_lidar_path = lyft.get_sample_data_path(ref_sd_token)
|
||
|
|
|
||
|
|
ref_boxes, ref_pose_rec = get_sample_data(lyft, ref_sd_token)
|
||
|
|
ref_time = 1e-6 * ref_sd_rec["timestamp"]
|
||
|
|
car_from_global = transform_matrix(
|
||
|
|
ref_pose_rec["translation"],
|
||
|
|
Quaternion(ref_pose_rec["rotation"]),
|
||
|
|
inverse=True,
|
||
|
|
)
|
||
|
|
|
||
|
|
car_to_global = transform_matrix(
|
||
|
|
ref_pose_rec["translation"],
|
||
|
|
Quaternion(ref_pose_rec["rotation"]),
|
||
|
|
inverse=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
info = {
|
||
|
|
"lidar_path": Path(ref_lidar_path).relative_to(data_path).__str__(),
|
||
|
|
"ref_from_car": ref_from_car,
|
||
|
|
"ref_to_car": ref_to_car,
|
||
|
|
'token': sample['token'],
|
||
|
|
'car_from_global': car_from_global,
|
||
|
|
'car_to_global': car_to_global,
|
||
|
|
'timestamp': ref_time,
|
||
|
|
'sweeps': []
|
||
|
|
}
|
||
|
|
|
||
|
|
sample_data_token = sample['data'][ref_chan]
|
||
|
|
curr_sd_rec = lyft.get('sample_data', sample_data_token)
|
||
|
|
sweeps = []
|
||
|
|
|
||
|
|
while len(sweeps) < max_sweeps - 1:
|
||
|
|
if curr_sd_rec['prev'] == '':
|
||
|
|
if len(sweeps) == 0:
|
||
|
|
sweep = {
|
||
|
|
'lidar_path': Path(ref_lidar_path).relative_to(data_path).__str__(),
|
||
|
|
'sample_data_token': curr_sd_rec['token'],
|
||
|
|
'transform_matrix': None,
|
||
|
|
'time_lag': curr_sd_rec['timestamp'] * 0,
|
||
|
|
}
|
||
|
|
sweeps.append(sweep)
|
||
|
|
else:
|
||
|
|
sweeps.append(sweeps[-1])
|
||
|
|
else:
|
||
|
|
curr_sd_rec = lyft.get('sample_data', curr_sd_rec['prev'])
|
||
|
|
|
||
|
|
# Get past pose
|
||
|
|
current_pose_rec = lyft.get('ego_pose', curr_sd_rec['ego_pose_token'])
|
||
|
|
global_from_car = transform_matrix(
|
||
|
|
current_pose_rec['translation'], Quaternion(current_pose_rec['rotation']), inverse=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Homogeneous transformation matrix from sensor coordinate frame to ego car frame.
|
||
|
|
current_cs_rec = lyft.get(
|
||
|
|
'calibrated_sensor', curr_sd_rec['calibrated_sensor_token']
|
||
|
|
)
|
||
|
|
car_from_current = transform_matrix(
|
||
|
|
current_cs_rec['translation'], Quaternion(current_cs_rec['rotation']), inverse=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
tm = reduce(np.dot, [ref_from_car, car_from_global, global_from_car, car_from_current])
|
||
|
|
|
||
|
|
lidar_path = lyft.get_sample_data_path(curr_sd_rec['token'])
|
||
|
|
|
||
|
|
time_lag = ref_time - 1e-6 * curr_sd_rec['timestamp']
|
||
|
|
|
||
|
|
sweep = {
|
||
|
|
'lidar_path': Path(lidar_path).relative_to(data_path).__str__(),
|
||
|
|
'sample_data_token': curr_sd_rec['token'],
|
||
|
|
'transform_matrix': tm,
|
||
|
|
'global_from_car': global_from_car,
|
||
|
|
'car_from_current': car_from_current,
|
||
|
|
'time_lag': time_lag,
|
||
|
|
}
|
||
|
|
sweeps.append(sweep)
|
||
|
|
|
||
|
|
info['sweeps'] = sweeps
|
||
|
|
|
||
|
|
if not test:
|
||
|
|
annotations = [
|
||
|
|
lyft.get("sample_annotation", token) for token in sample["anns"]
|
||
|
|
]
|
||
|
|
|
||
|
|
locs = np.array([b.center for b in ref_boxes]).reshape(-1, 3)
|
||
|
|
dims = np.array([b.wlh for b in ref_boxes]).reshape(-1, 3)[:, [1, 0, 2]]
|
||
|
|
rots = np.array([quaternion_yaw(b.orientation) for b in ref_boxes]).reshape(
|
||
|
|
-1, 1
|
||
|
|
)
|
||
|
|
velocity = np.array([b.velocity for b in ref_boxes]).reshape(-1, 3)
|
||
|
|
names = np.array([b.name for b in ref_boxes])
|
||
|
|
tokens = np.array([b.token for b in ref_boxes]).reshape(-1, 1)
|
||
|
|
gt_boxes = np.concatenate([locs, dims, rots], axis=1)
|
||
|
|
|
||
|
|
assert len(annotations) == len(gt_boxes)
|
||
|
|
|
||
|
|
info["gt_boxes"] = gt_boxes
|
||
|
|
info["gt_boxes_velocity"] = velocity
|
||
|
|
info["gt_names"] = names
|
||
|
|
info["gt_boxes_token"] = tokens
|
||
|
|
|
||
|
|
if sample["scene_token"] in train_scenes:
|
||
|
|
train_lyft_infos.append(info)
|
||
|
|
else:
|
||
|
|
val_lyft_infos.append(info)
|
||
|
|
|
||
|
|
progress_bar.close()
|
||
|
|
return train_lyft_infos, val_lyft_infos
|
||
|
|
|
||
|
|
def boxes_lidar_to_lyft(boxes3d, scores=None, labels=None):
|
||
|
|
box_list = []
|
||
|
|
for k in range(boxes3d.shape[0]):
|
||
|
|
quat = Quaternion(axis=[0, 0, 1], radians=boxes3d[k, 6])
|
||
|
|
box = Box(
|
||
|
|
boxes3d[k, :3],
|
||
|
|
boxes3d[k, [4, 3, 5]], # wlh
|
||
|
|
quat, label=labels[k] if labels is not None else np.nan,
|
||
|
|
score=scores[k] if scores is not None else np.nan,
|
||
|
|
)
|
||
|
|
box_list.append(box)
|
||
|
|
return box_list
|
||
|
|
|
||
|
|
|
||
|
|
def lidar_lyft_box_to_global(lyft, boxes, sample_token):
|
||
|
|
s_record = lyft.get('sample', sample_token)
|
||
|
|
sample_data_token = s_record['data']['LIDAR_TOP']
|
||
|
|
|
||
|
|
sd_record = lyft.get('sample_data', sample_data_token)
|
||
|
|
cs_record = lyft.get('calibrated_sensor', sd_record['calibrated_sensor_token'])
|
||
|
|
sensor_record = lyft.get('sensor', cs_record['sensor_token'])
|
||
|
|
pose_record = lyft.get('ego_pose', sd_record['ego_pose_token'])
|
||
|
|
|
||
|
|
box_list = []
|
||
|
|
for box in boxes:
|
||
|
|
# Move box to ego vehicle coord system
|
||
|
|
box.rotate(Quaternion(cs_record['rotation']))
|
||
|
|
box.translate(np.array(cs_record['translation']))
|
||
|
|
# Move box to global coord system
|
||
|
|
box.rotate(Quaternion(pose_record['rotation']))
|
||
|
|
box.translate(np.array(pose_record['translation']))
|
||
|
|
box_list.append(box)
|
||
|
|
return box_list
|
||
|
|
|
||
|
|
|
||
|
|
def convert_det_to_lyft_format(lyft, det_annos):
|
||
|
|
sample_tokens = []
|
||
|
|
det_lyft_box = []
|
||
|
|
for anno in det_annos:
|
||
|
|
sample_tokens.append(anno['metadata']['token'])
|
||
|
|
|
||
|
|
boxes_lyft_list = boxes_lidar_to_lyft(anno['boxes_lidar'], anno['score'], anno['pred_labels'])
|
||
|
|
boxes_list = lidar_lyft_box_to_global(lyft, boxes_lyft_list, anno['metadata']['token'])
|
||
|
|
|
||
|
|
for idx, box in enumerate(boxes_list):
|
||
|
|
name = anno['name'][idx]
|
||
|
|
box3d = {
|
||
|
|
'sample_token': anno['metadata']['token'],
|
||
|
|
'translation': box.center.tolist(),
|
||
|
|
'size': box.wlh.tolist(),
|
||
|
|
'rotation': box.orientation.elements.tolist(),
|
||
|
|
'name': name,
|
||
|
|
'score': box.score
|
||
|
|
}
|
||
|
|
det_lyft_box.append(box3d)
|
||
|
|
|
||
|
|
return det_lyft_box, sample_tokens
|
||
|
|
|
||
|
|
|
||
|
|
def load_lyft_gt_by_tokens(lyft, sample_tokens):
|
||
|
|
"""
|
||
|
|
Modify from Lyft tutorial
|
||
|
|
"""
|
||
|
|
|
||
|
|
gt_box3ds = []
|
||
|
|
|
||
|
|
# Load annotations and filter predictions and annotations.
|
||
|
|
for sample_token in sample_tokens:
|
||
|
|
|
||
|
|
sample = lyft.get('sample', sample_token)
|
||
|
|
|
||
|
|
sample_annotation_tokens = sample['anns']
|
||
|
|
|
||
|
|
sample_lidar_token = sample["data"]["LIDAR_TOP"]
|
||
|
|
lidar_data = lyft.get("sample_data", sample_lidar_token)
|
||
|
|
ego_pose = lyft.get("ego_pose", lidar_data["ego_pose_token"])
|
||
|
|
ego_translation = np.array(ego_pose['translation'])
|
||
|
|
|
||
|
|
for sample_annotation_token in sample_annotation_tokens:
|
||
|
|
sample_annotation = lyft.get('sample_annotation', sample_annotation_token)
|
||
|
|
sample_annotation_translation = sample_annotation['translation']
|
||
|
|
|
||
|
|
class_name = sample_annotation['category_name']
|
||
|
|
|
||
|
|
box3d = {
|
||
|
|
'sample_token': sample_token,
|
||
|
|
'translation': sample_annotation_translation,
|
||
|
|
'size': sample_annotation['size'],
|
||
|
|
'rotation': sample_annotation['rotation'],
|
||
|
|
'name': class_name
|
||
|
|
}
|
||
|
|
gt_box3ds.append(box3d)
|
||
|
|
|
||
|
|
return gt_box3ds
|
||
|
|
|
||
|
|
|
||
|
|
def format_lyft_results(classwise_ap, class_names, iou_threshold_list, version='trainval'):
|
||
|
|
ret_dict = {}
|
||
|
|
result = '----------------Lyft %s results-----------------\n' % version
|
||
|
|
result += 'Average precision over IoUs: {}\n'.format(str(iou_threshold_list))
|
||
|
|
for c_idx, class_name in enumerate(class_names):
|
||
|
|
result += '{:<20}: \t {:.4f}\n'.format(class_name, classwise_ap[c_idx])
|
||
|
|
ret_dict[class_name] = classwise_ap[c_idx]
|
||
|
|
|
||
|
|
result += '--------------average performance-------------\n'
|
||
|
|
mAP = np.mean(classwise_ap)
|
||
|
|
result += 'mAP:\t {:.4f}\n'.format(mAP)
|
||
|
|
|
||
|
|
ret_dict['mAP'] = mAP
|
||
|
|
return result, ret_dict
|