springboot整合kafka和netty服务简单实例
文章目錄
- 背景
- 實體
- 編寫KafkaSender,用于操作kafka
- 編寫netty服務端
- netty 服務啟動類:
- NettyServerHandler處理類
- Controller
背景
步驟:
pd-netty服務的作用是接收司機端上報的車輛定位信息并將信息發送到kafka隊列。pd-netty共提供兩種方式來接收司機端上報的定位信息:基于netty實現的TCP方式、HTTP接口方式。
配置:
spring:# jackson時間格式化jackson:time-zone: ${spring.jackson.time-zone}date-format: ${spring.jackson.date-format}servlet:multipart:max-file-size: ${spring.servlet.multipart.max-file-size}max-request-size: ${spring.servlet.multipart.max-request-size}enabled: ${spring.servlet.multipart.enabled}# kafkakafka:bootstrap-servers: ${spring.kafka.bootstrap-servers}listener: # 指定listener 容器中的線程數,用于提高并發量concurrency: ${spring.kafka.listener.concurrency}producer:retries: ${spring.kafka.producer.retries}batch-size: ${spring.kafka.producer.batch-size}buffer-memory: ${spring.kafka.producer.buffer-memory}key-serializer: ${spring.kafka.producer.key-serializer}value-serializer: ${spring.kafka.producer.value-serializer}consumer:group-id: ${spring.kafka.consumer.group-id}實體
//LocationEntity實體package com.itheima.pinda.entity; ? import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; ? @Data @ApiModel("位置信息") public class LocationEntity {public String getId() {return businessId + "#" + type + "#" + currentTime;} ?/*** 車輛Id*/@ApiModelProperty("業務id, 快遞員id 或者 車輛id")private String businessId; ?/*** 司機名稱*/@ApiModelProperty("司機名稱")private String name; ?/*** 司機電話*/@ApiModelProperty("司機電話")private String phone; ?/*** 車牌號*/@ApiModelProperty("licensePlate")private String licensePlate; ?/*** 類型*/@ApiModelProperty("類型,車輛:truck,快遞員:courier")private String type; ?/*** 經度*/@ApiModelProperty("經度")private String lng; ?/*** 維度*/@ApiModelProperty("維度")private String lat; ?/*** 當前時間*/@ApiModelProperty("當前時間 格式:yyyyMMddHHmmss")private String currentTime; ?@ApiModelProperty("所屬車隊")private String team; ?@ApiModelProperty("運輸任務id")private String transportTaskId; }編寫KafkaSender,用于操作kafka
package com.itheima.pinda.service; ? import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; ? @Component @Slf4j public class KafkaSender {public final static String MSG_TOPIC = "tms_order_location";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private static KafkaTemplate<String, String> template; ?@PostConstructpublic void init() {KafkaSender.template = this.kafkaTemplate;} ?//發送消息到kafka隊列public static boolean send(String topic, String message) {try {template.send(topic, message);log.info("消息發送成功:{} , {}", topic, message);} catch (Exception e) {log.error("消息發送失敗:{} , {}", topic, message, e);return false;}return true;} ? }編寫netty服務端
Netty 是一款基于 NIO(Nonblocking I/O,非阻塞IO)開發的網絡通信框架,對比于 BIO(Blocking I/O,阻塞IO),他的并發性能得到了很大提高。難能可貴的是,在保證快速和易用性的同時,并沒有喪失可維護性和性能等優勢。
TCP方式
Netty是由JBOSS提供的一個Java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
. Netty的特點?
? 一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持
? 使用更高效的socket底層,對epoll空輪詢引起的cpu占用飆升在內部進行了處理,避免了直接使用NIO的陷阱,簡化了NIO的處理方式。
? 采用多種decoder/encoder 支持,對TCP粘包/分包進行自動化處理
? 可使用接受/處理線程池,提高連接效率,對重連、心跳檢測的簡單支持
? 可配置IO線程數、TCP參數, TCP接收和發送緩沖區使用直接內存代替堆內存,通過內存池的方式循環利用ByteBuf
? 通過引用計數器及時申請釋放不再引用的對象,降低了GC頻率
? 使用單線程串行化的方式,高效的Reactor線程模型
? 大量使用了volitale、使用了CAS和原子類、線程安全類的使用、讀寫鎖的使用
Netty 中的重要組件?
Channel:Netty 網絡操作抽象類,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。
EventLoop:主要是配合 Channel 處理 I/O 操作,用來處理連接的生命周期中所發生的事情。
ChannelFuture:Netty 框架中所有的 I/O 操作都為異步的,因此我們需要 ChannelFuture 的 addListener()注冊一個 ChannelFutureListener 監聽事件,當操作執行成功或者失敗時,監聽就會自動觸發返回結果。
ChannelHandler:充當了所有處理入站和出站數據的邏輯容器。ChannelHandler 主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數據接收、異常、數據轉換等。
ChannelPipeline:為 ChannelHandler 鏈提供了容器,當 channel 創建時,就會被自動分配到它專屬的 ChannelPipeline,這個關聯是永久性的。
netty 服務啟動類:
package com.itheima.pinda.config; ? import com.itheima.pinda.service.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; ? /*** netty 服務啟動類*/ @Component @Slf4j public class NettyServer implements CommandLineRunner {private static NettyServer nettyServer; ?@PostConstructpublic void init() {nettyServer = this;} ?@Value("${netty.port}")private int port; ?private EventLoopGroup mainGroup;private EventLoopGroup subGroup;private ServerBootstrap server;private ChannelFuture future; ?public NettyServer() {// NIO線程組,用于處理網絡事件mainGroup = new NioEventLoopGroup();subGroup = new NioEventLoopGroup();// 服務初始化工具,封裝初始化服務的復雜代碼server = new ServerBootstrap();server.group(mainGroup, subGroup).option(ChannelOption.SO_BACKLOG, 128)// 設置緩存.childOption(ChannelOption.SO_KEEPALIVE, true).channel(NioServerSocketChannel.class)// 指定使用NioServerSocketChannel產生一個Channel用來接收連接.childHandler(new NettyServerHandler());//具體處理網絡IO事件 ?} ?public void start() {// 啟動服務端,綁定端口this.future = server.bind(nettyServer.port);log.info("Netty Server 啟動完畢!!!! 端口:" + nettyServer.port);} ?@Overridepublic void run(String... args) {this.start();} }NettyServerHandler處理類
? import com.alibaba.fastjson.JSON; import com.itheima.pinda.entity.LocationEntity; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import java.io.UnsupportedEncodingException; ? /*** netty 業務處理*/ @Slf4j @ChannelHandler.Sharable //@ChannelHandler.Sharable:表示可以將帶這個注釋的 ChannelHandler 的同一實例多次添加到一個或多個 ChannelPipelines 中,而不會出現競爭條件。 // 如果未指定此注解,則每次將其添加到管道時都必須創建一個新的處理程序實例,因為它具有成員變量等非共享狀態 //處理 I/O 事件或攔截 I/O 操作,并將其轉發到其ChannelPipeline下一個處理程序 public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {log.info("ServerHandler.channelRead()");ByteBuf in = (ByteBuf) msg;try {//接收報文String body = getRequestBody(in);log.info("報文內容:{}", body); ?//解析報文String message = parseMessage(body);if (StringUtils.isBlank(message)) {log.info("報文解析失敗");return;} ?//發送至kafka隊列KafkaSender.send(KafkaSender.MSG_TOPIC, message); ?} catch (Exception e) {log.error(e.getMessage());} finally {//使用完ByteBuf之后,需要主動去釋放資源,否則,資源一直在內存中加載,容易造成內存泄漏ReferenceCountUtil.release(msg);}if (null != in) {//把當前的寫指針 writerIndex 恢復到之前保存的 markedWriterIndex值in.resetWriterIndex();}} ?/*** 解析請求內容** @param in* @return* @throws UnsupportedEncodingException*/private String getRequestBody(ByteBuf in) throws UnsupportedEncodingException {if (in.readableBytes() <= 0) {return null;}byte[] req = new byte[in.readableBytes()];in.readBytes(req);return new String(req, "UTF-8");} ?/*** 解析報文* <p>* 設備不同報文也不同,本次設備為移動端,直接使用json格式傳輸*/private String parseMessage(String body) {if (StringUtils.isBlank(body)) {log.warn("報文為空");return null;}body = body.trim();// 其它格式的報文需要解析后放入MessageEntity實體LocationEntity message = JSON.parseObject(body, LocationEntity.class);if (message == null || StringUtils.isBlank(message.getType()) || StringUtils.isBlank(message.getBusinessId()) || StringUtils.isBlank(message.getLat()) || StringUtils.isBlank(message.getLng()) || StringUtils.isBlank(message.getId())) {log.warn("報文內容異常");return null;} ?String result = JSON.toJSONString(message);return result;} ?@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {// 寫一個空的buf,并刷新寫出區域。完成后關閉sock channel連接。ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);} ?@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 關閉發生異常的連接ctx.close();} }Controller
編寫NettyController,通過HTTP接口方式接受司機端上報的車輛位置信息
package com.itheima.pinda.controller; ? import com.alibaba.fastjson.JSON; import com.itheima.pinda.common.utils.Result; import com.itheima.pinda.entity.LocationEntity; import com.itheima.pinda.service.KafkaSender; import io.swagger.annotations.Api; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; ? @RestController @Api(tags = "車輛軌跡服務") @RequestMapping("netty") @Slf4j public class NettyController {@PostMapping(value = "/push")public Result push(@RequestBody LocationEntity locationEntity) {String message = JSON.toJSONString(locationEntity);log.info("HTTP 方式推送位置信息:{}", message);KafkaSender.send(KafkaSender.MSG_TOPIC, message);return Result.ok();} }總結
以上是生活随笔為你收集整理的springboot整合kafka和netty服务简单实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: springboot中使用规则引擎Dro
- 下一篇: 使用nio多线程下载网络文件实例