Skip to main content

Control the Robot with Python SDK

zenoh_ros2_sdk lets a normal Python program subscribe to ROS 2 topics through Zenoh without installing ROS 2 or running a Docker container. It can run on the robot's internal PC or on an external PC, which is useful when you only need robot state data such as /joint_states.

Official resources:

Install

Use one of the following installation methods.

pip install zenoh-ros2-sdk

For the publish example below, also install NumPy:

pip install numpy

With uv:

uv pip install zenoh-ros2-sdk
uv pip install numpy

For source installation:

git clone https://github.com/ROBOTIS-GIT/zenoh_ros2_sdk.git
cd zenoh_ros2_sdk
pip install -e .

Subscribe to /joint_states

If you are running the script from an external PC, connect the SDK process to the robot's Zenoh router first:

export ZENOH_CONFIG_OVERRIDE='transport/shared_memory/enabled=true;mode="client";connect/endpoints=["tcp/192.168.6.2:7447"]'

Replace 192.168.6.2 with the robot's actual Wi-Fi or LAN IP address. For the full external PC setup, see ROS 2 Communication: External PC Control.

Create subscribe_joint_states.py:

#!/usr/bin/env python3

import time

from zenoh_ros2_sdk import ROS2Subscriber


def on_joint_states(msg):
names = list(msg.name)
positions = list(msg.position)
velocities = list(msg.velocity)
efforts = list(msg.effort)

print("\n/joint_states")
for index, name in enumerate(names):
position = positions[index] if index < len(positions) else None
velocity = velocities[index] if index < len(velocities) else None
effort = efforts[index] if index < len(efforts) else None

print(
f"{name}: "
f"position={position}, "
f"velocity={velocity}, "
f"effort={effort}"
)


subscriber = ROS2Subscriber(
topic="/joint_states",
msg_type="sensor_msgs/msg/JointState",
callback=on_joint_states,
)

try:
print("Listening to /joint_states. Press Ctrl+C to stop.")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
subscriber.close()

Run

python3 subscribe_joint_states.py

If the robot and the external PC are connected to the same reachable network, the script prints each received sensor_msgs/msg/JointState message. The name, position, velocity, and effort arrays use the standard ROS 2 JointState structure, so each value at index i belongs to name[i].

Publish a Zero Position and Zero Gain Command

The impedance controller command topic is:

/joint_group_position_controller/commands

This example publishes a full command message with all desired positions, feedforward torque, position gains, and damping gains set to zero. Use it as a connectivity and message-format example before sending real motion commands.

For the detailed joint order and JointPositionGainsCommand message definition, see ROS 2 Package Structure and Topic Description.

The current command message includes std_msgs/Header, so the script sends an empty header with zero timestamp.

Create publish_zero_impedance_command.py:

#!/usr/bin/env python3

import time

import numpy as np

from zenoh_ros2_sdk import ROS2Publisher, get_message_class

JOINTS = [
"left_hip_pitch_joint",
"left_hip_roll_joint",
"left_hip_yaw_joint",
"left_knee_joint",
"left_ankle_pitch_joint",
"left_ankle_roll_joint",
"right_hip_pitch_joint",
"right_hip_roll_joint",
"right_hip_yaw_joint",
"right_knee_joint",
"right_ankle_pitch_joint",
"right_ankle_roll_joint",
"waist_yaw_joint",
"left_shoulder_pitch_joint",
"left_shoulder_roll_joint",
"left_shoulder_yaw_joint",
"left_elbow_joint",
"left_wrist_roll_joint",
"right_shoulder_pitch_joint",
"right_shoulder_roll_joint",
"right_shoulder_yaw_joint",
"right_elbow_joint",
"right_wrist_roll_joint",
]

Header = get_message_class("std_msgs/msg/Header")
Time = get_message_class("builtin_interfaces/msg/Time")

publisher = ROS2Publisher(
topic="/joint_group_position_controller/commands",
msg_type="ai_sapiens_controller_msgs/msg/JointPositionGainsCommand",
)

zero_positions = np.zeros(len(JOINTS), dtype=np.float64)
zero_feedforward = np.zeros(len(JOINTS), dtype=np.float64)
zero_kp = np.zeros(len(JOINTS), dtype=np.float64)
zero_kd = np.zeros(len(JOINTS), dtype=np.float64)
empty_header = Header(
stamp=Time(sec=0, nanosec=0),
frame_id="",
)

try:
print("Publishing zero position, zero feedforward, zero kp, and zero kd commands. Press Ctrl+C to stop.")
while True:
publisher.publish(
header=empty_header,
positions=zero_positions,
feedforward=zero_feedforward,
kp=zero_kp,
kd=zero_kd,
)

time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
publisher.close()

Run it:

python3 publish_zero_impedance_command.py

The array length is 23, matching the joint order in k1_rev1_controllers.yaml. When sending real commands, keep this same order and replace the zero arrays with the desired position, feedforward torque, kp, and kd values.