Kafka REST Proxy for MapR Streams入门
介紹
MapR生態系統軟件包2.0(MEP)隨附了一些與MapR流有關的新功能:
- MapR Streams的Kafka REST代理為MapR Streams和Kafka集群提供RESTful接口,以使用和生成消息并執行管理操作。
- Kafka Connect for MapR Streams是一個實用程序,用于在MapR Streams與Apache Kafka和其他存儲系統之間流式傳輸數據。
MapR生態系統軟件包(MEP)是一種提供與核心升級脫鉤的生態系統升級的方法-允許您獨立于聚合數據平臺升級工具。 您可以在本文中進一步了解MEP 2.0。
在此博客中,我們描述了如何使用REST代理向MapR Streams發布消息和從MapR Streams使用消息。 REST代理是對MapR融合數據平臺的重要補充,允許任何編程語言使用MapR流。
MapR Streams工具隨附的Kafka REST Proxy可以與MapR Streams一起使用(默認),也可以與Apache Kafka混合使用。 在本文中,我們將重點介紹MapR流。 <!–更多–>
先決條件
- 具有MEP 2.0的MapR融合數據平臺5.2
- 使用MapR Streams工具
- curl,wget或任何HTTP / REST客戶端工具
創建MapR流和主題
流是主題的集合,您可以通過以下方式將其作為一個組進行管理:
您可以在文檔中找到有關MapR Streams概念的更多信息。
在您的Mapr群集或沙盒上,運行以下命令:
$ maprcli stream create -path /apps/iot-stream -produceperm p -consumeperm p -topicperm p$ maprcli stream topic create -path /apps/iot-stream -topic sensor-json -partitions 3$ maprcli stream topic create -path /apps/iot-stream -topic sensor-binary -partitions 3啟動Kafka控制臺的生產者和消費者
打開兩個終端窗口,并使用以下命令運行使用者的Kafka實用程序:
消費者
- 主題傳感器-json
- 主題傳感器二進制
這兩個終端窗口可讓您查看有關不同主題的消息
使用Kafka REST代理
檢查主題元數據
端點/topics/[topic_name]允許您獲取有關該主題的一些信息。 在MapR Streams中,主題是路徑標識的流的一部分; 要使用REST API使用主題,您必須使用完整路徑,并將其編碼在URL中; 例如:
- /apps/iot-stream:sensor-json將使用%2Fapps%2Fiot-stream%3Asensor-json進行編碼
運行以下命令,以獲取有關sensor-json主題的信息
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json注意:為簡單起見,我從運行Kafka REST代理的節點上運行命令,因此可以使用localhost 。
您可以通過添加以下Python命令,以一種漂亮的方式打印JSON:
$ curl -X GET http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json | python -m json.tool默認流
如上所述,流路徑是您必須在命令中使用的主題名稱的一部分。 但是可以將MapR Kafka REST代理配置為使用默認流。 為此,您應該在/opt/mapr/kafka-rest/kafka-rest-2.0.1/config/kafka-rest.properties文件中添加以下屬性:
- streams.default.stream=/apps/iot-stream更改Kafka REST代理配置時,必須使用maprcli或MCS重新啟動服務。使用streams.default.stream屬性的streams.default.stream是簡化URL使用的URL。以應用為例
- 通過streams.default.stream ,可以使用curl -X GET http://localhost:8082/topics/
在本文中,所有URL都包含編碼的流名稱,就像您可以開始使用Kafka REST代理而無需更改配置,也可以將其用于其他流。
發布消息
用于MapR流的Kafka REST代理允許應用程序將消息發布到MapR流。 消息可以作為JSON或二進制內容(base64編碼)發送。
要發送JSON消息:
- 查詢應該是HTTP POST
- 內容類型應為: application/vnd.kafka.json.v1+json
- 身體:
完整的請求是:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 10 , "speed" : 40 , "direction" : "NW"} }]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json您應該在運行/apps/iot-stream:sensor-json使用者的終端窗口中看到打印的消息。
要發送二進制消息:
- 查詢應該是HTTP POST
- 內容類型應為: application/vnd.kafka.binary.v1+json
- 身體:
請注意, SGVsbG8gV29ybGQ=是在Base64中編碼的字符串“ Hello World”。
完整的請求是:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary您應該在運行/apps/iot-stream:sensor-binary使用者的終端窗口中看到打印的消息。
發送多條消息
HTTP正文的records字段允許您發送多個消息,例如,您可以發送:
curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" \--data '{"records":[{"value": {"temp" : 12 , "speed" : 42 , "direction" : "NW"} }, {"value": {"temp" : 10 , "speed" : 37 , "direction" : "N"} } ]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-json該命令將發送2條消息,并將偏移量增加2。您可以對二進制內容執行相同的操作,只需在JSON數組中添加新元素即可; 例如:
curl -X POST -H "Content-Type: application/vnd.kafka.binary.v1+json" \--data '{"records":[{"value":"SGVsbG8gV29ybGQ="}, {"value":"Qm9uam91cg=="}]}' \http://localhost:8082/topics/%2Fapps%2Fiot-stream%3Asensor-binary您可能知道,可以為消息設置密鑰,以確保所有具有相同密鑰的消息都將到達同一分區。 為此,將key屬性添加到消息中,如下所示:
{"records":[{"key": "K001","value":{"temp" : 10 ,"speed" : 40 ,"direction" : "NW"} }] }既然您知道如何使用REST代理將消息發布到MapR Stream主題,那么讓我們看看如何使用消息。
消費信息
REST代理還可以用于消費主題消息。 為此,您需要:
創建使用者實例
以下請求創建使用者實例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_json_consumer", "format": "json", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json服務器的響應如下所示:
{"instance_id":"iot_json_consumer","base_uri":"http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer" }請注意,我們已使用/consumers/[topic_name]創建使用者。
后續請求將使用base_uri從主題獲取消息。 與任何MapR Streams / Kafka使用者一樣, auto.offset.reset定義其行為。 在此示例中,該值設置為earliest ,這意味著使用者將從頭開始閱讀消息。 您可以在MapR Streams文檔中找到有關使用者配置的更多信息。
消費信息
要使用這些消息,只需將Mapr Streams主題添加到使用者實體的URL。
以下請求使用了該主題的消息:
curl -X GET -H "Accept: application/vnd.kafka.json.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-json/instances/iot_json_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-json此調用返回JSON文檔中的消息:
[{"key":null,"value":{"temp":10,"speed":40,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":1},{"key":null,"value":{"temp":12,"speed":42,"direction":"NW"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":2},{"key":null,"value":{"temp":10,"speed":37,"direction":"N"},"topic":"/apps/iot-stream:sensor-json","partition":1,"offset":3} ]每次對API的調用都會根據上一次調用的偏移量返回發布的新消息。
請注意,消費者將被銷毀:
- 由consumer.instance.timeout.ms實例。超時。毫秒設置的空閑時間后(默認值設置為300000毫秒/ 5分鐘)
- 使用REST API調用銷毀它(見下文)。
消費二進制格式的消息
如果需要使用二進制消息,則需要更改格式并接受標頭,該方法是相同的。
調用此URL為二進制主題創建使用者實例:
curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \--data '{"name": "iot_binary_consumer", "format": "binary", "auto.offset.reset": "earliest"}' \http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary然后使用消息,accept標頭設置為application/vnd.kafka.binary.v1+json :
curl -X GET -H "Accept: application/vnd.kafka.binary.v1+json" \ http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer/topics/%2Fapps%2Fiot-stream%3Asensor-binary該調用返回JSON文檔中的消息,并且該值在Base64中編碼
[{"key":null,"value":"SGVsbG8gV29ybGQ=","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":1},{"key":null,"value":"Qm9uam91cg==","topic":"/apps/iot-stream:sensor-binary","partition":1,"offset":2} ]刪除使用者實例
如前所述,使用者將根據REST Proxy的consumer.instance.timeout.ms配置自動銷毀。 也可以使用使用者實例URI和HTTP DELETE調用銷毀實例,如下所示:
curl -X DELETE http://localhost:8082/consumers/%2Fapps%2Fiot-stream%3Asensor-binary/instances/iot_binary_consumer結論
在本文中,您學習了如何對MapR流使用Kafka REST代理,該代理允許任何應用程序使用在MapR融合數據平臺中發布的消息。
您可以在MapR文檔和以下資源中找到有關Kafka REST代理的更多信息:
- MapR Streams入門
- Ted Dunning和Ellen Friedman撰寫的“流式傳輸體系結構:使用Apache Kafka和MapR流的新設計”電子書
翻譯自: https://www.javacodegeeks.com/2017/01/getting-started-kafka-rest-proxy-mapr-streams.html
總結
以上是生活随笔為你收集整理的Kafka REST Proxy for MapR Streams入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑excel快捷键图解(excel快捷
- 下一篇: apache hadoop_春天遇见Ap