ROS2 Jazzy:配置高效的进程内通信
背景
ROS 应用通常由多个独立“节点”组成,每个节点执行特定任务并与系统其他部分解耦。这有助于故障隔离、加速开发、实现模块化和代码复用,但这往往造成性能下降。在ROS1 最初开发出来之后,人们就意识到需要高效的节点组合方式,所以推出了 Nodelets。在 ROS2 中,我们通过解决一些需要重构节点的基本问题来改进 Nodelets 的设计。
本文的例子将重点展示如何通过手动组合节点,也就是说在不修改节点代码或限制其功能的情况下,将节点分开定义,然后以不同的进程布局来组合它们。
安装例子
如果您通过软件包安装了 ROS2,请确保已安装 ros-jazzy-intra-process-demo。如果您从归档文件下载或从源代码构建 ROS2,它会自动包含在安装中。
运行和理解例子
例子包括多种类型:一些是用于展示进程内通信功能的简单示例,另一些是使用 OpenCV 的端到端示例,展示了将节点重组为不同配置的能力。
双节点流通道例子
这个例子旨在说明:当使用 std::unique_ptr 发布和订阅消息时,进程内的发布/订阅连接可以实现消息的零拷贝传输。
首先查看源代码:
https://github.com/ros2/demos/blob/jazzy/intra_process_demo/src/two_node_pipeline/two_node_pipeline.cpp
#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"
using namespace std::chrono_literals;
// Node that produces messages.
struct Producer : public rclcpp::Node
{
Producer(const std::string & name, const std::string & output)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
{
// Create a publisher on the output topic.
pub_ = this->create_publisher<std_msgs::msg::Int32>(output, 10);
std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;
// Create a timer which publishes on the output topic at ~1Hz.
auto callback = [captured_pub]() -> void {
auto pub_ptr = captured_pub.lock();
if (!pub_ptr) {
return;
}
static int32_t count = 0;
std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32());
msg->data = count++;
printf(
"Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
pub_ptr->publish(std::move(msg));
};
timer_ = this->create_wall_timer(1s, callback);
}
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
rclcpp::TimerBase::SharedPtr timer_;
};
// Node that consumes messages.
struct Consumer : public rclcpp::Node
{
Consumer(const std::string & name, const std::string & input)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
{
// Create a subscription on the input topic which prints on receipt of new messages.
sub_ = this->create_subscription<std_msgs::msg::Int32>(
input,
10,
[](std_msgs::msg::Int32::UniquePtr msg) {
printf(
" Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
});
}
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
};
int main(int argc, char * argv[])
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
rclcpp::init(argc, argv);
rclcpp::executors::SingleThreadedExecutor executor;
auto producer = std::make_shared<Producer>("producer", "number");
auto consumer = std::make_shared<Consumer>("consumer", "number");
executor.add_node(producer);
executor.add_node(consumer);
executor.spin();
rclcpp::shutdown();
return 0;
}
从主函数可以看到,我们创建了一个生产者节点和一个消费者节点,将它们添加到单线程执行器中,然后调用 spin。
- 生产者节点(Producer):创建一个发布者,向“number”主题发布消息,并通过定时器定期生成新消息,打印消息的内存地址和值,然后发布。
- 消费者节点(Consumer):订阅“number”主题,接收消息,然后打印的消息的内存地址和值。
让我们通过执行ros2 run intra_process_demo two_node_pipeline executable来运行例子(不要忘记先source安装文件):
$ ros2 run intra_process_demo two_node_pipeline
# 生产者发布
# (第一条消息可能因订阅尚未建立而丢失,无对应的消费者输出)
Published message with value: 0, and address: 0x7fb02303faf0
# 消费者接收,地址一致
Published message with value: 1, and address: 0x7fb020cf0520
Received message with value: 1, and address: 0x7fb020cf0520
[...]
您可以看到具有相同值的“Published message…”和“Received message…”行也具有相同的消息地址。这表明接收到的消息的地址与发布的消息的地址相同。这是因为我们使用std::unique_ptrs进行消息的发布和订阅,这允许消息的所有权在系统中安全地移动。而如果使用const &和std::shared_ptr发布和订阅消息,则不会发生零复制。
循环管道例子
这个例子与前一个类似,但生产者不再为每次迭代创建新消息,而是通过循环结构重复使用同一个消息实例,并通过在执行器启动前手动触发一次发布来启动通信:
源代码:
https://github.com/ros2/demos/blob/jazzy/intra_process_demo/src/cyclic_pipeline/cyclic_pipeline.cpp
#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"
using namespace std::chrono_literals;
// This node receives an Int32, waits 1 second, then increments and sends it.
struct IncrementerPipe : public rclcpp::Node
{
IncrementerPipe(const std::string & name, const std::string & in, const std::string & out)
: Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
{
// Create a publisher on the output topic.
pub = this->create_publisher<std_msgs::msg::Int32>(out, 10);
std::weak_ptr<std::remove_pointer<decltype(pub.get())>::type> captured_pub = pub;
// Create a subscription on the input topic.
sub = this->create_subscription<std_msgs::msg::Int32>(
in,
10,
[captured_pub](std_msgs::msg::Int32::UniquePtr msg) {
auto pub_ptr = captured_pub.lock();
if (!pub_ptr) {
return;
}
printf(
"Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
printf(" sleeping for 1 second...\n");
if (!rclcpp::sleep_for(1s)) {
return; // Return if the sleep failed (e.g. on :kbd:`ctrl-c`).
}
printf(" done.\n");
msg->data++; // Increment the message's data.
printf(
"Incrementing and sending with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
pub_ptr->publish(std::move(msg)); // Send the message along to the output topic.
});
}
rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub;
rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub;
};
int main(int argc, char * argv[])
{
setvbuf(stdout, NULL, _IONBF, BUFSIZ);
rclcpp::init(argc, argv);
rclcpp::executors::SingleThreadedExecutor executor;
// Create a simple loop by connecting the in and out topics of two IncrementerPipe's.
// The expectation is that the address of the message being passed between them never changes.
auto pipe1 = std::make_shared<IncrementerPipe>("pipe1", "topic1", "topic2");
auto pipe2 = std::make_shared<IncrementerPipe>("pipe2", "topic2", "topic1");
rclcpp::sleep_for(1s); // Wait for subscriptions to be established to avoid race conditions.
// Publish the first message (kicking off the cycle).
std::unique_ptr<std_msgs::msg::Int32> msg(new std_msgs::msg::Int32());
msg->data = 42;
printf(
"Published first message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
reinterpret_cast<std::uintptr_t>(msg.get()));
pipe1->pub->publish(std::move(msg));
executor.add_node(pipe1);
executor.add_node(pipe2);
executor.spin();
rclcpp::shutdown();
return 0;
}
与前面的例子不同,这个例子只使用一个Node,然后使用不同的名称和配置实例化两次并形成循环。在一个循环中,ROS 图最终是样的 pipe1 -> pipe2 -> pipe1…
其中这行代码pipe1->pub->publish(msg)启动了这个循环流程,人这时起,每个节点在其自己的订阅回调中调用publish发布消息,在节点之间来回传递消息。
在这里,节点每秒来回传递一次消息,每次都增加消息的值。由于消息是作为unique_ptr发布和订阅的,因此在开始时创建的相同消息就会被持续使用。
为了测试这些期望,让我们运行它:
$ ros2 run intra_process_demo cyclic_pipeline
Published first message with value: 42, and address: 0x7fd2ce0a2bc0
Received message with value: 42, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 43, and address: 0x7fd2ce0a2bc0
Received message with value: 43, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 44, and address: 0x7fd2ce0a2bc0
Received message with value: 44, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 45, and address: 0x7fd2ce0a2bc0
Received message with value: 45, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 46, and address: 0x7fd2ce0a2bc0
Received message with value: 46, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
done.
Incrementing and sending with value: 47, and address: 0x7fd2ce0a2bc0
Received message with value: 47, and address: 0x7fd2ce0a2bc0
sleeping for 1 second...
[...]
图像管道例子
这个例子使用 OpenCV 实现图像采集、标注和显示,验证复杂场景下的进程内零拷贝通信。
注意(macOS 用户)
若遇到类似 ddsi_conn_write failed -1 的错误,需临时增大系统 UDP 数据包大小:
$ sudo sysctl -w net.inet.udp.recvspace=209715
$ sudo sysctl -w net.inet.udp.maxdgram=65500
(重启后设置失效)
简单管道(三节点)
首先,我们将有一个包含三个节点的管道,排列方式如下camera_node -> watermark_node -> image_view_node:
- camera_node:从摄像头采集图像,在图片上添加文信息并发布。
- watermark_node:订阅camera_node图像,添加更多文本后重新发布。
- image_view_node:订阅watermark_node图像,继续添加文本并通过 cv::imshow 显示。
在每个节点中,正在发送或已接收或两者同时发送的消息的地址都被写入图片中。watermark 和 image view 节点旨在修改图像而不复制它,因此,只要节点处于同一进程中,那么图片上打印的地址都应该相同。
让我们通过执行以下可执行文件来运行例子:
ros2 run intra_process_demo image_pipeline_all_in_one
可以看到暂停图像(按空格)后,图像上的三个消息地址应应该一致(同一进程内零拷贝)。
带两个图像查看器的管道
现在我们来看一个与上述示例类似的例子,不同之处在于它有两个图像查看节点。所有节点仍处于同一进程中,但现在应该会显示两个图像查看窗口( macOS 用户注意:您的图像查看窗口可能会重叠)。我们可以使用以下命令运行它:
ros2 run intra_process_demo image_pipeline_with_two_image_view
与上一个示例一样,您可以使用空格键暂停渲染,再次按下空格键可继续。您可以停止更新以检查写入屏幕的指针。
如上面的示例图像所示,我们有一个图像中所有指针都相同,另一个图像的前两个条目指针与第一个图像相同,但第二个图像的最后一个指针不同。要理解为什么会发生这种情况,请考虑图的拓扑结构:
camera_node -> watermark_node -> image_view_node
-> image_view_node2
摄像头节点(camera_node)与水印节点(watermark_node)之间的链接可以使用同一指针而无需复制,这是因为消息只需传递到一个进程内订阅(intra process subscription)。然而,在水印节点与两个图像视图节点之间的链接是一对多关系,因此如果图像视图节点(image_view_node)使用 unique_ptr 回调,则无法将同一指针的所有权同时传递给两者。此时,指针只能传递给其中一个节点。具体哪一个节点会获得原始指针的所有权并未明确定义,而是取决于消息传递的顺序——最后接收的节点将获得指针。
在这里还需要注意,图像查看节点并非使用 unique_ptr 回调进行订阅,而是使用 const shared_ptr 进行订阅。这意味着系统会将同一个 shared_ptr 传递给两个回调。当处理第一个进程内订阅时,内部存储的 unique_ptr 会提升为 shared_ptr。每个回调都将获得同一消息的共享所有权。
带进程间查看器的管道
另一个需要正确理解的重要点是,当进行进程间订阅时,要避免中断进程内的零拷贝行为。为了测试这一点,我们可以运行第一个图像管道演示 ros2 run intra_process_demo image_pipeline_all_in_one,然后运行独立的 ros2 run intra_process_demo image_view_node 实例。这看起来会像这样:
在ROS2中,同时暂停两个图像流以实现对齐存在困难,这可能导致图像帧无法完全同步。但关键点在于观察到的以下现象及其技术含义:image_pipeline_all_in_one 的图像查看器在每个步骤中显示的地址相同。这意味着即使存在外部订阅(例如独立的图像查看器),进程内零拷贝机制仍然生效。这是因为节点在同一进程中运行(如通过 ComposableNode 组合),ROS2 的进程内通信直接传递指针(unique_ptr 或裸指针),避免数据复制。而跨进程的图像查看器节点(独立的图像查看器节点)则显示不同进程ID,且地址与前两行不同。这是因为 ROS2 通过 DDS 中间件在跨进程通信时进行序列化/反序列化,生成独立副本,导致内存地址不同。
关注【智践行】公众号,发送 【机器人】 获得机器人经典学习资料