javascript
Spring cloud集成Rabbitmq
1、配置pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.0.RELEASE</version> </dependency>2、yml配置
spring:
? ?#rabbit mq
? ?rabbitmq:
? ? ?host: 192.168.1.146
? ? ?port: 5672
? ? ?username: guest
? ? ?password: guest
3、連接、交換器、隊列等設置
@Configuration public class RabbitMqConfig {/*** 組件發布EXCHANGE*/public static final String EXCHANGE_COMPONENT_PUBLISHED="exchange.component.published";public static final String EXCHANGE_COMPONENT_SYNCED="exchange.component.synced";public static final String EXCHANGE_EXTRACTION_TASK="exchange.extraction.task";/*** 組件發布消息隊列*/public static final String QUEUE_COMPONENT_PUBLISHED="queue.component.published";public static final String QUEUE_COMPONENT_SYNCED="queue.component.synced";public static final String QUEUE_EXTRACTION_TASK="queue.extraction.task";public static final String ROUTING_KEY_="";@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.port}")private int port;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("/");connectionFactory.setPublisherConfirms(true);return connectionFactory;}@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}/**** @return*/@Beanpublic FanoutExchange exchangeComponentPublished(){return new FanoutExchange(EXCHANGE_COMPONENT_PUBLISHED);}@Beanpublic Queue queueComponentPublished(){return new Queue(QUEUE_COMPONENT_PUBLISHED,false);}@Beanpublic FanoutExchange exchangeComponentSynced(){return new FanoutExchange(EXCHANGE_COMPONENT_SYNCED);}@Beanpublic FanoutExchange exchangeExtractionTask(){return new FanoutExchange(EXCHANGE_EXTRACTION_TASK);}@Beanpublic Queue queueComponentSynced(){return new Queue(QUEUE_COMPONENT_SYNCED,false);}@Beanpublic Queue queueExtractionTask(){return new Queue(QUEUE_EXTRACTION_TASK,false);}@Beanpublic Binding componentSyncedBinding(){return BindingBuilder.bind(queueComponentSynced()).to(exchangeComponentSynced());}@Beanpublic Binding extractionTaskBinding(){return BindingBuilder.bind(queueExtractionTask()).to(exchangeExtractionTask());}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory factory){RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);return rabbitTemplate;}}需要定義交換器,隊列,綁定,會自動注冊。
4、生產者
@Component public class ExtractionTaskMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(ExtractionTaskMessage msg){rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());ObjectMapper mapper = new ObjectMapper();Message m = null;try {m = MessageBuilder.withBody(mapper.writeValueAsBytes(msg)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).setContentType(MessageProperties.CONTENT_TYPE_JSON).build();} catch (JsonProcessingException e) {e.printStackTrace();}rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_EXTRACTION_TASK,"",msg);} }使用RabbitTemplate發送消息。
5、消費者
@Component @RabbitListener(queues = RabbitMqConfig.QUEUE_EXTRACTION_TASK) public class ExtractionTaskMessageReceiver {private static Logger logger = LoggerFactory.getLogger(ExtractionTaskMessageReceiver.class);@Autowiredprivate DataExtractorFactory dataExtractorFactory;@Autowiredprivate ExtractionTaskService extractionTaskService;@Autowiredprivate ExtractionDataService extractionDataService;@RabbitHandlerpublic void process(@Payload ExtractionTaskMessage msg) {try {} catch (DataExtractionException e){logger.error( "",e.getMessage());}catch (Exception e) {//異常處理logger.error(e.getMessage());}}} 使用 @RabbitListener 進行監聽,@RabbitHandler定義消息處理方法。在進行監聽時,會查詢所有@RabbitHandler注解的消息處理方法,如果沒有參數類型匹配的方法,則異常。
@RabbitListener 注意
-
消息處理方法參數是由 MessageConverter 轉化,若使用自定義 MessageConverter 則需要在 RabbitListenerContainerFactory 實例中去設置(默認 Spring 使用的實現是 SimpleRabbitListenerContainerFactory)
-
消息的 content_type 屬性表示消息 body 數據以什么數據格式存儲,接收消息除了使用 Message 對象接收消息(包含消息屬性等信息)之外,還可直接使用對應類型接收消息 body 內容,但若方法參數類型不正確會拋異常:
- application/octet-stream:二進制字節數組存儲,使用 byte[]
- application/x-java-serialized-object:java 對象序列化格式存儲。使用 Object、相應類型(反序列化時類型應該同包同名,否者會拋出找不到類異常)
- text/plain:文本數據類型存儲。使用 String
- application/json:JSON 格式。使用 Object、相應類型
注意:@RabbitListener注解在類上或方法上,行為不一樣。在類上,消費方法參數類型不可以設置為Message。在方法上,方法類型可以為Message
MessageConvert
- 涉及網絡傳輸的應用序列化不可避免,發送端以某種規則將消息轉成 byte 數組進行發送,接收端則以約定的規則進行 byte[] 數組的解析
- RabbitMQ 的序列化是指 Message 的 body 屬性,即我們真正需要傳輸的內容,RabbitMQ 抽象出一個 MessageConvert 接口處理消息的序列化,其實現有 SimpleMessageConverter(默認)、Jackson2JsonMessageConverter 等
- 當調用了 convertAndSend 方法時會使用 MessageConvert 進行消息的序列化
- SimpleMessageConverter 對于要發送的消息體 body 為 byte[] 時不進行處理,如果是 String 則轉成字節數組,如果是 Java 對象,則使用 jdk 序列化將消息轉成字節數組,轉出來的結果較大,含class類名,類相應方法等信息。因此性能較差
- 當使用 RabbitMQ 作為中間件時,數據量比較大,此時就要考慮使用類似 Jackson2JsonMessageConverter 等序列化形式以此提高性能
設置MessageConvert
Json格式
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}?自定義
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new MessageConverter() {@Overridepublic Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {return null;}@Overridepublic Object fromMessage(Message message) throws MessageConversionException {try(ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(message.getBody()))){return (User)ois.readObject();}catch (Exception e){e.printStackTrace();return null;}}});return factory;}@Payload 與 @Headers
- 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
? ? System.out.println("body:"+body);
? ? System.out.println("Headers:"+headers);
}
- 也可以獲取單個 Header 屬性
@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
? ? System.out.println("body:"+body);
? ? System.out.println("token:"+token);
}
?
通過 @RabbitListener 注解聲明 Binding
- 通過 @RabbitListener 的 bindings 屬性聲明 Binding(若 RabbitMQ 中不存在該綁定所需要的 Queue、Exchange、RouteKey 則自動創建,若存在則拋出異常)
@RabbitListener 和 @RabbitHandler 搭配使用
- @RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
- @RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,根據 MessageConverter 轉換后的參數類型
?
?
?
總結
以上是生活随笔為你收集整理的Spring cloud集成Rabbitmq的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring boot添加 LocalD
- 下一篇: RabbitMq链接