跳转至

开源|EdgeX 边缘计算数据持久化之 InfluxDB

日常情况下,在边缘计算端侧,一般不需要数据持久化到本地磁盘或数据库,但是,某些特殊应用下,需要做本地数据保存和分析时,这项功能就非常有必要。

本文将介绍 EdgeX 边缘计算框架在本地保存 telemetry 数据到 InfluxDB 的具体方法和结果。

背景介绍

EdgeX 简介

2023-10-23T06:44:33.png EdgeX Foundry 边缘计算框架的分层和服务为边缘设备/节点和云/企业应用之间提供了一个双向转换引擎。EdgeX 作为边缘计算平台是非常不错的选择。

2023-10-23T06:43:42.png 本文需要采用自定义开发的 EdgeX Application service 来完成数据持久化的工作。

InfluxDB 简介

InfluxDB 是一个由 InfluxData 公司开发的开源时序型数据库。它由 Go 写成,着力于高性能地查询与存储时序型数据。InfluxDB 被广泛应用于存储系统的监控数据,IoT 行业的实时数据等场景。

2023-10-23T06:36:47.png

EdgeX 数据写入(存入)InfluxDB 是非常适合且有必要的,本文使用 MQTT 和 Telegraf 作为 InfluxDB plugin 来实现具体功能。

开发过程

架构设计

2023-10-23T06:48:36.png

EdgeX 应用程序服务(app-influxdb-export)通过 MQTT 将读数导出到 InfluxDB。此服务示例是使用 EdgeX v2.3.0 版本创建的,如需其他版本,请与我们联系或自行开发。

这个管道服务,有两部分组成:

  • 将 EdgeX 事件/读取对象转换为 Influx line 协议;
  • 将结果导出到 MQTT Broker,这是 Telegraf / Influx 用来监听并从中获取数据的 MQTT 主题;
  • InfluxDB 实现数据持久化入库工作;

EdgeX application service

前提条件

  • 需要启动 EdgeX 框架;
  • 启动 MQTT Broker 容器;

Application Service

目录结构很简单,文件不多;

├── Dockerfile // docker 文件
├── LICENSE
├── Makefile // make build
├── README.md
├── app-service-influx.png
├── go.mod // gomod 配置
├── go.sum
├── main.go // 主程序
├── pkg
│   └── transforms
│       └── conversions.go // 转换器
└── res
    └── configuration.toml // 配置文件

主程序,不复杂,遵循 SDK 原则即可;

这里省略源文件,请到github下载。

尝试编译,能成功生成二进制文件;

make build

Docker 容器

Dockerfile 描述文件;

这里省略源文件,请到github下载。

创建 docker image;

docker build . -t edgexfoundry/app-influxdb:2.3.0

docker compose 片段,加入到你的 docker-compose.yml ;

app-service-influxdb-export:
    image: edgexfoundry/app-influxdb:2.3.0
    ports:
    - 127.0.0.1:59780:59780/tcp
    container_name: edgex-app-influxdb-export
    hostname: edgex-app-influxdb-export
    depends_on:
    - consul
    - data
    environment:
    CLIENTS_CORE_COMMAND_HOST: edgex-core-command
    CLIENTS_CORE_DATA_HOST: edgex-core-data
    CLIENTS_CORE_METADATA_HOST: edgex-core-metadata
    CLIENTS_SUPPORT_NOTIFICATIONS_HOST: edgex-support-notifications
    CLIENTS_SUPPORT_SCHEDULER_HOST: edgex-support-scheduler
    DATABASES_PRIMARY_HOST: edgex-redis
    EDGEX_SECURITY_SECRET_STORE: "false"
    MESSAGEQUEUE_HOST: edgex-redis
    REGISTRY_HOST: edgex-core-consul
    SERVICE_HOST: edgex-app-influxdb-export
    TRIGGER_EDGEXMESSAGEBUS_PUBLISHHOST_HOST: edgex-redis
    TRIGGER_EDGEXMESSAGEBUS_SUBSCRIBEHOST_HOST: edgex-redis
    MQTTCONFIG_BROKERADDRESS: edgex-mqtt-broker:1883
    read_only: true
    restart: always
    networks:
    - edgex-network
    security_opt:
    - no-new-privileges:true
    user: 2002:2001

InfluxDB 环境搭建

Telegraf 配置

配置文件,注意保留你需要的部分;

