post发送byte数组_KAFKA消息发送
消息發送的整體架構
RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發送,進而減少網絡傳輸的資源消耗以提升性能。RecordAccumulator 緩存的大小可以通過生產者客戶端參數 buffer.memory 配置,默認值為 33554432B,即32MB。如果生產者發送消息的速度超過發送到服務器的速度,則會導致生產者空間不足,這個時候 KafkaProducer 的 send() 方法調用要么被阻塞,要么拋出異常,這個取決于參數 max.block.ms 的配置,此參數的默認值為60000,即60秒。
Kafka是通過broker中未確認的消息數來判斷broker的負載的.未確認的消息數越多則負載越高.Sender線程通過InFlightRequests來緩存已經發出去但還沒有收到響應的請求,具體形式為Map.
消息有序性
Kafka 可以保證同一個分區中的消息是有序的。如果生產者按照一定的順序發送消息,那么這些消息也會順序地寫入分區,進而消費者也可以按照同樣的順序消費它們。
如果將acks參數配置為非零值,并且max.in.flight.requests.per.connection 參數配置為大于1的值,那么就會出現錯序的現象:如果第一批次消息寫入失敗,而第二批次消息寫入成功,那么生產者會重試發送第一批次的消息,此時如果第一批次的消息寫入成功,那么這兩個批次的消息就出現了錯序。一般而言,在需要保證消息順序的場合建議把參數 max.in.flight.requests.per.connection配置為1,而不是把 acks 配置為0,不過這樣也會影響整體的吞吐。
max.in.flight.requests.per.connection = 1 限制客戶端在單個連接上能夠發送的未響應請求的個數(也就是客戶端與 Node 之間的連接)。設置此值是1表示kafka broker在響應請求之前client不能再向同一個broker發送請求。注意:設置此參數是為了避免消息亂序
消息發送的三種模式
發后即忘(fire-and-forget,不保證消息到達broker,會丟消息)
同步(sync,同步發送,一條發完才發送下一條,每次都會返回Future值或拋異常,如果是可重試的異常,那么如果配置了retries參數則可自動重試)
異步(async,會有一個回調函數來通知消息的處理結果是成功還是異常)
同步代碼
try { Future<RecordMetadata> future = producer.send(record); //阻塞獲取結果,然后才能下一條發送 RecordMetadata metadata = future.get(); System.out.println(metadata.topic() + "-" +metadata.partition() + ":" + metadata.offset());} catch (ExecutionException | InterruptedException e) { //常見的可重試異常有:NetworkException、LeaderNotAvailableException、 //UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。 //對于可重試的異常,如果配置了 retries 參數,那么只要在規定的重試次數內自行恢復了,就不會拋出異常。 //不可重試異常如LeaderNotAvailableException ,RecordTooLargeException則是直接拋異常}異步代碼
public class KafkaAsyncSender{ private static final Logger logger = LoggerFactory.getLogger(KafkaAsyncSender.class); //KafkaProducer 而言,它是線程安全的 private Producer producer; @Autowired private UdpSerializer udpSerializer; @Value("${kafka_connect_string}") private String kafkaConnectString; private Cache<String, Integer> cache; private KafkaTopicPartitionMapper mapper; @PostConstruct public void init() { Properties props = new Properties(); props.put("metadata.broker.list", kafkaConnectString.trim()); props.put("bootstrap.servers", kafkaConnectString.trim()); props.put("producer.type", "async");//消息發送類型同步(sync)還是異步(async將本地buffer) props.put("compression.codec", "none");//消息的壓縮格式,默認為none不壓縮,gzip, snappy, lz4 生產者發送消息之后,只要分區的 leader 副本成功寫入消息,那么它就會收到來自服務端的成功響應 props.put("request.required.acks", "1"); //發送失敗后重試的次數,允許重試 //如果 max.in.flight.requests.per.connection 設置不為1,可能會導致亂序 props.put("message.send.max.retries", 3);//失敗重試次數 props.put("retry.backoff.ms", 100);//重試間隔 props.put("queue.buffering.max.ms", 10);//緩存數據的最大時間間隔 props.put("batch.num.messages", 1000);//緩存數據的最大條數 //限制生產者客戶端能發送的消息的最大值,默認值為1048576B,即1MB,需注意broker端的message.max.bytes props.put("max.request.size", 1024 * 1024); props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); this.producer = new KafkaProducer<String, String>(props); this.mapper = new KafkaTopicPartitionMapper(this.producer); this.cache = CacheBuilder.newBuilder().refreshAfterWrite(1, TimeUnit.SECONDS) .build(this.mapper); } //將被下面的handle類調用 public boolean sendMsg(final String topic, Object body, Callback callback) { //直接發送bytes數組 if(body instanceof byte[]){ ProducerRecord <String,byte[]>record = new ProducerRecord<String,byte[]>(topic,(byte[])body); producer.send(record); }else if(body instanceof BinlogEventInfo){ //對象類型的消息 final BinlogEventInfo binlogEventInfo = (BinlogEventInfo)body; Integer num = null; //獲取發送到kafka的key,這里是使用guava緩存了key String cacheKey = genCacheKey(topic, binlogEventInfo); num = this.cache.getIfPresent(cacheKey); if(num == null){ try { num = this.mapper.load(cacheKey); } catch (Exception e) { logger.error("load kafka partition cache exception :", e); } if(num == null){ this.cache.put(cacheKey, Integer.MIN_VALUE); } else { this.cache.put(cacheKey, num); } } //構造發送的消息體,注意序列化是使用Byte序列化,沒使用默認的String ProducerRecord <String,byte[]>record = null; if(num == Integer.MIN_VALUE || num == null){ //獲取key失敗,不使用key的構造發送發送數據 if(logger.isDebugEnabled()){ logger.debug("get partition fail , send to {}, info {}" , topic, JsonUtils.toJson((binlogEventInfo))); } record = new ProducerRecord<String,byte[]>(topic,udpSerializer.serialize(binlogEventInfo)); } else { //根據key指定到哪一個分區的發送 if(logger.isDebugEnabled()){ logger.debug("send to {}, partition {}, info {}" , topic, num, JsonUtils.toJson((binlogEventInfo))); } //這里有三個可能影響到分區數的因素 : 1.直接指定分區數 2,直接指定key 3.無任何指定 //在直接指定了分區數的情況下,那么將直接發送往此分區 //若分區數未指定,但key指定了,那么因為計算的hash值一樣,那么相同的key也會發送到一樣的分區 //若都未指定,則直接輪詢分區來發送消息 record = new ProducerRecord<String,byte[]>(topic, num , null ,udpSerializer.serialize(binlogEventInfo)); } producer.send(record, callback); } return true; } private String genCacheKey(String topic, BinlogEventInfo binlogEventInfo) { return topic + "-" + binlogEventInfo.getHost() + "-" + binlogEventInfo.getSchemaName() + "-" + binlogEventInfo.getTableName(); }}Mapper的查詢分區
public class KafkaTopicPartitionMapper extends CacheLoader<String, Integer>{ private Producer producer; public KafkaTopicPartitionMapper(Producer producer){ this.producer = producer; } @Override //格式 topic-host-database-table public Integer load(String key) throws Exception { try{ String[] arr = key.split("-"); int hash = this.hash(key); List list = this.producer.partitionsFor(arr[0]); int psize = list.size(); if(psize == 0){ return null; } else { //根據哈希值 % 分區數 return parlist.get(Math.abs(hash % psize)); } } catch(Exception e){ return null; } } //計算hash值 private int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } }異步的消息回調
//kafka消息管理類,發送消息以及回調處理public class KafkaQueueChannelHandler extends AsyncQueueChannelHandler{ private KafkaAsyncSender sender; public KafkaQueueChannelHandler(){ super("kafka"); this.sender = SpringContextUtil.getBean(KafkaAsyncSender.class); } public KafkaQueueChannelHandler(String identity) { super(identity); this.sender = SpringContextUtil.getBean(KafkaAsyncSender.class); } @Override public void sendMessage(BinlogEventInfo info, DeliverInfo deliverInfo) { //發送主題消息的時候設置回調 if(!sender.sendMsg(deliverInfo.getSendTopic(), info, new KafkaCallback(info, deliverInfo))) { this.stopDeliverAndNotify(info,Constants.SENDTYPE_KAFKA,"",deliverInfo.getSendTopic()); } } @Override public void sendMessageInner(BinlogEventInfo info, DeliverInfo deliverInfo) { this.sender.sendMsg(deliverInfo.getSendTopic(), info, new KafkaCallback(info, deliverInfo)); } private class KafkaCallback implements Callback{ private BinlogEventInfo info; private DeliverInfo deliverInfo; public KafkaCallback(BinlogEventInfo info, DeliverInfo deliverInfo) { this.info = info; this.deliverInfo = deliverInfo; } //異步回調方法 @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //onCompletion() 方法的兩個參數是互斥的,消息發送成功時,metadata 不為 null 而 exception 為 null; //消息發送異常時,metadata 為 null 而 exception 不為 null。 try{ if(metadata!=null){ logger.info("kafka回調信息:topic=【{}】,partition=【{}】,offset=【{}】,發送內容=【{}】,exception=【{}】,",metadata.topic(),metadata.partition(), metadata.offset(),JSON.toJSONString(info),exception); }else{ logger.info("kafka回調信息:發送內容=【{}】,exception=【{}】,",JSON.toJSONString(info),exception); } if(exception!=null){ stopDeliverAndNotify( info,Constants.SENDTYPE_KAFKA,"", this.deliverInfo.getSendTopic()); } else { //將位置信息保存到內存,異步更新到數據庫 updatePosition(info, this.deliverInfo); } }catch(Exception e){ logger.error("KafkaQueueChannelHandler kafka回調處理失敗:發送內容=【{}】,exception=【{}】",JSON.toJSONString(info),e); stopDeliverAndNotify( info,Constants.SENDTYPE_KAFKA,"", this.deliverInfo.getSendTopic()); } } } }生產者攔截器
生產者攔截器既可以用來在消息發送前做一些準備工作,比如按照某個規則過濾不符合要求的消息、修改消息的內容等,也可以用來在發送回調邏輯前做一些定制化的需求,比如統計類工作。
生產者攔截器的使用也很方便,主要是自定義實現 org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3個方法:
//KafkaProducer在將消息序列化和計算分區之前會調用生產者攔截器的 onSend() 方法來對消息進行相應的定制化操作。public ProducerRecordonSend(ProducerRecord record);//KafkaProducer會在消息被應答(Acknowledgement)之前或消息發送失敗時調用生產者攔截器的onAcknowledgement() 方法,//優先于用戶設定的 Callback 之前執行。public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();添加生產者攔截器
//此參數默認值為""properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());實現
//接口的這3個方法中拋出的異常都會被捕獲并記錄到日志中,但并不會再向上傳遞。public class ProducerInterceptorPrefix implements ProducerInterceptor<String,String>{ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { //更改消息內容 String modifiedValue = "prefix1-" + record.value(); return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} @Override public void close() {} @Override public void configure(Map<String, ?> map) {}}參考鏈接
https://juejin.im/book/5c7d467e5188251b9156fdc0/section/5c7d5391f265da2db7183fe5
總結
以上是生活随笔為你收集整理的post发送byte数组_KAFKA消息发送的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: tomcat linux环境变量,lin
- 下一篇: linux at24测试程序,linux