ROS2 Jazzy:配置高效的进程内通信

ROS2 Jazzy:配置高效的进程内通信

编码文章call10242025-05-21 12:36:115A+A-

背景

ROS 应用通常由多个独立“节点”组成,每个节点执行特定任务并与系统其他部分解耦。这有助于故障隔离、加速开发、实现模块化和代码复用,但这往往造成性能下降。在ROS1 最初开发出来之后,人们就意识到需要高效的节点组合方式,所以推出了 Nodelets。在 ROS2 中,我们通过解决一些需要重构节点的基本问题来改进 Nodelets 的设计。

本文的例子将重点展示如何通过手动组合节点,也就是说在不修改节点代码或限制其功能的情况下,将节点分开定义,然后以不同的进程布局来组合它们。

安装例子

如果您通过软件包安装了 ROS2,请确保已安装 ros-jazzy-intra-process-demo。如果您从归档文件下载或从源代码构建 ROS2,它会自动包含在安装中。

运行和理解例子

例子包括多种类型:一些是用于展示进程内通信功能的简单示例,另一些是使用 OpenCV 的端到端示例,展示了将节点重组为不同配置的能力。

双节点流通道例子

这个例子旨在说明:当使用 std::unique_ptr 发布和订阅消息时,进程内的发布/订阅连接可以实现消息的零拷贝传输。

首先查看源代码:
https://github.com/ros2/demos/blob/jazzy/intra_process_demo/src/two_node_pipeline/two_node_pipeline.cpp

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;

// Node that produces messages.
struct Producer : public rclcpp::Node
{
  Producer(const std::string & name, const std::string & output)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a publisher on the output topic.
    pub_ = this->create_publisher<std_msgs::msg::Int32>(output, 10);
    std::weak_ptr<std::remove_pointer<decltype(pub_.get())>::type> captured_pub = pub_;
    // Create a timer which publishes on the output topic at ~1Hz.
    auto callback = [captured_pub]() -> void {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }
        static int32_t count = 0;
        std_msgs::msg::Int32::UniquePtr msg(new std_msgs::msg::Int32());
        msg->data = count++;
        printf(
          "Published message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        pub_ptr->publish(std::move(msg));
      };
    timer_ = this->create_wall_timer(1s, callback);
  }

  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub_;
  rclcpp::TimerBase::SharedPtr timer_;
};

// Node that consumes messages.
struct Consumer : public rclcpp::Node
{
  Consumer(const std::string & name, const std::string & input)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a subscription on the input topic which prints on receipt of new messages.
    sub_ = this->create_subscription<std_msgs::msg::Int32>(
      input,
      10,
      [](std_msgs::msg::Int32::UniquePtr msg) {
        printf(
          " Received message with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
      });
  }

  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub_;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;

  auto producer = std::make_shared<Producer>("producer", "number");
  auto consumer = std::make_shared<Consumer>("consumer", "number");

  executor.add_node(producer);
  executor.add_node(consumer);
  executor.spin();

  rclcpp::shutdown();

  return 0;
}

从主函数可以看到,我们创建了一个生产者节点和一个消费者节点,将它们添加到单线程执行器中,然后调用 spin

  • 生产者节点(Producer):创建一个发布者,向“number”主题发布消息,并通过定时器定期生成新消息,打印消息的内存地址和值,然后发布。
  • 消费者节点(Consumer):订阅“number”主题,接收消息,然后打印的消息的内存地址和值。

让我们通过执行ros2 run intra_process_demo two_node_pipeline executable来运行例子(不要忘记先source安装文件):

$ ros2 run intra_process_demo two_node_pipeline
# 生产者发布
# (第一条消息可能因订阅尚未建立而丢失,无对应的消费者输出)
Published message with value: 0, and address: 0x7fb02303faf0
# 消费者接收,地址一致
Published message with value: 1, and address: 0x7fb020cf0520
Received message with value: 1, and address: 0x7fb020cf0520  
[...]

您可以看到具有相同值的“Published message…”和“Received message…”行也具有相同的消息地址。这表明接收到的消息的地址与发布的消息的地址相同。这是因为我们使用std::unique_ptrs进行消息的发布和订阅,这允许消息的所有权在系统中安全地移动。而如果使用const &和std::shared_ptr发布和订阅消息,则不会发生零复制。

循环管道例子

这个例子与前一个类似,但生产者不再为每次迭代创建新消息,而是通过循环结构重复使用同一个消息实例,并通过在执行器启动前手动触发一次发布来启动通信:

