RabbitMQ:The channelMax limit is reached. Try later.
RabbitMQ:The channelMax limit is reached. Try later.
? 這個問題是我當(dāng)初寫項(xiàng)目時(shí)遇到的,因?yàn)橛肦abbitMQ做削峰處理,高并發(fā)情況下,channel數(shù)到達(dá)了限制,所以不能繼續(xù)創(chuàng)建,相信大家也遇到過。
? 正常來說,這個錯誤還是比較少見的,只不過項(xiàng)目需要保證消息的可靠性,所以采取了發(fā)送確認(rèn)和消費(fèi)手動確認(rèn)機(jī)制,導(dǎo)致并發(fā)性能下降,從而出現(xiàn)這個問題。、
? 這里先上結(jié)論,方便著急的小伙伴們改bug。
? 結(jié)論:RabbitMQ java客戶端在創(chuàng)建連接時(shí),會向服務(wù)端發(fā)送一個請求,這個請求會獲取到服務(wù)端的channelMax值,java客戶端會自己進(jìn)行一個處理,兩者都不為0時(shí),會選擇一個小的值,如果你沒有在rabbitmq.conf文件中修改channel_Max的值,那么java客戶端會采用默認(rèn)的2047或更小,這就會導(dǎo)致你明明在客戶端連接上配置了channelMax(比如你配置了4095),但依舊會報(bào)錯,而且web管理頁面最大值依舊是2047
第一次修改配置不生效
出現(xiàn)這種情況經(jīng)常伴隨著消息丟失,而且消息丟失情況非常嚴(yán)重,達(dá)到了百分之二十的丟失率,這個丟失率也會因?yàn)椴l(fā)量、每次消費(fèi)數(shù)量等等配置的不同而變化。
由于項(xiàng)目是基于SpringBoot2.2的,yml暫時(shí)無法配置RequestChannelMax的值,這里只能采用直接通過set的方式放入值。
@Configuration @Slf4j public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) connectionFactory;//這里我明明設(shè)置了4095,但是項(xiàng)目運(yùn)行之后,壓測之后,還是會報(bào)異常,而且報(bào)異常的時(shí)候,RabbitMQ //web管理頁面上的channel數(shù)依舊是2047,不得已只能分析源碼了cachingConnectionFactory.getRabbitConnectionFactory().setRequestedChannelMax(4095);final RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);rabbitTemplate.setMessageConverter(jackson2MessageConverter());rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback((correlationData, b, s) -> {if(!b){log.error("confirmCallBack 發(fā)送失敗的數(shù)據(jù):{}",correlationData);log.error("confirmCallBack 確認(rèn)情況:{}",b);log.error("confirmCallBack 發(fā)送失敗的原因:{}",s);}});rabbitTemplate.setReturnCallback((message, i, s, s1, s2) -> {log.error("returnCallBack 消息:{}",message);log.error("returnCallBack 回應(yīng)碼:{}",i);log.error("returnCallBack 回應(yīng)信息:{}",s);log.error("returnCallBack 交換機(jī):{}",s1);log.error("returnCallBack 路由鍵:{}",s2);});return rabbitTemplate;}@Beanpublic Jackson2JsonMessageConverter jackson2MessageConverter() {return new Jackson2JsonMessageConverter();} }分析源碼
首先是模擬出報(bào)錯的場景,然后進(jìn)入報(bào)異常的類。
發(fā)現(xiàn)是this.delegate.createChannel();方法返回的是一個空channel對象,進(jìn)入這個方法看一下。
發(fā)現(xiàn)有一個ChannelManage對象,顧名思義,就是一個channel管理器,由它負(fù)責(zé)創(chuàng)建channel,那么看一下這個對象都有什么值呢?
public Channel createChannel() throws IOException {this.ensureIsOpen();ChannelManager cm = this._channelManager;if (cm == null) {return null;} else {Channel channel = cm.createChannel(this);this.metricsCollector.newChannel(channel);return channel;}}只截取了部分代碼,首先可以看到有一個int類型的channelMax,這個值就是channel的最大值,還有一個構(gòu)造器,很明顯,這個值是通過構(gòu)造器傳進(jìn)來的,通過容器初始化時(shí)打斷點(diǎn)進(jìn)行跟蹤,發(fā)現(xiàn)此時(shí)的channelMax依舊是2047,這也進(jìn)一步證明了,值的覆蓋或者處理發(fā)生在這個類調(diào)用之前。
public class ChannelManager {private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);private final Object monitor;private final Map<Integer, ChannelN> _channelMap;private final IntAllocator channelNumberAllocator;private final ConsumerWorkService workService;private final Set<CountDownLatch> shutdownSet;private final int _channelMax;private ExecutorService shutdownExecutor;private final ThreadFactory threadFactory;private int channelShutdownTimeout;protected final MetricsCollector metricsCollector;public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory, MetricsCollector metricsCollector) {this.monitor = new Object();this._channelMap = new HashMap();this.shutdownSet = new HashSet();this.channelShutdownTimeout = 63000;if (channelMax == 0) {channelMax = 65535;}this._channelMax = channelMax;this.channelNumberAllocator = new IntAllocator(1, channelMax);this.workService = workService;this.threadFactory = threadFactory;this.metricsCollector = metricsCollector;} }進(jìn)一步跟蹤之后,發(fā)現(xiàn)在AMQConnection類里的instantiateChannelManager()方法調(diào)用了構(gòu)造器,繼續(xù)往上追蹤。
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);this.configureChannelManager(result);return result;}在AMQConnetion類的start()方法中最終發(fā)現(xiàn)了值改變的地方。
this.requestedChannelMax值是我在配置類中配置的4095
connTune.getChannelMax()是2047
也就是說,negotiateChannelMax()方法對這兩個值進(jìn)行了處理,最終選擇了2047
int channelMax = this.negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax());this._channelManager = this.instantiateChannelManager(channelMax, this.threadFactory);最終發(fā)現(xiàn)這么一段處理邏輯,如果兩個數(shù)字都不為0,那么就取最小的,反之取最大的,看到這里是明白做了什么處理,但是還是有一處不明白,2047的值究竟從何處來的?
protected int negotiateChannelMax(int requestedChannelMax, int serverMax) {return negotiatedMaxValue(requestedChannelMax, serverMax);}private static int negotiatedMaxValue(int clientValue, int serverValue) {return clientValue != 0 && serverValue != 0 ? Math.min(clientValue, serverValue) : Math.max(clientValue, serverValue);}其實(shí)重點(diǎn)是connTune.getChannelMax()這個方法
int channelMax = this.negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax());通過對connTune的追尋,發(fā)現(xiàn)了這段處理,debug也證明了確實(shí)在這里獲取的2047這個值,
其實(shí)不管從方法名rpc()還是變量名serverResponse來看,這個都是做了一個請求,那么向誰請求其實(shí)很顯而易見了,這里向RabbitMQ端做了一個請求,用來索取MQ端的channelMax、frameMax、heartBeat值等等
Tune connTune = null;try {......try {Method serverResponse = this._channel0.rpc((Method)method, this.handshakeTimeout / 2).getMethod();if (serverResponse instanceof Tune) {connTune = (Tune)serverResponse;}......到現(xiàn)在其實(shí)就很明確了,我們只在客戶端修改邊界值配置是無效的,必須同步修改MQ服務(wù)端的配置,也就是rabbitmq.conf文件
## Set the max permissible number of channels per connection. ## 0 means "no limit". ##在配置文件中,輸入以下參數(shù)和自己想要設(shè)置的值即可,如果用不到2047,那就不用配置 # channel_max = 128其實(shí)問題并不大,主要還是不了解MQ的一個客戶端連接過程,導(dǎo)致耗費(fèi)了大量時(shí)間。
這里還是推薦大家,先用百度搜索,第一頁看不到正確解決方案,那就去StackOverflow網(wǎng)站,還不行的話,那就使用終極大法,要么官網(wǎng)逐行看文檔,要么走一波源碼,也是鍛煉自己解決問題的思路和能力。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ:The channelMax limit is reached. Try later.的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SQL create file遇到操作系
- 下一篇: 微信开发者工具报错 系统错误,错误码-