Control the Robot with Python SDK
zenoh_ros2_sdk lets a normal Python program subscribe to ROS 2 topics through Zenoh without installing ROS 2 or running a Docker container. It can run on the robot's internal PC or on an external PC, which is useful when you only need robot state data such as /joint_states.
Official resources:
Install
Use one of the following installation methods.
pip install zenoh-ros2-sdk
For the publish example below, also install NumPy:
pip install numpy
With uv:
uv pip install zenoh-ros2-sdk
uv pip install numpy
For source installation:
git clone https://github.com/ROBOTIS-GIT/zenoh_ros2_sdk.git
cd zenoh_ros2_sdk
pip install -e .
Subscribe to /joint_states
If you are running the script from an external PC, connect the SDK process to the robot's Zenoh router first:
export ZENOH_CONFIG_OVERRIDE='transport/shared_memory/enabled=true;mode="client";connect/endpoints=["tcp/192.168.6.2:7447"]'
Replace 192.168.6.2 with the robot's actual Wi-Fi or LAN IP address. For the full external PC setup, see ROS 2 Communication: External PC Control.
Create subscribe_joint_states.py:
#!/usr/bin/env python3
import time
from zenoh_ros2_sdk import ROS2Subscriber
def on_joint_states(msg):
names = list(msg.name)
positions = list(msg.position)
velocities = list(msg.velocity)
efforts = list(msg.effort)
print("\n/joint_states")
for index, name in enumerate(names):
position = positions[index] if index < len(positions) else None
velocity = velocities[index] if index < len(velocities) else None
effort = efforts[index] if index < len(efforts) else None
print(
f"{name}: "
f"position={position}, "
f"velocity={velocity}, "
f"effort={effort}"
)
subscriber = ROS2Subscriber(
topic="/joint_states",
msg_type="sensor_msgs/msg/JointState",
callback=on_joint_states,
)
try:
print("Listening to /joint_states. Press Ctrl+C to stop.")
while True:
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
subscriber.close()
Run
python3 subscribe_joint_states.py
If the robot and the external PC are connected to the same reachable network, the script prints each received sensor_msgs/msg/JointState message. The name, position, velocity, and effort arrays use the standard ROS 2 JointState structure, so each value at index i belongs to name[i].
Publish a Zero Position and Zero Gain Command
The impedance controller command topic is:
/joint_group_position_controller/commands
This example publishes a full command message with all desired positions, feedforward torque, position gains, and damping gains set to zero. Use it as a connectivity and message-format example before sending real motion commands.
For the detailed joint order and JointPositionGainsCommand message definition, see ROS 2 Package Structure and Topic Description.
The current command message includes std_msgs/Header, so the script sends an empty header with zero timestamp.
Create publish_zero_impedance_command.py:
#!/usr/bin/env python3
import time
import numpy as np
from zenoh_ros2_sdk import ROS2Publisher, get_message_class
JOINTS = [
"left_hip_pitch_joint",
"left_hip_roll_joint",
"left_hip_yaw_joint",
"left_knee_joint",
"left_ankle_pitch_joint",
"left_ankle_roll_joint",
"right_hip_pitch_joint",
"right_hip_roll_joint",
"right_hip_yaw_joint",
"right_knee_joint",
"right_ankle_pitch_joint",
"right_ankle_roll_joint",
"waist_yaw_joint",
"left_shoulder_pitch_joint",
"left_shoulder_roll_joint",
"left_shoulder_yaw_joint",
"left_elbow_joint",
"left_wrist_roll_joint",
"right_shoulder_pitch_joint",
"right_shoulder_roll_joint",
"right_shoulder_yaw_joint",
"right_elbow_joint",
"right_wrist_roll_joint",
]
Header = get_message_class("std_msgs/msg/Header")
Time = get_message_class("builtin_interfaces/msg/Time")
publisher = ROS2Publisher(
topic="/joint_group_position_controller/commands",
msg_type="ai_sapiens_controller_msgs/msg/JointPositionGainsCommand",
)
zero_positions = np.zeros(len(JOINTS), dtype=np.float64)
zero_feedforward = np.zeros(len(JOINTS), dtype=np.float64)
zero_kp = np.zeros(len(JOINTS), dtype=np.float64)
zero_kd = np.zeros(len(JOINTS), dtype=np.float64)
empty_header = Header(
stamp=Time(sec=0, nanosec=0),
frame_id="",
)
try:
print("Publishing zero position, zero feedforward, zero kp, and zero kd commands. Press Ctrl+C to stop.")
while True:
publisher.publish(
header=empty_header,
positions=zero_positions,
feedforward=zero_feedforward,
kp=zero_kp,
kd=zero_kd,
)
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
publisher.close()
Run it:
python3 publish_zero_impedance_command.py
The array length is 23, matching the joint order in k1_rev1_controllers.yaml. When sending real commands, keep this same order and replace the zero arrays with the desired position, feedforward torque, kp, and kd values.