emq数据储存到mysql_EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库
前言
InfluxDB 是一個用于存儲和分析時間序列數據的開源數據庫,內置 HTTP API,類 SQL 語句的支持和無結構的特性對使用者而言都非常友好。它強大的數據吞吐能力以及穩定的性能表現使其非常適合 IoT 領域。
通過 EMQ X 消息引擎,我們可以自定義 Template 文件,然后將 Json 格式的 MQTT 消息轉換為 Measurement 寫入 InfluxDB:
場景介紹
該場景需要將 EMQ X 指定主題下且滿足條件的消息存儲到 InfluxDB 時序數據庫。為了便于后續分析檢索,消息內容需要進行拆分存儲。
該場景下客戶端上報數據如下:
Topic:data/sensor
Payload:
{
"location": "bedroom",
"data": {
"temperature": 25,
"humidity": 46.4,
"pm2_5": 0.5
}
}
準備工作
數據庫安裝及初始化
創建 db 數據庫并開放 8089 UDP 端口。
$ docker pull influxdb
$ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git
$ cd influx_udp
$ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest
配置說明
創建資源
打開 EMQ X Dashboard,進入左側菜單的 資源 頁面,點擊 新建 按鈕,選擇 InfluxDB 資源類型并完成相關配置進行資源創建。
創建規則
進入左側菜單的 規則 頁面,點擊 新建 按鈕,進行規則創建。觸發事件 選擇 message.publish,即在 EMQ X 收到 PUBLISH 消息時觸發該規則進行數據處理。
選定觸發事件后,我們可在界面上看到可選字段及示例 SQL:
篩選所需字段
規則引擎使用 SQL 語句過濾和處理數據。例如前文提到的場景中我們需要將 payload 中的字段提取出來使用,則可以通過 payload. 實現。同時我們僅僅期望處理 data/sensor 主題,那么可以在 WHERE 子句中使用主題通配符 =~ 對 topic 進行篩選:topic =~ 'data/sensor', 最終我們得到 SQL 如下:
SELECT
payload.location as location,
payload.data.temperature as temperature,
payload.data.humidity as humidity,
payload.data.pm2_5 as pm2_5
FROM
"message.publish"
WHERE
topic =~ 'data/sensor'
SQL 測試
借助 SQL 測試功能,我們可以快速確認剛剛填寫的 SQL 語句是否能達到我們的目的。首先填寫用于測試的 payload 等數據如下:
然后點擊 測試 按鈕,得到以下輸出結果,與預期相符。
{
"humidity": 46.4,
"location": "bedroom",
"pm2_5": 0.5,
"temperature": 25
}
添加響應動作,存儲消息到 InfluxDB
SQL 條件輸入輸出無誤后,我們繼續添加響應動作,配置寫入 SQL 語句,將篩選結果存儲到 InfluxDB。
點擊響應動作中的 添加 按鈕,選擇動作 保存數據到 InfluxDB,選取剛剛創建的 InfluxDB 資源,再按照實際需求將 ${fieldName} 填寫到 Field Keys, Tag Keys 和 Timestamp Key 中,Measurement 表示將數據寫入 InfluxDB 時使用的 Measurement,最后點擊 新建 按鈕完成規則創建。
測試
預期結果
我們成功創建了一條規則,包含一個處理動作,動作期望效果如下:
客戶端向 data/sensor 主題上報消息時,該消息將命中規則,規則列表中 已命中 數字將會增加 1;
InfluxDB 的 db 數據庫中將會增加一條數據,數據內容與處理后的消息內容一致。
使用 Dashboard 中的 Websocket 工具測試
切換到 工具 --> Websocket 頁面,使用任意 Client ID 連接到 EMQ X,連接成功后在 消息 卡片中發送如下消息:
Topic:data/sensor
Payload:
{
"location": "bedroom",
"data": {
"temperature": 25,
"humidity": 46.4,
"pm2_5": 0.5
}
}
點擊 發送 按鈕,發送成功后可以看到當前規則已命中次數已經變為 1。
然后檢查 InfluxDB,新的 data point 是否添加成功:
$ docker exec -it influxdb influx
> use db
Using database db
> select * from "sensor_data"
name: sensor_data
time humidity location pm2_5 temperature
---- -------- -------- ----- -----------
1561535778444457348 46.4 bedroom 0.5 25
至此,我們通過規則引擎實現了存儲消息到 InfluxDB 數據庫的業務開發。
在閱讀該教程之前,假定你已經了解 MQTT、EMQ X 的簡單知識。
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的emq数据储存到mysql_EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用java雷电游戏_Java实现仿雷电游
- 下一篇: java stringbuffer倒置_