[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster.  To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
##   example: servers = ["tcp://localhost:1883"]
##            servers = ["ssl://localhost:1883"]
##            servers = ["ws://localhost:1883"]
servers = ["tcp://192.168.123.12:1883"]

## Topics that will be subscribed to.
topics = [
    "telegraf/host01/cpu",
    "telegraf/+/mem",
    "sensors/#",
    "edgex/EdgeXEvents"
]

## The message topic will be stored in a tag specified by this value.  If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"

## QoS policy for messages
##   0 = at most once
##   1 = at least once
##   2 = exactly once
##
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0

## Connection timeout for initial connection in seconds
# connection_timeout = "30s"

## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
## is not lost. This option sets the maximum messages to read from the
## broker that have not been written by an output.
##
## This value needs to be picked with awareness of the agent's
## metric_batch_size value as well. Setting max undelivered messages too high
## can result in a constant stream of data batches to the output. While
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000

## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client.  To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false

## If unset, a random client ID will be generated.
# client_id = ""

## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Client trace messages
## When set to true, and debug mode enabled in the agent settings, the MQTT
## client's messages are included in telegraf logs. These messages are very
## noisey, but essential for debugging issues.
# client_trace = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# [[inputs.mqtt_consumer.topic_parsing]]
#   topic = ""
#   measurement = ""
#   tags = ""
#   fields = ""
## Value supported is int, float, unit
#   [[inputs.mqtt_consumer.topic.types]]
#      key = type
# Configuration for sending metrics to InfluxDB 2.0
[[outputs.influxdb_v2]]
## The URLs of the InfluxDB cluster nodes.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
##   ex: urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"]
urls = ["http://192.168.123.12:8086"]

## Token for authentication.
token = "PlbXqGQqYrnmZh22q88C0eS7CUyVSa9n1t2CjDY1nkA9RbXwM3BHBm0FIK3f3hT6p3oJKOrtFWLqeLgY9WQmyQ=="

## Organization is the name of the organization you wish to write to.
organization = "yiqisoft"

## Destination bucket to write into.
bucket = "edgex"

## The value of this tag will be used to determine the bucket.  If this
## tag is not set the 'bucket' option is used as the default.
# bucket_tag = ""

## If true, the bucket tag will not be added to the metric.
# exclude_bucket_tag = false

## Timeout for HTTP messages.
# timeout = "5s"

## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}

## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
# http_proxy = "http://corporate.proxy:3128"

## HTTP User-Agent
# user_agent = "telegraf"

## Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "gzip"

## Enable or disable uint support for writing uints influxdb 2.0.
# influx_uint_support = false

## HTTP/2 Timeouts
## The following values control the HTTP/2 client's timeouts. These settings
## are generally not required unless a user is seeing issues with client
## disconnects. If a user does see issues, then it is suggested to set these
## values to "15s" for ping timeout and "30s" for read idle timeout and
## retry.
##
## Note that the timer for read_idle_timeout begins at the end of the last
## successful write and not at the beginning of the next write.
# ping_timeout = "0s"
# read_idle_timeout = "0s"

## Optional TLS Config for use on HTTP connections.
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

Docker 容器

docker-compose.yml

services:
influxdb:
    image: influxdb:latest
    container_name: influxdb2
    volumes:
    - /opt/devel/influxdb/data:/var/lib/influxdb2:rw
    environment:
    - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken
    ports:
    - 8086:8086
    restart: unless-stopped

telegraf:
    image: telegraf:latest
    container_name: telegraf
    volumes:
    #  Sync timezone with host
    - /etc/localtime:/etc/localtime:ro
    #  Map Telegraf configuration file
    - /opt/devel/influxdb/telegraf.conf:/etc/telegraf/telegraf.conf:ro
    #  Map /tmp to permanent storage  (this includes /tmp/metrics.out)
    - /opt/devel/influxdb:/tmp:rw
    environment:
    - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=mytoken
    restart: unless-stopped
    depends_on:
    - influxdb

启动容器

docker compose up -d

实验结果

MQTT broker 内容查看

2023-10-23T07:14:42.png

确认 mqtt 内容为 Influx line protocol;

InfluxDB Graph

2023-10-23T07:32:45.png

确认 InfluxDB 数据接收正常。

开源代码

关于我们

亿琪软件

上海亿琪软件有限公司,全球开放边缘计算和物联网领域的领导者,全球领先的工业物联网软件开发商和解决方案提供商,助力企业和组织实现数字化转型。公司专注于 5G 通信、AI 人工智能、边缘计算和大数据网络安全多项技术领域,致力于物联网领域前沿技术的创新,为用户提供全方位、智能化和安全的物联网解决方案。

  • 2023 年,公司发布“ YiFUSION |工业边缘智能融合网关 ”产品,为工业客户提供一整套的边缘计算+AI 能力:高性能数据采集、多类型数据融合、AI 算法集成、云端业务对接。在边缘网关的基础上,集成了 IoT 平台的边缘协同能力、本地 Web SCADA 和 HMI 功能、本地数据存储、边缘 AI 视频分析、行业应用集成等。

  • 2022 年,公司推出 “ YiCLOUD |亿琪云 ”一站式物联网应用解决方案。公司的业务涵盖了智慧城市、智慧农业、智能工厂和智慧园区等多个领域,公司软硬件产品和解决方案获得华为技术认证,得到中国移动 OCP 认证,公司还是边缘计算产业联盟 ECC 成员。

关注我们

yiqisoft edgexfoundry

联系我们--商业服务

  • 网站:http://yiqisoft.cn
  • 邮件:support@yiqisoft.cn
  • 电话:021-68863086
  • 手机:186-1666-9123