漫游Kafka实战篇之客户端编程实例
生活随笔
收集整理的這篇文章主要介紹了
漫游Kafka实战篇之客户端编程实例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
原文地址:http://blog.csdn.net/honglei915/article/details/37697655
Kafka視頻教程同步首發,歡迎觀看!
Kafka Producer APIs
新版的Producer API提供了以下功能:
新的api完整實例如下:
package com.cuicui.kafkademon;import java.util.ArrayList; import java.util.List; import java.util.Properties;import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;/*** @author <a href="mailto:leicui001@126.com">崔磊</a>* @date 2015年11月4日 上午11:44:15*/ public class MyProducer {public static void main(String[] args) throws InterruptedException {Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", KafkaProperties.BROKER_CONNECT);props.put("partitioner.class", "com.cuicui.kafkademon.MyPartitioner");props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);Producer<String, String> producer = new Producer<String, String>(config);// 單個發送for (int i = 0; i <= 1000000; i++) {KeyedMessage<String, String> message =new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);producer.send(message);Thread.sleep(5000);}// 批量發送List<KeyedMessage<String, String>> messages = new ArrayList<KeyedMessage<String, String>>(100);for (int i = 0; i <= 10000; i++) {KeyedMessage<String, String> message =new KeyedMessage<String, String>(KafkaProperties.TOPIC, i + "", "Message" + i);messages.add(message);if (i % 100 == 0) {producer.send(messages);messages.clear();}}producer.send(messages);} }
下面這個是用到的分區函數:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties;public class MyPartitioner implements Partitioner {public MyPartitioner(VerifiableProperties props) {}/** @see kafka.producer.Partitioner#partition(java.lang.Object, int)*/@Overridepublic int partition(Object key, int partitionCount) {return Integer.valueOf((String) key) % partitionCount;} }
KafKa Consumer APIs
Consumer API有兩個級別。低級別的和一個指定的broker保持連接,并在接收完消息后關閉連接,這個級別是無狀態的,每次讀取消息都帶著offset。 高級別的API隱藏了和brokers連接的細節,在不必關心服務端架構的情況下和服務端通信。還可以自己維護消費狀態,并可以通過一些條件指定訂閱特定的topic,比如白名單黑名單或者正則表達式。
低級別的API
package com.cuicui.kafkademon;import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map;import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.cluster.Broker; import kafka.common.TopicAndPartition; import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetRequest; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.TopicMetadataResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset;/*** offset自己維護 目標topic、partition均由自己分配*?* @author <a href="mailto:leicui001@126.com">崔磊</a>* @date 2015年11月4日 上午11:44:15**/ public class MySimpleConsumer {public static void main(String[] args) {new MySimpleConsumer().consume();}/*** 消費消息*/public void consume() {int partition = 0;// 找到leaderBroker leaderBroker = findLeader(KafkaProperties.BROKER_CONNECT, KafkaProperties.TOPIC, partition);// 從leader消費SimpleConsumer simpleConsumer =new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");long startOffet = 1;int fetchSize = 1000;while (true) {long offset = startOffet;// 添加fetch指定目標tipic,分區,起始offset及fetchSize(字節),可以添加多個fetchFetchRequest req =new FetchRequestBuilder().addFetch(KafkaProperties.TOPIC, 0, startOffet, fetchSize).build();// 拉取消息FetchResponse fetchResponse = simpleConsumer.fetch(req);ByteBufferMessageSet messageSet = fetchResponse.messageSet(KafkaProperties.TOPIC, partition);for (MessageAndOffset messageAndOffset : messageSet) {Message mess = messageAndOffset.message();ByteBuffer payload = mess.payload();byte[] bytes = new byte[payload.limit()];payload.get(bytes);String msg = new String(bytes);offset = messageAndOffset.offset();System.out.println("partition : " + 3 + ", offset : " + offset + " ?mess : " + msg);}// 繼續消費下一批startOffet = offset + 1;}}/*** 找到制定分區的leader broker*?* @param brokerHosts broker地址,格式為:“host1:port1,host2:port2,host3:port3”* @param topic topic* @param partition 分區* @return*/public Broker findLeader(String brokerHosts, String topic, int partition) {Broker leader = findPartitionMetadata(brokerHosts, topic, partition).leader();System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(),leader.port()));return leader;}/*** 找到指定分區的元數據*?* @param brokerHosts broker地址,格式為:“host1:port1,host2:port2,host3:port3”* @param topic topic* @param partition 分區* @return 元數據*/private PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {PartitionMetadata returnMetaData = null;for (String brokerHost : brokerHosts.split(",")) {SimpleConsumer consumer = null;String[] splits = brokerHost.split(":");consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");List<String> topics = Collections.singletonList(topic);TopicMetadataRequest request = new TopicMetadataRequest(topics);TopicMetadataResponse response = consumer.send(request);List<TopicMetadata> topicMetadatas = response.topicsMetadata();for (TopicMetadata topicMetadata : topicMetadatas) {for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {if (PartitionMetadata.partitionId() == partition) {returnMetaData = PartitionMetadata;}}}if (consumer != null)consumer.close();}return returnMetaData;}/*** 根據時間戳找到某個客戶端消費的offset*?* @param consumer SimpleConsumer* @param topic topic* @param partition 分區* @param clientID 客戶端的ID* @param whichTime 時間戳* @return offset*/public long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientID, long whichTime) {TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientID);OffsetResponse response = consumer.getOffsetsBefore(request);long[] offsets = response.offsets(topic, partition);return offsets[0];} } 低級別的API是高級別API實現的基礎,也是為了一些對維持消費狀態有特殊需求的場景,比如Hadoop consumer這樣的離線consumer。
高級別的API
package com.cuicui.kafkademon;import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;/*** offset在zookeeper中記錄,以group.id為key 分區和customer的對應關系由Kafka維護* * @author <a href="mailto:leicui001@126.com">崔磊</a>* @date 2015年11月4日 上午11:44:15*/ public class MyHighLevelConsumer {/*** 該consumer所屬的組ID*/private String groupid;/*** 該consumer的ID*/private String consumerid;/*** 每個topic開幾個線程?*/private int threadPerTopic;public MyHighLevelConsumer(String groupid, String consumerid, int threadPerTopic) {super();this.groupid = groupid;this.consumerid = consumerid;this.threadPerTopic = threadPerTopic;}public void consume() {Properties props = new Properties();props.put("group.id", groupid);props.put("consumer.id", consumerid);props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);props.put("zookeeper.session.timeout.ms", "60000");props.put("zookeeper.sync.time.ms", "2000");// props.put("auto.commit.interval.ms", "1000");ConsumerConfig config = new ConsumerConfig(props);ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);Map<String, Integer> topicCountMap = new HashMap<String, Integer>();// 設置每個topic開幾個線程topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);// 獲取streamMap<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);// 為每個stream啟動一個線程消費消息for (KafkaStream<byte[], byte[]> stream : streams.get(KafkaProperties.TOPIC)) {new MyStreamThread(stream).start();}}/*** 每個consumer的內部線程* * @author cuilei05**/private class MyStreamThread extends Thread {private KafkaStream<byte[], byte[]> stream;public MyStreamThread(KafkaStream<byte[], byte[]> stream) {super();this.stream = stream;}@Overridepublic void run() {ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();// 逐條處理消息while (streamIterator.hasNext()) {MessageAndMetadata<byte[], byte[]> message = streamIterator.next();String topic = message.topic();int partition = message.partition();long offset = message.offset();String key = new String(message.key());String msg = new String(message.message());// 在這里處理消息,這里僅簡單的輸出// 如果消息消費失敗,可以將已上信息打印到日志中,活著發送到報警短信和郵件中,以便后續處理System.out.println("consumerid:" + consumerid + ", thread : " + Thread.currentThread().getName()+ ", topic : " + topic + ", partition : " + partition + ", offset : " + offset + " , key : "+ key + " , mess : " + msg);}}}public static void main(String[] args) {String groupid = "myconsumergroup";MyHighLevelConsumer consumer1 = new MyHighLevelConsumer(groupid, "myconsumer1", 3);MyHighLevelConsumer consumer2 = new MyHighLevelConsumer(groupid, "myconsumer2", 3);consumer1.consume();consumer2.consume();} } 這個API圍繞著由KafkaStream實現的迭代器展開,每個流代表一系列從一個或多個分區多和broker上匯聚來的消息,每個流由一個線程處理,所以客戶端可以在創建的時候通過參數指定想要幾個流。一個流是多個分區多個broker的合并,但是每個分區的消息只會流向一個流。每調用一次createMessageStreams都會將consumer注冊到topic上,這樣consumer和brokers之間的負載均衡就會進行調整。API鼓勵每次調用創建更多的topic流以減少這種調整。createMessageStreamsByFilter方法注冊監聽可以感知新的符合filter的tipic。
總結
以上是生活随笔為你收集整理的漫游Kafka实战篇之客户端编程实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 漫游Kafka设计篇之主从同步
- 下一篇: 漫游Kafka实现篇之消息和日志