Week 8 — Control Systems Implementation
Day 6 of 7 | Difficulty: ⭐⭐⭐⭐⭐
💻 Full code included — 2,800 words


Goal

Build a complete robot control loop in ROS2 + Gazebo:

  1. A sensor node that simulates joint position and velocity (with noise).
  2. A controller node that runs a PID loop at 100 Hz.
  3. A motor node that applies the commanded torque to a simulated robot arm.
  4. A planner node that sends a sine-wave trajectory.

All in Python. All running in Gazebo.


Prerequisites

# Install ROS2 Humble (Ubuntu 22.04)
sudo apt install ros-humble-desktop

# Install Gazebo	sudo apt install ros-humble-gazebo-ros-pkgs

# Source ROS2
source /opt/ros/humble/setup.bash

Step 1: Create the ROS2 Package

cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python robot_controller --dependencies rclpy std_msgs sensor_msgs gazebo_ros

Step 2: Sensor Simulator Node

This node publishes noisy joint states at 100 Hz, simulating encoder + IMU fusion.

# robot_controller/robot_controller/sensor_node.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import numpy as np

class SensorSimulator(Node):
    def __init__(self):
        super().__init__('sensor_simulator')
        self.publisher = self.create_publisher(JointState, '/joint_states', 10)
        self.timer = self.create_timer(0.01, self.publish_state)  # 100 Hz
        
        # True state (hidden from controller)
        self.true_position = 0.0
        self.true_velocity = 0.0
        
        # Noise parameters (simulating real sensors)
        self.position_noise_std = 0.005  # 0.5% noise
        self.velocity_noise_std = 0.02   # 2% noise
        
        self.get_logger().info('Sensor simulator running at 100 Hz')

    def publish_state(self):
        # Simulate physics: position += velocity * dt
        dt = 0.01
        self.true_position += self.true_velocity * dt
        
        # Add Gaussian noise
        noisy_position = self.true_position + np.random.normal(0, self.position_noise_std)
        noisy_velocity = self.true_velocity + np.random.normal(0, self.velocity_noise_std)
        
        msg = JointState()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.name = ['joint1']
        msg.position = [noisy_position]
        msg.velocity = [noisy_velocity]
        
        self.publisher.publish(msg)

    def update_true_state(self, position, velocity):
        """Called by physics simulator (Gazebo)"""
        self.true_position = position
        self.true_velocity = velocity

def main(args=None):
    rclpy.init(args=args)
    node = SensorSimulator()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Key design: The controller only sees noisy_position and noisy_velocity. The “true” state is hidden — this matches reality where sensors are imperfect.


Step 3: PID Controller Node

# robot_controller/robot_controller/pid_controller.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64MultiArray
import numpy as np

class PIDController(Node):
    def __init__(self):
        super().__init__('pid_controller')
        
        # Subscribe to sensor data
        self.subscription = self.create_subscription(
            JointState, '/joint_states', self.state_callback, 10)
        
        # Publish torque commands
        self.publisher = self.create_publisher(
            Float64MultiArray, '/joint_commands', 10)
        
        # Subscribe to trajectory
        self.trajectory_sub = self.create_subscription(
            Float64MultiArray, '/trajectory', self.trajectory_callback, 10)
        
        # PID gains (tuned for this simulation)
        self.Kp = 50.0
        self.Ki = 10.0
        self.Kd = 5.0
        
        # Controller state
        self.integral = 0.0
        self.prev_error = 0.0
        self.dt = 0.01  # 100 Hz
        
        # Target from trajectory
        self.target_position = 0.0
        self.target_velocity = 0.0
        
        # Anti-windup
        self.integral_max = 10.0
        
        # Current measurement
        self.current_position = 0.0
        self.current_velocity = 0.0
        
        # Control loop timer
        self.timer = self.create_timer(self.dt, self.control_loop)
        
        self.get_logger().info(
            f'PID controller: Kp={self.Kp}, Ki={self.Ki}, Kd={self.Kd}')

    def state_callback(self, msg):
        self.current_position = msg.position[0]
        self.current_velocity = msg.velocity[0]

    def trajectory_callback(self, msg):
        self.target_position = msg.data[0]
        self.target_velocity = msg.data[1]

    def control_loop(self):
        # Compute error
        error = self.target_position - self.current_position
        
        # Proportional term
        P = self.Kp * error
        
        # Integral term (with anti-windup)
        self.integral += error * self.dt
        self.integral = np.clip(self.integral, -self.integral_max, self.integral_max)
        I = self.Ki * self.integral
        
        # Derivative term (on measurement, not error, for noise robustness)
        derivative = -self.current_velocity  # d/dt(target - measured) = -measured_velocity
        D = self.Kd * derivative
        
        # Total control output (torque)
        torque = P + I + D
        
        # Optional: add feedforward for target velocity
        # torque += self.target_velocity * 0.5  # inertia compensation
        
        # Publish
        msg = Float64MultiArray()
        msg.data = [torque]
        self.publisher.publish(msg)
        
        # Log every second
        if abs(self.get_clock().now().nanoseconds % 1_000_000_000) < 20_000_000:
            self.get_logger().info(
                f'target={self.target_position:.3f}, '
                f'actual={self.current_position:.3f}, '
                f'error={error:.3f}, '
                f'torque={torque:.3f}')