源代码:
https://github.com/ros2/demos/blob/jazzy/intra_process_demo/src/cyclic_pipeline/cyclic_pipeline.cpp

#include <chrono>
#include <cinttypes>
#include <cstdio>
#include <memory>
#include <string>
#include <utility>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/int32.hpp"

using namespace std::chrono_literals;

// This node receives an Int32, waits 1 second, then increments and sends it.
struct IncrementerPipe : public rclcpp::Node
{
  IncrementerPipe(const std::string & name, const std::string & in, const std::string & out)
  : Node(name, rclcpp::NodeOptions().use_intra_process_comms(true))
  {
    // Create a publisher on the output topic.
    pub = this->create_publisher<std_msgs::msg::Int32>(out, 10);
    std::weak_ptr<std::remove_pointer<decltype(pub.get())>::type> captured_pub = pub;
    // Create a subscription on the input topic.
    sub = this->create_subscription<std_msgs::msg::Int32>(
      in,
      10,
      [captured_pub](std_msgs::msg::Int32::UniquePtr msg) {
        auto pub_ptr = captured_pub.lock();
        if (!pub_ptr) {
          return;
        }
        printf(
          "Received message with value:         %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        printf("  sleeping for 1 second...\n");
        if (!rclcpp::sleep_for(1s)) {
          return;    // Return if the sleep failed (e.g. on :kbd:`ctrl-c`).
        }
        printf("  done.\n");
        msg->data++;    // Increment the message's data.
        printf(
          "Incrementing and sending with value: %d, and address: 0x%" PRIXPTR "\n", msg->data,
          reinterpret_cast<std::uintptr_t>(msg.get()));
        pub_ptr->publish(std::move(msg));    // Send the message along to the output topic.
      });
  }

  rclcpp::Publisher<std_msgs::msg::Int32>::SharedPtr pub;
  rclcpp::Subscription<std_msgs::msg::Int32>::SharedPtr sub;
};

int main(int argc, char * argv[])
{
  setvbuf(stdout, NULL, _IONBF, BUFSIZ);
  rclcpp::init(argc, argv);
  rclcpp::executors::SingleThreadedExecutor executor;

  // Create a simple loop by connecting the in and out topics of two IncrementerPipe's.
  // The expectation is that the address of the message being passed between them never changes.
  auto pipe1 = std::make_shared<IncrementerPipe>("pipe1", "topic1", "topic2");
  auto pipe2 = std::make_shared<IncrementerPipe>("pipe2", "topic2", "topic1");
  rclcpp::sleep_for(1s);  // Wait for subscriptions to be established to avoid race conditions.
  // Publish the first message (kicking off the cycle).
  std::unique_ptr<std_msgs::msg::Int32> msg(new std_msgs::msg::Int32());
  msg->data = 42;
  printf(
    "Published first message with value:  %d, and address: 0x%" PRIXPTR "\n", msg->data,
    reinterpret_cast<std::uintptr_t>(msg.get()));
  pipe1->pub->publish(std::move(msg));

  executor.add_node(pipe1);
  executor.add_node(pipe2);
  executor.spin();

  rclcpp::shutdown();

  return 0;
}

与前面的例子不同,这个例子只使用一个Node,然后使用不同的名称和配置实例化两次并形成循环。在一个循环中,ROS 图最终是样的 pipe1 -> pipe2 -> pipe1…

其中这行代码pipe1->pub->publish(msg)启动了这个循环流程,人这时起,每个节点在其自己的订阅回调中调用publish发布消息,在节点之间来回传递消息。

在这里,节点每秒来回传递一次消息,每次都增加消息的值。由于消息是作为unique_ptr发布和订阅的,因此在开始时创建的相同消息就会被持续使用。

为了测试这些期望,让我们运行它:

$ ros2 run intra_process_demo cyclic_pipeline
Published first message with value:  42, and address: 0x7fd2ce0a2bc0
Received message with value:         42, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 43, and address: 0x7fd2ce0a2bc0
Received message with value:         43, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 44, and address: 0x7fd2ce0a2bc0
Received message with value:         44, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 45, and address: 0x7fd2ce0a2bc0
Received message with value:         45, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 46, and address: 0x7fd2ce0a2bc0
Received message with value:         46, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
  done.
Incrementing and sending with value: 47, and address: 0x7fd2ce0a2bc0
Received message with value:         47, and address: 0x7fd2ce0a2bc0
  sleeping for 1 second...
[...]

图像管道例子

这个例子使用 OpenCV 实现图像采集、标注和显示,验证复杂场景下的进程内零拷贝通信。

注意(macOS 用户)

若遇到类似 ddsi_conn_write failed -1 的错误,需临时增大系统 UDP 数据包大小:

$ sudo sysctl -w net.inet.udp.recvspace=209715
$ sudo sysctl -w net.inet.udp.maxdgram=65500

(重启后设置失效)

简单管道(三节点)

首先,我们将有一个包含三个节点的管道,排列方式如下camera_node -> watermark_node -> image_view_node

