kafka streams_Kafka REST Proxy for MapR Streams入门
kafka streams
介紹
MapR生態(tài)系統(tǒng)軟件包2.0(MEP)隨附了一些與MapR流有關(guān)的新功能:
- 用于MapR Streams的Kafka REST代理為MapR Streams和Kafka集群提供RESTful接口,以使用和生成消息并執(zhí)行管理操作。
- Kafka Connect for MapR Streams是一個(gè)實(shí)用程序,用于在MapR Streams與Apache Kafka和其他存儲系統(tǒng)之間流式傳輸數(shù)據(jù)。
MapR生態(tài)系統(tǒng)軟件包(MEP)是一種提供與核心升級脫鉤的生態(tài)系統(tǒng)升級的方法-允許您獨(dú)立于聚合數(shù)據(jù)平臺升級工具。 您可以在本文中進(jìn)一步了解MEP 2.0。
在此博客中,我們描述了如何使用REST代理向MapR流發(fā)布消息和使用消息。 REST代理是對MapR融合數(shù)據(jù)平臺的重要補(bǔ)充,允許任何編程語言使用MapR流。
MapR Streams工具隨附的Kafka REST Proxy可以與MapR Streams一起使用(默認(rèn)),也可以與Apache Kafka混合使用。 在本文中,我們將重點(diǎn)介紹MapR流。 <!–更多–>
先決條件
- 具有MEP 2.0的MapR融合數(shù)據(jù)平臺5.2
- 使用MapR Streams工具
- curl,wget或任何HTTP / REST客戶端工具
創(chuàng)建MapR流和主題
流是主題的集合,您可以通過以下方式將其分組管理:
您可以在文檔中找到有關(guān)MapR Streams概念的更多信息。
在您的Mapr群集或沙盒上,運(yùn)行以下命令:
$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3啟動(dòng)Kafka控制臺的生產(chǎn)者和消費(fèi)者
打開兩個(gè)終端窗口,并使用以下命令運(yùn)行使用者的Kafka實(shí)用程序:
消費(fèi)者
- 主題傳感器-json
- 主題傳感器二進(jìn)制
這兩個(gè)終端窗口可讓您查看有關(guān)不同主題的消息
使用Kafka REST代理
檢查主題元數(shù)據(jù)
端點(diǎn)/topics/[topic_name]允許您獲取有關(guān)該主題的一些信息。 在MapR Streams中,主題是路徑標(biāo)識的流的一部分; 要使用REST API使用主題,您必須使用完整路徑,并在URL中進(jìn)行編碼; 例如:
- /apps/iot-stream:sensor-json將使用%2Fapps%2Fiot-stream%3Asensor-json進(jìn)行編碼
運(yùn)行以下命令,以獲取有關(guān)sensor-json主題的信息
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json注意:為簡單起見,我從運(yùn)行Kafka REST代理的節(jié)點(diǎn)上運(yùn)行命令,因此可以使用localhost 。
您可以通過添加以下Python命令,以一種漂亮的方式打印JSON:
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool默認(rèn)流
如上所述,流路徑是您必須在命令中使用的主題名稱的一部分。 但是可以將MapR Kafka REST代理配置為使用默認(rèn)流。 為此,您應(yīng)該在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下屬性:
- streams.default.stream=/apps/iot-stream更改Kafka REST代理配置時(shí),必須使用maprcli或MCS重新啟動(dòng)服務(wù)。使用streams.default.stream屬性的streams.default.stream是簡化URL使用的URL。以應(yīng)用為例
- 通過streams.default.stream ,可以使用curl -X GET http://localhost:8082/topics/
在本文中,所有URL都包含編碼的流名稱,就像您可以開始使用Kafka REST代理而無需更改配置,也可以將其用于其他流。
發(fā)布消息
用于MapR流的Kafka REST代理允許應(yīng)用程序?qū)⑾l(fā)布到MapR流。 消息可以作為JSON或二進(jìn)制內(nèi)容(base64編碼)發(fā)送。
要發(fā)送JSON消息:
- 查詢應(yīng)該是HTTP POST
- 內(nèi)容類型應(yīng)為: application/vnd.kafka.json.v1+json
- 身體:
完整的請求是:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"} }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json您應(yīng)該在運(yùn)行/apps/iot-stream:sensor-json使用者的終端窗口中看到打印的消息。
要發(fā)送二進(jìn)制消息:
- 查詢應(yīng)該是HTTP POST
- 內(nèi)容類型應(yīng)為: application/vnd.kafka.binary.v1+json
- 身體:
請注意, SGVsbG8gV29ybGQ=是在Base64中編碼的字符串“ Hello World”。
完整的請求是:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary您應(yīng)該在/apps/iot-stream:sensor-binary使用者正在運(yùn)行的終端窗口中看到打印的消息。
發(fā)送多條消息
HTTP正文的records字段允許您發(fā)送多個(gè)消息,例如,您可以發(fā)送:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"} }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"} } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json該命令將發(fā)送2條消息,并將偏移量增加2。您可以對二進(jìn)制內(nèi)容執(zhí)行相同的操作,只需在JSON數(shù)組中添加新元素即可; 例如:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary您可能知道,可以為消息設(shè)置密鑰,以確保所有具有相同密鑰的消息都將到達(dá)同一分區(qū)。 為此,將key屬性添加到消息中,如下所示:
{"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"} }] }既然您知道如何使用REST代理將消息發(fā)布到MapR Stream主題,那么讓我們看看如何使用消息。
消費(fèi)信息
REST代理還可以用于消費(fèi)主題消息。 為此,您需要:
創(chuàng)建使用者實(shí)例
以下請求創(chuàng)建使用者實(shí)例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json服務(wù)器的響應(yīng)如下所示:
{"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer" }請注意,我們已經(jīng)使用/consumers/[topic_name]創(chuàng)建使用者。
后續(xù)請求將使用base_uri從主題獲取消息。 與任何MapR Streams / Kafka使用者一樣, auto.offset.reset定義其行為。 在此示例中,該值設(shè)置為earliest ,這意味著使用者將從頭開始閱讀消息。 您可以在MapR Streams文檔中找到有關(guān)使用者配置的更多信息。
消費(fèi)信息
要使用這些消息,只需將Mapr Streams主題添加到使用者實(shí)體的URL。
以下請求使用了該主題的消息:
curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json此調(diào)用返回JSON文檔中的消息:
[{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3} ]每次對API的調(diào)用都會(huì)根據(jù)上一次調(diào)用的偏移量返回發(fā)布的新消息。
請注意,消費(fèi)者將被銷毀:
- 由consumer.instance.timeout.ms實(shí)例。超時(shí)。毫秒設(shè)置的空閑時(shí)間后(默認(rèn)值設(shè)置為300000毫秒/ 5分鐘)
- 使用REST API調(diào)用銷毀它(見下文)。
消費(fèi)二進(jìn)制格式的消息
如果需要使用二進(jìn)制消息,則需要更改格式并接受標(biāo)頭,該方法是相同的。
調(diào)用此URL為二進(jìn)制主題創(chuàng)建使用者實(shí)例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary然后使用消息,accept標(biāo)頭設(shè)置為application/vnd.kafka.binary.v1+json :
curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary該調(diào)用返回JSON文檔中的消息,并且該值在Base64中編碼
[{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2} ]刪除使用者實(shí)例
如前所述,將根據(jù)REST Proxy的consumer.instance.timeout.ms配置自動(dòng)銷毀consumer.instance.timeout.ms 。 也可以使用使用者實(shí)例URI和HTTP DELETE調(diào)用銷毀實(shí)例,如下所示:
curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer結(jié)論
在本文中,您學(xué)習(xí)了如何將Kafka REST代理用于MapR流,該代理允許任何應(yīng)用程序使用在MapR聚合數(shù)據(jù)平臺中發(fā)布的消息。
您可以在MapR文檔和以下資源中找到有關(guān)Kafka REST代理的更多信息:
- MapR Streams入門
- Ted Dunning和Ellen Friedman撰寫的“流傳輸體系結(jié)構(gòu):使用Apache Kafka和MapR流的新設(shè)計(jì)”電子書
翻譯自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams.html
kafka streams
總結(jié)
以上是生活随笔為你收集整理的kafka streams_Kafka REST Proxy for MapR Streams入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何进入U盘的pe系统如何进去u盘的pe
- 下一篇: 电脑黑屏怎么解决如何使电脑黑屏