【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践
簡介: 獲取更詳細的 Databricks 數據洞察相關信息,可至產品詳情頁查看:https://www.aliyun.com/product/bigdata/spark
作者
美的暖通與樓宇事業部 先行研究中心智能技術部
?
美的暖通 IoT 數據平臺建設背景
美的暖通與樓宇事業部(以下簡稱美的暖通)是美的集團旗下五大板塊之一,產品覆蓋多聯機組、大型冷水機組、單元機、機房空調、扶梯、直梯、貨梯以及樓宇自控軟件和建筑弱電集成解決方案,遠銷海內外200多個國家。當前事業部設備數據上云僅停留在數據存儲層面,缺乏挖掘數據價值的平臺,造成大量數據荒廢,并且不斷消耗存儲資源,增加存儲費用和維護成本。另一方面,現有數據驅動應用缺乏部署平臺,難以產生實際價值。因此,急需統一通用的 IoT 數據平臺,以支持設備運行數據的快速分析和建模。
?
我們的 IoT 數據平臺建設基于阿里云 Databricks 數據洞察全托管 Spark 產品,以下是整體業務架構圖。在本文后面的章節,我們將就IoT數據平臺建設技術選型上的一些思考,以及 Spark 技術棧尤其是 Delta Lake 場景的應用實踐做一下分享。
?
?
選擇 Spark & Delta Lake
在數據平臺計算引擎層技術選型上,由于我們數據團隊剛剛成立,前期的架構選型我們做了很多的調研,綜合各個方面考慮,希望選擇一個成熟且統一的平臺:既能夠支持數據處理、數據分析場景,也能夠很好地支撐數據科學場景。加上團隊成員對 Python 及 Spark 的經驗豐富,所以,從一開始就將目標鎖定到了 Spark 技術棧。
?
選擇 Databricks 數據洞察 Delta Lake
通過與阿里云計算平臺團隊進行多方面的技術交流以及實際的概念驗證,我們最終選擇了阿里云 Databricks 數據洞察產品。作為 Spark 引擎的母公司,其商業版 Spark 引擎,全托管 Spark 技術棧,統一的數據工程和數據科學等,都是我們決定選擇 Databricks 數據洞察的重要原因。
?
具體來看,Databricks 數據洞察提供的核心優勢如下:
- Saas 全托管 Spark:免運維,無需關注底層資源情況,降低運維成本,聚焦分析業務
- 完整 Spark 技術棧集成:一站式集成 Spark 引擎和 Delta Lake 數據湖,100%兼容開源 Spark 社區版;Databricks 做商業支持,最快體驗 Spark 最新版本特性
- 總成本降低:商業版本 Spark 及 Delta Lake 性能優勢顯著;同時基于計算存儲分離架構,存儲依托阿里云 OSS 對象存儲,借助阿里云 JindoFS 緩存層加速;能夠有效降低集群總體使用成本
- 高品質支持以及SLA保障:阿里云和 Databricks 提供覆蓋 Spark 全棧的技術支持;提供商業化 SLA 保障與7*24小時 Databricks 專家支持服務
?
IoT 數據平臺整體架構
整體的架構如上圖所示。
?
我們接入的 IoT 數據分為兩部分,歷史存量數據和實時數據。目前,歷史存量數據是通過 Spark SQL 以天為單位從不同客戶關系數據庫批量導入 Delta Lake 表中;實時數據通過 IoT 平臺采集到云 Kafka ,經由 Spark Structured Streaming 消費后實時寫入到 Delta Lake 表中。在這個過程中,我們將實時數據和歷史數據都 sink 到同一張 Delta 表里,這種批流一體操作可大大簡化我們的 ETL 流程(參考后面的案例部分)。數據管道下游,我們對接數據分析及數據科學工作流。
?
IoT 數據采集:從 Little Data 到 Big Data
作為 IoT 場景的典型應用,美的暖通最核心的數據均來自 IoT 終端設備。在整個 IoT 環境下,分布著無數個終端傳感器。從小的維度看,傳感器產生的數據本身屬于 Small Data(或者稱為 Little Data)。當把所有傳感器連接成一個大的 IoT 網絡,產生自不同傳感器的數據經由 Gateway 與云端相連接,并最終在云端形成 Big Data 。
?
在我們的場景下,IoT 平臺本身會對不同協議的數據進行初步解析,通過定制的硬件網絡設備將解析后的半結構化 JSON 數據經由網絡發送到云 Kafka。云 Kafka 扮演了整個數據管道的入口。
?
數據入湖:Delta Lake
IoT 場景下的數據有如下幾個特點:
- 時序數據:傳感器產生的數據記錄中包含時間相關的信息,數據本身具有時間屬性,因此不同的數據之間可能存在一定的相關性。利用 as-of-join 將不同時間序列數據 join 到一起是下游數據預測分析的基礎
- 數據的實時性:傳感器實時生成數據并以最低延遲的方式傳輸到數據管道,觸發規則引擎,生成告警和事件,通知相關工作人員。
- 數據體量巨大:IoT 網絡環境下遍布各地的成千上萬臺設備及其傳感器再通過接入服務將海量的數據歸集到平臺
- 數據協議多樣:通常在 IoT 平臺接入的不同種類設備中,上傳數據協議種類多樣,數據編碼格式不統一
?
IoT 數據上述特點給數據處理、數據分析及數據科學等帶來了諸多挑戰,慶幸的是,這些挑戰借助 Spark 和 Delta Lake 都可以很好地應對。Delta Lake 提供了 ACID 事務保證,支持增量更新數據表以及流批同時寫數據。借助 Spark Structed Streaming 可以實現 IoT 時序數據實時入湖。
?
以下是 Delta Lake 經典的三級數據表架構。具體到美的暖通 IoT 數據場景,我們針對每一層級的數據表分別做了如下定義:
?
?
- Bronze 表:存儲原生數據(Raw Data),數據經由 Spark Structed Streaming 從 Kafka 消費下來后 upsert 進 Delta Lake 表,該表作為唯一的真實數據表 ?(Single Source of Truth)
- Silver表:該表是在對 Bronze 表的數據進行加工處理的基礎上生成的中間表,在美的暖通的場景下,數據加工處理的步驟涉及到一些復雜的時序數據計算邏輯,這些邏輯都包裝在了 Pandas UDF 里提供給 Spark 計算使用
- Gold 表:Silver 表的數據施加 Schema 約束并做進一步清洗后的數據匯入 Gold 表,該表提供給下游的 Ad Hoc 查詢分析及數據科學使用
?
數據分析:Ad-Hoc 查詢
我們內部在開源 Superset 基礎上定制了內部版本的 SQL 查詢與數據可視化平臺,通過 PyHive 連接到 Databricks 數據洞察 Spark Thrift Server 服務,可以將 SQL 提交到集群上。商業版本的 thrift server 在可用性及性能方面都做了增強,Databricks 數據洞察針對 JDBC 連接安全認證提供了基于 LDAP 的用戶認證實現。借助 Superset ,數據分析師及數據科學家可以快速高效的對 Delta Lake 表進行數據探索。
?
數據科學:Workspace
樓宇能耗預測與設備故障診斷預測是美的暖通 IoT 大數據平臺建設的兩個主要業務目標。在 IoT 數據管道下游,需要對接機器學習平臺。現階段為了更快速方便地支撐起數據科學場景,我們將 Databricks 數據洞察集群與阿里云數據開發平臺 DDC 打通。DDC 集成了在數據科學場景下更友好的 Jupyter Notebook ,通過在 Jupyter 上使用 PySpark ,可以將作業跑到 Databricks 數據洞察集群上;同時,也可以借助 Apache Airflow 對作業進行調度。同時,考慮到機器學習模型構建、迭代訓練、指標檢測、部署等基本環節,我們也在探索 MLOps ,目前這部分工作還在籌備中。
?
典型應用場景介紹
Delta Lake 數據入湖(批流一體)
?
使用 UDF 函數定義流數據寫入 Delta Lake 的 Merge 規則
%spark import org.apache.spark.sql._ import io.delta.tables._// Function to upsert `microBatchOutputDF` into Delta table using MERGE def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {// Set the dataframe to view namemicroBatchOutputDF.createOrReplaceTempView("updates")// Use the view name to apply MERGE// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframemicroBatchOutputDF.sparkSession.sql(s"""MERGE INTO delta_{table_name} tUSING updates sON s.uuid = t.uuidWHEN MATCHED THEN UPDATE SET t.device_id = s.device_id,t.indoor_temperature = s.indoor_temperature,t.ouoor_temperature = s.ouoor_temperature,t.chiller_temperature = s.chiller_temperature,t.electricity = s.electricity,t.protocal_version = s.protocal_version,t.dt=s.dt,t.update_time=current_timestamp()WHEN NOT MATCHED THEN INSERT (t.uuid,t.device_id,t.indoor_temperature,t.ouoor_temperature ,t.chiller_temperature ,t.electricity,t.protocal_version,t.dt,t.create_time,t.update_time)values (s.uuid,s.device_id,s.indoor_temperature,s.ouoor_temperature,s.chiller_temperature,s.electricity,s.protocal_version ,s.dt,current_timestamp(),current_timestamp())""") }?
使用 Spark Structured Streaming 實時流寫入 Delta Lake
?
%sparkimport org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Triggerdef getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {var streamingInputDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", servers).option("subscribe", topic) .option("startingOffsets", "latest") .option("minPartitions", "10") .option("failOnDataLoss", "true").load() val resDF=streamingInputDF.select(col("value").cast("string")).withColumn("newMessage",split(col("value"), " ")).filter(col("newMessage").getItem(7).isNotNull).select(col("newMessage").getItem(0).as("uuid"),col("newMessage").getItem(1).as("device_id"),col("newMessage").getItem(2).as("indoor_temperature"),col("newMessage").getItem(3).as("ouoor_temperature"),col("newMessage").getItem(4).as("chiller_temperature"),col("newMessage").getItem(5).as("electricity"),col("newMessage").getItem(6).as("protocal_version")).withColumn("dt",date_format(current_date(),"yyyyMMdd")) val query = resDF.writeStream.format("delta").option("checkpointLocation", checkpoint_dir).trigger(Trigger.ProcessingTime("60 seconds")) // 執行流處理時間間隔.foreachBatch(upsertToDelta _) //引用upsertToDelta函數.outputMode("update")query.start() }?
數據災備:Deep Clone
由于 Delta Lake 的數據僅接入實時數據,對于存量歷史數據我們是通過 SparkSQL 一次性 Sink Delta Lake 的表中,這樣我們流和批處理時只維護一張 Delta 表,所以我們只在最初對這兩部分數據做一次 Merge 操作。同時為了保證數據的高安全,我們使用 Databricks Deep Clone 來做數據災備,每天會定時更新來維護一張從表以備用。對于每日新增的數據,使用 Deep Clone 同樣只會對新數據 Insert 對需要更新的數據 Update 操作,這樣可以大大提高執行效率。
?
CREATE OR REPLACE TABLE delta.delta_{table_name}_cloneDEEP CLONE delta.delta_{table_name};?
性能優化:OPTIMIZE & Z-Ordering
在流處理場景下會產生大量的小文件,大量小文件的存在會嚴重影響數據系統的讀性能。Delta Lake 提供了 OPTIMIZE 命令,可以將小文件進行合并壓縮,另外,針對 Ad-Hoc 查詢場景,由于涉及對單表多個維度數據的查詢,我們借助 Delta Lake 提供的 Z-Ordering 機制,可以有效提升查詢的性能。從而極大提升讀取表的性能。DeltaLake 本身提供了 Auto Optimize 選項,但是會犧牲少量寫性能,增加數據寫入 delta 表的延遲。相反,執行 OPTIMIZE 命令并不會影響寫的性能,因為 Delta Lake 本身支持 MVCC,支持 OPTIMIZE 的同時并發執行寫操作。因此,我們采用定期觸發執行 OPTIMIZE 的方案,每小時通過 OPTIMIZE 做一次合并小文件操作,同時執行 VACCUM 來清理過期數據文件:
OPTIMIZE delta.delta_{table_name} ZORDER by device_id, indoor_temperature; set spark.databricks.delta.retentionDurationCheck.enabled = false; VACUUM delta.delta_{table_name} RETAIN 1 HOURS;?
另外,針對 Ad-Hoc 查詢場景,由于涉及對單表多個維度數據的查詢,我們借助 Delta Lake 提供的 Z-Ordering 機制,可以有效提升查詢的性能。
?
總結與展望
我們基于阿里云 Databricks 數據洞察產品提供的商業版 Spark 及 Delta Lake 技術棧快速構建了 IoT 數據處理平臺,Databricks 數據洞察全托管免運維、商業版本引擎性能上的優勢以及計算/存儲分離的架構,為我們節省了總體成本。同時,Databricks 數據洞察產品自身提供的豐富特性,也極大提升了我們數據團隊的生產力,為數據分析業務的快速開展交付奠定了基礎。未來,美的暖通希望與阿里云 Databricks 數據洞察團隊針對 IoT 場景輸出更多行業先進解決方案。
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 全球边缘计算大会:阿里云资深技术专家李克
- 下一篇: 友盟+《小程序用户增长白皮书》:从五个角