javascript
SpringBoot 整合 Redis 实现消息队列
寫這篇文章的原因還是得歸咎于👉 上一篇博客寫了👉Docker搭建Redis Cluster 集群環(huán)境
我自己是認(rèn)為對(duì)于每個(gè)知識(shí)點(diǎn),光看了不操作是沒有用的(遺忘太快…),多少得在手上用上幾回才可以,才能對(duì)它加深印象。
昨天搭建了Redis Cluster 集群環(huán)境,今天就來(lái)拿它玩一玩Redis 消息隊(duì)列吧
于是便有了這個(gè)Redis 實(shí)現(xiàn)消息隊(duì)列的Demo,
很喜歡一句話:”八小時(shí)內(nèi)謀生活,八小時(shí)外謀發(fā)展“。
共勉.😁
Docker搭建Redis集群
SpringBoot 整合 Redis 實(shí)現(xiàn)消息隊(duì)列
- 一、前言
- 概念
- 作用:
- 應(yīng)用場(chǎng)景:
- 二、前期準(zhǔn)備
- 2.1、項(xiàng)目結(jié)構(gòu)
- 2.2、依賴的jar包
- 2.3、yml配置文件
- 三、編碼
- 3.1、config層
- 3.2、信息實(shí)體類
- 3.3、MyThread類
- 3.4、消費(fèi)者
- 3.5、生產(chǎn)者
- 四、測(cè)試
- 五、自言自語(yǔ)
一、前言
概念
消息隊(duì)列:“消息隊(duì)列”是在消息的傳輸過(guò)程中保存消息的容器。
其實(shí)就是個(gè) 生產(chǎn)者--->消息隊(duì)列<---消費(fèi)者 的模型。集群就是蠻多蠻多而已。
作用:
主要解決應(yīng)用耦合,異步消息,流量削鋒等問(wèn)題
應(yīng)用場(chǎng)景:
異步處理,應(yīng)用解耦(拆分多系統(tǒng)),流量削峰(秒殺活動(dòng)、請(qǐng)求量過(guò)大)和消息通訊(發(fā)布公告、日志)四個(gè)場(chǎng)景。
此處只演示了最簡(jiǎn)單的一個(gè)圖哈。
舉例子:異步消息
使用消息隊(duì)列后
消息中間件其實(shí)市面上已經(jīng)有很多,如RabbitMq,RocketMq、ActiveMq、Kafka等,我拿Redis來(lái)做消息隊(duì)列,其本意是1)為了熟悉Redis;2)Redis 確實(shí)可以來(lái)做簡(jiǎn)單的消息隊(duì)列(狗頭保命)
二、前期準(zhǔn)備
就是需要個(gè)Redis,其他的倒是沒啥特殊的啦。😁
2.1、項(xiàng)目結(jié)構(gòu)
一普通的SpringBoot的項(xiàng)目…😊
2.2、依賴的jar包
jar 也都是一些正常的jar包哈,沒啥新奇玩意。😜
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.5.2</version><relativePath/> <!-- lookup parent from repository --> </parent> <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.72</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> </dependencies>2.3、yml配置文件
分單機(jī)和集群,主要是上一篇文章帶的…🙄😶
單機(jī)配置文件
spring:redis:database: 0port: 6379host: localhostpassword:lettuce:pool:# 連接池最大連接數(shù)(使用負(fù)值表示沒有限制)max-active: 1024# 連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制)max-wait: 10000# 連接池中的最大空閑連接max-idle: 200# 連接池中的最小空閑連接min-idle: 0# 連接超時(shí)時(shí)間(毫秒)timeout: 10000redis集群配置文件
server:port: 8089 spring:application:name: springboot-redisredis:password: 1234cluster:nodes:- IP地址:6379- IP地址:6380- IP地址:6381- IP地址:6382- IP地址:6383- IP地址:6384max-redirects: 3 # 獲取失敗 最大重定向次數(shù)lettuce:pool:max-active: 1000 #連接池最大連接數(shù)(使用負(fù)值表示沒有限制)max-idle: 10 # 連接池中的最大空閑連接min-idle: 5 # 連接池中的最小空閑連接#===========jedis配置方式============================================= # jedis: # pool: # max-active: 1000 # 連接池最大連接數(shù)(使用負(fù)值表示沒有限制) # max-wait: -1ms # 連接池最大阻塞等待時(shí)間(使用負(fù)值表示沒有限制) # max-idle: 10 # 連接池中的最大空閑連接 # min-idle: 5 # 連接池中的最小空閑連接 #三、編碼
3.1、config層
沒有什么特殊的配置,🤗
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.data.redis.RedisProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer;/*** redis 配置類* 1. 設(shè)置RedisTemplate序列化/返序列化** @author cuberxp* @since 1.0.0* Create time 2020/1/23 0:06*/ @Configuration @ConditionalOnClass(RedisOperations.class) @EnableConfigurationProperties(RedisProperties.class) public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();//設(shè)置value hashValue值的序列化Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);serializer.setObjectMapper(om);redisTemplate.setValueSerializer(serializer);redisTemplate.setHashValueSerializer(serializer);//key hasKey的序列化redisTemplate.setKeySerializer(stringRedisSerializer);redisTemplate.setHashKeySerializer(stringRedisSerializer);redisTemplate.setConnectionFactory(redisConnectionFactory);redisTemplate.afterPropertiesSet();return redisTemplate;} }3.2、信息實(shí)體類
加個(gè)實(shí)體類,模擬傳遞信息中需要用到的實(shí)體類。
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;import java.io.Serializable;/*** @author crush*/ @Data @AllArgsConstructor @NoArgsConstructor public class AnnouncementMessage implements Serializable {private static final long serialVersionUID = 8632296967087444509L;private String id;/*** 內(nèi)容 */private String content; }3.3、MyThread類
隨項(xiàng)目啟動(dòng)而啟動(dòng)。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;/*** @Author: crush* @Date: 2021-08-06 22:17* version 1.0* ApplicationRunner:* 用于指示 bean 在包含在SpringApplication時(shí)應(yīng)該運(yùn)行的SpringApplication 。 * 通俗說(shuō)就是 在這個(gè)項(xiàng)目運(yùn)行的時(shí)候,它也會(huì)自動(dòng)運(yùn)行起來(lái)。*/ @Component public class MyThread implements ApplicationRunner {@AutowiredMessageConsumerService messageConsumerService;@Overridepublic void run(ApplicationArguments args) throws Exception {messageConsumerService.start();} }3.4、消費(fèi)者
import java.util.concurrent.TimeUnit; import com.crush.queue.entity.AnnouncementMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service;/*** ApplicationRunner 實(shí)現(xiàn)這個(gè)接口可以跟隨項(xiàng)目啟動(dòng)而啟動(dòng)* @author crush*/ @Service public class MessageConsumerService extends Thread {@Autowiredprivate RedisTemplate<String,Object> redisTemplate;private volatile boolean flag = true;private String queueKey="queue";private Long popTime=1000L;@Overridepublic void run() {try {AnnouncementMessage message;// 為了能一直循環(huán)而不結(jié)束while(flag && !Thread.currentThread().isInterrupted()) {message = (AnnouncementMessage) redisTemplate.opsForList().rightPop(queueKey,popTime,TimeUnit.SECONDS);System.out.println("接收到了" + message);}} catch (Exception e) {System.err.println(e.getMessage());}}public boolean isFlag() {return flag;}public void setFlag(boolean flag) {this.flag = flag;}}3.5、生產(chǎn)者
import com.crush.queue.entity.AnnouncementMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service;@Service public class MessageProducerService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;private String queueKey="queue";public Long sendMeassage(AnnouncementMessage message) {System.out.println("發(fā)送了" + message);return redisTemplate.opsForList().leftPush(queueKey, message);}}四、測(cè)試
就是簡(jiǎn)單寫了一個(gè)測(cè)試代碼。😝
import com.crush.queue.entity.AnnouncementMessage; import com.crush.queue.service.MessageConsumerService; import com.crush.queue.service.MessageProducerService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; /*** @Author: crush* @Date: 2021-08-06 17:11* version 1.0*/ @SpringBootTest public class MessageQueueTest {@Autowiredprivate MessageProducerService producer;@Autowiredprivate MessageConsumerService consumer;/*** 這個(gè)測(cè)時(shí) 的先啟動(dòng)主啟動(dòng)累,* 然后消費(fèi)者可以一直在監(jiān)聽。*/@Testpublic void testQueue2() {producer.sendMeassage(new AnnouncementMessage("1", "aaaa"));producer.sendMeassage(new AnnouncementMessage("2", "bbbb"));try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}} }注:這只是一個(gè)小demo ,很多細(xì)節(jié)都沒有去考慮,只是一次對(duì)Redis做消息隊(duì)列的初探,大家見諒。
五、自言自語(yǔ)
一次由搭建Redis Cluster集群開啟的博客,終于結(jié)束了,算了好像還沒,感覺下次可以多寫點(diǎn)實(shí)用的。😂🤣
不知道大家學(xué)習(xí)是什么樣的,博主自己的感覺就是學(xué)了的東西,要通過(guò)自己去梳理一遍,或者說(shuō)是去實(shí)踐一遍,我覺得這樣子,無(wú)論是對(duì)于理解還是記憶,都會(huì)更加深刻。
如若有不足之處,請(qǐng)不嗇賜教!!😁
有疑惑之處,也可以留言或私信,定會(huì)第一時(shí)間回復(fù)。👩?💻
這篇文章就到這里啦,下篇文章再見。👉一篇文章用Redis 實(shí)現(xiàn)消息隊(duì)列(還在寫)
總結(jié)
以上是生活随笔為你收集整理的SpringBoot 整合 Redis 实现消息队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 史上最详细Docker搭建Redis C
- 下一篇: Spring Boot 使用Actuat