Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量)
生活随笔
收集整理的這篇文章主要介紹了
Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Java使用kafka的API來監控kafka的某些topic的數據量增量,offset,定時查總量之后,然后計算差值,然后就可以算單位間隔的每個topic的增量,kafka監控一般都是監控的吞吐量,即數據量的大小,而不在意這個count,數量。額,這個就是在意count。統計一下count。總結最近的偏移量---Summed Recent Offsets.
使用的jar依賴
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'Java代碼
import com.google.common.collect.Lists; import com.google.common.collect.Maps; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.Date; import java.util.List; import java.util.Map;/*** kafka監控 topic的數據消費情況** @author LiXuekai on 2020/9/16*/ public class KafkaMonitorTools {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMonitorTools.class);public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = Maps.newHashMap();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);OffsetResponse response = consumer.getOffsetsBefore(request);if (response.hasError()) {LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));return 0;}long[] offsets = response.offsets(topic, partition);return offsets[0];}/*** @param brokers broker 地址* @param topic topic* @return map<分區, 分區count信息>*/public static Map<Integer, PartitionMetadata> findLeader(List<String> brokers, String topic) {Map<Integer, PartitionMetadata> map = Maps.newHashMap();for (String broker : brokers) {SimpleConsumer consumer = null;try {String[] hostAndPort = broker.split(":");consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());List<String> topics = Lists.newArrayList(topic);TopicMetadataRequest req = new TopicMetadataRequest(topics);kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);List<TopicMetadata> metaData = resp.topicsMetadata();for (TopicMetadata item : metaData) {for (PartitionMetadata part : item.partitionsMetadata()) {map.put(part.partitionId(), part);}}} catch (Exception e) {LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);} finally {if (consumer != null)consumer.close();}}return map;}public static Map<String, Long> monitor(List<String> brokers, List<String> topics) {if (brokers == null || brokers.isEmpty()) {return null;}if (topics == null || topics.isEmpty()) {return null;}Map<String, Long> map = Maps.newTreeMap();for (String topicName : topics) {Map<Integer, PartitionMetadata> metadata = findLeader(brokers, topicName);long size = 0L;for (Map.Entry<Integer, PartitionMetadata> entry : metadata.entrySet()) {int partition = entry.getKey();String leadBroker = entry.getValue().leader().host();String clientName = "Client_" + topicName + "_" + partition;SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);long readOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);size += readOffset;consumer.close();}map.put(topicName, size);}return map;} }測試代碼:
private final String topics = "a,b,c,d,e,f";private final String server = "1.1.1.11:92";@Testpublic void monitor() {Map<String, Long> monitor = KafkaMonitorTools.monitor(Lists.newArrayList(server), Lists.newArrayList(topics.split(",")));monitor.forEach((k, v)-> System.out.println("topic:" + k + " \tSummed Recent Offsets:" + v));}使用的卡夫卡版本的截圖
總結
以上是生活随笔為你收集整理的Java kafka监控 topic的数据量count情况,每个topic的Summed Recent Offsets(总结最近的偏移量)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Windows 平台安装 MongoDB
- 下一篇: matlab wolfe准则,[原创]用