跳转至

分享|EdgeX Message Bus 消息总线

本文提供了 EdgeX Foundry Message Bus 消息总线相关定义和内容。

EdgeX Foundry 在下面将统一简称为 EdgeX

  • 介绍
  • 消息封装
  • 实现
  • 多级主题和通配符
  • 部署

本文内容大部分来自官方文档:EdgeX MessageBus

介绍

EdgeX 有一个称为 EdgeX MessageBus 的内部消息总线,用于 EdgeX 服务之间的内部通信。 EdgeX 服务是指来自 EdgeX 的任何核心/支持/应用程序/设备服务或使用 EdgeX SDK 构建的任何自定义应用程序或设备服务

下图显示了每个 EdgeX 服务如何使用 EdgeX MessageBus。

插入 EdgeX MessageBus 的图像

EdgeX MessageBus 用于内部 EdgeX 服务与服务之间的通信。它并不意味着作为外部服务与内部 EdgeX 服务通信的入口点。 eKuiper 规则引擎是一个例外,因为它与 EdgeX 紧密集成。

用作外部入口点的 EdgeX 服务包括:

  • 所有 EdgeX 服务上的 REST API - 以非安全模式直接访问或通过 API 网关 当以安全模式运行时

  • 使用外部 MQTT 触发器的应用服务 - 配置为使用外部 MQTT 触发器的应用服务将在“外部”上接受来自外部 MQTT 连接服务的数据。

  • 使用 HTTP 触发器的应用服务 - 配置为使用 HTTP 触发器的应用服务将接受数据来自“外部”上的外部服务休息连接。访问方式与其他 EdgeX REST API 相同。

  • 使用自定义触发器的应用服务 - 配置为使用自定义触发器的应用服务可以接受数据来自外部服务或通过附加协议,几乎没有限制。有关示例,请参阅自定义触发器示例。

  • Core Command 外部 MQTT 连接 - Core Command 现在通过独立于 EdgeX MessageBus 的外部 MQTT 连接接收命令请求并发布响应。请求被转发到 EdgeX MessageBus,相应的响应被转发回外部 MQTT 连接。

最初,EdgeX MessageBus 仅用于将事件/读数(Event/Readmings)从核心数据发送到应用程序服务层。在最近的版本中,更多服务使用 EdgeX MessageBus 而不是 REST 进行服务间通信。

  • 设备服务将事件/读数直接发布到 EdgeX MessageBus,而不是通过 REST 将其发送到 Core Data。
  • 服务指标发布到 EdgeX MessageBus。
  • 系统事件发布到 EdgeX MessageBus。
  • 命令请求/响应 现在由核心命令和设备服务发布到 EdgeX MessageBus。
  • 通过 EdgeX MessageBus 从核心元数据到设备服务的设备验证请求。

消息封装

发布到 EdgeX MessageBus 的所有消息都包装在 MessageEnvelope 中。此封装包含描述消息有效负载的元数据,例如有效负载内容类型(JSON 或 CBOR)、相关 ID 等。

笔记

除非下面另有说明,MessageEnvelope 在将其发布到 EdgeX MessageBus 时采用 JSON 编码。这确实会导致 MessageEnvelope 的有效负载被双重编码。

实现

EdgeX MessageBus 由 go-mod-messaging 中实现的消息总线抽象定义。该模块定义了一个抽象客户端 API,目前有四种用于不同底层消息总线协议的 API 实现。

通用消息总线配置

每个使用 EdgeX MessageBus 的服务都有一个配置部分,用于定义要使用的实现、连接方法和底层协议客户端。此部分是所有 EdgeX 服务的服务通用配置中的 MessageBus: 部分。有关更多详细信息,请参阅通用配置中的 MessageBus 选项卡。

