ROS2中关于QoS的应用以及知识点整理

ROS

ROS(Robot Operation System)其实很多人并不陌生,是一个开源操作系统,专门用于开发机器人。ROS的诞生使得开发一款机器人的时间大大缩短,记得以前在一本书中看过,ROS诞生之前,开发一个机器人可能需要一年,甚至二年。而利用ROS来开发机器人,只需要一周的时间!

那么ROS是由谁开发的呢?
其实,ROS最初是斯坦福大学人工智能实验室与机器人技术公司Willow Garage共同开发的,2008年以后主要由Willow Garage公司推动,维护。
2012年以后,Willow Garage中有一个ROS团队,从公司中独立出来后,成为了非盈利组织the Open Source Robotics Foundation(OSRF)。主要负责维护和更新ROS。
这里就像是我们公司的Autoware与The Autoware Foundation的关系。

说到底,ROS主要就是提供了一个平台,让各个传感器之间实现通信,可以说ROS就是一个通信中间件。

通信中间件

通信中间件的功能主要就是负责各种传感器数据比如相机,激光雷达,雷达,IMU等数据高效稳定的传递。
目前自动驾驶领域主要有百度所采用的Cyber RT和我们公司所采用的ROS。

通信中间件的通信方式

ROS的网络通信提供了2种方式,TCP和UDP。TCP是默认的通信方式。
TCP简单来说就是:发送方发出一个信息后,接受方需要发出一个信号,告诉发送方“我收到消息了”。如果发送方没有受到这个信号,那下一条信息就不能发出。
UDP简单来说就是:发送方发出一个信息,不管接收方是否有接受到消息,发送方都会一直发送消息。

这么来看的话,UDP通信方式更适合自动驾驶,即使我发出的一帧数据丢失了,我下一帧的数据同样重要。比如激光雷达定位系统,哪怕我丢一帧数据,我也不会那么轻易的丢失自己的位置信息。可是如果说在UDP的情况下,你没有告诉我收到了信息,激光雷达的信息就会停留在某一帧直至接收方收到信息,这样的话几秒之内汽车的位置信息可能已经出去几十米甚至百米了。

通信中间件的工作模式

通信中间件包括点到点,消息队列和发布/订阅三种工作模式,SOME/IP和DDS都采用了第三种工作模式,发布/订阅。
相比于面向信号的CAN,SOME/IP和DDS都是面向服务的通信协议。
两者的区别在于:面向信号的数据传输,不管网络需不需要,它始终会不断循环发送;而面向服务的通信方式则不同,仅当客户端请求或服务器通知特定订阅者时,才在客户端-服务器配置中交换数据,这就确保了永远不会浪费带宽,并且仅在需要的时间和地点进行数据通信/交换。

通信中间件的种类

根据源代码是否开放,分为闭源与开源两种。
闭源的主要有Vector公司的SOME/IP, RTI公司的DDS等。
开源的主要有OPEN DDS, FAST DDS, Cyclone DDS等。

SOME/IP

SOME/IP的全称为:Scalable service-Oriented MiddlewarE over IP,是一种面向服务的传输协议。
严格地说,SOME/IP不是一款特定的产品,而是一种技术标准。其最早由宝马在2012-2013年开发,并在2014年集成进AUTOSAR 4.2.1中。
当前,全球最大的商用SOME/IP产品供应商是Vector。 开源版的SOME/IP则是由Genivi协会来维护的。

AUTOSAR全称为AUTomotive Open System ARchitecture(汽车开放系统架构)。AUTOSAR是由汽车主机厂、零部件供应商、半导体厂商、软件服务商、工具提供商以及其他相关的厂商联合成立的一个组织

DDS

DDS的全称为Data Distribution Service(数据分发服务),是由OMG发布的分布式通信规范,采用发布/订阅模型,提供多种QoS服务质量策略。
DDS将分布式网络中传输的数据定义为“主题”,将数据的产生和接收对象分别定义为“发布者”和“订阅者”,从而构成数据的发布/订阅传输模型。各个节点在逻辑上无主从关系,点与点之间都是对等关系,通信方式可以是点对点、点对多、多对多等,在QoS的控制下建立连接,自动发现和配置网络参数。
DDS最早应用于美国海军,用于解决舰船复杂网络环境中大量软件升级的兼容性问题,后来扩展至航空、航天、船舶、国防、金融、通信、汽车等领域,包括作战系统、船舶导航和控制系统、船舶防御系统、无人机驾驶系统和地面控制系统、装甲车辆控制系统、仿真和培训系统、雷达处理和空中交通管理系统、金融系统等。
2018年,DDS首次被引进AUTOSAR AP,作为可选择的通信方式之一。

