kafka的使用
kafka基于zookeeper。
需要安裝kafka、zookeeper。
安裝方法參考:http://tzz6.iteye.com/blog/2401197
啟動zookeeper:點擊zkServer.cmd啟動zookeeper。
啟動kafka:
如果啟動報錯:
啟動kafka的時候報錯:
ERROR Error while deleting the clean shutdown file in dir E:\kafka_2.11-1.0.0\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: E:\kafka_2.11-1.0.0\tmp\kafka-logs\__consumer_offsets-9\00000000000000000000.timeindex: 另一個程序正在使用此文件,進程無法訪問。
解決辦法:
刪除日志:
日志的路徑在kafka的文件中找到server.properties:log.dirs=/tmp/kafka-logs。刪除tmp文件夾下的kafka-logs文件夾。重啟kafka即可。
kafka配置:
kafka-beans.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> <context:component-scan base-package="com.unionpay.producer"></context:component-scan> <context:component-scan base-package="com.unionpay.consumer"></context:component-scan> <bean id="kafkaProducerDemo" class="com.test.www.unionpay.producer.KafkaProducerDemo"> <property name="properties"> <props> <prop key="topic">my-replicated-topic</prop> <prop key="bootstrap.servers">127.0.0.1:9092</prop> <prop key="acks">all</prop> <prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer </prop> <prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer </prop> <prop key="buffer.memory">33554432</prop> </props> </property> </bean> <bean id="kafkaConsumerDemo" class="com.test.www.unionpay.consumer.KafkaConsumerDemo"> <property name="props"> <props> <prop key="topic">my-replicated-topic</prop> <prop key="bootstrap.servers">127.0.0.1:9092</prop> <prop key="group.id">group1</prop> <prop key="enable.auto.commit">true</prop> <prop key="auto.commit.interval.ms">1000</prop> <prop key="session.timeout.ms">30000</prop> <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer </prop> <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer </prop> </props> </property> </bean> </beans>?redis.properties:
# 控制一個pool可分配多少個jedis實例 redis.pool.maxTotal=1000 # 控制一個pool最多有多少個狀態(tài)為idle(空閑)的jedis實例 redis.pool.maxIdle=200 # 表示當borrow一個jedis實例時,最大的等待時間,如果超過等待時間,則直接拋出JedisConnectionException redis.pool.maxWaitMillis=2000 #在borrow一個jedis實例時,是否提前進行validate操作;如果為true,則得到的jedis實例均是可用的 redis.pool.testOnBorrow=true # redis 單機 # 單機 host jedis.host=127.0.0.1 # 單機 port jedis.port=6379KafkaProducerDemo.java:
package com.test.www.unionpay.producer;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { Properties properties; public KafkaProducerDemo() { } public KafkaProducerDemo(Properties properties) { super(); this.properties = properties; } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public void sendMessage(String msg) { KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> record = new ProducerRecord<String, String>(properties.getProperty("topic"),msg); producer.send(record); producer.close(); } }KafkaConsumerDemo.java:
package com.test.www.unionpay.consumer;import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerDemo { private Properties props; public KafkaConsumerDemo() { } public KafkaConsumerDemo(Properties props) { super(); this.props = props; } public Properties getProps() { return props; } public void setProps(Properties props) { this.props = props; } public String receive(){ KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Arrays.asList(props.getProperty("topic"))); String msg = ""; while(true){ ConsumerRecords<String,String> consumerRecords = consumer.poll(100); for(ConsumerRecord<String, String> consumerRecord:consumerRecords){ msg += consumerRecord.value(); } consumer.close(); return msg; } } }KafkaController.java:
package com.test.www.web.controller;import java.text.SimpleDateFormat; import java.util.Date; import javax.annotation.Resource; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.servlet.ModelAndView; import com.test.www.unionpay.consumer.KafkaConsumerDemo; import com.test.www.unionpay.producer.KafkaProducerDemo; @Controller public class KafkaController { @Resource(name = "kafkaProducerDemo") KafkaProducerDemo producer; @Resource(name = "kafkaConsumerDemo") KafkaConsumerDemo consumer; @RequestMapping(value = "/welcome") public ModelAndView welcome() { System.out.println("--------welcome--------"); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/sendmessage", method = RequestMethod.GET) public ModelAndView sendMessage() { System.out.println("--------sendmessage--------"); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String now = sdf.format(date); ModelAndView mv = new ModelAndView(); mv.addObject("time", now); mv.setViewName("kafka_send"); return mv; } @RequestMapping(value = "/onsend", method = RequestMethod.POST) public ModelAndView onsend(@RequestParam("message") String msg) { System.out.println("--------onsend--------"); producer.sendMessage(msg); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/receive") public ModelAndView receive() { System.out.println("--------receive--------"); String msg = consumer.receive(); ModelAndView mv = new ModelAndView(); mv.addObject("msg", msg); mv.setViewName("kafka_receive"); return mv; } }頁面:
welcome.jsp:
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>welcome</title> </head> <body> <h1>Welcome</h1> <h2><a href="sendmessage.html">Send a Message</a></h2> <h2><a href="receive.html">Get a Message</a></h2> </body> </html>kafka_send.jsp:
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>kafka_send</title> </head> <body> <h1>Send a Message</h1> <form action="onsend.html" method="post"> MessageText:<textarea name="message">${time}</textarea> <br> <input type="submit" value="Submit"> </form> <h2><a href="welcome.html">RETURN HOME</a></h2> </body> </html>kafka_receive.jsp:
<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>kafka_receive</title> </head> <body> <h1>Kafka_Reveive!!!</h1> <h2>Receive Message : ${msg}</h2> <h2><a href="welcome.html">RETURN HOME</a></h2> </body> </html>效果圖:
如圖,kafka發(fā)送消息、接受消息運行成功。
?
轉(zhuǎn)載于:https://www.cnblogs.com/super-chao/p/9304565.html
總結
- 上一篇: Android 常用开源框架源码解析 系
- 下一篇: Java BigDecimal初探