javascript
利用SpringBoot+RabbitMQ,实现一个邮件推送服务
一、流程圖
本文內容主要圍繞這個流程圖展開,利用 RabbitMQ 消息隊列,實現生產者與消費者解耦,所以有必要先貼出來,涵蓋了 RabbitMQ 很多知識點,如:
消息發送確認機制
消費確認機制
消息的重新投遞
消費冪等性, 等等
二、實現思路
1.在虛擬機創建一個CentOS7上,并安裝 RabbitMQ
2.開放QQ郵箱或者其它郵箱授權碼,用于發送郵件
3.創建郵件發送項目并編寫代碼
4.發送郵件測試
5.消息發送失敗處理
三、RabbitMQ安裝
RabbitMQ 基于 erlang 進行通信,相比其它的軟件,安裝有些麻煩,不過本例采用rpm方式安裝,任何新手都可以完成安裝,過程如下!
3.1、安裝前命令準備
輸入如下命令,完成安裝前的環境準備。
yum?install?lsof??build-essential?openssl?openssl-devel?unixODBC?unixODBC-devel?make?gcc?gcc-c++?kernel-devel?m4?ncurses-devel?tk?tc?xz?wget?vim3.2、下載 RabbitMQ、erlang、socat 的安裝包
本次下載的是RabbitMQ-3.6.5版本,采用rpm一鍵安裝,適合新手直接上手。
先創建一個rabbitmq目錄,本例的目錄路徑為/usr/app/rabbitmq,然后在目錄下執行如下命令,下載安裝包!
下載erlang
下載socat
下載rabbitMQ
最終目錄文件如下:
3.3、安裝軟件包
下載完之后,按順序依次安裝軟件包,這個很重要哦~
安裝erlang
安裝socat
安裝rabbitmq
安裝完成之后,修改rabbitmq的配置,默認配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin目錄下。
vim?/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app修改loopback_users節點的值!
最后只需通過如下命令,啟動服務即可!
rabbitmq-server?start?&運行腳本之后,如果報錯,例如下圖!
解決辦法如下:
vim?/etc/rabbitmq/rabbitmq-env.conf在文件里添加一行,如下配置!
NODENAME=rabbit@localhost然后,再保存!再次以下命令啟動服務!
rabbitmq-server?start?&通過如下命令,查詢服務是否啟動成功!
lsof?-i:5672如果出現5672已經被監聽,說明已經啟動成功!
3.4、啟動可視化的管控臺
輸入如下命令,啟動控制臺!
rabbitmq-plugins?enable?rabbitmq_management用瀏覽器打開http://ip:15672,這里的ip就是 CentOS 系統的 ip,結果如下:
賬號、密碼,默認為guest,如果出現無法訪問,檢測防火墻是否開啟,如果開啟將其關閉即可!
登錄之后的監控平臺,界面如下:
四、郵箱授權碼的獲取
獲取郵箱授權碼的目的,主要是為了通過代碼進行發送郵件,例如 QQ 郵箱授權碼獲取方式,如下圖:
點擊【開啟】按鈕,然后發送短信,即可獲取授權碼,該授權碼就是配置文件spring.mail.password需要的密碼!
五、項目介紹
springboot版本:2.1.5.RELEASE
RabbitMQ版本:3.6.5
SendMailUtil:發送郵件工具類
ProduceServiceImpl:生產者,發送消息
ConsumerMailService:消費者,消費消息,發送郵件
六、代碼實現
6.1、創建項目
在 IDEA 下創建一個名稱為smail的 Springboot 項目,pom文件中加入amqp和mail。
<dependencies><!--spring?boot核心--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring?boot?測試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springmvc?web--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--開發環境調試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional></dependency><!--mail?支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency><!--amqp?支持--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--?commons-lang3?--><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.4</version></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version></dependency> </dependencies>6.2、配置rabbitMQ、mail
在application.properties文件中,配置amqp和mail!
#rabbitmq spring.rabbitmq.host=192.168.0.103 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啟confirms回調 P -> Exchange spring.rabbitmq.publisher-confirms=true # 開啟returnedMessage回調 Exchange -> Queue spring.rabbitmq.publisher-returns=true # 設置手動確認(ack) Queue -> C spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100# mail spring.mail.default-encoding=UTF-8 spring.mail.host=smtp.qq.com spring.mail.username=1370887518@qq.com spring.mail.password=獲取的郵箱授權碼 spring.mail.from=1370887518@qq.com spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true其中,spring.mail.password第四步中獲取的授權碼,同時username和from要一致!
6.3、RabbitConfig配置類
@Configuration @Slf4j public?class?RabbitConfig?{//?發送郵件public?static?final?String?MAIL_QUEUE_NAME?=?"mail.queue";public?static?final?String?MAIL_EXCHANGE_NAME?=?"mail.exchange";public?static?final?String?MAIL_ROUTING_KEY_NAME?=?"mail.routing.key";@Autowiredprivate?CachingConnectionFactory?connectionFactory;@Beanpublic?RabbitTemplate?rabbitTemplate()?{RabbitTemplate?rabbitTemplate?=?new?RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(converter());//?消息是否成功發送到ExchangerabbitTemplate.setConfirmCallback((correlationData,?ack,?cause)?->?{if?(ack)?{log.info("消息成功發送到Exchange");}?else?{log.info("消息發送到Exchange失敗,?{},?cause:?{}",?correlationData,?cause);}});//?觸發setReturnCallback回調必須設置mandatory=true,?否則Exchange沒有找到Queue就會丟棄掉消息,?而不會觸發回調rabbitTemplate.setMandatory(true);//?消息是否從Exchange路由到Queue,?注意:?這是一個失敗回調,?只有消息從Exchange路由到Queue失敗才會回調這個方法rabbitTemplate.setReturnCallback((message,?replyCode,?replyText,?exchange,?routingKey)?->?{log.info("消息從Exchange路由到Queue失敗:?exchange:?{},?route:?{},?replyCode:?{},?replyText:?{},?message:?{}",?exchange,?routingKey,?replyCode,?replyText,?message);});return?rabbitTemplate;}@Beanpublic?Jackson2JsonMessageConverter?converter()?{return?new?Jackson2JsonMessageConverter();}@Beanpublic?Queue?mailQueue()?{return?new?Queue(MAIL_QUEUE_NAME,?true);}@Beanpublic?DirectExchange?mailExchange()?{return?new?DirectExchange(MAIL_EXCHANGE_NAME,?true,?false);}@Beanpublic?Binding?mailBinding()?{return?BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);} }6.4、Mail 郵件實體類
@Getter @Setter @NoArgsConstructor @AllArgsConstructor public?class?Mail?{@Pattern(regexp?=?"^([a-z0-9A-Z]+[-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$",?message?=?"郵箱格式不正確")private?String?to;@NotBlank(message?=?"標題不能為空")private?String?title;@NotBlank(message?=?"正文不能為空")private?String?content;private?String?msgId;//?消息id }6.5、SendMailUtil郵件發送類
@Component @Slf4j public?class?SendMailUtil?{@Value("${spring.mail.from}")private?String?from;@Autowiredprivate?JavaMailSender?mailSender;/***?發送簡單郵件**?@param?mail*/public?boolean?send(Mail?mail)?{String?to?=?mail.getTo();//?目標郵箱String?title?=?mail.getTitle();//?郵件標題String?content?=?mail.getContent();//?郵件正文SimpleMailMessage?message?=?new?SimpleMailMessage();message.setFrom(from);message.setTo(to);message.setSubject(title);message.setText(content);try?{mailSender.send(message);log.info("郵件發送成功");return?true;}?catch?(MailException?e)?{log.error("郵件發送失敗,?to:?{},?title:?{}",?to,?title,?e);return?false;}} }6.6、ProduceServiceImpl 生產者類
@Service public?class?ProduceServiceImpl?implements?ProduceService?{@Autowiredprivate?RabbitTemplate?rabbitTemplate;@Overridepublic?boolean?send(Mail?mail)?{//創建uuidString?msgId?=?UUID.randomUUID().toString().replaceAll("-",?"");mail.setMsgId(msgId);//發送消息到rabbitMQCorrelationData?correlationData?=?new?CorrelationData(msgId);rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,?RabbitConfig.MAIL_ROUTING_KEY_NAME,?MessageHelper.objToMsg(mail),?correlationData);return?true;} }6.7、ConsumerMailService 消費者類
@Component @Slf4j public?class?ConsumerMailService?{@Autowiredprivate?SendMailUtil?sendMailUtil;@RabbitListener(queues?=?RabbitConfig.MAIL_QUEUE_NAME)public?void?consume(Message?message,?Channel?channel)?throws?IOException?{//將消息轉化為對象String?str?=?new?String(message.getBody());Mail?mail?=?JsonUtil.strToObj(str,?Mail.class);log.info("收到消息:?{}",?mail.toString());MessageProperties?properties?=?message.getMessageProperties();long?tag?=?properties.getDeliveryTag();boolean?success?=?sendMailUtil.send(mail);if?(success)?{channel.basicAck(tag,?false);//?消費確認}?else?{channel.basicNack(tag,?false,?true);}} }6.8、TestController 控制層類
@RestController @RequestMapping("/test") @Slf4j public?class?TestController?{@Autowiredprivate?ProduceService?testService;@PostMapping("send")public?boolean?sendMail(Mail?mail)?{return?testService.send(mail);} }七、測試服務
啟動 SpringBoot 服務之后,用 postman 模擬請求接口。
查看控制臺信息。
查詢接受者郵件信息。
郵件發送成功!
八、消息發送失敗處理
雖然,上面案例可以成功的實現消息的發送,但是上面的流程很脆弱,例如:rabbitMQ 突然蹦了、郵件發送失敗了、重啟 rabbitMQ 服務器出現消息重復消費,應該怎處理呢?
很顯然,我們需要對原有的邏輯進行升級改造,因此我們需要引入數據庫來記錄消息的發送情況。
8.1、創建消息投遞日志表
CREATE?TABLE?`msg_log`?(`msg_id`?varchar(255)?NOT?NULL?DEFAULT?''?COMMENT?'消息唯一標識',`msg`?text?COMMENT?'消息體,?json格式化',`exchange`?varchar(255)?NOT?NULL?DEFAULT?''?COMMENT?'交換機',`routing_key`?varchar(255)?NOT?NULL?DEFAULT?''?COMMENT?'路由鍵',`status`?int(11)?NOT?NULL?DEFAULT?'0'?COMMENT?'狀態:?0投遞中?1投遞成功?2投遞失敗?3已消費',`try_count`?int(11)?NOT?NULL?DEFAULT?'0'?COMMENT?'重試次數',`next_try_time`?datetime?DEFAULT?NULL?COMMENT?'下一次重試時間',`create_time`?datetime?DEFAULT?NULL?COMMENT?'創建時間',`update_time`?datetime?DEFAULT?NULL?COMMENT?'更新時間',PRIMARY?KEY?(`msg_id`),UNIQUE?KEY?`unq_msg_id`?(`msg_id`)?USING?BTREE )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4?COMMENT='消息投遞日志';8.2、編寫 MsgLog 相關服務類
public?interface?MsgLogService?{/***?插入消息日志*?@param?msgLog*/void?insert(MsgLog?msgLog);/***?更新消息狀態*?@param?msgId*?@param?status*/void?updateStatus(String?msgId,?Integer?status);/***?查詢消息*?@param?msgId*?@return*/MsgLog?selectByMsgId(String?msgId); }8.3、改寫服務邏輯
在生產服務類中,新增數據寫入。
同時,在RabbitConfig服務配置,當消息發送成功之后,新增更新消息狀態邏輯。
改造消費者ConsumerMailService,每次消費的時候,從數據庫中查詢,如果消息已經被消費,不用再重復發送數據!
這樣即可保證,如果 rabbitMQ 服務器,即使重啟之后重新推送消息,通過數據庫判斷,也不會重復消費進而發生業務異常!
8.4、利用定數任務對消息投遞失敗進行補償
當 rabbitMQ 服務器突然掛掉之后,生成者就無法正常進行投遞數據,此時因為消息已經被記錄到數據庫,因此我們可以利用定數任務查詢出沒有投遞成功的消息,進行補償投遞。
利用定數任務,對投遞失敗的消息進行補償投遞,基本可以保證消息 100% 消費成功!
九、總結
本文主要是通過發送郵件這個業務案例,來講解 Springboot 與 rabbitMQ 技術的整合和使用!
當然解決這個業務需求的技術方案還有很多,例如 Springboot 與 rocketMQ 也可以實現這個需求,這個會在后期的文章講解!
同時,Springboot +?rabbitMQ?這種架構方案更適合于集群應用,如果是單體應用,直接通過服務類操作即可實現郵件推送!
本篇主要是Springboot 與 rabbitMQ整合和基本使用教程,希望小伙伴能有所收獲!
十、參考
1、簡書 - wangzaiplus - springboot + rabbitmq發送郵件(保證消息100%投遞成功并被消費)
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的利用SpringBoot+RabbitMQ,实现一个邮件推送服务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 粉丝福利,送10个程序员专用机械键盘
- 下一篇: NYOJ 608 畅通工程