ROS2和Cyber RT的底层都使用了开源的DDS,将DDS作为最重要的通信机制。与此相对应的是,Xaver、Orin等面向自动驾驶的SOC芯片上也都预留了DDS接口。

闭源DDS: RTI Connext DDS

全球范围内,DDS市场份额最大的供应商(80%左右)的是成立于1991年的美国RTI公司(全称为Real-Time Innovations)。RTI作为OMG组织董事会的成员,主导了DDS标准的制定,从2004年开始负责主持DDS工作组的工作,目前已经成为这个行业的领导者,对DDS标准有足够的权威。
RTI开发的DDS品牌名为Connext,,因此又被称为Connext DDS。

开源DDS: FAST DDS and OPEN DDS

开源DDS,主要是相对于商用的RTI Connext DDS等而言的,其也是根据OMG官方标准开发的,但源代码开放。

FAST DDS
在自动驾驶领域比较有影响力的开源DDS是由RTI原核心团队成员在欧洲创办的eProsima公司推出的FastDDS。在eProsima将FastDDS的源代码开放出来后,用户可以直接在github上免费下载。但FastDDS使用起来比较麻烦,这个时候,用户就需要通过向eProsima支付费用来取得支持。
OPEN DDS
OpenDDS 由位于圣路易斯和凤凰城的的Object Computing的ACE/TAO团队开发,它和FastDDS具有一定的相似性——两者都是基于RTPS实现的,面向数据的通信框架,遵循的是同一标准。这类框架的典型特征是去中心化,支持QoS机制,支持实时通信。通常会绑定如protobuf等序列化工具。

开源DDS与闭源DDS的不同点

在许多情况下,FastDDS 、OpenDDS可以跟RTI的Connnext DDS互操作/通信。当然,具体还得看场景。比如开源DDS的QoS(服务策略)有 23个,大家都用这23个QOS交互,那就能互操作;如果RTI Connext用的是超出这23个自定义范围的QoS,那么开源DDS就解析不了。此外,如果用的是OMG没开源的DDS工具,那也没法互操作/通信。

国内某中间件厂商负责人介绍,出于成本的考量,英伟达的Xavier自带的Driver.OS中便采用了FastDDS或OpenDDS这样的开源DDS。
RTI方面认为,开源DDS是其最大的竞争对手。
另外RTI DDS的服务策略有50多个,但开源DDS的服务策略只有23个,完整程度远不及前者。此外RTI的DDS已经通过了ASIL-D的认证,但开源DDS还没有。

MQTT

MQTT是由IBM开发的即时通讯协议,其全称是Message Queuing Telemetry Transport (消息队列遥测传输)。MQTT协议也采用发布/订阅模式,所有的物联网终端都通过TCP连接到云端,云端通过主题的方式管理各个设备关注的通讯内容,负责将设备与设备之间消息的转发。
由于延时控制等方面功能较差、服务策略也比较少,MQTT不适用于高速项目和大型项目,但可用于低带宽、不可靠的网络场景,提供基于云平台的远程设备的数据传输和监控。在自动驾驶领域,MQTT比较典型的应用场景是V2X。
此外,MQTT在低速车领域也同样适用。它体积极小,并能提供简单的QoS保证,非常适合玩具车,扫地车等功能简单、硬件资源有限的项目。
MQTT也是开源的通信中间件。

QoS