  • camera_node:从摄像头采集图像,在图片上添加文信息并发布。
  • watermark_node:订阅camera_node图像,添加更多文本后重新发布。
  • image_view_node:订阅watermark_node图像,继续添加文本并通过 cv::imshow 显示。

在每个节点中,正在发送或已接收或两者同时发送的消息的地址都被写入图片中。watermark 和 image view 节点旨在修改图像而不复制它,因此,只要节点处于同一进程中,那么图片上打印的地址都应该相同。

让我们通过执行以下可执行文件来运行例子:

ros2 run intra_process_demo image_pipeline_all_in_one

可以看到暂停图像(按空格)后,图像上的三个消息地址应应该一致(同一进程内零拷贝)。

带两个图像查看器的管道

现在我们来看一个与上述示例类似的例子,不同之处在于它有两个图像查看节点。所有节点仍处于同一进程中,但现在应该会显示两个图像查看窗口( macOS 用户注意:您的图像查看窗口可能会重叠)。我们可以使用以下命令运行它:

ros2 run intra_process_demo image_pipeline_with_two_image_view

与上一个示例一样,您可以使用空格键暂停渲染,再次按下空格键可继续。您可以停止更新以检查写入屏幕的指针。

如上面的示例图像所示,我们有一个图像中所有指针都相同,另一个图像的前两个条目指针与第一个图像相同,但第二个图像的最后一个指针不同。要理解为什么会发生这种情况,请考虑图的拓扑结构:

camera_node -> watermark_node -> image_view_node
                              -> image_view_node2

摄像头节点(camera_node)与水印节点(watermark_node)之间的链接可以使用同一指针而无需复制,这是因为消息只需传递到一个进程内订阅(intra process subscription)。然而,在水印节点与两个图像视图节点之间的链接是一对多关系,因此如果图像视图节点(image_view_node)使用 unique_ptr 回调,则无法将同一指针的所有权同时传递给两者。此时,指针只能传递给其中一个节点。具体哪一个节点会获得原始指针的所有权并未明确定义,而是取决于消息传递的顺序——最后接收的节点将获得指针。

在这里还需要注意,图像查看节点并非使用 unique_ptr 回调进行订阅,而是使用 const shared_ptr 进行订阅。这意味着系统会将同一个 shared_ptr 传递给两个回调。当处理第一个进程内订阅时,内部存储的 unique_ptr 会提升为 shared_ptr。每个回调都将获得同一消息的共享所有权。

带进程间查看器的管道

另一个需要正确理解的重要点是,当进行进程间订阅时,要避免中断进程内的零拷贝行为。为了测试这一点,我们可以运行第一个图像管道演示 ros2 run intra_process_demo image_pipeline_all_in_one,然后运行独立的 ros2 run intra_process_demo image_view_node 实例。这看起来会像这样:

在ROS2中,同时暂停两个图像流以实现对齐存在困难,这可能导致图像帧无法完全同步。但关键点在于观察到的以下现象及其技术含义:image_pipeline_all_in_one 的图像查看器在每个步骤中显示的地址相同。这意味着即使存在外部订阅(例如独立的图像查看器),进程内零拷贝机制仍然生效。这是因为节点在同一进程中运行(如通过 ComposableNode 组合),ROS2 的进程内通信直接传递指针(unique_ptr 或裸指针),避免数据复制。而跨进程的图像查看器节点(独立的图像查看器节点)则显示不同进程ID,且地址与前两行不同。这是因为 ROS2 通过 DDS 中间件在跨进程通信时进行序列化/反序列化,生成独立副本,导致内存地址不同。


关注【智践行】公众号,发送 【机器人】 获得机器人经典学习资料

点击这里复制本文地址 以上内容由文彬编程网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

文彬编程网 © All Rights Reserved.  蜀ICP备2024111239号-4