【物联网项目系列】——使用netty框架做一个环保hj212协议即时通讯工具(接收解析处理实时数据)
本物聯(lián)網(wǎng)系列
一、用netty做一個環(huán)保hj212協(xié)議即時通訊工具
二、零基礎(chǔ)用uniapp快速開發(fā)實現(xiàn)MQTT設(shè)備中心附后臺接口
三、MQTT服務(wù)器搭建實現(xiàn)物聯(lián)網(wǎng)通訊
四、springboot + rabbitmq 做智能家居以及web顯示未讀消息
文章目錄
- 前言
- springboot整合netty
- 總結(jié)
前言
統(tǒng)一回答下: 我的都是做項目的時候抽空自己寫的博客,帶有很多跳躍性,大家假如喜歡這個系列的話,我都有寫。可以去看看上面相關(guān)鏈接我只能提供一些部分的思路,剛開始做項目的時候資料比較欠缺,很理解大家的心情,功夫得自己摸索 其實有這些關(guān)鍵的部分是可以幫很多試手的同學(xué)做出點(diǎn)的東西的,還有因為比如大家問的就是數(shù)據(jù)庫哪里下,jar怎么導(dǎo)入,那些問題,我沒法回答哈哈哈使用方法 與效果截圖
本文代碼已經(jīng)上傳打成jar上傳到我的資源【已分享百度網(wǎng)盤】
鏈接: https://pan.baidu.com/s/1ebK6nVroqmp4_JWA1XCv5g 提取碼: id8h 復(fù)制這段內(nèi)容后打開百度網(wǎng)盤手機(jī)App,操作更方便哦
(https://download.csdn.net/download/weixin_44106334/13182987)
使用方法:解壓后進(jìn)入cmd java -jar shbykj-handle.jar
具體代碼比較多,我就沒貼了
springboot整合netty
pom
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency>yml文件
# netty配置 netty:# 端口號port: 6666# 最大線程數(shù)maxThreads: 1024# 數(shù)據(jù)包的最大長度max_frame_length: 65535服務(wù)器(接收數(shù)據(jù)) :TCPServer
package com.shbykj.springboot.netty;import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.MessageToByteEncoder; import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 網(wǎng)絡(luò)服務(wù)器** @author* @date 2020-11-04 8:32*/ @Component public class TCPServer {@Resourceprivate NettyServerConfig nettyConfig;/* @Resourceprivate DivisorService divisorService;@Resourceprivate PointInfoService pointInfoService;@Resourceprivate PointAndDivisorService pointAndDivisorService;*/public void run() {int port = nettyConfig.getPort();EventLoopGroup boosGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline(); // pipeline.addLast(new TCPHandler(divisorService,pointInfoService,pointAndDivisorService));pipeline.addLast("encoder", new MessageToByteEncoder<byte[]>() {@Overrideprotected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {out.writeBytes(msg);}});pipeline.addLast(new LengthFieldBasedFrameDecoder(nettyConfig.getMaxFrameLength(), 0, 2, 0, 2));pipeline.addLast(new LengthFieldPrepender(2));}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);System.out.println("服務(wù)啟動...");ChannelFuture channelFuture = bootstrap.bind(port).sync();channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {workGroup.shutdownGracefully();boosGroup.shutdownGracefully();System.out.println("服務(wù)關(guān)閉...");}}}客戶端(解析數(shù)據(jù)處理):TCPHandler
package com.shbykj.springboot.netty;import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.shbykj.pojo.*; import com.shbykj.service.DivisorService; import com.shbykj.service.PointAndDivisorService; import com.shbykj.service.PointInfoService; import com.shbykj.utils.DateUtils; import com.shbykj.utils.config.CollectionMapping; import com.shbykj.utils.core.T212Parser; import com.shbykj.utils.db.MongoDBConnection; import com.shbykj.utils.db.RedisConnection; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; import io.netty.util.internal.StringUtil; import org.bson.Document; import org.bson.conversions.Bson; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis;import java.io.StringReader; import java.math.BigInteger; import java.net.SocketAddress; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList;/*** @author* @date 2020-11-04 8:17*/ @Component public class TCPHandler extends SimpleChannelInboundHandler<ByteBuf> {private DivisorService divisorService;private PointInfoService pointInfoService;private PointAndDivisorService pointAndDivisorService;public TCPHandler(DivisorService divisorService, PointInfoService pointInfoService, PointAndDivisorService pointAndDivisorService) {this.divisorService = divisorService;this.pointInfoService = pointInfoService;this.pointAndDivisorService = pointAndDivisorService;}private static final MongoDatabase DATABASE = MongoDBConnection.getDatabase();private static final MongoCollection<Document> COLLECTION = DATABASE.getCollection("point_info");private static final Jedis JEDIS = RedisConnection.getJedis();public static final List<Document> list = new CopyOnWriteArrayList<>();/*** 讀取消息** @param ctx* @param m* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf m) throws Exception {byte[] data = new byte[m.readableBytes()];m.readBytes(data);String msg = new String(data, CharsetUtil.UTF_8);SocketAddress address = ctx.channel().remoteAddress();System.out.println(address.toString().substring(1) + "---" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));System.out.println(msg);List<Object> list = parse(msg);if (list == null) {return;}for (Object o : list) {if (o instanceof RealData) {RealData realData = (RealData) o;Bson bson = Filters.eq("mn", realData.getMn());FindIterable<Document> documents = COLLECTION.find(bson);for (Document document : documents) {Integer id = (Integer) document.get("pointId");realData.setPointId(id);}if (realData == null) {return;}if (!msg.contains("CN=2011")) {//4.創(chuàng)建文檔Document document = new Document("dataTime", realData.getQn()).append("mn", realData.getMn()).append("pointId", realData.getPointId()).append("metrics", realData.getData());if (msg.contains("CN=2051")) {//5.添加數(shù)據(jù)DATABASE.getCollection(CollectionMapping.MINUTE.getcollectName()).insertOne(document);}if (msg.contains("CN=2061")) {//5.添加數(shù)據(jù)DATABASE.getCollection(CollectionMapping.HOUR.getcollectName()).insertOne(document);}if (msg.contains("CN=2031")) {//5.添加數(shù)據(jù)DATABASE.getCollection(CollectionMapping.DAY.getcollectName()).insertOne(document);}} else {//4.創(chuàng)建文檔Document document = new Document("dataTime", realData.getQn()).append("mn", realData.getMn()).append("pointId", realData.getPointId()).append("metrics", realData.getData());//若實時數(shù)據(jù)量過多時,可以統(tǒng)一對數(shù)據(jù)庫進(jìn)行操作/*synchronized (TCPHandler.class) {list.add(document);if (list.size() == 100) {//刪除集合中的所有數(shù)據(jù)MongoCollection<Document> collection = MongoDBConnection.getConnection(CollectionMapping.SECOND.getcollectName());BasicDBObject dbObject = new BasicDBObject();collection.deleteMany(dbObject);//插入數(shù)據(jù)collection.insertMany(list);list.clear();}}*/DATABASE.getCollection(CollectionMapping.SECOND.getcollectName()).insertOne(document);}Bson bson1 = Filters.eq("mn", realData.getMn());FindIterable<Document> docs = COLLECTION.find(bson1);for (Document doc : docs) {Integer pointId = doc.getInteger("pointId");JEDIS.hset("onlineLastTime", Integer.toString(pointId), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));}//只有一個值//for (Document doc : docs) {//Integer pointId = doc.getInteger("pointId");//Jedis jedis = RedisConnection.getJedis();//redisTemplate.opsForHash().put("onlineLastTime",Integer.toString(pointId), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));//jedis.hset("onlineLastTime", Integer.toString(pointId), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));// }}if (o instanceof WareInfo) {WareInfo wareInfo = (WareInfo) o;if (wareInfo == null) {return;}Document document = new Document("pointId", wareInfo.getPointId()).append("pointName", wareInfo.getPointName()).append("dataTime", wareInfo.getQn()).append("mn", wareInfo.getMn()).append("enterpriseId", wareInfo.getEnterpriseId()).append("enterpriseName", wareInfo.getEnterpriseName()).append("dataType", wareInfo.getDataType()).append("metrics", wareInfo.getData());DATABASE.getCollection(CollectionMapping.WARN.getcollectName()).insertOne(document);}if (o instanceof ExceptionData) {ExceptionData exceptionData = (ExceptionData) o;if (exceptionData == null) {return;}Document document = new Document("pointId", exceptionData.getPointId()).append("pointName", exceptionData.getPointName()).append("dataTime", exceptionData.getQn()).append("mn", exceptionData.getMn()).append("enterpriseId", exceptionData.getEnterpriseId()).append("enterpriseName", exceptionData.getEnterpriseName()).append("dataType", exceptionData.getDataType()).append("metrics", exceptionData.getData());DATABASE.getCollection(CollectionMapping.DataException.getcollectName()).insertOne(document);}}}private List<Object> parse(String msg) {List<Object> list = new ArrayList<>();RealData realData = new RealData();String dataTime = "";try {StringReader reader = new StringReader(msg);T212Parser t212Parser = new T212Parser(reader);char[] header = t212Parser.readHeader();char[] dataLen = t212Parser.readDataLen();char[] data = t212Parser.readData(Integer.parseInt(new String(dataLen)));char[] readCrc = t212Parser.readCrc();//根據(jù)數(shù)據(jù)獲取crcint crc = T212Parser.crc16Checkout(data, data.length);//校驗if (Integer.parseInt(new BigInteger(new String(readCrc), 16).toString()) == crc) {//System.out.println(msg);String str = new String(data);//將212協(xié)議 分段String[] strings = str.substring(0, str.length() - 2).split(";");Map<String, Map<String, Map<String, String>>> map2 = new HashMap<>();Map<String, Map<String, String>> map1 = new HashMap<>();Map<String, Map<String, String>> wareMap1 = new HashMap<>();Map<String, Map<String, String>> exceptionMap1 = new HashMap<>();boolean flag = false;WareInfo wareInfo = new WareInfo();ExceptionData exceptionData = new ExceptionData();for (int i = 0; i < strings.length; i++) {// System.out.println(strings[i]);//獲取QN對應(yīng)值if (strings[i].contains("QN")) {Date date = DateUtils.stringToDate(strings[i].split("=")[1], "yyyyMMddHHmmssSSS");realData.setQn(date);wareInfo.setQn(date);exceptionData.setQn(date);//System.out.println("QN:" + strings[i].split("=")[1]);}//獲取ST對應(yīng)值if (strings[i].contains("ST")) {//System.out.println("ST:" + strings[i].split("=")[1]);}//獲取CN對應(yīng)值if (strings[i].contains("CN")) {//System.out.println("CN:" + strings[i].split("=")[1]);wareInfo.setDataType(strings[i].split("=")[1]);exceptionData.setDataType(strings[i].split("=")[1]);}//獲取PW對應(yīng)值if (strings[i].contains("PW")) {//System.out.println("PW:" + strings[i].split("=")[1]);}//獲取MN對應(yīng)值if (strings[i].contains("MN")) {realData.setMn(strings[i].split("=")[1]);wareInfo.setMn(realData.getMn());exceptionData.setMn(realData.getMn());//System.out.println("MN:" + strings[i].split("=")[1]);Bson bson = Filters.eq("mn", realData.getMn());FindIterable<Document> documents = COLLECTION.find(bson);Document doc = documents.first();if (doc == null) {return null;}for (Document document : documents) {Integer pointId = (Integer) document.get("pointId");String pointName = (String) document.get("pointName");Integer enterpriseId = (Integer) document.get("enterpriseId");String enterpriseName = (String) document.get("enterpriseName");wareInfo.setEnterpriseId(enterpriseId);wareInfo.setEnterpriseName(enterpriseName);wareInfo.setPointId(pointId);wareInfo.setPointName(pointName);exceptionData.setEnterpriseId(enterpriseId);exceptionData.setEnterpriseName(enterpriseName);exceptionData.setPointId(pointId);exceptionData.setPointName(pointName);}}//獲取DataTime對應(yīng)值if (strings[i].contains("DataTime")) {dataTime = strings[i].split("=")[2];//System.out.println("DataTime:" + strings[i].split("=")[2]);}//對數(shù)據(jù)進(jìn)行分段 CP的后一段是數(shù)據(jù)段//CH4-Min=1.0700,CH4-Avg=1.1000,CH4-Max=1.1200,CH4-Cou=0.0014,CH4-Flag=N//25-Min=12.6700,25-Avg=13.0820,25-Max=13.7000,25-Cou=0.0172,25-Flag=Nif ((i > 0 && strings[i - 1].contains("CP")) || flag) {//修改標(biāo)志位if (strings[i - 1].contains("CP")) {flag = true;}Map<String, String> wareMap = new HashMap<>();Map<String, String> exceptionMap = new HashMap<>();//查詢String code = strings[i].split("-", 2)[0];Divisor divisor = divisorService.findByCode(code);if (divisor != null) {wareMap.put("dateTime", DateUtils.dateToString(new Date(), "yyyy-MM-dd HH:mm:ss"));wareMap.put("divisorName", divisor.getName());exceptionMap.put("dateTime", DateUtils.dateToString(new Date(), "yyyy-MM-dd HH:mm:ss"));exceptionMap.put("divisorName", divisor.getName());}//根據(jù)站點(diǎn)和因子id查詢站點(diǎn)因子信息PointAndDivisor pointAndDivisor = pointAndDivisorService.findByPointIdAndDivisorId(wareInfo.getPointId(), divisor.getId());//對數(shù)據(jù)段進(jìn)行分區(qū) string2[i] CH4-Min=1.0700String[] strings2 = strings[i].split(",");Map<String, String> map = new HashMap<>();boolean wareFlag = false;boolean exceptionFlag = false;for (int j = 0; j < strings2.length; j++) {String sufVal = strings2[j].split("-", 2)[1]; //CH4-Min=1.0700String sufPreVal = sufVal.split("=")[0]; //MinString suf2Val = sufVal.split("=")[1]; //1.0700if (!msg.contains("CN=2011")) {//零值 和 恒值 異常if (msg.contains("CN=2051") && sufVal.contains("Avg")) {String k = "pointId:" + wareInfo.getPointId() + "-" + "divisorId:" + divisor.getId();String value = JEDIS.hget(k, "val");//第一次 為空if (StringUtil.isNullOrEmpty(value)) {JEDIS.hset(k, "val", suf2Val);JEDIS.hset(k, "beginTime", DateUtils.dateToString(new Date(), "yyyy-HH-dd HH:mm:ss"));JEDIS.hset(k, "updateTime", DateUtils.dateToString(new Date(), "yyyy-HH-dd HH:mm:ss"));} else if (value.equals(suf2Val)) {JEDIS.hset(k, "updateTime", DateUtils.dateToString(new Date(), "yyyy-HH-dd HH:mm:ss"));} else {// 值發(fā)生改變String endTime = JEDIS.hget(k, "updateTime");String beginTime = JEDIS.hget(k, "beginTime");Map<String, Map<String, String>> m1 = new HashMap<>();Map<String, String> m = new HashMap<>();m.put("divisorName", divisor.getName());m.put("dateTime", beginTime + "~" + endTime);m.put("rtd", value);m1.put(sufPreVal, m);if (dateDiff(DateUtils.stringToDate(endTime, "yyyy-MM-dd HH:mm:ss"), DateUtils.stringToDate(beginTime, "yyyy-MM-dd HH:mm:ss"))) {Document document = new Document("pointId", wareInfo.getPointId()).append("pointName", wareInfo.getPointName()).append("mn", wareInfo.getMn()).append("enterpriseId", wareInfo.getEnterpriseId()).append("enterpriseName", wareInfo.getEnterpriseName()).append("code", sufPreVal).append("beginTime", DateUtils.stringToDate(beginTime, "yyyy-MM-dd HH:mm:ss")).append("endTime", DateUtils.stringToDate(endTime, "yyyy-MM-dd HH:mm:ss")).append("metrics", m1);if (Integer.parseInt(value) == 0) {MongoDBConnection.getConnection(CollectionMapping.ZERO.getcollectName()).insertOne(document);} else {MongoDBConnection.getConnection(CollectionMapping.CONST.getcollectName()).insertOne(document);}}//更新redisJEDIS.hset(k, "val", sufVal);JEDIS.hset(k, "beginTime", DateUtils.dateToString(new Date(), "yyyy-HH-dd HH:mm:ss"));JEDIS.hset(k, "updateTime", DateUtils.dateToString(new Date(), "yyyy-HH-dd HH:mm:ss"));}}if (sufVal.contains("Max")) {exceptionMap.put("Rtd", suf2Val);}if (sufVal.contains("Flag")) {wareMap.put("flag", suf2Val);}if (sufVal.contains("Flag") && !"N".equals(suf2Val)) {exceptionFlag = true;}//超標(biāo)異常if (sufVal.contains("Max") && pointAndDivisor != null && Integer.parseInt(pointAndDivisor.getCeilval()) != 0 && Double.parseDouble(suf2Val) > Double.parseDouble(pointAndDivisor.getCeilval())) {wareMap.put("ceilval", pointAndDivisor.getCeilval());wareMap.put("floorval", pointAndDivisor.getFloorval());wareMap.put("Rtd", suf2Val);wareFlag = true;}}map.put(sufPreVal, suf2Val);if (j == strings2.length - 1) {String preVal = strings2[j].split("-", 2)[0];if (exceptionFlag) {exceptionMap1.put(preVal, exceptionMap);} else if (wareFlag) {wareMap1.put(preVal, wareMap);} else {map1.put(preVal, map);}}}if (wareFlag && wareMap1.size() > 0) {wareInfo.setData(wareMap1);if (wareInfo.getQn() == null) {try {Date date = DateUtils.stringToDate(dataTime, "yyyyMMddHHmmss");wareInfo.setQn(date);} catch (Exception e) {System.out.println();}}list.add(wareInfo);}if (i == strings.length - 1 && exceptionMap1.size() > 0) {exceptionData.setData(exceptionMap1);if (exceptionData.getQn() == null) {try {Date date = DateUtils.stringToDate(dataTime, "yyyyMMddHHmmss");exceptionData.setQn(date);} catch (Exception e) {System.out.println();}}list.add(exceptionData);}if (i == strings.length - 1 && map1.size() > 0) {map2.put(dataTime, map1);realData.setData(map2);if (realData.getQn() == null) {try {Date date = DateUtils.stringToDate(dataTime, "yyyyMMddHHmmss");realData.setQn(date);} catch (Exception e) {System.out.println();}}list.add(realData);}}}}} catch (Exception e) {System.out.println();}return list;}/*** 發(fā)生異常** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (ctx.channel().isActive()) {ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}}/*** 計算時間差值** @param date1* @param date2* @return*/private boolean dateDiff(Date date1, Date date2) {long diff = date1.getTime() - date2.getTime();long days = diff / (1000 * 60 * 60 * 24);long hours = (diff - days * (1000 * 60 * 60 * 24)) / (1000 * 60 * 60);//System.out.println(minutes);return hours >= 1;} }總結(jié)
1.本文涉及的service,就是數(shù)據(jù)處理部分(異常數(shù)據(jù),超標(biāo)數(shù)據(jù),恒值數(shù)據(jù),零值數(shù)據(jù)的處理)
2.主線就是,pom下載依賴——》yml配置netty網(wǎng)絡(luò)端口等配置信息——》TCPServer網(wǎng)絡(luò)服務(wù)端——》TCPHandler數(shù)據(jù)處理端
總結(jié)
以上是生活随笔為你收集整理的【物联网项目系列】——使用netty框架做一个环保hj212协议即时通讯工具(接收解析处理实时数据)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何构建一个真实的推荐系统?
- 下一篇: 6.java 代码块