kafka原理_kafka入门(原理搭建简单使用)
前言
???????公司在用kafka接受和發(fā)送數(shù)據(jù),自己學習過Rabbitmq,不懂kafka挺不爽的,說干就干!網(wǎng)上找了許多帖子,學習了很多,小小的demo自己也搭建起來了,美滋滋,下面我認為優(yōu)秀的網(wǎng)站和自己的步驟展現(xiàn)給大家。
一、kafka介紹與原理
我們將消息的發(fā)布(publish)稱作 producer,將消息的訂閱(subscribe)表述為 consumer,將中間的存儲陣列稱作 broker(代理),這樣就可以大致描繪出這樣一個場面:
生產(chǎn)者將數(shù)據(jù)生產(chǎn)出來,交給 broker 進行存儲,消費者需要消費數(shù)據(jù)了,就從broker中去拿出數(shù)據(jù)來,然后完成一系列對數(shù)據(jù)的處理操作。
乍一看返也太簡單了,不是說了它是分布式嗎,難道把 producer、 broker 和 consumer 放在三臺不同的機器上就算是分布式了嗎。看 kafka 官方給出的圖:
多個 broker 協(xié)同合作,producer 和 consumer 部署在各個業(yè)務邏輯中被頻繁的調(diào)用,三者通過 zookeeper管理協(xié)調(diào)請求和轉(zhuǎn)發(fā)。這樣一個高性能的分布式消息發(fā)布訂閱系統(tǒng)就完成了。
圖上有個細節(jié)需要注意,producer 到 broker 的過程是 push,也就是有數(shù)據(jù)就推送到 broker,而 consumer 到 broker 的過程是 pull,是通過 consumer 主動去拉數(shù)據(jù)的,而不是 broker 把數(shù)據(jù)主懂發(fā)送到 consumer 端的。
二、kafka的linux基本搭建
??????此處還是給大家一個鏈接很是優(yōu)秀!kafka的linux搭建
注意:kafka依賴于zookeeper的節(jié)點,需要搭建zookeeper,linux安裝zookeeper文章中有鏈接(kafaka高版本也自帶zookeeper,其實博主用kafka自帶的zookeeper啟動,然后啟動kafka沒成功,用安裝的zookeeper就成功了…)
三、springboot整合kafka的簡單demo
1.引入依賴
在springboot項目中的pom.xml引入下列依賴:
<dependency> <groupId>org.springframework.kafkagroupId> <artifactId>spring-kafkaartifactId> <version>2.2.6.RELEASEversion> dependency> <dependency> <groupId>org.apache.kafkagroupId> <artifactId>kafka-clientsartifactId> <version>2.1.0version> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <optional>trueoptional> dependency>2.yml配置文件
配置如下:
spring: kafka: bootstrap-servers: 192.168.200.130:9092 #此處是我虛擬機上linux的ip kafak的默認端口為9092 producer: #生產(chǎn)者 acks: 1 client-id: kafka-producer batch-size: 5 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: #消費者 group-id: hello-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer application: name: kafkatestserver: port: 9192eureka: #eureka注冊中心配置 client: service-url: defaultZone: http://127.0.0.1:6868/eureka instance: prefer-ip-address: true?是不是依賴沒注釋看不懂ennnnn,別急嘛,那我來搬運點詳細的注釋解釋(我的demo就是上面的)
???????????????????????????????????????????????producer
#============== kafka ===================# 指定kafka server的地址,集群配多個,中間,逗號隔開spring.kafka.bootstrap-servers=127.0.0.1:9092#=============== provider =======================# 寫入失敗時,重試次數(shù)。當leader節(jié)點失效,一個repli節(jié)點會替代成為leader節(jié)點,此時可能出現(xiàn)寫入失敗,# 當retris為0時,produce不會重復。retirs重發(fā),此時repli節(jié)點完全成為leader節(jié)點,不會產(chǎn)生消息丟失。spring.kafka.producer.retries=0# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送spring.kafka.producer.batch-size=16384# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達到buffer.memory就發(fā)送數(shù)據(jù)spring.kafka.producer.buffer-memory=33554432#procedure要求leader在考慮完成請求之前收到的確認數(shù),用于控制發(fā)送記錄在服務端的持久化,其值可以為如下:#acks = 0 如果設置為零,則生產(chǎn)者將不會等待來自服務器的任何確認,該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務器已收到記錄,并且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。#acks = 1 這意味著leader會將記錄寫入其本地日志,但無需等待所有副本服務器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄后立即失敗,但在將數(shù)據(jù)復制到所有的副本服務器之前,則記錄將會丟失。#acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本服務器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當于acks = -1的設置。#可以設置的值為:all, -1, 0, 1spring.kafka.producer.acks=1# 指定消息key和消息體的編解碼方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?consumer
#=============== consumer =======================# 指定默認消費者group id --> 由于在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設置組名spring.kafka.consumer.group-id=testGroup# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設置smallestspring.kafka.consumer.auto-offset-reset=earliest# enable.auto.commit:true --> 設置自動提交offsetspring.kafka.consumer.enable-auto-commit=true#如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。spring.kafka.consumer.auto-commit-interval=100# 指定消息key和消息體的編解碼方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer3.Controller
import lombok.AllArgsConstructor;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;@RestController@AllArgsConstructor@RequestMapping("Kafka")public class SimpleController { private final KafkaTemplate<Object,Object> kafkaTemplate; @GetMapping("/send/{message}") public String send(@PathVariable("message") String message){ kafkaTemplate.send("topic1","topic1:"+message); kafkaTemplate.send("topic2","topic2:"+message); return message; }}4.Listener
import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Component;@Componentpublic class SimpleListener { @KafkaListener(topics = {"topic1","topic2"}) public void listen1(String data){ System.out.println(data); }}5.訪問
地址欄 輸入:http://localhost:9192/Kafka/send/helloKafaka (9192端口是我的boot項目的端口,看官根據(jù)自己的項目更改)
6.結(jié)果
控制臺輸出:
四,搭建的坑
我搭建使用的是虛擬機上的linux所以要么開放kafka端口要么關(guān)閉防火墻如果不開放項目啟動的時候報錯:Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
點擊解決方案(找了好久還是大佬厲害)
解決方案
總結(jié)
??????????以上就是今天要展示的kafka入門,雖然內(nèi)容不多,但是入門了對吧,更深層次的原理及其使用場景(大數(shù)據(jù)等等)還等著我們?nèi)グl(fā)掘呢,給我的感覺就是基本的會了,但是牛批的場景使用還是不會,就像高數(shù)一樣同樣學的高數(shù)課本但是有的題你不會人家會,而且還能會出花樣來,所以多接觸優(yōu)秀的事物,多學習,多總結(jié)成就優(yōu)秀的你,加油,陌生人,越努力越幸運!
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的kafka原理_kafka入门(原理搭建简单使用)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 对于频繁的写数据处理方式
- 下一篇: git 常用命令笔记