八.利用springAMQP实现异步消息队列的日志管理
經過前段時間的學習和鋪墊,已經對spring amqp有了大概的了解。俗話說學以致用,今天就利用springAMQP來完成一個日志管理模塊。大概的需求是這樣的:系統中有很多地方需要記錄操作日志,比如登錄、退出、查詢等,如果將記錄日志這個操作摻雜在主要的業務邏輯當中,勢必會增加響應的時間,對客戶來說是一種不好的體驗。所以想到用異步消息隊列來進行優化。系統處理完主要業務邏輯之后,將日志的相關實體發布到特定Queue下,然后設置一個監聽器,監該Queue的消息并做處理。客戶不用等待日志的處理就可直接返回。
大概的業務流程如下圖所示。
?
1.首先建立日志的數據表和實體,數據表起名為t_log。實體如下。主要包含操作者,操作的事件,操作時間等幾個主要參數?! ?/p> package com.xdx.entity;import java.util.Date;public class TLog {private Integer logId;private String operator;private String event;private Date createTime;private Integer isDel;public TLog(String operator, String event) {this.operator = operator;this.event = event;}public TLog() {}public Integer getLogId() {return logId;}public void setLogId(Integer logId) {this.logId = logId;}public String getOperator() {return operator;}public void setOperator(String operator) {this.operator = operator == null ? null : operator.trim();}public String getEvent() {return event;}public void setEvent(String event) {this.event = event == null ? null : event.trim();}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public Integer getIsDel() {return isDel;}public void setIsDel(Integer isDel) {this.isDel = isDel;} }
2.編寫保存日志的方法,很簡單,就是一個數據庫的save過程。
package com.xdx.service;import javax.annotation.Resource;import org.springframework.stereotype.Service;import com.xdx.dao.BaseDao; import com.xdx.entity.TLog;@Service public class LogService {@Resource(name = "baseDao")private BaseDao<TLog, Integer> baseDao;public Integer saveLog(TLog log) {Integer result = baseDao.addT("TLogMapper.insertSelective", log);return result;} }
其中的TLogMapper.insertSelective代碼如下:
<insert id="insertSelective" parameterType="com.xdx.entity.TLog" >insert into t_log<trim prefix="(" suffix=")" suffixOverrides="," ><if test="logId != null" >log_id,</if><if test="operator != null" >operator,</if><if test="event != null" >event,</if><if test="createTime != null" >create_time,</if><if test="isDel != null" >is_del,</if></trim><trim prefix="values (" suffix=")" suffixOverrides="," ><if test="logId != null" >#{logId,jdbcType=INTEGER},</if><if test="operator != null" >#{operator,jdbcType=VARCHAR},</if><if test="event != null" >#{event,jdbcType=VARCHAR},</if><if test="createTime != null" >#{createTime,jdbcType=TIMESTAMP},</if><if test="isDel != null" >#{isDel,jdbcType=INTEGER},</if></trim></insert>3.接下來就跟我們的spring amqp有關了,首先要在pom.xml中引入相關的jar包?!?/p> <!-- spring-rabbitMQ --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency><!-- spring -amqp --><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.0.1.RELEASE</version></dependency>
4.編寫主配置文件。在項目新建一個com.xdx.spring_rabbit包。關于rabbit的所有代碼都寫在這邊。
編寫一個抽象的rabbit的主配置文件,之所以這樣做是為了以后擴展方便,讓不同的異步消息隊列的業務可以繼承并擴展它。如下所示。
主配置的文件主要是配置了連接Rabbit服務的基本信息,并且指定了消息轉換器是json轉換器。
package com.xdx.spring_rabbit;import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean;/*** 抽象類,rabbitMQ的主配置類* * @author xdx**/ public abstract class AbstractRabbitConfiguration {@Value("${amqp.port:5672}")private int port = 5672;protected abstract void configureRabbitTemplate(RabbitTemplate template);/*** 由于connectionFactory會與項目中的redis的connectionFactory命名沖突,* 所以這邊改名為rabbit_connectionFactory* * @return*/@Beanpublic ConnectionFactory rabbit_connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.1.195");connectionFactory.setUsername("xdx");connectionFactory.setPassword("xxxx");connectionFactory.setPort(port);return connectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate template = new RabbitTemplate(rabbit_connectionFactory());template.setMessageConverter(jsonMessageConverter());configureRabbitTemplate(template);return template;}@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic AmqpAdmin amqpAdmin() {RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbit_connectionFactory());return rabbitAdmin;} }5.編寫我們這個日志項目需要用到的配置文件,繼承上述的抽象類,在該配置文件中,我們具體指定Exchange,RouteKey,Queue,Binding以及監聽器這些要素。
package com.xdx.spring_rabbit;import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** 日志管理的Rabbit配置類的具體實現類* * @author xdx**/ @Configuration public class LogRabbitConfiguration extends AbstractRabbitConfiguration {protected static String LOG_EXCHANGE_NAME = "warrior.exchange.log";// topic// exchange的名稱protected static String LOG_QUEUE_NAME = "warrior.queue.log";// 接收消息的queueprotected static String LOG_ROUTING_KEY = LOG_QUEUE_NAME;@Autowiredprivate LogRabbitRecHandler logRabbitRecHandler;// 監聽器的委托類,委托其處理接收到的消息/*** 設置Exchange為LOG_EXCHANGE_NAME,RoutingKey為LOG_ROUTING_KEY,這樣將信息發送到* Exchange為LOG_EXCHANGE_NAME,RouteKey為LOG_ROUTING_KEY的通道中*/@Overrideprotected void configureRabbitTemplate(RabbitTemplate template) {System.err.println("創建一個RabbitTemplate,名字是 " + template);template.setExchange(LOG_EXCHANGE_NAME);template.setRoutingKey(LOG_ROUTING_KEY);}/*** 用于接收日志消息的Queue,默認綁定自己的名稱* * @return*/@Beanpublic Queue logQueue() {return new Queue(LOG_QUEUE_NAME);}/*** 定義一個topExchange* * @return*/@Beanpublic TopicExchange logExchange() {return new TopicExchange(LOG_EXCHANGE_NAME);}/*** 定義一個綁定日志接收的Queue的binding* * @return*/@Beanpublic Binding logQueueBinding() {return BindingBuilder.bind(logQueue()).to(logExchange()).with(LOG_ROUTING_KEY);}/*** 這個bean為監聽適配器,用于日志消息,并交由logRabbitRecHandler處理* * @return*/@Beanpublic MessageListenerAdapter messageListenerAdapter() {return new MessageListenerAdapter(logRabbitRecHandler,jsonMessageConverter());}/*** 這個bean用于監聽服務端發過來的消息,監聽的Queue為logQueue(),* 因為該Queue綁定了logExchange和logRouteKey, 所以它可以接收到我們發送的日志消息* @return*/@Beanpublic SimpleMessageListenerContainer messageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbit_connectionFactory());container.setConcurrentConsumers(5);container.setQueues(logQueue());container.setMessageListener(messageListenerAdapter());container.setAcknowledgeMode(AcknowledgeMode.AUTO);return container;} }6.封裝發送消息的接口,如下所示。這是一個泛型的接口,目的是為了傳入不同的消息類型。
package com.xdx.spring_rabbit; /*** 定義一個泛型接口,用于發送消息,T為要發送的消息類型* @author xdx** @param <T>*/ public interface RabbitSend<T> {void send(T t); }7.實現這個發送消息的接口。在這個實現類中,我們注入了之前生成的RabbitTemplate對象。用于發送消息。
package com.xdx.spring_rabbit;import javax.annotation.Resource;import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;import com.xdx.entity.TLog;/*** 用于發送日志消息的通用實現類* * @author xdx**/ @Component("logRabbitSend") public class LogRabbitSend implements RabbitSend<TLog> {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@Overridepublic void send(TLog log) {rabbitTemplate.convertAndSend(log);System.err.println("發送消息:" + log);} }8.封裝監聽器委托對象的接口,該接口用于處理監聽器監聽到的消息。同意是一個泛型的類,如下所示。
package com.xdx.spring_rabbit;/*** 用于處理監聽到的消息的消息處理器接口,T為接收到的消息的類型* * @author xdx** @param <T>*/ public interface RabbitRecHandler<T> {void handleMessage(T t); }9.實現上述委托對象的接口,如下所示。在該接口中,我們注入了日志處理類的對象。用于儲存日志信息到數據庫。
package com.xdx.spring_rabbit;import javax.annotation.Resource;import org.springframework.stereotype.Component;import com.xdx.entity.TLog; import com.xdx.service.LogService; @Component("logRabbitRecHandler") public class LogRabbitRecHandler implements RabbitRecHandler<TLog> {@Resource(name="logService")private LogService logService;@Overridepublic void handleMessage(TLog log) {System.err.println("開始存儲日志"+log.getOperator()+","+log.getEvent());logService.saveLog(log);} }10.最后,我們在具體的業務類中調用消息發送的接口,就可以實現日志消息的發送了。如下所示。
@Controller public class AdminController { @Resource(name = "logRabbitSend")private LogRabbitSend logRabbitSend;@RequestMapping("admin")public ModelAndView admin(HttpSession session,String adminName, String password) throws Exception {List<Map<String,Object>>adminMap=adminService.getAllAdminMap();ModelAndView mv = new ModelAndView();//登錄操作的主要邏輯代碼……session.setAttribute("adminName", admin.getAdminName());session.setAttribute("realName", admin.getRealName()); TLog log=new TLog(adminName, "登錄系統");logRabbitSend.send(log);return mv;} }運行我們的系統,我們先看看RabbitMQ的后臺??吹搅宋覀兌x的Exchange和Queue等元素。
?
?
?
運行AdmintController類中的admin方法,登錄系統,我們發現確實已經發送了消息,并且消息被監聽到,然后存儲到了數據庫。
控制臺打印出來的消息為:
數據庫存入的記錄為:
?
轉載于:https://www.cnblogs.com/roy-blog/p/8125049.html
總結
以上是生活随笔為你收集整理的八.利用springAMQP实现异步消息队列的日志管理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CNN中的卷积操作的参数数计算
- 下一篇: pandas常用