# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import os
import rospkg
import threading
import yaml
from copy import deepcopy
import message_filters
import numpy as np
import pyrobot.util as prutil
import rospy
from cv_bridge import CvBridge, CvBridgeError
from pyrobot.core import Camera
from sensor_msgs.msg import CameraInfo
from sensor_msgs.msg import Image
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64
from tf import TransformListener
def constrain_within_range(value, MIN, MAX):
return min(max(value, MIN), MAX)
def is_within_range(value, MIN, MAX):
return (value <= MAX) and (value >= MIN)
[docs]class SimpleCamera(Camera):
"""
This is camera class that interfaces with the Realsense
camera on the locobot and locobot-lite.
This class does not have the pan and tilt actuation
capabilities for the camera.
"""
[docs] def __init__(self, configs):
"""
Constructor of the SimpleCamera class.
:param configs: Camera specific configuration object
:type configs: YACS CfgNode
"""
super(SimpleCamera, self).__init__(configs=configs)
self.cv_bridge = CvBridge()
self.camera_info_lock = threading.RLock()
self.camera_img_lock = threading.RLock()
self._tf_listener = TransformListener()
self.rgb_img = None
self.depth_img = None
self.camera_info = None
self.camera_P = None
rospy.Subscriber(self.configs.CAMERA.ROSTOPIC_CAMERA_INFO_STREAM,
CameraInfo,
self._camera_info_callback)
rgb_topic = self.configs.CAMERA.ROSTOPIC_CAMERA_RGB_STREAM
self.rgb_sub = message_filters.Subscriber(rgb_topic, Image)
depth_topic = self.configs.CAMERA.ROSTOPIC_CAMERA_DEPTH_STREAM
self.depth_sub = message_filters.Subscriber(depth_topic, Image)
img_subs = [self.rgb_sub, self.depth_sub]
self.sync = message_filters.ApproximateTimeSynchronizer(img_subs,
queue_size=10,
slop=0.2)
self.sync.registerCallback(self._sync_callback)
depth_threshold = (self.configs.BASE.VSLAM.DEPTH_MIN,
self.configs.BASE.VSLAM.DEPTH_MAX)
cfg_filename = self.configs.BASE.VSLAM.CFG_FILENAME
self.depth_cam = DepthImgProcessor(subsample_pixs=1,
depth_threshold=depth_threshold,
cfg_filename=cfg_filename)
self.cam_cf = self.configs.BASE.VSLAM.RGB_CAMERA_CENTER_FRAME
self.base_f = self.configs.BASE.VSLAM.VSLAM_BASE_FRAME
def _sync_callback(self, rgb, depth):
self.camera_img_lock.acquire()
try:
self.rgb_img = self.cv_bridge.imgmsg_to_cv2(rgb, "bgr8")
self.rgb_img = self.rgb_img[:, :, ::-1]
self.depth_img = self.cv_bridge.imgmsg_to_cv2(depth, "passthrough")
except CvBridgeError as e:
rospy.logerr(e)
self.camera_img_lock.release()
def _camera_info_callback(self, msg):
self.camera_info_lock.acquire()
self.camera_info = msg
self.camera_P = np.array(msg.P).reshape((3, 4))
self.camera_info_lock.release()
[docs] def get_rgb(self):
'''
This function returns the RGB image perceived by the camera.
:rtype: np.ndarray or None
'''
self.camera_img_lock.acquire()
rgb = deepcopy(self.rgb_img)
self.camera_img_lock.release()
return rgb
[docs] def get_depth(self):
'''
This function returns the depth image perceived by the camera.
:rtype: np.ndarray or None
'''
self.camera_img_lock.acquire()
depth = deepcopy(self.depth_img)
self.camera_img_lock.release()
return depth
[docs] def get_rgb_depth(self):
'''
This function returns both the RGB and depth
images perceived by the camera.
:rtype: np.ndarray or None
'''
self.camera_img_lock.acquire()
rgb = deepcopy(self.rgb_img)
depth = deepcopy(self.depth_img)
self.camera_img_lock.release()
return rgb, depth
[docs] def get_intrinsics(self):
"""
This function returns the camera intrinsics.
:rtype: np.ndarray
"""
if self.camera_P is None:
return self.camera_P
self.camera_info_lock.acquire()
P = deepcopy(self.camera_P)
self.camera_info_lock.release()
return P[:3, :3]
[docs] def get_current_pcd(self, in_cam=True):
"""
Return the point cloud at current time step (one frame only)
:param in_cam: return points in camera frame,
otherwise, return points in base frame
:type in_cam: bool
:returns: tuple (pts, colors)
pts: point coordinates in world frame (shape: :math:`[N, 3]`)
colors: rgb values for pts_in_cam (shape: :math:`[N, 3]`)
:rtype: tuple(np.ndarray, np.ndarray)
"""
trans, rot, T = self.get_link_transform(self.cam_cf,
self.base_f)
base2cam_trans = np.array(trans).reshape(-1, 1)
base2cam_rot = np.array(rot)
rgb_im, depth_im = self.get_rgb_depth()
pcd_in_cam, colors = self.depth_cam.get_pcd_ic(depth_im=depth_im,
rgb_im=rgb_im)
pts = pcd_in_cam[:3, :].T
if in_cam:
return pts, colors
pts = np.dot(pts, base2cam_rot.T)
pts = pts + base2cam_trans.T
return pts, colors
[docs] def pix_to_3dpt(self, rs, cs, in_cam=False):
"""
Get the 3D points of the pixels in RGB images.
:param rs: rows of interest in the RGB image.
It can be a list or 1D numpy array
which contains the row indices.
The default value is None,
which means all rows.
:param cs: columns of interest in the RGB image.
It can be a list or 1D numpy array
which contains the column indices.
The default value is None,
which means all columns.
:param in_cam: return points in camera frame,
otherwise, return points in base frame
:type rs: list or np.ndarray
:type cs: list or np.ndarray
:type in_cam: bool
:returns: tuple (pts, colors)
pts: point coordinates in world frame
(shape: :math:`[N, 3]`)
colors: rgb values for pts_in_cam
(shape: :math:`[N, 3]`)
:rtype: tuple(np.ndarray, np.ndarray)
"""
trans, rot, T = self.get_link_transform(self.cam_cf,
self.base_f)
base2cam_trans = np.array(trans).reshape(-1, 1)
base2cam_rot = np.array(rot)
rgb_im, depth_im = self.get_rgb_depth()
pcd_in_cam = self.depth_cam.get_pix_3dpt(depth_im=depth_im,
rs=rs,
cs=cs)
pts = pcd_in_cam[:3, :].T
colors = rgb_im[rs, cs].reshape(-1, 3)
if in_cam:
return pts, colors
pts = np.dot(pts, base2cam_rot.T)
pts = pts + base2cam_trans.T
return pts, colors
[docs]class LoCoBotCamera(SimpleCamera):
"""
This is camera class that interfaces with the Realsense
camera and the pan and tilt joints on the robot.
"""
[docs] def __init__(self, configs):
"""
Constructor of the LoCoBotCamera class.
:param configs: Object containing configurations for camera,
pan joint and tilt joint.
:type configs: YACS CfgNode
"""
use_camera = rospy.get_param('use_camera', False)
use_sim = rospy.get_param('use_sim', False)
use_camera = use_camera or use_sim
if not use_camera:
rospy.logwarn('Neither use_camera, nor use_sim, is not set'
' to True when the LoCoBot driver is launched.'
'You may not be able to command the camera'
' correctly using PyRobot!!!')
return
super(LoCoBotCamera, self).__init__(configs=configs)
rospy.Subscriber(self.configs.ARM.ROSTOPIC_JOINT_STATES,
JointState,
self._camera_pose_callback)
self.set_pan_pub = rospy.Publisher(
self.configs.CAMERA.ROSTOPIC_SET_PAN, Float64, queue_size=1)
self.set_tilt_pub = rospy.Publisher(
self.configs.CAMERA.ROSTOPIC_SET_TILT, Float64, queue_size=1)
self.pan = None
self.tilt = None
self.tol = 0.01
def _camera_pose_callback(self, msg):
if 'head_pan_joint' in msg.name:
pan_id = msg.name.index('head_pan_joint')
self.pan = msg.position[pan_id]
if 'head_tilt_joint' in msg.name:
tilt_id = msg.name.index('head_tilt_joint')
self.tilt = msg.position[tilt_id]
@property
def state(self):
"""
Return the current pan and tilt joint angles of the robot camera.
:return:
pan_tilt: A list the form [pan angle, tilt angle]
:rtype: list
"""
return self.get_state()
[docs] def get_state(self):
"""
Return the current pan and tilt joint angles of the robot camera.
:return:
pan_tilt: A list the form [pan angle, tilt angle]
:rtype: list
"""
return [self.pan, self.tilt]
[docs] def get_pan(self):
"""
Return the current pan joint angle of the robot camera.
:return:
pan: Pan joint angle
:rtype: float
"""
return self.pan
[docs] def get_tilt(self):
"""
Return the current tilt joint angle of the robot camera.
:return:
tilt: Tilt joint angle
:rtype: float
"""
return self.tilt
[docs] def set_pan(self, pan, wait=True):
"""
Sets the pan joint angle to the specified value.
:param pan: value to be set for pan joint
:param wait: wait until the pan angle is set to
the target angle.
:type pan: float
:type wait: bool
"""
pan = constrain_within_range(np.mod(pan + np.pi,
2 * np.pi) - np.pi,
self.configs.CAMERA.MIN_PAN,
self.configs.CAMERA.MAX_PAN)
self.set_pan_pub.publish(pan)
if wait:
for i in range(30):
rospy.sleep(0.1)
if np.fabs(self.get_pan() - pan) < self.tol:
break
[docs] def set_tilt(self, tilt, wait=True):
"""
Sets the tilt joint angle to the specified value.
:param tilt: value to be set for the tilt joint
:param wait: wait until the tilt angle is set to
the target angle.
:type tilt: float
:type wait: bool
"""
tilt = constrain_within_range(np.mod(tilt + np.pi,
2 * np.pi) - np.pi,
self.configs.CAMERA.MIN_TILT,
self.configs.CAMERA.MAX_TILT)
self.set_tilt_pub.publish(tilt)
if wait:
for i in range(30):
rospy.sleep(0.1)
if np.fabs(self.get_tilt() - tilt) < self.tol:
break
[docs] def set_pan_tilt(self, pan, tilt, wait=True):
"""
Sets both the pan and tilt joint angles to the specified values.
:param pan: value to be set for pan joint
:param tilt: value to be set for the tilt joint
:param wait: wait until the pan and tilt angles are set to
the target angles.
:type pan: float
:type tilt: float
:type wait: bool
"""
pan = constrain_within_range(np.mod(pan + np.pi,
2 * np.pi) - np.pi,
self.configs.CAMERA.MIN_PAN,
self.configs.CAMERA.MAX_PAN)
tilt = constrain_within_range(np.mod(tilt + np.pi,
2 * np.pi) - np.pi,
self.configs.CAMERA.MIN_TILT,
self.configs.CAMERA.MAX_TILT)
self.set_pan_pub.publish(pan)
self.set_tilt_pub.publish(tilt)
if wait:
for i in range(30):
rospy.sleep(0.1)
if np.fabs(self.get_pan() - pan) < self.tol and \
np.fabs(self.get_tilt() - tilt) < self.tol:
break
[docs] def reset(self):
"""
This function resets the pan and tilt joints by actuating
them to their home configuration.
"""
self.set_pan_tilt(self.configs.CAMERA.RESET_PAN,
self.configs.CAMERA.RESET_TILT)
class DepthImgProcessor:
"""
This class transforms the depth image and rgb image to point cloud
"""
def __init__(self, subsample_pixs=1, depth_threshold=(0, 1.5),
cfg_filename='realsense_d435.yaml'):
"""
The constructor for :class:`DepthImgProcessor` class.
:param subsample_pixs: sample rows and columns for the images
:param depth_threshold: minimum and maximum of valid depth values
:param cfg_filename: configuration file name for ORB-SLAM2
:type subsample_pixs: int
:type depth_threshold: tuple
:type cfg_filename: string
"""
assert (type(depth_threshold) is tuple and
0 < len(depth_threshold) < 3) or \
(depth_threshold is None)
self.subsample_pixs = subsample_pixs
self.depth_threshold = depth_threshold
self.cfg_data = self.read_cfg(cfg_filename)
self.intrinsic_mat = self.get_intrinsic()
self.intrinsic_mat_inv = np.linalg.inv(self.intrinsic_mat)
img_pixs = np.mgrid[0: self.cfg_data['Camera.height']: subsample_pixs,
0: self.cfg_data['Camera.width']: subsample_pixs]
img_pixs = img_pixs.reshape(2, -1)
img_pixs[[0, 1], :] = img_pixs[[1, 0], :]
self.uv_one = np.concatenate((img_pixs,
np.ones((1, img_pixs.shape[1]))))
self.uv_one_in_cam = np.dot(self.intrinsic_mat_inv, self.uv_one)
def get_pix_3dpt(self, depth_im, rs, cs):
"""
:param depth_im: depth image (shape: :math:`[H, W]`)
:param rs: rows of interest. It can be a list or 1D numpy array
which contains the row indices. The default value is None,
which means all rows.
:param cs: columns of interest. It can be a list or 1D numpy array
which contains the column indices.
The default value is None,
which means all columns.
:type depth_im: np.ndarray
:type rs: list or np.ndarray
:type cs: list or np.ndarray
:return: 3D point coordinates of the pixels in
camera frame (shape: :math:`[4, N]`)
:rtype np.ndarray
"""
assert isinstance(rs,
int) or isinstance(rs,
list) or isinstance(rs,
np.ndarray)
assert isinstance(cs,
int) or isinstance(cs,
list) or isinstance(cs,
np.ndarray)
if isinstance(rs, int):
rs = [rs]
if isinstance(cs, int):
cs = [cs]
if isinstance(rs, np.ndarray):
rs = rs.flatten()
if isinstance(cs, np.ndarray):
cs = cs.flatten()
depth_im = depth_im[rs, cs]
depth = depth_im.reshape(-1) / float(self.cfg_data['DepthMapFactor'])
img_pixs = np.stack((rs, cs)).reshape(2, -1)
img_pixs[[0, 1], :] = img_pixs[[1, 0], :]
uv_one = np.concatenate((img_pixs,
np.ones((1, img_pixs.shape[1]))))
uv_one_in_cam = np.dot(self.intrinsic_mat_inv, uv_one)
pts_in_cam = np.multiply(uv_one_in_cam, depth)
pts_in_cam = np.concatenate((pts_in_cam,
np.ones((1, pts_in_cam.shape[1]))),
axis=0)
return pts_in_cam
def get_pcd_ic(self, depth_im, rgb_im=None):
"""
Returns the point cloud (filtered by minimum
and maximum depth threshold)
in camera's coordinate frame
:param depth_im: depth image (shape: :math:`[H, W]`)
:param rgb_im: rgb image (shape: :math:`[H, W, 3]`)
:type depth_im: np.ndarray
:type rgb_im: np.ndarray
:returns: tuple (pts_in_cam, rgb_im)
pts_in_cam: point coordinates in
camera frame (shape: :math:`[4, N]`)
rgb: rgb values for pts_in_cam (shape: :math:`[N, 3]`)
:rtype tuple(np.ndarray, np.ndarray)
"""
# pcd in camera from depth
depth_im = depth_im[0::self.subsample_pixs, 0::self.subsample_pixs]
rgb_im = rgb_im[0::self.subsample_pixs, 0::self.subsample_pixs]
depth = depth_im.reshape(-1) / float(self.cfg_data['DepthMapFactor'])
rgb = None
if rgb_im is not None:
rgb = rgb_im.reshape(-1, 3)
if self.depth_threshold is not None:
valid = depth > self.depth_threshold[0]
if len(self.depth_threshold) > 1:
valid = np.logical_and(valid,
depth < self.depth_threshold[1])
uv_one_in_cam = self.uv_one_in_cam[:, valid]
depth = depth[valid]
rgb = rgb[valid]
else:
uv_one_in_cam = self.uv_one_in_cam
pts_in_cam = np.multiply(uv_one_in_cam, depth)
pts_in_cam = np.concatenate((pts_in_cam,
np.ones((1, pts_in_cam.shape[1]))),
axis=0)
return pts_in_cam, rgb
def get_pcd_iw(self, pts_in_cam, extrinsic_mat):
"""
Returns the point cloud in the world coordinate frame
:param pts_in_cam: point coordinates in
camera frame (shape: :math:`[4, N]`)
:param extrinsic_mat: extrinsic matrix for
the camera (shape: :math:`[4, 4]`)
:type pts_in_cam: np.ndarray
:type extrinsic_mat: np.ndarray
:return: point coordinates in
ORB-SLAM2's world frame (shape: :math:`[N, 3]`)
:rtype: np.ndarray
"""
# pcd in world
pts_in_world = np.dot(extrinsic_mat, pts_in_cam)
pts_in_world = pts_in_world[:3, :].T
return pts_in_world
def read_cfg(self, cfg_filename):
"""
Reads the configuration file
:param cfg_filename: configuration file name for ORB-SLAM2
:type cfg_filename: string
:return: configurations in the configuration file
:rtype: dict
"""
rospack = rospkg.RosPack()
slam_pkg_path = rospack.get_path('orb_slam2_ros')
cfg_path = os.path.join(slam_pkg_path,
'cfg',
cfg_filename)
with open(cfg_path, 'r') as f:
for i in range(1):
f.readline()
cfg_data = yaml.load(f)
return cfg_data
def get_intrinsic(self):
"""
Returns the instrinsic matrix of the camera
:return: the intrinsic matrix (shape: :math:`[3, 3]`)
:rtype: np.ndarray
"""
fx = self.cfg_data['Camera.fx']
fy = self.cfg_data['Camera.fy']
cx = self.cfg_data['Camera.cx']
cy = self.cfg_data['Camera.cy']
Itc = np.array([[fx, 0, cx],
[0, fy, cy],
[0, 0, 1]])
return Itc