Apache Beam和BigQuery的错误处理(Java SDK)
設計管道
假設我們有一個簡單的場景:事件正在流向Kafka,我們希望使用管道中的事件,進行一些轉換并將結果寫入BigQuery表,以使數據可用于分析。
可以在作業開始之前創建BigQuery表,或者Beam本身可以創建它。
代碼看起來很簡單:
EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()
。作為(EventsProcessingOptions。類);管道 p = 管道。創造(選項);
PCollection tableRows =
少了什么東西?
在現實世界中,可能會發生錯誤,在大多數情況下,我們將需要處理它們。
在上面的管道中,當我們嘗試將事件從Kafka解析為JsonNode,轉換期間以及BigQuery插入階段時,可能會發生錯誤。
錯誤處理計劃
對于每個錯誤,我們將在不同的BigQuery表中創建一行,其中包含更多信息,例如來自Kafka的origin事件。
一旦發生錯誤,我們就可以分析錯誤記錄并全面了解它。
然后,我們可以修復管道代碼,重置/更改Kafka使用者組偏移,并再次使用固定代碼重播事件。
我們還可以修復事件本身(例如,在JSON解析錯誤中)并將其重新發送到Kafka。
處理轉換錯誤
讓我們快速瀏覽一下我們的轉換函數:
@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
TableRow convertedRow = new TableRow();
insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
//更多轉變來
背景。輸出(輸出);
}
private void insertLong(JsonNode value,String key,TableRow convertedRow){
}
private void insertFloat(JsonNode value,String key,TableRow convertedRow){
}
是的,我們可能在解析過程中失敗,因為我們將字符串解析為Float / Long,并且這對無法轉換的數據失敗。
我們需要從主函數輸出中排除失敗的數據并將這些數據發送到管道中的不同路徑,然后我們將它們保存到BigQuery中的錯誤表中。
怎么樣?讓我們使用標簽
當我們在ParDo 函數末尾輸出一個元素時 ,我們可以在一個標簽內輸出它。然后我們可以獲取所有標記為特定名稱的元素,并對它們執行一些處理。
這里我們將使用兩個標簽,一個是MAIN標簽,它包含所有成功的記錄,另一個包含所有錯誤和一些上下文,例如 DEADLETTER_OUT。
該主標記必須與ParDo 函數本身的OUTPUT類型相同,并且 所有其他標記可以是不同類型。
現在,我們的 ParDo 函數將如下所示(注意標記添加):
@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
public static final TupleTag < JsonNode > MAIN_OUT = new TupleTag < JsonNode >(){};
public static final TupleTag < BigQueryProcessError > DEADLETTER_OUT = new TupleTag < BigQueryProcessError >(){};
TableRow convertedRow = new TableRow();
嘗試 {
} catch(例外 e){
記錄器。誤差(“失敗變換” + ?。的getMessage(),ê);背景。輸出(DEADLETTER_OUT,新 BigQueryProcessError(convertedRow。的toString(),ê。的getMessage(),ERROR_TYPE。BQ_PROCESS,originEvent));}
}
我們如何通過標簽處理元素?讓我們改變管道,并進行拆分。該 MAIN 元素將大量查詢表和 DEADLETTER_OUT 內容將被發送到錯誤表。
EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()
。作為(EventsProcessingOptions。類);管道 p = 管道。創造(選項);
PCollectionTuple tableRows =
p。run();
處理BigQuery插入錯誤
為了在BigQuery插入期間處理錯誤,我們必須使用BiqQueryIO API。
讓我們放大寫入階段。并稍微改變一下:
WriteResult writeResult = tableRowToInsertCollection
。申請(“BQ-寫”,BigQueryIO。寫()//指定將返回失敗的行及其錯誤。withExtendedErrorInfo()。到(tableSpec)。withCreateDisposition(BigQueryIO。寫。createDisposition會。CREATE_NEVER)。withWriteDisposition(BigQueryIO。寫。writeDisposition會。WRITE_APPEND)//指定處理失敗插入的策略。。withFailedInsertRetryPolicy(InsertRetryPolicy。retryTransientErrors()));//將失敗的行及其錯誤寫入錯誤表
寫結果
在上面的代碼片段中,我們從BigQueryIO獲取失敗的TableRows及其錯誤。現在我們可以將它們轉換為另一個 TableRow 并將它們寫入錯誤表。在這種情況下,我們讓作業在需要時創建表。
總結
以上是生活随笔為你收集整理的Apache Beam和BigQuery的错误处理(Java SDK)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 剑指offer——面试题10:斐波那契数
- 下一篇: NSkyKit 项目实践-Dagger