服务策略QoS是DDS标准相比于SOME/IP最重要的特征。
SOME/IP只有一个QoS,即可靠性的定义reliability
而RTI DDS和开源DDS里面分别有50多个和20多个QoS,这些QoS基本上能涵盖绝大多数可以预见到的智能驾驶场景。
QoS具体是什么呢?有什么用?

  1. 通信中的传输优先级、数据可靠性、资源限制、时间过滤等,都需要在QoS里面设置。
  2. 数据传输过程中可能会出现丢帧现象,也就是说,不是每一帧都能达到接收端。针对这种现象,我们需要考虑场景需求。如果是关键信息报警指令,比如手动接管请求,我们需要发送方重发。如果是非关键信息高频传感数据,比如我上面提到的LiDAR定位,系统就不必把丢失的部分都找回来,这些都只需配置QoS的reliability就可以了。
  3. 在感知系统有冗余的情况下,一旦一个传感器宕机,就需要第二个传感器补上来。QoS可以对两个传感器的数据做优先级高低的区分。配置之后也不需要重新编译,因为它是动态部署的,配置完之后就可以按照最新配置运行,去完成不同节点之间的发布订阅。
  4. DDS的解耦模式允许某一主题发布方在没有订阅方的情况下仍然发布数据,但后加入的订阅方也许对这一主题的历史记录感兴趣。例如,新节点上线后需要去监控其他节点的运行情况,这些节点也许每隔较长一段时间才发布一个信息说自己“运行正常”,那么这个新上线的节点就需要去了解其他节点发布的历史信息以确定其运行状态,也就是它希望能收到其上线之前其他节点发布的历史数据。这种情况,只需要简单配置QoS就可以实现,新节点可以获知上线之前多长时间、多长节点的数据流,去关注它的历史数据等等。
  5. 负责监控的新节点上线后,需要去监控其他节点的运行情况。通常,这些节点每个小时发布一个信息说自己“运行正常”,但也有可能这个“运行正常”的节点先发布了,过半小时之后监控节点才发布,那这时候,监控节点是否还能收到其上线之前其他节点发布的数据?这种情况,也是需要通过QoS去配置的,QoS可以去配置订阅新节点上线之前多长时间、多长节点的数据流,去关注它的历史数据等等。

进一步说,QoS能够提供实时系统所要求的性能、可预测性和资源可控性,并且能够保证发布/订阅模型的模块性、可量测性和鲁棒性等。因此,DDS能够满足非常复杂、非常灵活的数据流要求。
相比之下,SOME/IP只解决了发布订阅问题,但由于没有这些QoS,结果便是,很多本来可用自动配置服务策略来实现的功能,都需要通过软件开发人员写代码才能实现,这会浪费大量的时间。
此外,由于SOME/IP没有QoS,在数据量大的时候,无法解决丢包的问题,进而造成指令被卡在某个地方发不出去,然后,整个系统就无法正常运作了。

ROS2中的QoS

综上所述,对QoS应该有了充分的了解,这里记录一下自己在工作中接触到的问题。
ROS2中QoS有如下选择,可以进行多种多样的配置,从而达到使用需求。
History
Keep last: 只保留最近的N个数据,通过队列深度选项进行配置。
Keep all: 保留所有的数据,但要受限于底层中间件的资源限制。
Depth
N:仅当与存最近的数据(Keep last)一起使用时才有效。
Reliability
Best effort: 尝试提供样本,但如果网络不稳定,可能会丢失样本。
Reliable: 确保数据的可靠传输,可以多次重试。
Durability
Volatile: 不尝试保留数据。
Transient Local:发布者为晚连接的订阅者保留数据

官方教程中,默认的是可以互相通信的,当我们在Subscriber中引入QoS后,就发生了无法通信的问题。

1
2
3
4
5
6
7
qos = QoSProfile(depth=10)
qos.durability = DurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
qos)

最近在工作中,播放rosbag后,想在rviz中确认image topic,结果没有看到任何图片,最后通过调查得知,接收消息,需要设置正确的QoS配置。
代码放在下面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#!/usr/bin/env python3
import rclpy
import cv2
from rclpy.node import Node
from rclpy.qos import QoSProfile
from sensor_msgs.msg import Image, CompressedImage
from cv_bridge import CvBridge, CvBridgeError

import numpy as np

import threading
import argparse
import os

class ImgViewer(Node):
def __init__(self, in_image_topic, rate):
self.lock = threading.Lock()

super().__init__('img_viewer')
qos_profile = QoSProfile(depth=10, reliability=2)
topic = in_image_topic
self.create_subscription(CompressedImage, topic, self.image_callback, qos_profile)
self.rate = rate

self.image = None

def image_callback(self, msg):
with self.lock:
try:
# image = CvBridge().imgmsg_to_cv2(msg, "bgr8")
np_arr = np.frombuffer(msg.data, np.uint8)
image = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
image_stamp = msg.header.stamp.sec*1e3 + msg.header.stamp.nanosec/1e6
except CvBridgeError as e:
print(e)
print('received image0. timestamp: {}'.format(image_stamp))

self.image = image
self.image_stamp = image_stamp
img_resize = cv2.resize(
image, None, fx=0.25, fy=0.25)
cv2.imshow('image6', img_resize)
cv2.waitKey(1)


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--in_image', '-i', type=str,
default='/sensing/camera/camera0/image_raw/compressed')
parser.add_argument('--rate', '-r', type=str,
default=0.25)
args = parser.parse_args()

return args

def main(args=None):
args = parse_args()
rclpy.init()

node = ImgViewer(args.in_image, args.rate)
rclpy.spin(node)

node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

参考

Quality of Service