iref streams_如何利用Neo4j Streams并建立即时数据仓库
iref streams
by Andrea Santurbano
通過安德里亞·桑圖爾巴諾(Andrea Santurbano)
如何利用Neo4j Streams并建立即時數據倉庫 (How to leverage Neo4j Streams and build a just-in-time data warehouse)
In this article, we’ll show how to create a Just-In-Time Data Warehouse by using Neo4j and the Neo4j Streams module with Apache Spark’s Structured Streaming Apis and Apache Kafka.
在本文中,我們將展示如何通過將Neo4j 和Neo4j Streams模塊與Apache Spark的結構化流Apis和Apache Kafka結合使用來創建即時數據倉庫 。
In order to show how to integrate them, simplify the integration, and let you test the whole project by hand, I’ll use Apache Zeppelin a notebook runner that simply allows to natively interact with Neo4j.
在 為了展示如何集成它們,簡化集成并讓您手動測試整個項目,我將使用Apache Zeppelin 一個筆記本運行器,該筆記本運行器僅允許與Neo4j進行本機交互 。
利用Neo4j流 (Leveraging Neo4j Streams)
The Neo4j Streams project is composed of three main pillars:
Neo4j Streams項目由三個主要Struts組成:
The Change Data Capture (the subject of this first article) that allows us to stream database changes over Kafka topics
更改數據捕獲 (第一篇文章的主題)使我們能夠通過Kafka主題流式傳輸數據庫更改
The Sink that allows consuming data streams from the Kafka topic
接收器可以使用來自Kafka主題的數據流
A set of procedures that allows us to Produce/Consume data to/from Kafka Topics
一套程序 ,使我們可以向/從卡夫卡主題產生/消費數據
什么是變更數據捕獲? (What is a Change Data Capture?)
It’s a system that automatically captures changes from a source system (a Database, for instance) and automatically provides these changes to downstream systems for a variety of use cases.
它是一個系統,可自動捕獲源系統(例如數據庫)中的更改,并針對各種用例自動將這些更改提供給下游系統。
CDC typically forms part of an ETL pipeline. This is an important component for ensuring Data Warehouses (DWH) are kept up to date with any record changes.
CDC通常構成ETL管道的一部分。 這是確保數據倉庫(DWH)與任何記錄更改保持最新的重要組件。
Also traditionally CDC applications used to work off of transaction logs, thereby allowing us to replicate databases without having much of a performance impact on its operation.
傳統上,CDC應用程序還用于處理事務日志,從而使我們能夠復制數據庫,而不會對其數據庫性能產生很大影響。
Neo4j Streams CDC模塊如何處理數據庫更改? (How does the Neo4j Streams CDC module deal with database changes?)
Every transaction inside Neo4j gets captured and transformed in order to stream an atomic element of the transaction.
Neo4j內部的每個事務都將被捕獲和轉換,以流式傳輸事務的原子元素。
Let’s suppose we have a simple creation of two nodes and one relationship between them:
假設我們有兩個節點以及它們之間一個關系的簡單創建:
CREATE (andrea:Person{name:"Andrea"})-[knows:KNOWS{since:2014}]->(michael:Person{name:"Michael"})The CDC module will transform this transaction into 3 events (2 node creation, 1 relationship creation).
CDC模塊會將此事務轉換為3個事件(2個節點創建,1個關系創建)。
The Event structure was inspired by the Debezium format and has the following general structure:
事件結構的靈感來自Debezium格式,并具有以下常規結構:
{ "meta": { /* transaction meta-data */ }, "payload": { /* the data related to the transaction */ "before": { /* the data before the transaction */}, "after": { /* the data after the transaction */} }}Node source (andrea):
節點源(andrea) :
{ "meta": { "timestamp": 1532597182604, "username": "neo4j", "tx_id": 1, "tx_event_id": 0, "tx_events_count": 3, "operation": "created", "source": { "hostname": "neo4j.mycompany.com" } }, "payload": { "id": "1004", "type": "node", "after": { "labels": ["Person"], "properties": { "name": "Andrea" } } }}Node target (michael):
節點目標(michael) :
{ "meta": { "timestamp": 1532597182604, "username": "neo4j", "tx_id": 1, "tx_event_id": 1, "tx_events_count": 3, "operation": "created", "source": { "hostname": "neo4j.mycompany.com" } }, "payload": { "id": "1006", "type": "node", "after": { "labels": ["Person"], "properties": { "name": "Michael" } } }}Relationship knows:
關系knows :
{ "meta": { "timestamp": 1532597182604, "username": "neo4j", "tx_id": 1, "tx_event_id": 2, "tx_events_count": 3, "operation": "created", "source": { "hostname": "neo4j.mycompany.com" } }, "payload": { "id": "1007", "type": "relationship", "label": "KNOWS", "start": { "labels": ["Person"], "id": "1005" }, "end": { "labels": ["Person"], "id": "106" }, "after": { "properties": { "since": 2014 } } }}By default, all the data will be streamed on the neo4j topic. The CDC module allows controlling which nodes are sent to Kafka, and which of their properties you want to send to the topic:
默認情況下,所有數據將在neo4j主題上流式傳輸。 CDC模塊允許控制將哪些節點發送到Kafka,以及要將其哪些屬性發送到主題:
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>With the following example:
用下面的例子:
streams.source.topic.nodes.products=Product{name, code}The CDC module will send to the products topic all the nodes that have the label Product. It then sends, to that topic, only the changes about name and code properties. Please go the official documentation for a full description on how label filtering works.
CDC模塊會將帶有標簽Product所有節點發送到products主題。 然后,它將有關name和code屬性的更改僅發送到該主題。 請轉到官方文檔以獲取有關標簽過濾工作原理的完整說明。
For a more in-depth description of the Neo4j Streams project and how/why we at LARUS and Neo4j built it, check out this article that provides an in-depth description.
有關Neo4j Streams項目以及我們LARUS和Neo4j如何/為什么構建它的更深入的描述,請查看這篇文章,其中提供了深入的描述 。
超越傳統的數據倉庫 (Beyond the traditional Data Warehouse)
A traditional DWH requires data teams to constantly build multiple costly and time-consuming Extract Transform Load (ETL) pipelines to ultimately derive business insights.
傳統的DWH要求數據團隊不斷構建多個成本高昂且費時的提取轉換負載(ETL)管道,以最終獲得業務見解。
One of the biggest pain points is that, due to its rigid architecture that’s difficult to change, Enterprise Data Warehouses are inherently rigid. That’s because:
最大的痛點之一是,由于其難以更改的剛性架構 ,企業數據倉庫具有固有的剛性。 那是因為:
they are based on the Schema-On-Write architecture: first, you define your schema, then you write your data, then you read your data and it comes back in the schema you defined up-front
它們基于 寫時架構(Schema-On-Write)架構:首先,定義架構,然后編寫數據,然后讀取數據,然后將其返回到預先定義的架構中
they are based on (expensive) batched/scheduled jobs
它們基于 (昂貴的) 批處理/計劃工作
This results in having to build costly and time-consuming ETL pipelines to access and manipulate the data. And as new data types and sources are introduced, the need to augment your ETL pipelines exacerbates the problem.
這導致必須建立昂貴且耗時的ETL管道來訪問和處理數據。 隨著新數據類型和數據源的引入,對擴展ETL管道的需求加劇了問題 。
Thanks to the combination of the stream data processing with the Neo4j Streams CDC module and the Schema-On-Read approach provided by Apache Spark, we can overcome this rigidity and build a new kind of (flexible) DWH.
由于將流數據處理與Neo4j Streams CDC模塊以及Apache Spark提供的“讀取時模式 ” 相結合 ,我們可以克服這種僵化并構建一種新型的(靈活的)DWH。
范式轉變:即時數據倉庫 (A paradigm shift: Just-In-Time Data Warehouse)
A JIT-DWH solution is designed to easily handle a wider variety of data from different sources and starts from a different approach about how to deal with and manage data: Schema-On-Read.
JIT-DWH解決方案旨在輕松處理來自不同來源的各種數據,并且從關于如何處理和管理數據的不同方法入手: Schema-On-Read。
讀取架構 (Schema-On-Read)
Schema-On-Read follows a different sequence: it just loads the data as-is and applies your own lens to the data when you read it back out. With this kind of approach, you can present data in a schema that is adapted best to the queries being issued. You’re not stuck with a one-size-fits-all schema. With schema-on-read, you can present the data back in a schema that is most relevant to the task at hand.
讀取時模式遵循不同的順序: 它僅按原樣加載數據,并在您讀出數據時將自己的鏡頭應用于數據 。 通過這種方法,您可以按照最適合要發出的查詢的模式來顯示數據。 您不必局限于“一刀切”的架構。 使用讀取模式,您可以將數據顯示在與手頭任務最相關的模式中。
搭建環境 (Set-Up the Environment)
Going to the following Github repo you’ll find everything you need in order to replicate what I’m presenting in this article. What you will need to start is Docker. Then you can simply spin-up the stack by entering into the directory and from the Terminal, executing the following command:
轉到下面的Github存儲庫,您將找到所需的一切,以便復制我在本文中介紹的內容。 您將需要啟動Docker 。 然后,您可以通過進入目錄并從終端執行以下命令來簡單地增加堆棧:
$ docker-compose upThis will start-up the whole environment that comprises:
這將啟動包括以下內容的整個環境:
- Neo4j + Neo4j Streams module + APOC procedures Neo4j + Neo4j Streams模塊+ APOC程序
- Apache Kafka 阿帕奇·卡夫卡
- Apache Spark Apache Spark
- Apache Zeppelin 阿帕奇·齊柏林
By going into Apache Zeppelin @ http://localhost:8080 you’ll find in the directory Medium/Part 1 two notebooks:
通過進入Apache Zeppelin @ http://localhost:8080您將在Medium/Part 1目錄Medium/Part 1找到兩個筆記本:
Create a Just-In-Time Data Warehouse: in this notebook, we will build the JIT-DWH
創建一個即時數據倉庫 :在此筆記本中,我們將構建JIT-DWH
Query The JIT-DWH: in this notebook, we will perform some queries over the JIT-DWH
查詢JIT-DWH :在本筆記本中,我們將對JIT-DWH進行一些查詢
用例: (The Use-Case:)
We’ll create a fake social network like dataset. This will activate the CDC module of Neo4j Stream, and via Apache Spark we’ll intercept this event and persist them on the File System as JSON.
我們將創建一個偽造的社交網絡,例如數據集。 這將激活Neo4j Stream的CDC模塊,并且通過Apache Spark,我們將攔截此事件并將其作為JSON保留在文件系統上。
Then we’ll demonstrate how new fields added in our nodes will be automatically added to our JIT-DWL without the modification of the ETL pipeline, thanks to the Schema-On-Read approach.
然后,我們將演示由于采用“按讀取模式”方法,如何在不修改ETL管道的情況下將節點中添加的新字段自動添加到JIT-DWL中。
We’ll execute the following steps:
我們將執行以下步驟:
筆記本1:創建即時數據倉庫 (Notebook 1: Create a Just-In-Time Data Warehouse)
We’ll create a fake social network by using the APOC apoc.periodic.repeat procedure that executes this query every 15 seconds:
我們將使用APOC apoc.periodic.repeat過程創建一個偽造的社交網絡,該過程每15秒執行一次此查詢:
WITH ["M", "F", ""] AS genderUNWIND range(1, 10) AS idCREATE (p:Person {id: apoc.create.uuid(), name: "Name-" + apoc.text.random(10), age: round(rand() * 100), index: id, gender: gender[toInteger(size(gender) * rand())]})WITH collect(p) AS peopleUNWIND people AS p1UNWIND range(1, 3) AS friendWITH p1, people[(p1.index + friend) % size(people)] AS p2CREATE (p1)-[:KNOWS{years: round(rand() * 10), engaged: (rand() > 0.5)}]->(p2)If you need more details about the APOC project, please follow this link.
如果您需要有關APOC項目的更多詳細信息,請點擊此鏈接 。
So the resulting graph model is quite straightforward:
因此,生成的圖形模型非常簡單:
Let’s create an index over the Person node:
讓我們在Person節點上創建一個索引:
%neo4jCREATE INDEX ON :Person(id)Now let’s set the Background Job in Neo4j:
現在,讓我們在Neo4j中設置“后臺作業”:
%neo4jCALL apoc.periodic.repeat('create-fake-social-data', 'WITH ["M", "F", "X"] AS gender UNWIND range(1, 10) AS id CREATE (p:Person {id: apoc.create.uuid(), name: "Name-" + apoc.text.random(10), age: round(rand() * 100), index: id, gender: gender[toInteger(size(gender) * rand())]}) WITH collect(p) AS people UNWIND people AS p1 UNWIND range(1, 3) AS friend WITH p1, people[(p1.index + friend) % size(people)] AS p2 CREATE (p1)-[:KNOWS{years: round(rand() * 10), engaged: (rand() > 0.5)}]->(p2)', 15) YIELD nameRETURN name AS createdThis background query brings the Neo4j-Streams CDC module to stream related events over the “neo4j” Kafka topic (the default topic of the CDC).
此后臺查詢使Neo4j-Streams CDC模塊通過“ neo4j” Kafka主題(CDC的默認主題)流式傳輸相關事件。
Now let’s create a Structured Streaming Dataset that consumes the data from the “neo4j” topic:
現在,讓我們創建一個結構化流數據集,該數據集使用“ neo4j”主題中的數據:
val kafkaStreamingDF = (spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "broker:9093") .option("startingoffsets", "earliest") .option("subscribe", "neo4j") .load())The kafkaStreamingDF Dataframe is basically a ProducerRecord representation. And in fact its schema is:
kafkaStreamingDF框基本上是一個ProducerRecord表示形式。 實際上,它的架構是:
root|-- key: binary (nullable = true)|-- value: binary (nullable = true)|-- topic: string (nullable = true)|-- partition: integer (nullable = true)|-- offset: long (nullable = true)|-- timestamp: timestamp (nullable = true)|-- timestampType: integer (nullable = true)Now let’s create the Structure of the data streamed by the CDC using the Spark APIs in order to read the streamed data:
現在,讓我們使用Spark API創建CDC流數據的結構,以讀取流數據:
val cdcMetaSchema = (new StructType() .add("timestamp", LongType) .add("username", StringType) .add("operation", StringType) .add("source", MapType(StringType, StringType, true))) val cdcPayloadSchemaBeforeAfter = (new StructType() .add("labels", ArrayType(StringType, false)) .add("properties", MapType(StringType, StringType, true))) val cdcPayloadSchema = (new StructType() .add("id", StringType) .add("type", StringType) .add("label", StringType) .add("start", MapType(StringType, StringType, true)) .add("end", MapType(StringType, StringType, true)) .add("before", cdcPayloadSchemaBeforeAfter) .add("after", cdcPayloadSchemaBeforeAfter)) val cdcSchema = (new StructType() .add("meta", cdcMetaSchema) .add("payload", cdcPayloadSchema))The cdcSchema is suitable for both node and relationships events.
cdcSchema適用于節點和關系事件。
What we need now is to extract only the CDC event from the Dataframe, so let’s perform a simple transformation query over Spark:
現在,我們需要從數據幀中僅提取CDC事件,因此讓我們對Spark執行簡單的轉換查詢:
val cdcDataFrame = (kafkaStreamingDF .selectExpr("CAST(value AS STRING) AS VALUE") .select(from_json('VALUE, cdcSchema) as 'JSON))The cdcDataFrame contains just one column JSON which is the data streamed from the Neo4j-Streams CDC module.
cdcDataFrame僅包含一列JSON ,這是從Neo4j-Streams CDC模塊流式傳輸的數據。
Let’s perform a simple ETL query in order to extract fields of interest:
讓我們執行一個簡單的ETL查詢以提取感興趣的字段:
val dataWarehouseDataFrame = (cdcDataFrame .where("json.payload.type = 'node' and (array_contains(nvl(json.payload.after.labels, json.payload.before.labels), 'Person'))") .selectExpr("json.payload.id AS neo_id", "CAST(json.meta.timestamp / 1000 AS Timestamp) AS timestamp", "json.meta.source.hostname AS host", "json.meta.operation AS operation", "nvl(json.payload.after.labels, json.payload.before.labels) AS labels", "explode(json.payload.after.properties)"))This query is quite important, because it represents how the data will be persisted over the filesystem. Every node will be exploded in a number of JSON snippets, one for each node property, just like this:
此查詢非常重要,因為它表示如何在文件系統上持久存儲數據。 每個節點都將分解為多個JSON代碼片段,每個節點屬性都包含一個,如下所示:
{"neo_id":"35340","timestamp":"2018-12-19T23:07:10.465Z","host":"neo4j","operation":"created","labels":["Person"],"key":"name","value":"Name-5wc62uKO5l"}{"neo_id":"35340","timestamp":"2018-12-19T23:07:10.465Z","host":"neo4j","operation":"created","labels":["Person"],"key":"index","value":"8"}{"neo_id":"35340","timestamp":"2018-12-19T23:07:10.465Z","host":"neo4j","operation":"created","labels":["Person"],"key":"id","value":"944e58bf-0cf7-49cf-af4a-c803d44f222a"}{"neo_id":"35340","timestamp":"2018-12-19T23:07:10.465Z","host":"neo4j","operation":"created","labels":["Person"],"key":"gender","value":"F"}This kind of structure can be easily turned into tabular representation (we’ll see in the next few steps how to do this).
這種結構可以輕松地轉換為表格表示形式(我們將在接下來的幾個步驟中了解如何執行此操作)。
Now let's write a Spark continuous streaming query that saves the data to the file system as JSON:
現在讓我們編寫一個Spark連續流查詢,該查詢將數據以JSON格式保存到文件系統中:
val writeOnDisk = (dataWarehouseDataFrame .writeStream .format("json") .option("checkpointLocation", "/zeppelin/spark-warehouse/jit-dwh/checkpoint") .option("path", "/zeppelin/spark-warehouse/jit-dwh") .queryName("nodes") .start())We have now created a simple JIT-DWH. In the second notebook we’ll learn how to query it and how simple it is to deal with dynamical changes in the data structures thanks schema-on-read.
現在,我們創建了一個簡單的JIT-DWH。 在第二本筆記本中,我們將學習如何查詢它,以及如何通過讀取模式來處理數據結構中的動態變化有多么簡單。
筆記本2:查詢JIT-DWH (Notebook 2: Query The JIT-DWH)
The first paragraph let us query and display our JIT-DWH
第一段讓我們查詢并顯示我們的JIT-DWH
val flattenedDF = (spark.read.format("json").load("/zeppelin/spark-warehouse/jit-dwh/**") .where("neo_id is not null") .groupBy("neo_id", "timestamp", "host", "labels", "operation") .pivot("key") .agg(first($"value")))z.show(flattenedDF)Remember how we saved the data in JSON some row above? The flattenedDF simply pivoted the JSONs over the key field thus grouping the data over 5 columns that represent the “unique key” (“neo_id”, “timestamp”, “host”, “labels”, “operation”). This allows us to have this tabular representation of the source data as follows:
還記得我們如何在上面的某行中將數據保存在JSON中嗎? flattenedDF只需將JSON遍歷key字段即可,從而將數據分為代表“唯一鍵”( “ neo_id”,“ timestamp”,“ host”,“ labels”,“ operation” )的5列進行分組。 這使我們可以使用以下表格形式表示源數據:
Now imagine that our Person dataset gets a new field: birth. Let's add this new field to one node; in this case, you must choose an id from your dataset and update it with the following paragraph:
現在,假設我們的“人”數據集得到一個新字段:“ 出生”。 讓我們將此新字段添加到一個節點; 在這種情況下,您必須從數據集中選擇一個ID,并使用以下段落進行更新:
Now the final step: reuse the same query and filter the DWH by the id that we have previously changed in order to check how our dataset changed according to the changes made over Neo4j.
現在的最后一步:重用相同的查詢,并根據我們先前更改的ID過濾DWH,以檢查數據集如何根據Neo4j進行的更改而發生變化。
結論 (Conclusions)
In this first part, we learned how to leverage the events produced by Neo4j Stream CDC module in order to build a simple (Real-Time) JIT-DWL that uses the Schema-On-Read approach.
在第一部分中,我們學習了如何利用Neo4j Stream CDC模塊產生的事件來構建使用“讀取時架構”方法的簡單(實時)JIT-DWL。
In Part 2 we’ll discover how to use the Sink module in order to ingest data into Neo4j directly from Kafka.
在第2部分中,我們將發現如何使用Sink模塊,以便直接從Kafka將數據提取到Neo4j中。
If you have already tested the Neo4j-Streams module or tested it via these notebooks please fill out our feedback survey.
如果您已經測試了Neo4j-Streams模塊或通過這些筆記本電腦對其進行了測試,請填寫我們的反饋調查 。
If you run into any issues or have thoughts about improving our work, please raise a GitHub issue.
如果您遇到任何問題或對改進我們的工作有想法, 請提出GitHub問題 。
翻譯自: https://www.freecodecamp.org/news/how-to-leverage-neo4j-streams-and-build-a-just-in-time-data-warehouse-64adf290f093/
iref streams
總結
以上是生活随笔為你收集整理的iref streams_如何利用Neo4j Streams并建立即时数据仓库的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 做梦梦到同事吵架是什么意思
- 下一篇: 您需要了解的WordPress漏洞以及如