insert时调用本身字段_「技术篇」ETL工具Kettle数据对比同步以及Java程序中调用
作為一個技術棧出身的攻城獅,雖然走上管理之路,但是技術是不能扔下的,時不時的拿起來重溫一下,理論與實踐相結合...
使用背景:
住建部某區塊鏈共享平臺(下游系統)需要自于上游系統的生產庫數據,數據量不大十幾萬,而且須要每天提供截止當天的增量數據,要求每條數據給出數據變化時間及標示,即數據若是插入有插入時間和插入標示;若是改動,有改動時間和改動標示;若是刪除需邏輯刪除、有刪除標示且有刪除時間等等。
解決辦法:
kettle的轉換ktr里有一個圖元叫做合并記錄。能夠把兩個表輸入分為源和目的依據唯一標示進行全量比對。得到增量的數據流,再寫入到中間表里,即能夠實現該需求。實現功能的ktr如下圖:
1 輸入源 合并記錄
上圖。最左側是兩個表輸入,上面一個GRZHXX 是上游系統,下一個是SEND_GRZHXX目標數據
GRZHXX的數據來源SQL:注意一定要排序操作!
SEND_GRZHXX的數據來源SQL: 注意一定要排序操作!
2 值映射
例如以下圖, 將輸入源與目的源的每一個字段數據依據唯一字段比較后到值映射圖元,使用字段名為起的后面用到的一個變量名(可隨意起)。源值列為系統默認
1 代表輸入源與目標源比較后刪除的數據標志
2 是輸入源新增
3 是輸入源更新
4是不變 目標值 是自己起的名字 能夠依據須要不變 或改動
標志字段:設置標志字段的名稱,標志字段用于保存比較的結果,比較結果有下列幾種。
1. “identical” – 舊數據和新數據一樣
2. “changed” – 數據發生了變化;
3. “new” – 新數據中有而舊數據中沒有的記錄
4. “deleted” –舊數據中有而新數據中沒有的記錄
3過濾無效記錄
例如以下圖,條件 flagfield is not null (后面沒顯示完),若條件成立發送給下一步zh_check_date,若不成立發送給空操作。
通過下面的過濾標志過濾各種類型數據。
4 新增數據推斷add
例如以下圖,zh_check_date 為獲取當前的系統時間變量。
add2 圖元打開為 畫圈的圖 左側的地方 ,條件 flagfield = add_rec ,若成立及發送數據到中間畫圈的add圖元,若不成立則 發送數據到mod_del圖元 (矩形紅框) 如果為true數據到 add圖元,打開 即是下圖右側 部分 填寫須要插入的數據字段 再到insert圖元 ,就可以把輸入源比目標源多的新數據更新到目標表來 而且加上時間戳。
5 改動或刪除 mod_rec
例如以下圖,如果數據從add2發送而來。 打開矩形框 mod_del 條件flagfield = mod_rec 若true 則發送到 update mapping 若flase 則發送到delete mappinig 。如果是更新,則右側 的查詢keyword 是 更新的比較字段 即是一開始合并記錄的比較字段 ,更新字段就是 除了比較字段之外的其它字段。這樣數據就能夠從 輸入源更新到目標源。
6 刪除數據 delete mapping
例如以下圖。如果數據流到了 delete mapping 。則 僅僅須要依據比較字段把 目標表的時間戳更新 和 狀態更新為del_rec就可以,下圖zh_check_type 為flag_field的值
7 數據結果查看
zh_check_type和check_date已更新成功
8 程序調用 (定時執行、批量執行)
在程序中直接調用ktr執行數據同步操作,需要注意的是,從Kettle工具中拷貝jar到項目中
具體實現代碼如下:
import java.text.SimpleDateFormat;import java.util.Date;?import org.pentaho.di.core.KettleEnvironment;import org.pentaho.di.core.exception.KettleException;import org.pentaho.di.core.util.EnvUtil;import org.pentaho.di.job.Job;import org.pentaho.di.job.JobMeta;import org.pentaho.di.trans.Trans;import org.pentaho.di.trans.TransMeta;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;?@Componentpublic class TestKettle {? ? ? public static String filePath = " pdi-ce-5.1.0.0-752/data-integration/";? ? ? public static String fileName = "GRZHXX.ktr";?? ? ? public static void main(String[] args) {? ? ? ? ? ? ?System.out.println("============>>>>>> job開始執行 【 "? ? ? ? ? ? ? ? ? ? ? ? ? ?+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.format(new Date()) + "】");? ? ? ? ? ? ?String jobFileName = filePath + "TestJob.kjb";? ? ? ? ? ? ?try {? ? ? ? ? ? ? ? ? ? long startTime = System.currentTimeMillis();? ? ? ? ? ? ? ? ? ? // callKettleJob(jobFileName);? ? ? ? ? ? ? ? ? ? callNativeTrans(filePath + fileName);? ? ? ? ? ? ? ? ? ? long endTime = System.currentTimeMillis();? ? ? ? ? ? ? ? ? ? System.out.println("數據抽取任務運行時間:" + (endTime - startTime) / 1000+ "S");? ? ? ? ? ? ?} catch (KettleException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ?}? ? ? }?? ? ? @Scheduled(cron = "0 30 16 * * ? ")? ? ? public void process() {? ? ? ? ? ? ?System.out.println("============>>>>>> job開始執行 【 "? ? ? ? ? ? ? ? ? ? ? ? ? ?+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?.format(new Date()) + "】");? ? ? ? ? ? ?String jobFileName = filePath + "TestJob.kjb";? ? ? ? ? ? ?try {? ? ? ? ? ? ? ? ? ? long startTime = System.currentTimeMillis();? ? ? ? ? ? ? ? ? ? // callKettleJob(jobFileName);? ? ? ? ? ? ? ? ? ? callNativeTrans(filePath + fileName);? ? ? ? ? ? ? ? ? ? long endTime = System.currentTimeMillis();? ? ? ? ? ? ? ? ? ? System.out.println("數據抽取任務運行時間:" + (endTime - startTime) / 1000+ "S");? ? ? ? ? ? ?} catch (KettleException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ?}? ? ? }?? ? ? public static void callKettleJob(String jobFileName) throws KettleException {? ? ? ? ? ? ?KettleEnvironment.init();// 初始化? ? ? ? ? ? ?JobMeta jobMeta = new JobMeta(jobFileName, null);// jobFileName是job腳本路徑及文件名? ? ? ? ? ? ?Job job = new Job(null, jobMeta);? ? ? ? ? ? ?// job.setVariable("AJBH", "123");// 傳入參數? ? ? ? ? ? ?job.start();? ? ? ? ? ? ?job.waitUntilFinished();? ? ? ? ? ? ?if (job.getErrors() > 0) {? ? ? ? ? ? ? ? ? ? throw new KettleException("job執行不成功,有步驟失敗!");? ? ? ? ? ? ?}? ? ? ? ? ? ?System.out.println("============>>>>>> job執行完成");? ? ? }?? ? ? /**? ? ? ?* 調用本地的轉換文件? ? ? ?*/? ? ? public static void callNativeTrans(String transFileName)? ? ? ? ? ? ? ? ? ? throws KettleException {? ? ? ? ? ? ?KettleEnvironment.init();// 初始化? ? ? ? ? ? ?// 轉換元對象? ? ? ? ? ? ?TransMeta transMeta = new TransMeta(transFileName);? ? ? ? ? ? ?// 轉換? ? ? ? ? ? ?Trans trans = new Trans(transMeta);? ? ? ? ? ? ?// 執行轉換? ? ? ? ? ? ?trans.execute(null);? ? ? ? ? ? ?// 等待轉換執行結束? ? ? ? ? ? ?trans.waitUntilFinished();? ? ? }?}對于定時執行,可以執行kettle里面的job文件也可以通過程序定時任務來控制同時還可以多線程同時操作:public static void main(String arg[]) throws Exception? ?{? ? ? String idname="7";//參數值? ? ? String filename1="./test1.ktr";//ktr路徑? ? ? String filename2="./test2.ktr";//ktr路徑? ? ? KettleEnvironment.init();//初始化kettle環境? ? ? /*定義文件路徑,模型元數據,模型三個容器*/? ? ? ArrayList list1=new ArrayList();? ? ? ArrayList list2=new ArrayList();? ? ? ArrayList list3=new ArrayList();? ? ? /*添加文件對象*/? ? ? list1.add(filename1);? ? ? list1.add(filename2);? ? ? //System.out.print("=======================1:"+list1.get(0));? ? ? //System.out.print("=======================2:"+list1.get(1));? ? ? /*遍歷文件對象,創建轉換元數據對象*/? ? ? for(int i=0;i總結
以上是生活随笔為你收集整理的insert时调用本身字段_「技术篇」ETL工具Kettle数据对比同步以及Java程序中调用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: idea redis 插件_Redis客
- 下一篇: python3.7.3 离线安装para