每个实现的常见 MessageBus 配置元素是:

  • 类型: Type - 指定要使用以下哪种实现。

  • Redis Pub/Sub(默认)- Type=redis

  • MQTT 3.1 - Type=mqtt
  • NATS 核心 - Type=nats-core
  • NATS JetStream - Type=nats-jetstream

  • 主机: Host - 指定消息 broker 的名称或 IP

  • 端口: Port - 指定消息 broker 的端口号
  • 协议: Protocol - 指定消息 broker 使用的协议
  • redis Redis Pub/Sub
  • tcp MQTT 3.1
  • tcp NATS 核心
  • tcp NATS JetStream

笔记

通常,部署中运行的所有 EdgeX 服务都必须配置为使用相同的 EdgeX MessageBus 实现。默认情况下,所有使用 EdgeX MessageBus 的服务都配置为使用 Redis Pub/Sub 实现(这很重要)。 NATS 确实支持与 MQTT 的兼容模式。有关详细信息,请参阅下面的 NATS MQTT 模式部分。

Redis Pub/Sub(发布/订阅)

如上所述,这是所有 EdgeX 服务都配置使用的默认实现。它利用 broker 的现有 Redis 数据库实例。 Redis Pub/Sub 是一种即发即忘协议,因此无法保证送达。如果需要更高的稳健性,请使用 MQTT 或 NATS 实现。另外,Redis 是内存型数据库,随着数据量越大占用内存越来越多。

配置

请参阅上面的通用配置部分,了解所有实现的通用配置元素。

安全配置
选项 默认值 描述
认证模式 usernamepassword 要使用的身份验证模式。值为 noneusernamepasswordclientcertcacert。在安全模式下,Redis Pub/Sub 使用 usernamepassword
秘密名称 redisb 用于在服务的 SecretStore 中查找凭据的秘密名称
附加配置

该实现没有任何额外的配置。

MQTT 3.1

稳健的消息总线协议,具有额外的稳健性配置选项,并且需要运行额外的 MQTT broker 代理。有关此协议的更多详细信息,请参阅 MQTT 规范。

配置

请参阅上面的通用配置部分,了解所有实现的通用配置元素。

安全配置
选项 默认值 描述
AuthMode 认证模式 none 要使用的身份验证模式。值为 noneusernamepasswordclientcertcacert。在安全模式下,MQTT 代理使用 usernamepassword
SecretName 秘密名称 blank 用于在服务的 SecretStore 中查找凭据的秘密名称
附加配置

除非服务通用配置中存在注明的默认值。本节的内容大都是跟 MQTT 规范有关