def main(args=None):
    rclpy.init(args=args)
    node = PIDController()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Anti-windup: integral is clamped to ±10. Without this, a sustained error (e.g., motor saturation) would cause massive overshoot.

Derivative-on-measurement: We use -current_velocity instead of (error - prev_error) / dt. This avoids “derivative kick” when the target jumps.


Step 4: Trajectory Planner Node

# robot_controller/robot_controller/trajectory_planner.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
import numpy as np

class TrajectoryPlanner(Node):
    def __init__(self):
        super().__init__('trajectory_planner')
        self.publisher = self.create_publisher(
            Float64MultiArray, '/trajectory', 10)
        self.timer = self.create_timer(0.05, self.publish_trajectory)  # 20 Hz
        
        self.start_time = self.get_clock().now()
        self.amplitude = 1.0  # radians
        self.frequency = 0.5  # Hz
        
        self.get_logger().info('Trajectory planner: sine wave, 1 rad amplitude, 0.5 Hz')

    def publish_trajectory(self):
        now = self.get_clock().now()
        t = (now - self.start_time).nanoseconds / 1e9
        
        # Sine wave trajectory
        position = self.amplitude * np.sin(2 * np.pi * self.frequency * t)
        velocity = self.amplitude * 2 * np.pi * self.frequency * np.cos(2 * np.pi * self.frequency * t)
        
        msg = Float64MultiArray()
        msg.data = [position, velocity]
        self.publisher.publish(msg)

def main(args=None):
    rclpy.init(args=args)
    node = TrajectoryPlanner()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Step 5: Launch File

# robot_controller/launch/control_stack.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(package='robot_controller', executable='sensor_node', name='sensor'),
        Node(package='robot_controller', executable='pid_controller', name='controller'),
        Node(package='robot_controller', executable='trajectory_planner', name='planner'),
    ])

Step 6: Build and Run

cd ~/ros2_ws
colcon build --packages-select robot_controller
source install/setup.bash

# Terminal 1: Launch Gazebo with a 1-DOF robot
ros2 launch robot_controller one_dof_robot.launch.py

# Terminal 2: Start the control stack
ros2 launch robot_controller control_stack.launch.py

# Terminal 3: Monitor topics
ros2 topic echo /joint_commands
ros2 topic echo /joint_states

# Terminal 4: Visualize in RViz
ros2 run rviz2 rviz2

Expected Output

[controller] target=0.587, actual=0.582, error=0.005, torque=0.78
[controller] target=0.809, actual=0.804, error=0.005, torque=0.52
[controller] target=0.951, actual=0.947, error=0.004, torque=0.31

Tracking error: ~0.005 rad (0.5% of amplitude).
Overshoot: < 2%.
Settling time: ~0.1 s after a step change.


Visualization

Use rqt_plot to view real-time tracking:

ros2 run rqt_plot rqt_plot /trajectory/data[0] /joint_states/position[0]

You should see two nearly overlapping curves — the commanded sine wave and the actual joint position.


Exercises

  1. Tune the PID: Start with $K_p=50, K_i=0, K_d=0$. Increase $K_p$ until oscillation, then add $K_d$, then $K_i$. Compare responses.
  2. Add a disturbance: In the sensor node, inject a step disturbance at t=5s. Observe how the PID recovers.
  3. Remove anti-windup: Comment out the np.clip line. Saturate the motor by setting torque_max=2.0. Watch the system overshoot.
  4. Switch to LQR: Replace the PID with an LQR gain (from Day 3). Compare tracking accuracy.

Key Takeaway

This is a complete robot control stack: planner generates goals, controller computes corrections, sensors provide feedback, and actuators execute. Every industrial robot runs a version of this loop. ROS2 makes it modular — swap PID for LQR, encoders for cameras, or Gazebo for real hardware without rewriting everything.


Next: Day 7 — Week 8 Summary and Key Takeaways