伴鱼:借助 Flink 完成机器学习特征系统的升级
簡介:?Flink 用于機器學習特征工程,解決了特征上線難的問題;以及 SQL + Python UDF 如何用于生產實踐。
本文作者陳易生,介紹了伴魚平臺機器學習特征系統的升級,在架構上,從 Spark 轉為 Flink,解決了特征上線難的問題,以及 SQL + Python UDF 如何用于生產實踐。 主要內容為:
一、前言
在伴魚,我們在多個在線場景使用機器學習提高用戶的使用體驗,例如:在伴魚繪本中,我們根據用戶的帖子瀏覽記錄,為用戶推薦他們感興趣的帖子;在轉化后臺里,我們根據用戶的繪本購買記錄,為用戶推薦他們可能感興趣的課程等。
特征是機器學習模型的輸入。如何高效地將特征從數據源加工出來,讓它能夠被在線服務高效地訪問,決定了我們能否在生產環境可靠地使用機器學習。為此,我們搭建了特征系統,系統性地解決這一問題。目前,伴魚的機器學習特征系統運行了接近 100 個特征,支持了多個業務線的模型對在線獲取特征的需求。
下面,我們將介紹特征系統在伴魚的演進過程,以及其中的權衡考量。
二、舊版特征系統 V1
特征系統 V1 由三個核心組件構成:特征管道,特征倉庫,和特征服務。整體架構如下圖所示:
特征管道包括流特征管道和批特征管道,它們分別消費流數據源和批數據源,對數據經過預處理加工成特征 (這一步稱為特征工程),并將特征寫入特征倉庫。
- 批特征管道使用 Spark 實現,由 DolphinScheduler 進行調度,跑在 YARN 集群上;
- 出于技術棧的一致考慮,流特征管道使用 Spark Structured Streaming 實現,和批特征管道一樣跑在 YARN 集群上。
特征倉庫選用合適的存儲組件 (Redis) 和數據結構 (Hashes),為模型服務提供低延遲的特征訪問能力。之所以選用 Redis 作為存儲,是因為:
- 伴魚有豐富的 Redis 使用經驗;
- 包括?DoorDash Feature Store和?Feast在內的業界特征倉庫解決方案都使用了 Redis。
特征服務屏蔽特征倉庫的存儲和數據結構,對外暴露 RPC 接口?GetFeatures(EntityName, FeatureNames),提供對特征的低延遲點查詢。在實現上,這一接口基本對應于 Redis 的?HMGET EntityName FeatureName_1 ... FeatureName_N?操作。
這一版本的特征系統存在幾個問題:
- 算法工程師缺少控制,導致迭代效率低。這個問題與系統涉及的技術棧和公司的組織架構有關。在整個系統中,特征管道的迭代需求最高,一旦模型對特征有新的需求,就需要修改或者編寫一個新的 Spark 任務。而 Spark 任務的編寫需要有一定的 Java 或 Scala 知識,不屬于算法工程師的常見技能,因此交由大數據團隊全權負責。大數據團隊同時負責多項數據需求,往往有很多排期任務。結果便是新特征的上線涉及頻繁的跨部門溝通,迭代效率低;
- 特征管道只完成了輕量的特征工程,降低在線推理的效率。由于特征管道由大數據工程師而非算法工程師編寫,復雜的數據預處理涉及更高的溝通成本,因此這些特征的預處理程度都比較輕量,更多的預處理被留到模型服務甚至模型內部進行,增大了模型推理的時延。
為了解決這幾個問題,特征系統 V2 提出幾個設計目的:
- 將控制權交還算法工程師,提高迭代效率;
- 將更高權重的特征工程交給特征管道,提高在線推理的效率。
三、新版特征系統 V2
特征系統 V2 相比特征系統 V1 在架構上的唯一不同點在于,它將特征管道切分為三部分:特征生成管道,特征源,和特征注入管道。值得一提的是,管道在實現上均從 Spark 轉為 Flink,和公司數據基礎架構的發展保持一致。特征系統 V2 的整體架構如下圖所示:
1. 特征生成管道
特征生成管道讀取原始數據源,加工為特征,并將特征寫入指定特征源 (而非特征倉庫)。
- 如果管道以流數據源作為原始數據源,則它是流特征生成管道;
- 如果管道以批數據源作為原始數據源,則它是批特征生成管道。
特征生成管道的邏輯由算法工程師全權負責編寫。其中,批特征生成管道使用 HiveQL 編寫,由 DolphinScheduler 調度。流特征生成管道使用 PyFlink 實現,詳情見下圖:
算法工程師需要遵守下面步驟:
這一套流程確保了:
- 算法工程師掌握上線特征的自主權;
- 平臺工程師把控特征生成管道的代碼質量,并在必要時可以對它們實現重構,而無需算法工程師的介入。
2. 特征源
特征源存儲從原始數據源加工形成的特征。值得強調的是,它同時還是連接算法工程師和 AI 平臺工程師的橋梁。算法工程師只負責實現特征工程的邏輯,將原始數據加工為特征,寫入特征源,剩下的事情就交給 AI 平臺。平臺工程師實現特征注入管道,將特征寫入特征倉庫,以特征服務的形式對外提供數據訪問服務。
3. 特征注入管道
特征注入管道將特征從特征源讀出,寫入特征倉庫。由于 Flink 社區缺少對 Redis sink 的原生支持,我們通過拓展?RichSinkFunction簡單地實現了?StreamRedisSink?和?BatchRedisSink,很好地滿足我們的需求。
其中,BatchRedisSink?通過?Flink Operator State?和?Redis Pipelining的簡單結合,大量參考 Flink 文檔中的?BufferingSink,實現了批量寫入,大幅減少對 Redis Server 的請求量,增大吞吐,寫入效率相比逐條插入提升了 7 倍?。BatchRedisSink?的簡要實現如下。其中,flush?實現了批量寫入 Redis 的核心邏輯,checkpointedState?/?bufferedElements?/?snapshotState?/?initializeState?實現了使用 Flink 有狀態算子管理元素緩存的邏輯。
class BatchRedisSink(pipelineBatchSize: Int ) extends RichSinkFunction[(String, Timestamp, Map[String, String])]with CheckpointedFunction {@transientprivate var checkpointedState: ListState[(String, java.util.Map[String, String])] = _private val bufferedElements: ListBuffer[(String, java.util.Map[String, String])] =ListBuffer.empty[(String, java.util.Map[String, String])]private var jedisPool: JedisPool = _override def invoke(value: (String, Timestamp, Map[String, String]),context: SinkFunction.Context): Unit = {import scala.collection.JavaConverters._val (key, _, featureKVs) = valuebufferedElements += (key -> featureKVs.asJava)if (bufferedElements.size == pipelineBatchSize) {flush()}}private def flush(): Unit = {var jedis: Jedis = nulltry {jedis = jedisPool.getResourceval pipeline = jedis.pipelined()for ((key, hash) <- bufferedElements) {pipeline.hmset(key, hash)}pipeline.sync()} catch { ... } finally { ... }bufferedElements.clear()}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.clear()for (element <- bufferedElements) {checkpointedState.add(element)}}override def initializeState(context: FunctionInitializationContext): Unit = {val descriptor =new ListStateDescriptor[(String, java.util.Map[String, String])]("buffered-elements",TypeInformation.of(new TypeHint[(String, java.util.Map[String, String])]() {}))checkpointedState = context.getOperatorStateStore.getListState(descriptor)import scala.collection.JavaConverters._if (context.isRestored) {for (element <- checkpointedState.get().asScala) {bufferedElements += element}}}override def open(parameters: Configuration): Unit = {try {jedisPool = new JedisPool(...)} catch { ... }}override def close(): Unit = {flush()if (jedisPool != null) {jedisPool.close()}} }特征系統 V2 很好地滿足了我們提出的設計目的。
- 由于特征生成管道的編寫只需用到 SQL 和 Python 這兩種算法工程師十分熟悉的工具,因此他們全權負責特征生成管道的編寫和上線,無需依賴大數據團隊,大幅提高了迭代效率。在熟悉后,算法工程師通常只需花費半個小時以內,就可以完成流特征的編寫、調試和上線。而這個過程原本需要花費數天,取決于大數據團隊的排期;
- 出于同樣的原因,算法工程師可以在有需要的前提下,完成更重度的特征工程,從而減少模型服務和模型的負擔,提高模型在線推理效率。
四、總結
特征系統 V1 解決了特征上線的問題,而特征系統 V2 在此基礎上,解決了特征上線難的問題。在特征系統的演進過程中,我們總結出作為平臺研發的幾點經驗:
- 平臺應該提供用戶想用的工具。這與 Uber ML 平臺團隊在內部推廣的經驗相符。算法工程師在 Python 和 SQL 環境下工作效率最高,而不熟悉 Java 和 Scala。那么,想讓算法工程師自主編寫特征管道,平臺應該支持算法工程師使用 Python 和 SQL 編寫特征管道,而不是讓算法工程師去學 Java 和 Scala,或是把工作轉手給大數據團隊去做;
- 平臺應該提供易用的本地調試工具。我們提供的 Docker 環境封裝了 Kafka 和 Flink,讓用戶可以在本地快速調試 PyFlink 腳本,而無需等待管道部署到測試環境后再調試;
- 平臺應該在鼓勵用戶自主使用的同時,通過自動化檢查或代碼審核等方式牢牢把控質量。
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。?
總結
以上是生活随笔為你收集整理的伴鱼:借助 Flink 完成机器学习特征系统的升级的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聚焦业务价值:分众传媒在 Serverl
- 下一篇: TDA-04D8变送器数据上报阿里云