选项 默认值 描述
ClientId 客户端 ID service key 连接到 MQTT 代理的客户端的唯一名称(在每个服务的私有配置中设置
Qos 服务质量 0 服务质量级别
0:最多交付一次
1:至少交付一次
2:恰好一次传送
请参阅 MQTT QOS 规范了解更多详情
KeepAlive 保活 10 客户端完成传输一个控制数据包与开始发送下一个控制数据包之间允许经过的最大时间间隔(以秒为单位)。如果超过,broker 将关闭客户端连接
Retained 保留 false 如果为 true,服务器必须存储应用程序消息及其 QoS,以便可以将其传递给订阅与其主题名称匹配的未来订阅者。请参阅保留消息了解更多详情。
AutoReconnect 自动重新连接 true 如果为 true,则在连接丢失时自动尝试重新连接到代理
ConnectTimeout 连接超时 30 成功连接到代理的超时(以秒为单位)
CleanSession 清理会话 false 如果为真,服务器必须丢弃任何先前的会话并启动一个新的会话。该会话的持续时间与网络连接的时间相同

NATS

NATS 是一种高性能消息传递系统,为本地部署提供了一些有趣的选项。它使用一种轻量级的基于文本的协议,与 http 非常相似。该协议包括完整的标头支持,如果部署中的所有服务都使用 NATS,则可以允许跨服务边界传送 EdgeX MessageEnvelope,而无需双重编码。目前,服务必须使用 include_nats_messaging 标记专门构建才能启用此选项。

NATS 核心

普通 NATS 服务器使用兴趣(interest)或客户端订阅的存在作为服务器上主题可用性的基础。这使得 Publish 成为一种类似于 Redis 的即发即忘操作,并为系统提供了 at most once 最多一次的服务质量。

NATS 捷流

JetStream 持久层将 NATS 主题绑定到持久流,这使服务器能够收集没有注册兴趣的主题的消息,并支持 at least once 至少一次的服务质量。值得注意的是,在 core-nats 模式下运行的服务仍然可以订阅和发布到支持 jetstream 的主题,而无需与发布确认相关的额外开销。

配置

请参阅上面的通用配置部分,了解所有实现的通用配置元素。

安全配置
选项 默认值 描述
AuthMode 认证模式 none 要使用的身份验证模式。值为 noneusernamepasswordclientcertcacert。 NATS 服务器当前未在安全模式下受到保护。
SecretName 秘密名称 blank 用于在服务的 SecretStore 中查找凭据的秘密名称
NKeySeedFile NKey 种子文件 blank 用于身份验证的种子文件的路径。请参阅 NATS 文档了解更多详细信息
CredentialsFile 凭证文件 blank 用于身份验证的凭据文件的路径。请参阅 NATS 文档了解更多详细信息
附加配置

除非服务通用配置中存在注明的默认值。

选项 默认值 描述
ClientId 客户端 ID service key 连接到 NATS 服务器的客户端的唯一名称(在每个服务的私有配置中设置
Format 格式 nats 实际发布消息的格式。有效值为:
- nats :来自 MessageEnvlope 的元数据被放入 NATS 标头和 MessageEnvlope 中的有效负载按原样发布。 所有服务都使用 NATS 时的首选格式
- json :JSON 编码 MessageEnvelope 并将其作为消息发布。当其他服务使用 MQTT 3.1 并以 MQTT 模式运行 NATS 服务器时,请使用此格式以实现兼容性。
ConnectTime 连接超时 30 成功连接到代理的超时(以秒为单位)
RetryOnFailedConnect 连接失败重试 false 连接失败时重试 - 需要布尔值的字符串表示形式
QueueGroup 队列组 blank 指定队列组以将消息从流分发到工作服务池
Durable 耐用的 blank 指定持久消费者应与给定名称一起使用。请注意,如果指定名称的持久使用者不存在,它将被视为临时使用者,并在耗尽/取消订阅时被客户端删除(仅限 JetStream
Subject 主题 blank 如果未指定 Durable,则指定订阅流的主题 - 还将被格式化为订阅时使用的流名称。该主题还用于在需要时自动配置流,并且应该使用“root”进行配置。所有订阅共有的主题(例如 edgex/#),以确保涵盖总线上的所有主题。 (仅限 JetStream
AutoProvision 自动配置 false 自动配置 NATS 流。 (仅限 JetStream
Deliver 递送 new 指定订阅的交付模式 - 选项有“新”、“全部”、“最后”等。或“最后一个主题”。有关更多详细信息,请参阅 NATS 文档(仅限 JetStream
DefaultPubRetryAttempts 默认发布重试尝试 2 发布失败时尝试重试的次数(仅限 JetStream

使用 nats-box 进行资源配置

虽然 SDK 会尝试自动配置所需的流(如果已配置),但如果您需要启用特定功能或策略,通常最好配置您自己的流。 nats-box docker 映像 已预装各种实用程序,使这一切变得更容易。

有关使用 nats cli 进行流配置的信息,请参阅此处

nats-box 提供了一个名为 nk 的实用程序来生成 nkey。要生成 nkey 种子文件,请参阅此处

nats-box 提供了一个名为 nsc 的实用程序来进行凭证管理。有关使用凭据文件的信息,请参阅有关 解析器 和配套的 内存解析器教程的文档.

NATS MQTT 模式

支持 JetStream 的服务器可以支持同一组基础主题上的 MQTT 连接。如果您使用预构建的 EdgeX 服务(例如 device-onvif-camera)但希望将系统转换为使用 NATS,则此功能特别有用。请注意,必须使用 format=json,以便 NATS 消息总线客户端可以读取 MQTT 客户端发送的双编码封装。有关详细信息,请参阅 NATS MQTT 文档

多级主题和通配符

EdgeX MessageBus 使用多级主题和通配符来允许通过订阅过滤数据,并在类似 MQTT 的方案上进行了标准化。请参阅 MQTT 多级主题和通配符了解更多信息。

Redis 实现转换 Redis Pub/Sub 多级主题方案以匹配 MQTT 的方案。在 Redis Pub/Sub 中,“.” “”用作级别分隔符。后面跟随的级别分隔符用作单级别通配符,“”用作单级别通配符。最后用作多级通配符。这些被转换为“/”。和“+”和“#”分别由 MQTT 使用。

NATS 实现转换 NATS 多级主题方案以匹配 MQTT 的方案。在 NATS 中“.” “*”用作级别分隔符。用作单级通配符,“>”可用作单级通配符。用于多级通配符。这些被转换为“/”、“+”和“/”。和“#”分别符合 MQTT 方案。

EdgeX MessageBus 的多级主题和通配符示例

  • Edgex/events/# 来自任何设备服务或任何设备配置文件、设备或源的核心数据的所有事件

  • edgex/events/device/# 来自任何设备配置文件、设备或源的任何设备服务的所有事件

  • edgex/events/+/device-onvif-camera/# 仅来自设备服务“device-onvif-camera”的事件适用于任何设备配置文件、设备和源

  • edgex/events/+/+/+/camera-001/# 来自任何设备服务的事件或任何设备配置文件的核心数据,但仅针对设备“camera-001”。对于任何来源

  • edgex/events/device/+/onvif/+/status 来自仅针对设备配置文件“onvif”的任何设备服务以及来自任何设备且仅针对源“状态”的事件。

部署

Redis Pub/Sub 发布/订阅(默认)

所有 EdgeX 服务都能够使用 Redis Pub/Sub,无需对配置进行任何更改。发布的 compose 文件使用 Redis Pub/Sub。

MQTT 3.1

所有 EdgeX 服务都能够使用 MQTT 3.1,只需更改每个服务的配置即可。

笔记

如上所述,MQTT 3.1 实现需要添加 MQTT Broker 服务才能运行。也就是说,需要一个额外的 MQTT broker 服务。

配置变更

EdgeX 3.0

对于 EdgeX 3.0 MessageQueue 配置已重命名为 MessageBus,现在处于通用配置中。

MessageBus 配置是通用配置,以下更改只需进行一次并应用于所有服务。有关更多详细信息,请参阅通用配置中的 MessageBus 选项卡。

所有服务的 MQTT 配置更改示例

必须在通用配置中更改以下 MessageBus 配置设置,所有 EdgeX 服务才能使用 MQTT 3.1

MessageBus:
    Type: "mqtt"
    Protocol: "tcp"
    Host: "localhost" # in docker this must be overriden to be the docker host name of the MQTT Broker
    Port: 1883
    AuthMode: "none" # set to "usernamepassword" when running in secure mode
    SecreName: "message-bus"
...

笔记

适用于 MQTT 的可选设置已在通用配置中,因此上面未包含。

Docker

EdgeX Compose Builder 实用程序提供了一个选项,可轻松生成一个 compose 文件,其中包含使用环境覆盖为 MQTT 3.1 重新配置的所有选定服务。这是通过使用 mqtt-bus 选项来完成的。有关所有可用选项的详细信息,请参阅 Compose Builder 自述文件。

MQTT 3.1 的安全模式 compose 生成示例

make gen ds-virtual ds-rest mqtt-bus

MQTT 3.1 的非安全模式 compose 生成,加入 no-secty 选项

make gen no-secty ds-virtual ds-rest mqtt-bus

笔记

run 命令可用于在一个命令中生成并运行 compose 文件,但对生成的 compose 文件所做的任何更改都将在下一次被覆盖。

make run ...

另一种方法是使用 up 命令,该命令运行最新生成的撰写文件以及可能已进行的任何修改。

make up

NATS

基于 EdgeX Go 的服务无法使用 NATS 实现,除非使用 include_nats_messaging 构建标签进行重建。任何旨在在部署中使用 NATS 的 EdgeX Core/Support/Go 设备/应用程序服务都必须修改 Makefile 以添加此构建标志。然后可以针对本机和/或 Docker 重建该服务。

核心数据使目标修改为包括 NATS

cmd/core-data/core-data:
    $(GOCGO) build -tags "include_nats_messaging $(NON_DELAYED_START_GO_BUILD_TAG_FOR_CORE)" $(CGOFLAGS) -o $@ ./cmd/core-data

笔记

C 设备 SDK 目前没有 NATS 实现,因此 C 设备不能与基于 NATS 的 EdgeX MessageBus 一起使用。

配置变更

EdgeX 3.0

对于 EdgeX 3.0 MessageQueue 配置已重命名为 MessageBus,现在处于通用配置中。

MessageBus 配置是通用配置,以下更改只需进行一次并应用于所有服务。有关更多详细信息,请参阅通用配置中的 MessageBus 选项卡。

所有服务的 NATS 配置更改示例

必须在通用配置中更改以下 MessageBus 配置设置,所有 EdgeX 服务才能使用 NATS Jetstream

MessageBus:
    Type: "nats-jetstream"
    Protocol: "tcp"
    Host: "localhost" # in docker this must be overriden to be the docker host name of the NATS server
    Port: 4222
    AuthMode: "none" # Currently in secure mode the NATS server is not secured

笔记

适用于 NATS 的可选设置已在通用配置中,因此上面未包含。

Docker

EdgeX Compose Builder 实用程序提供了一个选项,可轻松生成 compose 文件,其中使用环境覆盖为 NATS 重新配置了所有选定的服务。这是通过使用 nats-bus 选项来完成的。此选项将服务配置为使用 NATS Jetstream 实现。有关所有可用选项的详细信息,请参阅 Compose Builder 自述文件。如果首选 NATS Core,只需在生成的撰写文件中搜索并用 替换 nats-jetstreamnats-core 即可。

NATS 的安全模式 compose 生成示例

make gen ds-virtual ds-rest nats-bus

NATS 的非安全模式 compose 生成

make gen no-secty ds-virtual ds-rest nats-bus

关于我们

亿琪软件

上海亿琪软件有限公司,全球开放边缘计算和物联网领域的领导者,全球领先的工业物联网软件开发商和解决方案提供商,助力企业和组织实现数字化转型。公司专注于 5G 通信、AI 人工智能、边缘计算和大数据网络安全多项技术领域,致力于物联网领域前沿技术的创新,为用户提供全方位、智能化和安全的物联网解决方案。

  • 2023 年,公司发布“ YiFUSION |工业边缘智能融合网关 ”产品,为工业客户提供一整套的边缘计算+AI 能力:高性能数据采集、多类型数据融合、AI 算法集成、云端业务对接。在边缘网关的基础上,集成了 IoT 平台的边缘协同能力、本地 Web SCADA 和 HMI 功能、本地数据存储、边缘 AI 视频分析、行业应用集成等。

  • 2022 年,公司推出 “ YiCLOUD |亿琪云 ”一站式物联网应用解决方案。公司的业务涵盖了智慧城市、智慧农业、智能工厂和智慧园区等多个领域,公司软硬件产品和解决方案获得华为技术认证,得到中国移动 OCP 认证,公司还是边缘计算产业联盟 ECC 成员。

关注我们

yiqisoft edgexfoundry

联系我们--商业服务

  • 网站:http://yiqisoft.cn
  • 邮件:support@yiqisoft.cn
  • 电话:021-68863086
  • 手机:186-1666-9123