datax安装+配置+使用文档
1 DataX離線同步工具DataX3.0介紹
DataX 是阿里巴巴集團內被廣泛使用的離線數據同步工具/平臺,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構數據源之間高效的數據同步功能。
Github地址:https://github.com/alibaba/DataX
1.1.1 DataX 3.0概覽
DataX 是一個異構數據源離線同步工具,致力于實現包括關系型數據庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構數據源之間穩定高效的數據同步功能。
? 設計理念
為了解決異構數據源同步問題,DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。
? 當前使用現狀
DataX在阿里巴巴集團內被廣泛使用,承擔了所有大數據的離線同步業務,并已持續穩定運行了6年之久。目前每天完成同步8w多道作業,每日傳輸數據量超過300TB。
此前已經開源DataX1.0版本,此次介紹為阿里云開源全新版本DataX3.0,有了更多更強大的功能和更好的使用體驗。Github主頁地址:https://github.com/alibaba/DataX
1.2 特征
DataX本身作為數據同步框架,將不同數據源的同步抽象為從源頭數據源讀取數據的Reader插件,以及向目標端寫入數據的Writer插件,理論上DataX框架可以支持任意數據源類型的數據同步工作。同時DataX插件體系作為一套生態系統, 每接入一套新數據源該新加入的數據源即可實現和現有的數據源互通。1.3 DataX詳細介紹
地址:https://github.com/alibaba/DataX/blob/master/introduction.md
1.4 DataX3.0框架設計
DataX本身作為離線數據同步框架,采用Freamwork + plugin架構構建。將數據源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
Reader: Reader�為數據采集模塊,負責采集數據源的數據,將數據發送給Framework。
Writer: Writer為數據寫入模塊,負責不斷向Framework取數據,并將數據寫入到目的端。
Framework: Framework用于連接reader和writer,作為兩者的數據傳輸通道,并處理緩沖,流控,并發,數據轉換等核心技術問題。
1.5 Support Data Channels
DataX目前已經有了比較全面的插件體系,主流的RDBMS數據庫、NOSQL、大數據計算系統都已經接入,目前支持數據如下圖,詳情請點擊:(https://github.com/alibaba/DataX/blob/master/introduction.md)
1.6 DataX3.0核心架構
DataX3.0開源版本支持單機多線程模式完成同步作業運行,本小節按一個DataX作業聲明周期的時序圖,從整體架構設計非常簡要說明DataX各個模塊相互關系。
核心模塊介紹:
1.DataX完成單個數據同步的作業,我們稱之為Job,DataX接受一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清洗、子任務切分(將單一作業計算轉化為多個子Task).
2.DataXJob啟動后,會根據不同的源端切分策略,將job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都負責一部分數據的同步工作。
3.切分多個Task之后,DataX Job會調用Scheduler模塊兒,根據配置的并發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運行完畢分配好的所有Task,默認單個任務組的并發數量為5.
4:每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader?Channel?Writer的線程來完成任務同步工作。
5.DataX作業運行起來之后,Job監控并等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0.
DataX調度流程:
舉例來說,用戶提交了一個DataX作業,并配置了20個并發,目的是將一個100張分表的mysql數據同步到odps里面。
DataX的調度決策思路是:
1.DataXJob根據分庫分表切分成了100個Task.
2.根據20個并發,DataX計算共需要分配4個TaskGroup. (默認每個TaskGroup的并發數量是5)
3.4個TaskGrou平均切分好的100個Task,每一個TaskGroup負責5個并發共計25個Task.
1.7 DataX 3.0六大核心優勢
? 可靠的數據質量監控
? 完美解決數據傳輸個別類型失真問題 DataX舊版對于部分數據類型(比如時間戳)傳輸一直存在毫秒階段等數據失真情況,新版本DataX3.0已經做到支持所有的強數據類型,每一種插件都有自己的數據類型轉換策略,讓數據可以完整無損的傳輸到目的端。 ? 提供作業全鏈路的流量、數據量運行時監控DataX3.0運行過程中可以將作業本身狀態、數據流量、數據速度、執行進度等信息進行全面的展示,讓用戶可以實時了解作業狀態。并可在作業執行過程中智能判斷源端和目的端的速度對比情況,給予用戶更多性能排查信息。 ? 提供臟數據探測在大量數據的傳輸過程中,必定會由于各種原因導致很多數據傳輸報錯(比如類型轉換錯誤),這種數據DataX認為就是臟數據。DataX目前可以實現臟數據精確過濾、識別、采集、展示,為用戶提供多種的臟數據處理模式,讓用戶準確把控數據質量大關!? 豐富的數據轉換功能
DataX作為一個服務于大數據的ETL工具,除了提供數據快照搬遷功能之外,還提供了豐富數據轉換的功能,讓數據在傳輸過程中可以輕松完成數據脫敏,補全,過濾等數據轉換功能,另外還提供了自動groovy函數,讓用戶自定義轉換函數。詳情請看DataX3的transformer詳細介紹。
? 精準的速度控制
還在為同步過程中對線程存儲壓力影響而擔心嗎?新版本DataX3.0提供了包括通道(并發)、記錄流、字節流三種流控模式,可以隨意控制你的作業速度,讓你的作業在庫可以在承受的范圍內達到最佳的同步速度。
? 強勁的同步性能
DataX3.0每一種讀插件都有一種或多種切分策略,都能將作業合理切分成多個Task并行執行,單機多線程執行模型可以讓DataX速度隨并發成線性增長。在源端和目的端性能都足夠的情況下,單個作業一定可以打滿網卡。另外,DataX團隊對所有的已經接入的插件都做了極致的性能優化,并且做了完整的性能測試。性能測試相關詳情可以參照每單個數據源的詳細介紹:https://github.com/alibaba/DataX/wiki/DataX-all-data-channels
? 健壯的容錯機制
DataX作業是極易受外部因素的干擾,網絡閃斷、數據源不穩定等因素很容易讓同步到一半的作業報錯停止。因此穩定性是DataX的基本要求,在DataX 3.0的設計中,重點完善了框架和插件的穩定性。目前DataX3.0可以做到線程級別、進程級別(暫時未開放)、作業級別多層次局部/全局的重試,保證用戶的作業穩定運行。
? 極簡的使用體驗
? 易用 下載即可用,支持linux和windows,只需要短短幾步驟就可以完成數據的傳輸。 ? 詳細 DataX在運行日志中打印了大量信息,其中包括傳輸速度,Reader、Writer性能,進程CPU,JVM和GC情況等等。? 傳輸過程中打印傳輸速度、精度等
? 傳輸過程中打印進程相關的CPU、JVM等
? 在任務結束之后,打印總體運行情況
1.8 Quick Start
下載地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz Quick start地址:https://github.com/alibaba/DataX/blob/master/userGuid.md1.8.1 System Requirements
? Linux ? JDK(1.8以上,推薦1.8) ? Python(推薦Python2.6.X) ? Apache Maven 3.x (Compile DataX)1.8.2 工具部署
方法一、直接下載DataX工具包:DataX下載地址(http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz)
下載后解壓至本地某個目錄,進入bin目錄,即可運行同步作業:
自檢腳本:python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
方法二:下載DataX源碼,自己編譯:https://github.com/alibaba/DataX
(1)、下載DataX源碼:
(2)、通過maven打包
$ cd {DataX_source_code_home} $ mvn -U clean package assembly:assembly -Dmaven.test.skip=true打包成功,日志顯示如下:
[INFO] BUILD SUCCESS [INFO] ----------------------------------------------------------------- [INFO] Total time: 08:12 min [INFO] Finished at: 2015-12-13T16:26:48+08:00 [INFO] Final Memory: 133M/960M [INFO] -----------------------------------------------------------------打包成功后的DataX包位于{DataX_source_code_home}/target/datax/datax/,結構如下:
$ cd {DataX_source_code_home} $ ls ./target/datax/datax/ bin conf job lib log log_perf plugin script tmp1.8.3 配置示例:從stream讀取數據并打印到控制臺
(1)、第一步、創建創業的的配置文件(json格式)
可以通過命令查看配置模板:python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
根據模板配置json如下:
#stream2stream.json {"job": {"content": [{"reader": {"name": "streamreader","parameter": {"sliceRecordCount": 10,"column": [{"type": "long","value": "10"},{"type": "string","value": "hello,你好,世界-DataX"}]}},"writer": {"name": "streamwriter","parameter": {"encoding": "UTF-8","print": true}}}],"setting": {"speed": {"channel": 5}}} }第二步:啟動DataX
[root@hadoop3 datax]# cd /home/installed/datax/bin/ [root@hadoop3 bin]# python datax.py /home/test/dataxtest/stream2stream.jsonDataX (DATAX-OPENSOURCE-3.0), From Alibaba ! Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.2019-09-09 16:14:17.345 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl 2019-09-09 16:14:17.356 [main] INFO Engine - the machine info => osInfo: Oracle Corporation 1.8 25.161-b12jvmInfo: Linux amd64 3.10.0-693.el7.x86_64cpu num: 4totalPhysicalMemory: -0.00GfreePhysicalMemory: -0.00GmaxFileDescriptorCount: -1currentOpenFileDescriptorCount: -1GC Names [PS MarkSweep, PS Scavenge]MEMORY_NAME | allocation_size | init_size PS Eden Space | 256.00MB | 256.00MB Code Cache | 240.00MB | 2.44MB Compressed Class Space | 1,024.00MB | 0.00MB PS Survivor Space | 42.50MB | 42.50MB PS Old Gen | 683.00MB | 683.00MB Metaspace | -0.00MB | 0.00MB 2019-09-09 16:14:17.375 [main] INFO Engine - {"content":[{"reader":{"name":"streamreader","parameter":{"column":[{"type":"long","value":"10"},{"type":"string","value":"hello,你好,世界-DataX"}],"sliceRecordCount":10}},"writer":{"name":"streamwriter","parameter":{"encoding":"UTF-8","print":true}}}],"setting":{"speed":{"channel":5}} }2019-09-09 16:14:17.404 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null 2019-09-09 16:14:17.406 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0 2019-09-09 16:14:17.406 [main] INFO JobContainer - DataX jobContainer starts job. 2019-09-09 16:14:17.409 [main] INFO JobContainer - Set jobId = 0 2019-09-09 16:14:17.431 [job-0] INFO JobContainer - jobContainer starts to do prepare ... 2019-09-09 16:14:17.432 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do prepare work . 2019-09-09 16:14:17.432 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do prepare work . 2019-09-09 16:14:17.433 [job-0] INFO JobContainer - jobContainer starts to do split ... 2019-09-09 16:14:17.433 [job-0] INFO JobContainer - Job set Channel-Number to 5 channels. 2019-09-09 16:14:17.434 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] splits to [5] tasks. 2019-09-09 16:14:17.435 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] splits to [5] tasks. 2019-09-09 16:14:17.467 [job-0] INFO JobContainer - jobContainer starts to do schedule ... 2019-09-09 16:14:17.485 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups. 2019-09-09 16:14:17.488 [job-0] INFO JobContainer - Running by standalone Mode. 2019-09-09 16:14:17.507 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [5] channels for [5] tasks. 2019-09-09 16:14:17.513 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated. 2019-09-09 16:14:17.513 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated. 2019-09-09 16:14:17.545 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[2] attemptCount[1] is started 2019-09-09 16:14:17.558 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[3] attemptCount[1] is started 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 2019-09-09 16:14:17.580 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[1] attemptCount[1] is started 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 2019-09-09 16:14:17.598 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[4] attemptCount[1] is started 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 2019-09-09 16:14:17.619 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 10 hello,你好,世界-DataX 2019-09-09 16:14:17.731 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[112]ms 2019-09-09 16:14:17.731 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[1] is successed, used[163]ms 2019-09-09 16:14:17.731 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[2] is successed, used[202]ms 2019-09-09 16:14:17.731 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[3] is successed, used[177]ms 2019-09-09 16:14:17.732 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[4] is successed, used[136]ms 2019-09-09 16:14:17.733 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks. 2019-09-09 16:14:27.511 [job-0] INFO StandAloneJobContainerCommunicator - Total 50 records, 950 bytes | Speed 95B/s, 5 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2019-09-09 16:14:27.511 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks. 2019-09-09 16:14:27.511 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work. 2019-09-09 16:14:27.512 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work. 2019-09-09 16:14:27.512 [job-0] INFO JobContainer - DataX jobId [0] completed successfully. 2019-09-09 16:14:27.513 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /home/installed/datax/hook 2019-09-09 16:14:27.515 [job-0] INFO JobContainer - [total cpu info] => averageCpu | maxDeltaCpu | minDeltaCpu -1.00% | -1.00% | -1.00%[total gc info] => NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s 2019-09-09 16:14:27.516 [job-0] INFO JobContainer - PerfTrace not enable! 2019-09-09 16:14:27.516 [job-0] INFO StandAloneJobContainerCommunicator - Total 50 records, 950 bytes | Speed 95B/s, 5 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00% 2019-09-09 16:14:27.517 [job-0] INFO JobContainer - 任務啟動時刻 : 2019-09-09 16:14:17 任務結束時刻 : 2019-09-09 16:14:27 任務總計耗時 : 10s 任務平均流量 : 95B/s 記錄寫入速度 : 5rec/s 讀出記錄總數 : 50 讀寫失敗總數 : 0[root@hadoop3 bin]#1.9 使用DataX進行MySQL數據讀寫
1.9.1 MysqlReader插件文檔
1.9.1.1 快速介紹
MysqlReader插件實現了從Mysql讀取數據。在底層實現上,MysqlReader通過JDBC連接遠程Mysql數據庫,并執行相應的sql語句將數據從mysql庫中SELECT出來。
不同于其他關系型數據庫,MysqlReader不支持FetchSize.
1.9.1.2 實現原理
簡而言之,MysqlReader通過JDBC連接器連接到遠程的Mysql數據庫,并根據用戶配置的信息生成查詢SELECT SQL語句,然后發送到遠程Mysql數據庫,并將該SQL執行返回結果使用DataX自定義的數據類型拼裝為抽象的數據集,并傳遞給下游Writer處理。
對于用戶配置Table、Column、Where的信息,MysqlReader將其拼接為SQL語句發送到Mysql數據庫;對于用戶配置querySql信息,MysqlReader直接將其發送到Mysql數據庫。
1.9.1.3 功能說明
1.9.1.3.1 配置樣例
? 配置一個從Mysql數據庫同步抽取數據到本地的作業:
{"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","column": ["id","name"],"splitPk": "db_id","connection": [{"table": ["table"],"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]}]}},"writer": {"name": "streamwriter","parameter": {"print":true}}}]} }? 配置一個自定義SQL的數據庫同步任務到本地內容的作業:
{"job": {"setting": {"speed": {"channel":1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "root","connection": [{"querySql": ["select db_id,on_line_flag from db_info where db_id < 10;"],"jdbcUrl": ["jdbc:mysql://bad_ip:3306/database","jdbc:mysql://127.0.0.1:bad_port/database","jdbc:mysql://127.0.0.1:3306/database"]}]}},"writer": {"name": "streamwriter","parameter": {"print": false,"encoding": "UTF-8"}}}]} }1.9.1.3.2 參數說明
? jdbcUrl
描述:描述的是到對端數據庫的JDBC連接信息,使用JSON的數組描述,并支持一個庫填寫多個連接地址。之所以使用JSON數組描述連接信息,是因為阿里集團內部支持多個IP探測,如果配置了多個,MysqlReader可以依次探測ip的可連接性,直到選擇一個合法的IP。
如果全部連接失敗,MysqlReader報錯。 注意,jdbcUrl必須包含在connection配置單元中。對于阿里集團外部使用情況,JSON數組填寫一個JDBC連接即可。
jdbcUrl按照Mysql官方規范,并可以填寫連接附件控制信息。具體請參看Mysql官方文檔。
必選:是
默認值:無
? username
描述:數據源的用戶名
必選:是
默認值:無
? password
描述:數據源指定用戶名的密碼
必選:是
默認值:無
? table
描述:所選取的需要同步的表。使用JSON的數組描述,因此支持多張表同時抽取。當配置為多張表時,用戶自己需保證多張表是同一schema結構,MysqlReader不予檢查表是否同一邏輯表。注意,table必須包含在connection配置單元中。
必選:是
默認值:無
? column
描述:所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息。用戶使用代表默認使用所有列配置,例如[’’]。
支持列裁剪,即列可以挑選部分列進行導出。
支持列換序,即列可以不按照表schema信息進行導出。
支持常量配置,用戶需要按照Mysql SQL語法格式: [“id”, “table”, “1”, “‘bazhen.csy’”, “null”, “to_char(a + 1)”, “2.3” , “true”] id為普通列名,table為包含保留在的列名,1為整形數字常量,'bazhen.csy’為字符串常量,null為空指針,to_char(a + 1)為表達式,2.3為浮點數,true為布爾值。
必選:是
默認值:無
? splitPk
描述:MysqlReader進行數據抽取時,如果指定splitPk,表示用戶希望使用splitPk代表的字段進行數據分片,DataX因此會啟動并發任務進行數據同步,這樣可以大大提供數據同步的效能。
推薦splitPk用戶使用表主鍵,因為表主鍵通常情況下比較均勻,因此切分出來的分片也不容易出現數據熱點。
目前splitPk僅支持整形數據切分,不支持浮點、字符串、日期等其他類型。如果用戶指定其他非支持類型,MysqlReader將報錯!
如果splitPk不填寫,包括不提供splitPk或者splitPk值為空,DataX視作使用單通道同步該表數據。
必選:否
默認值:空
? where
描述:篩選條件,MysqlReader根據指定的column、table、where條件拼接SQL,并根據這個SQL進行數據抽取。在實際業務場景中,往往會選擇當天的數據進行同步,可以將where條件指定為gmt_create > $bizdate 。注意:不可以將where條件指定為limit 10,limit不是SQL的合法where子句。
where條件可以有效地進行業務增量同步。如果不填寫where語句,包括不提供where的key或者value,DataX均視作同步全量數據。
必選:否
默認值:無
? querySql
描述:在有些業務場景下,where這一配置項不足以描述所篩選的條件,用戶可以通過該配置型來自定義篩選SQL。當用戶配置了這一項之后,DataX系統就會忽略table,column這些配置型,直接使用這個配置項的內容對數據進行篩選,例如需要進行多表join后同步數據,使用select a,b from table_a join table_b on table_a.id = table_b.id
當用戶配置querySql時,MysqlReader直接忽略table、column、where條件的配置,querySql優先級大于table、column、where選項。
必選:否
默認值:無
1.9.1.3.3 類型轉換
目前MysqlReader支持大部分Mysql類型,但也存在部分個別類型沒有支持的情況,請注意檢查你的類型。
下面列出MysqlReader針對Mysql類型轉換列表:
請注意:
除上述羅列字段類型外,其他類型均不支持。
tinyint(1) DataX視作為整形。
year DataX視作為字符串類型
bit DataX屬于未定義行為。
1.9.1.4 約束限制
1 主備同步數據恢復問題
主備同步問題指Mysql使用主從災備,備庫從主庫不間斷通過binlog恢復數據。由于主備數據同步存在一定的時間差,特別在于某些特定情況,例如網絡延遲等問題,導致備庫同步恢復的數據與主庫有較大差別,導致從備庫同步的數據不是一份當前時間的完整鏡像。
針對這個問題,我們提供了preSql功能,該功能待補充。
2 一致性約束
Mysql在數據存儲劃分中屬于RDBMS系統,對外可以提供強一致性數據查詢接口。例如當一次同步任務啟動運行過程中,當該庫存在其他數據寫入方寫入數據時,MysqlReader完全不會獲取到寫入更新數據,這是由于數據庫本身的快照特性決定的。關于數據庫快照特性,請參看MVCC Wikipedia
上述是在MysqlReader單線程模型下數據同步一致性的特性,由于MysqlReader可以根據用戶配置信息使用了并發數據抽取,因此不能嚴格保證數據一致性:當MysqlReader根據splitPk進行數據切分后,會先后啟動多個并發任務完成數據同步。由于多個并發任務相互之間不屬于同一個讀事務,同時多個并發任務存在時間間隔。因此這份數據并不是完整的、一致的數據快照信息。
針對多線程的一致性快照需求,在技術上目前無法實現,只能從工程角度解決,工程化的方式存在取舍,我們提供幾個解決思路給用戶,用戶可以自行選擇:
? 使用單線程同步,即不再進行數據切片。缺點是速度比較慢,但是能夠很好保證一致性。
? 關閉其他數據寫入方,保證當前數據為靜態數據,例如,鎖表、關閉備庫同步等等。缺點是可能影響在線業務。
3 數據庫編碼問題
Mysql本身的編碼設置非常靈活,包括指定編碼到庫、表、字段級別,甚至可以均不同編碼。優先級從高到低為字段、表、庫、實例。我們不推薦數據庫用戶設置如此混亂的編碼,最好在庫級別就統一到UTF-8。MysqlReader底層使用JDBC進行數據抽取,JDBC天然適配各類編碼,并在底層進行了編碼轉換。因此MysqlReader不需用戶指定編碼,可以自動獲取編碼并轉碼。對于Mysql底層寫入編碼和其設定的編碼不一致的混亂情況,MysqlReader對此無法識別,對此也無法提供解決方案,對于這類情況,導出有可能為亂碼。4 增量數據同步
MysqlReader使用JDBC SELECT語句完成數據抽取工作,因此可以使用SELECT…WHERE…進行增量數據抽取,方式有多種:
對于業務上無字段區分新增、修改數據情況,MysqlReader也無法進行增量數據同步,只能同步全量數據。
5 Sql安全性
MysqlReader提供querySql語句交給用戶自己實現SELECT抽取語句,MysqlReader本身對querySql不做任何安全性校驗。這塊交由DataX用戶方自己保證。
FAQ
Q: MysqlReader同步報錯,報錯信息為XXX
A: 網絡或者權限問題,請使用mysql命令行測試:
如果上述命令也報錯,那可以證實是環境問題,請聯系你的DBA。
1.9.2 DataX MySQLWriter
1 快速介紹
MysqlWriter 插件實現了寫入數據到 Mysql 主庫的目的表的功能。在底層實現上, MysqlWriter 通過 JDBC 連接遠程 Mysql 數據庫,并執行相應的 insert into … 或者 ( replace into …) 的 sql 語句將數據寫入 Mysql,內部會分批次提交入庫,需要數據庫本身采用 innodb 引擎。
MysqlWriter 面向ETL開發工程師,他們使用 MysqlWriter 從數倉導入數據到 Mysql。同時 MysqlWriter 亦可以作為數據遷移工具為DBA等用戶提供服務。
2 實現原理
MysqlWriter 通過 DataX 框架獲取 Reader 生成的協議數據,根據你配置的 writeMode 生成
? insert into…(當主鍵/唯一性索引沖突時會寫不進去沖突的行)
或者
? replace into…(沒有遇到主鍵/唯一性索引沖突時,與 insert into 行為一致,沖突時會用新行替換原有行所有字段) 的語句寫入數據到 Mysql。出于性能考慮,采用了 PreparedStatement + Batch,并且設置了:rewriteBatchedStatements=true,將數據緩沖到線程上下文 Buffer 中,當 Buffer 累計到預定閾值時,才發起寫入請求。
注意:目的表所在數據庫必須是主庫才能寫入數據;整個任務至少需要具備 insert/replace into…的權限,是否需要其他權限,取決于你任務配置中在 preSql 和 postSql 中指定的語句。
3 功能說明
3.1 配置樣例
? 這里使用一份從內存產生到 Mysql 導入的數據。
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "streamreader","parameter": {"column" : [{"value": "DataX","type": "string"},{"value": 19880808,"type": "long"},{"value": "1988-08-08 08:08:08","type": "date"},{"value": true,"type": "bool"},{"value": "test","type": "bytes"}],"sliceRecordCount": 1000}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "insert","username": "root","password": "root","column": ["id","name"],"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from test"],"connection": [{"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk","table": ["test"]}]}}}]} }3.2 參數說明
? jdbcUrl
o 描述:目的數據庫的 JDBC 連接信息。作業運行時,DataX 會在你提供的 jdbcUrl 后面追加如下屬性:yearIsDateType=false&zeroDateTimeBehavior=convertToNull&rewriteBatchedStatements=true o 注意:1、在一個數據庫上只能配置一個 jdbcUrl 值。這與 MysqlReader 支持多個備庫探測不同,因為此處不支持同一個數據庫存在多個主庫的情況(雙主導入數據情況) o 2、jdbcUrl按照Mysql官方規范,并可以填寫連接附加控制信息,比如想指定連接編碼為 gbk ,則在 jdbcUrl 后面追加屬性 useUnicode=true&characterEncoding=gbk。具體請參看 Mysql官方文檔或者咨詢對應 DBA。 o 必選:是 o 默認值:無? username
o 描述:目的數據庫的用戶名 o 必選:是 o 默認值:無? password
o 描述:目的數據庫的密碼 o 必選:是 o 默認值:無? table
o 描述:目的表的表名稱。支持寫入一個或者多個表。當配置為多張表時,必須確保所有表結構保持一致。 o 注意:table 和 jdbcUrl 必須包含在 connection 配置單元中 o 必選:是 o 默認值:無? column
o 描述:目的表需要寫入數據的字段,字段之間用英文逗號分隔。例如: "column": ["id","name","age"]。如果要依次寫入全部列,使用表示, 例如: "column": [""]。 o **column配置項必須指定,不能留空!** o o 注意:1、我們強烈不推薦你這樣配置,因為當你目的表字段個數、類型等有改動時,你的任務可能運行不正確或者失敗 o 2、 column 不能配置任何常量值 o 必選:是 o 默認值:否? session
o 描述: DataX在獲取Mysql連接時,執行session指定的SQL語句,修改當前connection session屬性 o 必須: 否 o 默認值: 空? preSql
o 描述:寫入數據到目的表前,會先執行這里的標準語句。如果 Sql 中有你需要操作到的表名稱,請使用 @table 表示,這樣在實際執行 Sql 語句時,會對變量按照實際表名稱進行替換。比如你的任務是要寫入到目的端的100個同構分表(表名稱為:datax_00,datax01, ... datax_98,datax_99),并且你希望導入數據前,先對表中數據進行刪除操作,那么你可以這樣配置:"preSql":["delete from 表名"],效果是:在執行到每個表寫入數據前,會先執行對應的 delete from 對應表名稱 o 必選:否 o 默認值:無? postSql
o 描述:寫入數據到目的表后,會執行這里的標準語句。(原理同 preSql ) o 必選:否 o 默認值:無? writeMode
o 描述:控制寫入數據到目標表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 語句 o 必選:是 o 所有選項:insert/replace/update o 默認值:insert? batchSize
o 描述:一次性批量提交的記錄數大小,該值可以極大減少DataX與Mysql的網絡交互次數,并提升整體吞吐量。但是該值設置過大可能會造成DataX運行進程OOM情況。 o 必選:否 o 默認值:10243.3 類型轉換
類似 MysqlReader ,目前 MysqlWriter 支持大部分 Mysql 類型,但也存在部分個別類型沒有支持的情況,請注意檢查你的類型。
下面列出 MysqlWriter 針對 Mysql 類型轉換列表:
? bit類型目前是未定義類型轉換
FAQ
Q: MysqlWriter 執行 postSql 語句報錯,那么數據導入到目標數據庫了嗎?
A: DataX 導入過程存在三塊邏輯,pre 操作、導入操作、post 操作,其中任意一環報錯,DataX 作業報錯。由于 DataX 不能保證在同一個事務完成上述幾個操作,因此有可能數據已經落入到目標端。
Q: 按照上述說法,那么有部分臟數據導入數據庫,如果影響到線上數據庫怎么辦?
A: 目前有兩種解法,第一種配置 pre 語句,該 sql 可以清理當天導入數據, DataX 每次導入時候可以把上次清理干凈并導入完整數據。第二種,向臨時表導入數據,完成后再 rename 到線上表。
Q: 上面第二種方法可以避免對線上數據造成影響,那我具體怎樣操作?
A: 可以配置臨時表導入
1.10 Mysql2Hive
接下來將mysql數據庫中的數據寫入hive的案例:
mysql數據庫和表準備:
CREATE DATABASE `complaint_report` DEFAULT CHARACTER SET utf8;USE `complaint_report`; DROP TABLE IF EXISTS `sys_complaint_threshold_value`;CREATE TABLE `sys_complaint_threshold_value` (`id` BIGINT(10) NOT NULL AUTO_INCREMENT,`threshold_type` VARCHAR(16) DEFAULT NULL,`threshold_name` VARCHAR(32) DEFAULT NULL,`threshold_value` SMALLINT(2) DEFAULT '0',`threshold_key` VARCHAR(32) DEFAULT NULL,`operator_msg` VARCHAR(32) DEFAULT NULL,`operator_scope` VARCHAR(16) DEFAULT NULL,`create_date` DATETIME DEFAULT NULL,`create_user` VARCHAR(32) DEFAULT NULL,`update_date` DATETIME DEFAULT NULL,`update_user` VARCHAR(32) DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;Hive中創建庫test_db和表sys_complaint_threshold_value:
use test_db; drop table if exists sys_complaint_threshold_value; CREATE TABLE `sys_complaint_threshold_value`(`id` bigint, `threshold_type` string, `threshold_name` string, `threshold_value` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS ORC;編寫json文件:
{"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","threshold_type","threshold_name","threshold_value"],"splitPk": "id","connection": [{"table": ["sys_complaint_threshold_value"],"jdbcUrl": ["jdbc:mysql://192.168.106.158:3306/complaint_report"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS":"hdfs://hadoop1:9000","fileType":"orc","path":"/user/hive/warehouse/test_db.db/sys_complaint_threshold_value","fileName":"sys_complaint_threshold_value","column":[{"name":"id","type":"BIGINT"},{"name":"threshold_type","type":"STRING"},{"name":"threshold_name","type":"STRING"},{"name":"threshold_value","type": "INT"}],"writeMode": "append","fieldDelimiter": "\t","compress":"NONE"}}}]} }然后執行datax命令:
cd /home/installed/datax/bin/ python datax.py /home/test/dataxtest/mysql2hdfs.json然后到hive中查看狀態:
hive> use test_db; OK Time taken: 0.045 seconds hive> drop table if exists sys_complaint_threshold_value; OK Time taken: 1.739 seconds hive> CREATE TABLE `sys_complaint_threshold_value`(> `id` bigint, > `threshold_type` string, > `threshold_name` string, > `threshold_value` int> )> ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'> STORED AS ORC; OK Time taken: 0.254 seconds hive> select * from sys_complaint_threshold_value; OK 5 tag 疑似虛假值 70 7 tag 職業索賠人值 81 8 tag 職業索賠人值 80 4 tag 聚類相關值 70 2 tag 疑似重復值 84 3 tag 聚類相關值 85 1 remind 疑似重復值 85 6 tag 重大風險值 60 Time taken: 0.221 seconds, Fetched: 8 row(s) hive>總結
以上是生活随笔為你收集整理的datax安装+配置+使用文档的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 建设银行信用卡逾期多久会起诉
- 下一篇: 兴业银行私人银行客户标准