Week 8 — Control Systems Implementation
Day 6 of 7 | Difficulty: ⭐⭐⭐⭐⭐
💻 Full code included — 2,800 words
Goal
Build a complete robot control loop in ROS2 + Gazebo:
- A sensor node that simulates joint position and velocity (with noise).
- A controller node that runs a PID loop at 100 Hz.
- A motor node that applies the commanded torque to a simulated robot arm.
- A planner node that sends a sine-wave trajectory.
All in Python. All running in Gazebo.
Prerequisites
# Install ROS2 Humble (Ubuntu 22.04)
sudo apt install ros-humble-desktop
# Install Gazebo sudo apt install ros-humble-gazebo-ros-pkgs
# Source ROS2
source /opt/ros/humble/setup.bash
Step 1: Create the ROS2 Package
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python robot_controller --dependencies rclpy std_msgs sensor_msgs gazebo_ros
Step 2: Sensor Simulator Node
This node publishes noisy joint states at 100 Hz, simulating encoder + IMU fusion.
# robot_controller/robot_controller/sensor_node.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import numpy as np
class SensorSimulator(Node):
def __init__(self):
super().__init__('sensor_simulator')
self.publisher = self.create_publisher(JointState, '/joint_states', 10)
self.timer = self.create_timer(0.01, self.publish_state) # 100 Hz
# True state (hidden from controller)
self.true_position = 0.0
self.true_velocity = 0.0
# Noise parameters (simulating real sensors)
self.position_noise_std = 0.005 # 0.5% noise
self.velocity_noise_std = 0.02 # 2% noise
self.get_logger().info('Sensor simulator running at 100 Hz')
def publish_state(self):
# Simulate physics: position += velocity * dt
dt = 0.01
self.true_position += self.true_velocity * dt
# Add Gaussian noise
noisy_position = self.true_position + np.random.normal(0, self.position_noise_std)
noisy_velocity = self.true_velocity + np.random.normal(0, self.velocity_noise_std)
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = ['joint1']
msg.position = [noisy_position]
msg.velocity = [noisy_velocity]
self.publisher.publish(msg)
def update_true_state(self, position, velocity):
"""Called by physics simulator (Gazebo)"""
self.true_position = position
self.true_velocity = velocity
def main(args=None):
rclpy.init(args=args)
node = SensorSimulator()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Key design: The controller only sees noisy_position and noisy_velocity. The “true” state is hidden — this matches reality where sensors are imperfect.
Step 3: PID Controller Node
# robot_controller/robot_controller/pid_controller.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64MultiArray
import numpy as np
class PIDController(Node):
def __init__(self):
super().__init__('pid_controller')
# Subscribe to sensor data
self.subscription = self.create_subscription(
JointState, '/joint_states', self.state_callback, 10)
# Publish torque commands
self.publisher = self.create_publisher(
Float64MultiArray, '/joint_commands', 10)
# Subscribe to trajectory
self.trajectory_sub = self.create_subscription(
Float64MultiArray, '/trajectory', self.trajectory_callback, 10)
# PID gains (tuned for this simulation)
self.Kp = 50.0
self.Ki = 10.0
self.Kd = 5.0
# Controller state
self.integral = 0.0
self.prev_error = 0.0
self.dt = 0.01 # 100 Hz
# Target from trajectory
self.target_position = 0.0
self.target_velocity = 0.0
# Anti-windup
self.integral_max = 10.0
# Current measurement
self.current_position = 0.0
self.current_velocity = 0.0
# Control loop timer
self.timer = self.create_timer(self.dt, self.control_loop)
self.get_logger().info(
f'PID controller: Kp={self.Kp}, Ki={self.Ki}, Kd={self.Kd}')
def state_callback(self, msg):
self.current_position = msg.position[0]
self.current_velocity = msg.velocity[0]
def trajectory_callback(self, msg):
self.target_position = msg.data[0]
self.target_velocity = msg.data[1]
def control_loop(self):
# Compute error
error = self.target_position - self.current_position
# Proportional term
P = self.Kp * error
# Integral term (with anti-windup)
self.integral += error * self.dt
self.integral = np.clip(self.integral, -self.integral_max, self.integral_max)
I = self.Ki * self.integral
# Derivative term (on measurement, not error, for noise robustness)
derivative = -self.current_velocity # d/dt(target - measured) = -measured_velocity
D = self.Kd * derivative
# Total control output (torque)
torque = P + I + D
# Optional: add feedforward for target velocity
# torque += self.target_velocity * 0.5 # inertia compensation
# Publish
msg = Float64MultiArray()
msg.data = [torque]
self.publisher.publish(msg)
# Log every second
if abs(self.get_clock().now().nanoseconds % 1_000_000_000) < 20_000_000:
self.get_logger().info(
f'target={self.target_position:.3f}, '
f'actual={self.current_position:.3f}, '
f'error={error:.3f}, '
f'torque={torque:.3f}')
def main(args=None):
rclpy.init(args=args)
node = PIDController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Anti-windup: integral is clamped to ±10. Without this, a sustained error (e.g., motor saturation) would cause massive overshoot.
Derivative-on-measurement: We use -current_velocity instead of (error - prev_error) / dt. This avoids “derivative kick” when the target jumps.
Step 4: Trajectory Planner Node
# robot_controller/robot_controller/trajectory_planner.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
import numpy as np
class TrajectoryPlanner(Node):
def __init__(self):
super().__init__('trajectory_planner')
self.publisher = self.create_publisher(
Float64MultiArray, '/trajectory', 10)
self.timer = self.create_timer(0.05, self.publish_trajectory) # 20 Hz
self.start_time = self.get_clock().now()
self.amplitude = 1.0 # radians
self.frequency = 0.5 # Hz
self.get_logger().info('Trajectory planner: sine wave, 1 rad amplitude, 0.5 Hz')
def publish_trajectory(self):
now = self.get_clock().now()
t = (now - self.start_time).nanoseconds / 1e9
# Sine wave trajectory
position = self.amplitude * np.sin(2 * np.pi * self.frequency * t)
velocity = self.amplitude * 2 * np.pi * self.frequency * np.cos(2 * np.pi * self.frequency * t)
msg = Float64MultiArray()
msg.data = [position, velocity]
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = TrajectoryPlanner()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 5: Launch File
# robot_controller/launch/control_stack.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(package='robot_controller', executable='sensor_node', name='sensor'),
Node(package='robot_controller', executable='pid_controller', name='controller'),
Node(package='robot_controller', executable='trajectory_planner', name='planner'),
])
Step 6: Build and Run
cd ~/ros2_ws
colcon build --packages-select robot_controller
source install/setup.bash
# Terminal 1: Launch Gazebo with a 1-DOF robot
ros2 launch robot_controller one_dof_robot.launch.py
# Terminal 2: Start the control stack
ros2 launch robot_controller control_stack.launch.py
# Terminal 3: Monitor topics
ros2 topic echo /joint_commands
ros2 topic echo /joint_states
# Terminal 4: Visualize in RViz
ros2 run rviz2 rviz2
Expected Output
[controller] target=0.587, actual=0.582, error=0.005, torque=0.78
[controller] target=0.809, actual=0.804, error=0.005, torque=0.52
[controller] target=0.951, actual=0.947, error=0.004, torque=0.31
Tracking error: ~0.005 rad (0.5% of amplitude).
Overshoot: < 2%.
Settling time: ~0.1 s after a step change.
Visualization
Use rqt_plot to view real-time tracking:
ros2 run rqt_plot rqt_plot /trajectory/data[0] /joint_states/position[0]
You should see two nearly overlapping curves — the commanded sine wave and the actual joint position.
Exercises
- Tune the PID: Start with $K_p=50, K_i=0, K_d=0$. Increase $K_p$ until oscillation, then add $K_d$, then $K_i$. Compare responses.
- Add a disturbance: In the sensor node, inject a step disturbance at t=5s. Observe how the PID recovers.
- Remove anti-windup: Comment out the
np.clipline. Saturate the motor by settingtorque_max=2.0. Watch the system overshoot. - Switch to LQR: Replace the PID with an LQR gain (from Day 3). Compare tracking accuracy.
Key Takeaway
This is a complete robot control stack: planner generates goals, controller computes corrections, sensors provide feedback, and actuators execute. Every industrial robot runs a version of this loop. ROS2 makes it modular — swap PID for LQR, encoders for cameras, or Gazebo for real hardware without rewriting everything.
Next: Day 7 — Week 8 Summary and Key Takeaways