# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import bezier
import numpy as np
import rospy
import tf
import tf.transformations
from geometry_msgs.msg import Twist, PoseStamped
from matplotlib import gridspec
from matplotlib import pyplot as plt
from nav_msgs.srv import GetPlan
from bicycle_model import wrap_theta, Foo, BicycleSystem
ANGLE_THRESH = 0.05
def build_pose_msg(x, y, theta, frame):
pose_stamped = PoseStamped()
pose_stamped.header.frame_id = frame
pose_stamped.pose.position.x = x
pose_stamped.pose.position.y = y
pose_stamped.pose.position.z = 0
quat = tf.transformations.quaternion_from_euler(0, 0, theta)
pose_stamped.pose.orientation.x = quat[0]
pose_stamped.pose.orientation.y = quat[1]
pose_stamped.pose.orientation.z = quat[2]
pose_stamped.pose.orientation.w = quat[3]
return pose_stamped
def _get_absolute_pose(rel_pose, start_pos):
goal_pos = np.zeros(3)
x, y, theta = start_pos
dx, dy, dtheta = rel_pose
goal_pos[0] = x + dx * np.cos(theta) - dy * np.sin(theta)
goal_pos[1] = y + dx * np.sin(theta) + dy * np.cos(theta)
goal_pos[2] = theta + dtheta
return goal_pos
def get_ramp(ramp, T):
s = np.linspace(-ramp, ramp, T)
s = np.tanh(s)
s = s - np.min(s)
s = s / np.max(s)
return s
def linear_interpolate_ramp(start_pos, goal_pos, T, ramp):
"""Linearly interpolates from start to goal, in T steps using a ramp."""
s = get_ramp(ramp, T)[:, np.newaxis]
states = start_pos[np.newaxis, :] * (1 - s) + goal_pos[np.newaxis, :] * s
return states
def pure_rotation_init(start_pos, goal_pos, dt, max_v, max_w, ramp):
"""Function to generate state sequences for pure rotation."""
# Pure Rotation
dtheta = goal_pos[2] - start_pos[2]
dtheta = wrap_theta(dtheta)
T = int(np.ceil(np.abs(dtheta) / max_w / dt))
goal1_pos = goal_pos.copy()
goal1_pos[2] = start_pos[2] + dtheta
states1 = linear_interpolate_ramp(start_pos, goal1_pos, T, ramp)
return states1
def sharp_init(start_pos, goal_pos, dt, max_v, max_w, reverse=False):
f = 0.5
ramp = 1
delta_theta = np.arctan2(
goal_pos[1] - start_pos[1],
goal_pos[0] - start_pos[0])
to_rotate = wrap_theta(delta_theta - start_pos[2])
if reverse and np.abs(to_rotate) > 3 * np.pi / 4:
to_rotate = wrap_theta(np.pi + to_rotate)
goal1_pos = start_pos.copy()
if np.abs(to_rotate) > ANGLE_THRESH:
num_steps_1 = np.ceil(
np.abs(to_rotate) /
max_w /
f /
dt).astype(
np.int)
num_steps_1 = np.maximum(num_steps_1, 1)
goal1_pos[2] = goal1_pos[2] + to_rotate
states1 = linear_interpolate_ramp(
start_pos, goal1_pos, num_steps_1, ramp)
else:
states1 = np.zeros([0, 3], dtype=np.float32)
dist = np.linalg.norm(goal_pos[:2] - start_pos[:2])
goal2_pos = goal1_pos.copy()
goal2_pos[:2] = goal_pos[:2]
num_steps_2 = np.maximum(np.ceil(dist / max_v / f / dt).astype(np.int), 1)
states2 = linear_interpolate_ramp(goal1_pos, goal2_pos, num_steps_2, ramp)
to_rotate = wrap_theta(goal_pos[2] - goal2_pos[2])
goal3_pos = goal2_pos.copy()
if np.abs(to_rotate) > ANGLE_THRESH:
num_steps_3 = np.ceil(
np.abs(to_rotate) /
max_w /
f /
dt).astype(
np.int)
num_steps_3 = np.maximum(num_steps_3, 1)
goal3_pos[2] = goal3_pos[2] + to_rotate
states3 = linear_interpolate_ramp(
goal2_pos, goal3_pos, num_steps_3, ramp)
else:
states3 = np.zeros([0, 3], dtype=np.float32)
init_states = np.concatenate([states1, states2, states3], 0)
return init_states
def position_control_init_fn(init_type, start_pos, goal_pos, dt, max_v, max_w,
reverse=False):
# sharp init, rotates in direction of goal, and then moves to the goal
# location and then rotates into goal orientation. It takes velcoity limits
# into account when doing so.
dist_thresh = 0.01
dist = np.linalg.norm(goal_pos[:2] - start_pos[:2])
angle = wrap_theta(goal_pos[2] - start_pos[2])
already_at_goal = dist < dist_thresh and np.abs(angle) < ANGLE_THRESH
pure_rotation = dist < dist_thresh
f = 0.5
ramp = 1
if already_at_goal:
init_states = np.array([start_pos])
elif pure_rotation:
init_states = pure_rotation_init(
start_pos, goal_pos, dt, max_v * f, max_w * f, ramp)
elif not pure_rotation:
if init_type == 'sharp':
init_states = sharp_init(
start_pos, goal_pos, dt, max_v, max_w, reverse)
elif init_type == 'smooth':
init_states = smooth_init(
start_pos, goal_pos, dt, max_v, max_w, reverse)
return init_states
def smooth_init(start_pos, goal_pos, dt, max_v, max_w, reverse=False):
# smart init, rotates in direction of goal, and then moves to the goal
dist = np.linalg.norm(goal_pos[:2] - start_pos[:2])
# if 5cm from goal, treat it as a pure rotation target.
pure_rotation = dist < 0.10
ramp = 1
f = 0.5
backward = False
if pure_rotation:
init_states = pure_rotation_init(start_pos, goal_pos,
dt, max_v * f, max_w * f, ramp=ramp)
else:
s, pts, bezier_curve = bezier_trajectory(start_pos, goal_pos,
dt, max_v=max_v * f,
end_derivative_scale=0.8,
ramp=ramp)
if reverse:
s_, pts_, bezier_curve_ = \
bezier_trajectory(start_pos, goal_pos,
dt, max_v=max_v * f,
end_derivative_scale=-0.8,
ramp=ramp)
if bezier_curve_.length < bezier_curve.length:
pts = pts_
backward = True
init_states, _ = compute_controls_from_xy(pts,
start_pos[2],
dt,
backward)
return init_states
def bezier_trajectory(
start,
end,
dt,
max_v=0.4,
end_derivative_scale=0.5,
ramp=3):
"""
Computes a bezier fit to the start and end conditions, and returns a
sequence of points that smoothly coneys the robot to the desired target
configuration.
"""
x0, y0, th0 = start
x1, y1, th1 = end
r = end_derivative_scale
qs = np.array([[x0, y0],
[x0 + r * np.cos(th0), y0 + r * np.sin(th0)],
[x1 - r * np.cos(th1), y1 - r * np.sin(th1)],
[x1, y1]], dtype=np.float64)
bezier_curve = bezier.Curve(qs.T, degree=3)
T = int(np.ceil(bezier_curve.length / dt / max_v))
s = get_ramp(ramp, T)
pts = bezier_curve.evaluate_multi(s).T
return s, pts, bezier_curve
def compute_controls_from_xy(xy, theta0, dt, flip_theta=False):
"""
Given the xy trajectory, this computes the orientation, and v and w
commands to track this trajectory. These can then be used to close the loop
on this trajectory using an LQR controller.
"""
x, y = xy.T
theta = np.arctan2(y[1:] - y[:-1], x[1:] - x[:-1])
if flip_theta:
theta = theta + np.pi
theta = np.concatenate([[theta0], theta], axis=0)
# Unwrap theta as necessary.
old_theta = theta[0]
for i in range(theta.shape[0] - 1):
theta[i + 1] = wrap_theta(theta[i + 1] - old_theta) + old_theta
old_theta = theta[i + 1]
xyt = np.array([x, y, theta]).T
v = np.linalg.norm(xy[1:, :] - xy[:-1, :], axis=1)
w = theta[1:] - theta[:-1]
v = np.append(v, 0)
w = np.append(w, 0)
us = np.array([v, w]).T
us = us / dt
return xyt, us
[docs]class MoveBasePlanner:
"""
This class enscapsulates the planning capabilities
of the move_base and provides planning services
and plan tracking services.
"""
def __init__(self, configs):
self.configs = configs
self.ROT_VEL = self.configs.BASE.MAX_ABS_TURN_SPEED
self.LIN_VEL = self.configs.BASE.MAX_ABS_FWD_SPEED
self.MAP_FRAME = self.configs.BASE.MAP_FRAME
self.BASE_FRAME = self.configs.BASE.VSLAM.VSLAM_BASE_FRAME
self.point_idx = self.configs.BASE.TRACKED_POINT
rospy.wait_for_service(self.configs.BASE.PLAN_TOPIC, timeout=3)
try:
self.plan_srv = rospy.ServiceProxy(
self.configs.BASE.PLAN_TOPIC, GetPlan)
except rospy.ServiceException:
rospy.loginfo(
"Timed out waiting for the planning service. \
Make sure build_map in script and \
use_map in roslauch are set to the same value")
self.start_state = build_pose_msg(0, 0, 0, self.BASE_FRAME)
self.tolerance = self.configs.BASE.PLAN_TOL
self._transform_listener = tf.TransformListener()
def _compute_relative_ang_dist(self, point2):
# point 1 is the base point 2 is the point on the path
# convert point 2 to base frame
pose_stamped = build_pose_msg(point2[0], point2[1], 0, self.MAP_FRAME)
base_pose = self._transform_listener.transformPose(
self.BASE_FRAME, pose_stamped)
y = base_pose.pose.position.y
x = base_pose.pose.position.x
angle = np.arctan2(y, x)
distance = np.sqrt(y ** 2 + x ** 2)
return (angle, distance)
[docs] def get_plan_absolute(self, x, y, theta):
"""
Gets plan as a list of poses in the world
frame for the given (x, y, theta
"""
goal_state = build_pose_msg(x, y, theta, self.MAP_FRAME)
start_state = self._transform_listener.transformPose(
self.MAP_FRAME, self.start_state)
response = self.plan_srv(
start=start_state,
goal=goal_state,
tolerance=self.tolerance)
status = True
if (len(response.plan.poses) == 0):
status = False
return response.plan.poses, status
[docs] def parse_plan(self, plan):
"""Parses the plan generated by move_base service"""
res_plan = []
for p in plan:
p = p.pose
point = 3 * [0]
point[0] = p.position.x
point[1] = p.position.y
(roll, pitch, yaw) = tf.transformations.euler_from_quaternion(
[p.orientation.x, p.orientation.y,
p.orientation.z, p.orientation.w])
point[2] = yaw
res_plan.append(point)
return res_plan
[docs] def move_to_goal(self, goal, go_to_relative):
"""
Moves the robot to the robot to given goal state in the absolute frame
(world frame).
:param goal: The goal state of the form (x,y,t) in
the world (map) frame.
:param go_to_relative: This is a method that moves the robot the
appropiate relative goal while NOT taking map
into account.
:type goal: list
:type go_to_rative: function of the form foo([x,y,theta])
"""
plan, plan_status = self.get_plan_absolute(goal[0], goal[1], goal[2])
if not plan_status:
rospy.loginfo("Failed to find a valid plan!")
return
if len(plan) < self.point_idx:
point = goal
else:
point = [plan[self.point_idx - 1].pose.position.x,
plan[self.point_idx - 1].pose.position.y, 0]
g_angle, g_distance = self._compute_relative_ang_dist(goal)
while g_distance > self.configs.BASE.TRESHOLD_LIN:
plan, plan_status = self.get_plan_absolute(
goal[0], goal[1], goal[2])
if not plan_status:
rospy.loginfo("Failed to find a valid plan!")
return
if len(plan) < self.point_idx:
point = goal
else:
point = [plan[self.point_idx - 1].pose.position.x,
plan[self.point_idx - 1].pose.position.y, 0]
angle, distance = self._compute_relative_ang_dist(point)
g_angle, g_distance = self._compute_relative_ang_dist(goal)
go_to_relative([0, 0, angle])
go_to_relative([distance, 0, 0])
g_angle, g_distance = self._compute_relative_ang_dist(goal)
rospy.loginfo("Adujusting Orientation at goal..{}.".format(g_angle))
pose_stamped = build_pose_msg(
goal[0], goal[1], goal[2], self.MAP_FRAME)
base_pose = self._transform_listener.transformPose(
self.BASE_FRAME, pose_stamped)
pose_quat = [
base_pose.pose.orientation.x,
base_pose.pose.orientation.y,
base_pose.pose.orientation.z,
base_pose.pose.orientation.w]
euler = tf.transformations.euler_from_quaternion(pose_quat)
go_to_relative([0, 0, euler[2]])
def get_state_trajectory_from_controls(start_pos, dt, controls):
system = BicycleSystem(dt)
states = system.unroll(start_pos, controls)
return states
def get_control_trajectory(trajectory_type, T, v, w):
if trajectory_type == 'circle':
init_controls = np.zeros((T, 2), dtype=np.float32)
init_controls[:, 0] = v
init_controls[:, 1] = w
elif trajectory_type == 'negcircle':
init_controls = np.zeros((T, 2), dtype=np.float32)
init_controls[:, 0] = v
init_controls[:, 1] = -w
elif trajectory_type == 'straight':
init_controls = np.zeros((T, 2), dtype=np.float32)
init_controls[:, 0] = v
init_controls[:, 1] = 0.0
elif trajectory_type == 'rotate':
init_controls = np.zeros((T, 2), dtype=np.float32)
init_controls[:, 0] = 0.0
init_controls[:, 1] = w
elif trajectory_type == 'negrotate':
init_controls = np.zeros((T, 2), dtype=np.float32)
init_controls[:, 0] = 0.0
init_controls[:, 1] = -w
else:
raise ValueError('Unknown type: %s' % trajectory_type)
return init_controls
def get_trajectory_circle(start_pos, dt, r, v, angle):
w = v / r
T = int(np.round(angle / w / dt))
controls = get_control_trajectory('circle', T, v, w)
states = get_state_trajectory_from_controls(start_pos, dt, controls)
return states, controls
def get_trajectory_negcircle(start_pos, dt, r, v, angle):
w = v / r
T = int(np.round(angle / w / dt))
controls = get_control_trajectory('circle', T, v, -w)
states = get_state_trajectory_from_controls(start_pos, dt, controls)
return states, controls
[docs]class TrajectoryTracker(object):
"""
Class to track a given trajectory. Uses LQR to generate a controller
around the system that has been linearized around the trajectory.
"""
[docs] def __init__(self, system):
"""
Provide system that should track the trajectory.
"""
self.system = system
[docs] def generate_plan(self, xs, us=None):
"""
Generates a feedback controller that tracks the state trajectory
specified by xs. Optionally specify a control trajectory us, otherwise
it is automatically inferred.
:param xs: List of states that define the trajectory.
:param us: List of controls that define the trajectory. If None,
controls are inferred from the state trajectory.
:returns: LQR controller that can track the specified trajectory.
:rtype: LQRSolver
"""
iters = 2
if us is None:
# Initialize with simple non-saturated controls.
us = np.zeros((len(xs), 2), dtype=np.float32) + 0.1
for i in range(iters):
T = len(us)
As = []
Bs = []
Cs = []
Qs = []
Rs = []
x_costs = []
u_costs = []
total_cost = 0.
controls = us
states = xs
for j in range(T):
A, B, C, _ = self.system.dynamics_fn(states[j], controls[j])
As.append(A)
Bs.append(B)
Cs.append(C)
Q, q, q_, x_cost = self.system.get_system_cost(
xs[j], states[j])
Qs.append((Q, q, q_))
R, r, r_, u_cost_ = self.system.get_control_cost(controls[j])
u_cost = np.dot(
np.dot(controls[j][:, np.newaxis].T, R),
controls[j][:, np.newaxis])
Rs.append(R)
total_cost += x_cost + u_cost
x_costs.append(x_cost)
u_costs.append(u_cost)
plan = LQRSolver(As, Bs, Cs, Qs, Rs, states, controls)
plan.solve()
if i < iters - 1:
# We will do more iterations
us = self._compute_controls(states[0], plan)
return plan
def _compute_controls(self, start_state, plan):
state = start_state
us = []
for j in range(plan.T):
u = plan.get_control(state, j)
_, _, _, state = self.system.dynamics_fn(state, u)
us.append(u)
us = np.array(us)
return us
[docs] def stop(self):
"""
Stops the base.
"""
msg = Twist()
msg.linear.x = 0
msg.angular.z = 0
self.ctrl_pub.publish(msg)
[docs] def execute_plan(self, plan, close_loop=True):
"""
Executes the plan, check for conditions like bumps, etc.
Plan is object returned from generate_plan.
Also stores the plan execution into
variable self._trajectory_tracker_execution.
:param plan: Object returned from generate_plan function.
:param close_loop: Whether to use feedback controller, or apply
open-loop commands.
:type plan: LQRSolver
:type close_loop: bool
:returns: Weather plan execution was successful or not.
:rtype: bool
"""
twist_min = np.array([self.min_v, self.min_w], dtype=np.float32)
twist_max = np.array([self.max_v, self.max_w], dtype=np.float32)
twist = Twist()
success = True
xs = []
xrefs = []
us = []
urefs = []
for i in range(plan.T):
if self.should_stop:
self.stop()
self.should_stop = False
success = False
break
# Apply control for this time stpe.
# Read the current state of the system and apply control.
x = self.state.state_f.copy()
u = plan.get_control(x, i)
u_ref = plan.u_refs[i]
if close_loop:
u_apply = u.copy()
else:
u_apply = u_ref.copy()
u_apply = np.clip(u_apply, a_min=twist_min, a_max=twist_max)
twist.linear.x = u_apply[0]
twist.angular.z = u_apply[1]
self.ctrl_pub.publish(twist)
xs.append(x)
us.append(u_apply)
xrefs.append(plan.x_refs[i])
urefs.append(plan.u_refs[i])
# wait for 0.1 seconds (10 HZ) and publish again
self.rate.sleep()
xs = np.array(xs).reshape((-1, 3))
xrefs = np.array(xrefs).reshape((-1, 3))
urefs = np.array(urefs).reshape((-1, 2))
us = np.array(us).reshape((-1, 2))
xus = np.concatenate((xs, us), 1)
xurefs = np.concatenate((xrefs, urefs), 1)
self._trajectory_tracker_execution = Foo(xus=xus, xurefs=xurefs)
return success
[docs] def plot_plan_execution(self, file_name=None):
"""
Plots the execution of the plan.
:param file_name: Name of file to plot plan execution.
:type file_name: string
"""
gs = gridspec.GridSpec(5, 10)
gs.update(left=0.05, right=0.95, hspace=0.05, wspace=0.10)
axes = [plt.subplot(gs[i, :5]) for i in range(5)]
axes.append(plt.subplot(gs[:, 5:]))
names = ['x', 'y', 'theta', 'v', 'w']
xus = self._trajectory_tracker_execution.xus
xurefs = self._trajectory_tracker_execution.xurefs
for i in range(5):
ax = axes[i]
ax.plot(xus[:, i], 'r--', label='odom', lw=3)
ax.plot(xurefs[:, i], 'b--', label='ref', lw=2)
ax.set_title(names[i], x=0.1, y=0.8)
ax.grid(True)
ax = axes[5]
ax.plot(xus[:, 0], xus[:, 1], 'rs', label='odom', alpha=0.5)
ax.plot(xurefs[:, 0], xurefs[:, 1], 'b*', label='ref', alpha=0.5)
ax.axis('equal')
ax.grid(True)
ax.legend()
if file_name is not None:
plt.savefig(file_name, bbox_inches='tight')
plt.close()
else:
plt.show()
[docs]class LQRSolver(object):
"""
This class implements a solver for a time-varying Linear Quadratic
Regulator System. A time-varying LQR system is defined via affine time
varying transition functions, and time-varying quadratic cost
functions.Such a system can be solved using dynamic programming to obtain
time-varying feedback control matrices that can be used to compute the
control command given the state of the system at any given time step.
"""
[docs] def __init__(
self,
As=None,
Bs=None,
Cs=None,
Qs=None,
Rs=None,
x_refs=None,
u_refs=None):
"""
Constructor for LQR solver.
System definition: :math:`x_{t+1} = A_tx_t + B_tu_t + C_t`
State cost: :math:`x^t_tQ_tx_t + x^t_tq + q\_`
Control cost: :math:`u^t_tR_tu_t`
:param As: List of time-varying matrices A_t for dynamics
:param Bs: List of time-varying matrices B_t for dynamics
:param Cs: List of time-varying matrices C_t for dynamics
:param Qs: List of time-varying matrices Q_t for state cost
:param Rs: List of time-varying matrices R_t for control cost
:type As: List of state_dim x state_dim numpy matrices
:type Bs: List of state_dim x control_dim numpy matrices
:type Cs: List of state_dim x 1 numpy matrices
:type Qs: List of 3 tuples with numpy array of size state_dim x
state_dim, state_dim x 1, 1 x 1
:type Rs: List of control_dim x control_dim numpy matrices
"""
if As is None:
As = []
if Bs is None:
Bs = []
if Cs is None:
Cs = []
if Qs is None:
Qs = []
if Rs is None:
Rs = []
if x_refs is None:
x_refs = []
if u_refs is None:
u_refs = []
assert (len(As) == len(Bs))
assert (len(As) == len(Cs))
assert (len(As) == len(Qs))
assert (len(As) == len(Rs))
self.As = As
self.Bs = Bs
self.Cs = Cs
self.Qs = Qs
self.Rs = Rs
self.x_refs = x_refs
self.u_refs = u_refs
self.T = len(As)
[docs] def solve(self):
"""
Solves the LQR system and stores the solution, such that it can be
accessed using get_control() function.
"""
Qs = self.Qs
As = self.As
Bs = self.Bs
Rs = self.Rs
Cs = self.Cs
T = len(Qs)
Ps = [None for _ in range(T)]
Ks = [None for _ in range(T)]
u_dim = Bs[0].shape[1]
x_dim = Bs[0].shape[0]
for i in range(T):
# Computing things for i steps to go.
if i == 0:
Ps[T - 1 - i] = Qs[T - 1 - i]
Ks[T - 1 - i] = (np.zeros((u_dim, x_dim),
dtype=np.float64),
np.zeros((u_dim, 1)))
else:
Ks[T - 1 - i], Ps[T - 1 - i] = \
self._one_step(As[T - 1 - i], Bs[T - 1 - i], Cs[T - 1 - i],
Qs[T - 1 - i], Rs[T - 1 - i], Ps[T - i])
self.Ks = Ks
self.Ps = Ps
def _one_step(self, A, B, C, Q_q_q_, R, P_p_p_):
"""
Executes one step of dynamic programming. Given cost to go for i steps,
computes cost to go for i+1 steps.
"""
Q, q, q_ = Q_q_q_
P, p, p_ = P_p_p_
# state cost is xtQx + xtq + q_
# control cost is utRu
# dynamics are x_{t+1} = A*x_t + B*u_t + C
# Cost to go is (from previous time step):
# xtPx + xtp + p_
# Cost to go for current time step is of the form:
# xt_Ax + ut_Bu + xt_Cu + xt_D + ut_E + _F
_A = Q + np.dot(np.dot(A.T, P), A)
_B = R + np.dot(np.dot(B.T, P), B)
_C = 2 * np.dot(np.dot(A.T, P), B)
_D = q + np.dot(A.T, p) + 2 * np.dot(np.dot(A.T, P), C)
_E = np.dot(B.T, p) + 2 * np.dot(np.dot(B.T, P), C)
_F = p_ + q_ + np.dot(np.dot(C.T, P), C) + np.dot(p.T, C)
inv_B = np.linalg.inv(_B)
k = -0.5 * np.dot(inv_B, _C.T)
k_ = -0.5 * np.dot(inv_B, _E)
o_P = _A - 0.25 * (np.dot(np.dot(_C, inv_B.T), _C.T))
o_p = _D - 0.5 * (np.dot(np.dot(_C, inv_B.T), _E))
o_p_ = _F - 0.25 * (np.dot(np.dot(_E.T, inv_B.T), _E))
return (k, k_), (o_P, o_p, o_p_)
[docs] def get_control(self, x, i):
"""
Uses the stored solution to the system to output control cost if the
system is in state x at time step i.
:param x: state of the system
:param i: time step
:type x: numpy array (state_dim,)
:type i: int
:return: feedback control that should be applied
:rtype: numpy array (control_dim,)
"""
u = np.dot(self.Ks[i][0], x[:, np.newaxis]) + self.Ks[i][1]
u = u[:, 0]
return u
[docs] def get_control_ls(self, x, alpha, i):
"""
Returns control but modulated via a step-size alpha.
:param x: state of the system
:param alpha: step size
:param i: time step
:type x: numpy array (state_dim,)
:type alpha: float
:type i: int
:return: feedback control that should be applied
:rtype: numpy array (control_dim,)
"""
x = x[:, np.newaxis]
u = np.dot(self.Ks[i][0], (x - self.x_refs[i])) \
+ alpha * (self.Ks[i][1] - self.u_refs[i]
+ np.dot(self.Ks[i][0], self.x_refs[i])) \
+ self.u_refs[i]
u = u[:, 0]
return u
[docs] def get_cost_to_go(self, x, i):
"""
Returns cost to go if the system is in state x at time step i.
:param x: state of the system
:param i: time step
:type x: numpy array (state_dim,)
:type i: int
:return: cost if system were to follow optimal f
eedback control from now
:rtype: scalar
"""
x = x[:, np.newaxis]
c = np.dot(np.dot(x.T, self.Ps[i][0]), x) + \
np.dot(x.T, self.Ps[i][1]) + self.Ps[i][2]
return c[0, 0]
[docs]class ILQRSolver(object):
"""
Iterative LQR solver for non-linear systems. Computes a linearization of
the system at the current trajectory, and solves that linear system using
LQR. This process is repeated.
"""
[docs] def __init__(self, dyn_fn, Q_fn, R_fn, start_state, goal_state):
"""
Constructor that sets up functions describing the system and costs.
:param Q_fn: State cost function that takes as input x_goal, x_ref,
time_step, lqr_iteration, and returns quadratic
approximations of state cost around x_ref.
:param R_fn: Control cost function that takes as input u_ref and
returns quadtratic approximation around it.
:param dyn_fn: Dynamics function thattakes in state, controls,
return_only_state flag, and returns the linearization
and the next state.
:param start_state: Starting state of the system.
:param goal_state: Goal state of the system.
:type Q_fn: python function
:type R_fn: python function
:type dyn_fn: python function
:type start_state: numpy vector
:type goal_state: numpy vector
"""
self.dyn_fn = dyn_fn
self.Q_fn = Q_fn
self.R_fn = R_fn
self.start_state = start_state
self.goal_state = goal_state
[docs] def unroll(self, dyn_fn, start_state, controls):
"""
Obtain state trajectory by applying controls on the system defined by
the dynamics function dyn_fn.
:param Q_fn: State cost function that takes as input x_goal, x_ref,
time_step, lqr_iteration, and returns quadratic
approximations of state cost around x_ref.
:param start_state: Starting state of the system.
:param controls: Sequence of controls to apply to the system.
:type Q_fn: python function
:type start_state: numpy vector
:type controls: numpy array
:return: Sequence of states
:rtype: numpy array
"""
T = len(controls)
states = []
state = start_state.copy()
for j in range(T):
states.append(state)
_, _, _, state = dyn_fn(state, controls[j], True)
return states
[docs] def solve(self, init_controls, ilqr_iters):
"""
Solve the non-linear system.
:param init_controls: Initial control sequence to start optimization
from.
:param ilqr_iters: Number of iterations of linearizations and LQR
solutions.
:type init_controls: numpy array
:param ilqr_iters: int
:return: LQR Tracker, step_size, cost of solution
:rtype: LQRSolver, int, cost
"""
controls = init_controls
T = len(controls)
total_costs = []
for i in range(ilqr_iters):
# Unroll trajectory.
states = self.unroll(self.dyn_fn, self.start_state, controls)
# Linearize around trajectory.
As = []
Bs = []
Cs = []
Qs = []
Rs = []
total_cost_i = 0.
for j in range(T):
A, B, C, _ = self.dyn_fn(states[j], controls[j], False)
As.append(A)
Bs.append(B)
Cs.append(C)
Q, q, q_, x_cost = self.Q_fn(self.goal_state, states[j], j, i)
Qs.append((Q, q, q_))
R, r, r_, u_cost_ = self.R_fn(controls[j])
u_cost = np.dot(
np.dot(controls[j][:, 0].T, R), controls[j][:, 0])
Rs.append(R)
total_cost_i += x_cost + u_cost
total_costs.append(total_cost_i)
lqr_ = LQRSolver(As, Bs, Cs, Qs, Rs, states, controls)
lqr_.solve()
step_size, controls, cost = self.get_step_size(
lqr_, controls, total_cost_i, i)
if step_size == 0:
break
return lqr_, step_size, cost
[docs] def get_step_size(
self,
lqr_,
ref_controls,
ref_cost,
ilqr_iter):
"""
Search for the step size that improves the cost function over LQR
iterations.
:param ilqr_iter: Which ILQR iteration are we doing so as to compute
the cost function which may depend on the
ilqr_iteration for a log barrier method.
:param ref_controls: Reference controls, we are starting iterations
from.
:param ref_cost: Refernce cost that we want to improve over.
:type ilqr_iter: int
:type ref_cost: float
:type ref_controls: numpy array
:return: step size, updated controls, updated cost
:rtype: float, numpy array, float
"""
out_controls = ref_controls
out_step_size = 0.
out_cost = ref_cost
step_size = 1.
while step_size > 0.00001:
controls_ = []
state = self.start_state.copy()
q_line_search = 0.
T = len(ref_controls)
for j in range(T):
u = lqr_.get_control_ls(state, step_size, j)
R, r, r_, u_cost = self.R_fn(u)
_, _, _, x_cost = self.Q_fn(
self.goal_state, state, j, ilqr_iter)
controls_.append(u)
_, _, _, state = self.dyn_fn(state, controls_[j], True)
q_line_search += x_cost + u_cost
if q_line_search < ref_cost:
out_controls = controls_
out_step_size = step_size
out_cost = q_line_search
break
step_size = step_size * 0.5
return out_step_size, out_controls, out_cost