springboot项目实现mqtt客户端
公司中項目大多是物聯網項目,需要跟設備進行交互,用到的協議比較多,如NB/MQTT/LWM2M/COAP等,項目中不可避免用到了MQTT協議,本文介紹springboot項目MQTT客戶端實現,不多說直接上可執行代碼。
一、EMQ官網java sdk demo,如果只需要用到一個客戶端,可以參照下官網demo,修改下應用于項目
1、pom.xml依賴引用
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version> </dependency>2、App.java
package io.emqx;import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://broker.emqx.io:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 連接選項MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留會話connOpts.setCleanSession(true);// 設置回調client.setCallback(new PushCallback());// 建立連接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 訂閱client.subscribe(subTopic);// 消息發布所需參數MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}} }3、回調消息處理類 OnMessageCallback.java
package io.emqx;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// 連接丟失后,一般在這里面進行重連System.out.println("連接斷開,可以做重連");}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息會執行到這里面System.out.println("接收消息主題:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息內容:" + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());} }二、正規項目中,肯定不能如demo那樣簡單實現,也許需要創建多個客戶端,項目中本人做了封裝,如下:
1、application.yml配置
customer:mqtt:broker: tcp://127.0.0.1:1883clientList:#客戶端ID- clientId: iotGateway#監聽主題subscribeTopic: v1/devices/me/rpc/response/+#用戶名username:#密碼password:2、Mqtt客戶端數據類
package com.wfl.iot.mqtt.model;import lombok.Data;/*** MQTT客戶端** @author wangfenglei*/ @Data public class MqttClientVO {/*** 客戶端ID*/private String clientId;/*** 監聽主題*/private String subscribeTopic;/*** 用戶名*/private String userName;/*** 密碼*/private String password; } package com.wfl.iot.mqtt.config;import com.wfl.iot.mqtt.model.MqttClientVO; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;import java.util.List;/*** Mqtt配置類** @author wangfenglei*/ @Data @Configuration @ConfigurationProperties(prefix = "customer.mqtt") public class MqttConfig {/*** mqtt broker地址*/String broker;/*** 需要創建的MQTT客戶端*/List<MqttClientVO> clientList; }3、MQTT回調(策略模式)
package com.wfl.iot.mqtt.callback;import com.wfl.iot.mqtt.MqttClientManager; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage;/*** MQTT回調抽象類** @author wangfenglei*/ @Slf4j public abstract class AbsMqttCallBack implements MqttCallback {private String clientId;private MqttConnectOptions connectOptions;public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public MqttConnectOptions getConnectOptions() {return connectOptions;}public void setConnectOptions(MqttConnectOptions connectOptions) {this.connectOptions = connectOptions;}/*** 失去連接操作,進行重連** @param throwable 異常*/@Overridepublic void connectionLost(Throwable throwable) {try {if (null != clientId) {if (null != connectOptions) {MqttClientManager.getMqttClientById(clientId).connect(connectOptions);} else {MqttClientManager.getMqttClientById(clientId).connect();}}} catch (Exception e) {log.error("{} reconnect failed!", e);}}/*** 接收訂閱消息** @param s 主題* @param mqttMessage 接收消息* @throws Exception 異常*/@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {String content = new String(mqttMessage.getPayload());log.info("Receive topic[{}],message={}", s, content);handleReceiveMessage(s, content);}/*** 消息發送成功** @param iMqttDeliveryToken toke*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}/*** 處理接收的消息** @param topic 主題* @param message 消息內容*/protected abstract void handleReceiveMessage(String topic, String message); } package com.wfl.iot.mqtt.callback;import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;/*** 默認回調** @author wangfenglei*/ @Slf4j @Component("default") public class DefaultMqttCallBack extends AbsMqttCallBack {@Overrideprotected void handleReceiveMessage(String topic, String message) {log.info("DefaultCallBack:topic={},message={}", topic, message);} }說明:后續需求變更需要新增MQTT回調類,只需實現AbsMqttCallBack即可,但是@Compoent("客戶端ID")注解的名稱需要跟?application.yml里的customer.mqtt.clientList.clientId一致,如此才能自動匹配注冊回調類
package com.wfl.iot.mqtt.callback;import org.springframework.stereotype.Component;import java.util.Map; import java.util.concurrent.ConcurrentHashMap;/*** MQTT訂閱回調環境類** @author wangfenglei*/ @Component public class MqttCallBackContext {private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();/*** 默認構造函數** @param callBackMap 回調集合*/public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {this.callBackMap.clear();callBackMap.forEach((k, v) -> this.callBackMap.put(k, v));}/*** 獲取MQTT回調類** @param clientId 客戶端ID* @return MQTT回調類*/public AbsMqttCallBack getCallBack(String clientId) {return this.callBackMap.get(clientId);} }3、MQTT客戶端管理
package com.wfl.iot.mqtt;import com.wfl.iot.mqtt.callback.AbsMqttCallBack; import com.wfl.iot.mqtt.callback.MqttCallBackContext; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;/*** MQTT客戶端管理類,如果客戶端非常多后續可入redis緩存** @author wangfenglei*/ @Slf4j @Component public class MqttClientManager {@Value("${customer.mqtt.broker}")private String mqttBroker;@Resourceprivate MqttCallBackContext mqttCallBackContext;/*** 存儲MQTT客戶端*/public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();public static MqttClient getMqttClientById(String clientId) {return MQTT_CLIENT_MAP.get(clientId);}/*** 創建mqtt客戶端** @param clientId 客戶端ID* @param subscribeTopic 訂閱主題,可為空* @param userName 用戶名,可為空* @param password 密碼,可為空* @return mqtt客戶端*/public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();if (null != userName && !"".equals(userName)) {connOpts.setUserName(userName);}if (null != password && !"".equals(password)) {connOpts.setPassword(password.toCharArray());}connOpts.setCleanSession(true);if (null != subscribeTopic && !"".equals(subscribeTopic)) {AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);if (null == callBack) {callBack = mqttCallBackContext.getCallBack("default");}callBack.setClientId(clientId);callBack.setConnectOptions(connOpts);client.setCallback(callBack);}//連接mqtt服務端brokerclient.connect(connOpts);if (null != subscribeTopic && !"".equals(subscribeTopic)) {client.subscribe(subscribeTopic);}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);} catch (MqttException e) {log.error("Create mqttClient failed!", e);}} }4、MQTT客戶端項目啟動自動創建
package com.wfl.iot.mqtt;import com.wfl.iot.mqtt.config.MqttConfig; import com.wfl.iot.mqtt.model.MqttClientVO; import org.omg.CORBA.PRIVATE_MEMBER; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.List;/*** MQTT客戶端創建** @author wangfenglei*/ @Component public class MqttClientCreate {@Resourceprivate MqttClientManager MqttClientManager;@Autowiredprivate MqttConfig mqttConfig;/*** 創建MQTT客戶端*/@PostConstructpublic void createMqttClient() {List<MqttClientVO> mqttClientList = mqttConfig.getClientList();for (MqttClientVO mqttClient : mqttClientList) {//創建客戶端,客戶端ID:demo,回調類跟客戶端ID一致MqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());}} }說明:@ PostConstruct注解,可以在項目啟動過程中自動創建mqtt客戶端,如果啟動比較耗時,可以考慮creatMqttClient方法里面使用線程
5、MQTT客戶端使用工具類
package com.wfl.iot.mqtt.util;import com.wfl.iot.mqtt.MqttClientManager; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage;/*** MQTT客戶端工具類** @author wangfenglei*/ @Slf4j public class MqttClientUtil {public static void sendMqttMsg(String clientId, String topic, String content) {try {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(2);MqttClient mqttClient = MqttClientManager.getMqttClientById(clientId);if (null == mqttClient) {log.error("Not exist mqttClient where it's clientId is {}", clientId);return;}MqttClientManager.getMqttClientById(clientId).publish(topic, message);log.info("Publish to mqtt broker,message={}", message);} catch (MqttException e) {log.error("MqttClient send msg faild!", e);}} }說明:項目中向mqtt broker發送消息,直接調用MqttClientUtil.sendMqttMsg方法即可
總結
以上是生活随笔為你收集整理的springboot项目实现mqtt客户端的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SQL——正则表达式
- 下一篇: WPF框架