kafka Windows客户端Linux服务器---转
原文:http://blog.csdn.net/jingshuigg/article/details/25001979
一、對(duì)于服務(wù)器端的搭建可以參考上一篇文章:kafka單機(jī)版環(huán)境搭建與測(cè)試
服務(wù)器端IP :10.0.30.221
運(yùn)行環(huán)境的目錄如下:
?
需要改動(dòng)config文件夾下的server.properties中的以下兩個(gè)屬性
zookeeper.connect=localhost:2181改成zookeeper.connect=00.00.00.01 (IP地址):2181
以及默認(rèn)注釋掉的
#host.name=localhost改成host.name=00.00.00.01 (IP地址)
host.name不更改會(huì)造成客戶端報(bào)如下的錯(cuò)誤
Exception in thread "Thread-0" kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.Producer.send(Producer.scala:76) at kafka.javaapi.producer.Producer.send(Producer.scala:33) at its.kafka.Producer.run(Producer.java:46)上述步驟完成以后,按照《kafka單機(jī)版環(huán)境搭建與測(cè)試》中的方法啟動(dòng)zookeeper-server和kafka-server即可
?
二、客戶端搭建
客戶端使用的win7系統(tǒng),在Eclipse中連接服務(wù)器
1.在eclipse下新建工程kafka_producer,目錄如下:
注意:將config文件夾下的log4j.properties文件放在src下,這樣才起作用,可以觀測(cè)到日志信息
producer的代碼如下:
import java.util.Properties; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;public class Producer extends Thread{private final kafka.javaapi.producer.Producer<Integer, String> producer;private final String topic;private final String name;private final int numsOfMessage;private final Properties props = new Properties();public Producer(String name,String topic,int numsOfMessage){props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "10.0.30.221:9092");//異步發(fā)送//props.put("producer.type", "async");//每次發(fā)送多少條//props.put("batch.num.messages", "100");producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));this.topic = topic;this.name = name;this.numsOfMessage = numsOfMessage;}public void run() {int messageNo = 1;while(messageNo <= numsOfMessage) { //每個(gè)生產(chǎn)者生產(chǎn)的消息數(shù);String message = new String(name+"'s Message_" + messageNo+"******");KeyedMessage<Integer, String> messageForSend = new KeyedMessage<Integer, String>(topic, message);producer.send(messageForSend);messageNo++;}producer.close();} }啟動(dòng)producer的代碼如下:
?
import java.util.concurrent.TimeUnit;public class KafkaProducerDemo implements KafkaProperties{ public static void main(String[] args){StartThread(1,"testTopic",10);}/*** @param numsOfProducer 生產(chǎn)者的數(shù)目* @param topic 消息的主題* @param numsOfMessage 每個(gè)生產(chǎn)者生產(chǎn)的消息樹(shù)* @return */public static void StartThread(int numsOfProducer,String topic,int numsOfMessage){for(int i = 1; i <= numsOfProducer; i ++ ){String name = "Producer" + i;new Producer(name,topic,numsOfMessage).start(); }} }?
2.在eclipse下新建kafka_consumer工程,目錄如下:
?
consumer代碼如下:
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;public class Consumer extends Thread {private final ConsumerConnector consumer;private final String topic;private final String name;public Consumer(String name,String topic){consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());this.topic = topic;this.name = name;}private static ConsumerConfig createConsumerConfig(){Properties props = new Properties();props.put("zookeeper.connect", KafkaProperties.zkConnect);props.put("group.id", KafkaProperties.groupId);props.put("zookeeper.session.timeout.ms", "60000");props.put("zookeeper.sync.time.ms", "200");props.put("auto.commit.interval.ms", "1000");//每次最少接收的字節(jié)數(shù),默認(rèn)是1//props.put("fetch.min.bytes", "1024");//每次最少等待時(shí)間,默認(rèn)是100//props.put("fetch.wait.max.ms", "600000");return new ConsumerConfig(props);}public void run() {Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();while(it.hasNext()){System.out.println("************"+name+" gets "+new String(it.next().message()));}} }啟動(dòng)consumer的代碼:
?
public class KafkaConsumerDemo implements KafkaProperties {public static void main(String[] args){//Consumer1Consumer consumerThread1 = new Consumer("Consumer1",KafkaProperties.topic);consumerThread1.start();} }properties的代碼(為了傳遞屬性值,當(dāng)然也可以是xml提供屬性值):
public interface KafkaProperties{final static String zkConnect = "10.0.30.221:2181"; final static String groupId = "group1";final static String topic = "testTopic";final static String kafkaServerURL = "10.0.30.221";final static int kafkaServerPort = 9092;final static int kafkaProducerBufferSize = 64*1024;final static int connectionTimeOut = 100000;final static int reconnectInterval = 10000;final static String clientId = "SimpleConsumerDemoClient"; }?
3.啟動(dòng)consumer然后再啟動(dòng)producer,則在eclipse的Console窗口中觀察到:
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4201875.html
總結(jié)
以上是生活随笔為你收集整理的kafka Windows客户端Linux服务器---转的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: windows7 'telnet'不是内
- 下一篇: spring beans源码解读之--B