Logstash同步mysql一对多数据到ES(踩坑日记系列)
場景:
Logstash 、Kibana、ES版本:6.3.1。
使用Logstash從mysql同步用戶和用戶所有的寵物到ES中。
希望的格式:
"register_name": "孟林潔","id": 80469531,"pets": [{"breed_name": "萬能梗","birthday": null,"pet_id": 999044,"name": "一只狗","images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}","breed_id": 130},{"breed_name": "萬能梗","birthday": null,"pet_id": 999097,"name": "一只狗2","images": "{\"result\":[\"https://petkit-img3.oss-cn-hangzhou.aliyuncs.com/img/tmp_6f4c8e92de0c53ab355fdb69214d4bf3.jpg\"]}","breed_id": 130}],"mobile": "*******","avatar": null,"pet_list": [999044,999097]問題
解決:
1、解決logstash同步nested嵌套類型到ES中
先創建索引,并且修改索引類型為nested
使用logstash的過濾器中aggregate插件進行數據聚合。
配置文件jdbc3.conf
input {stdin {}jdbc {jdbc_driver_library => "../mysql-connector-java-6.0.6.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://****.com:3306/food-dev"jdbc_user => "****"jdbc_password => "****"#jdbc_paging_enabled => "true"#jdbc_page_size => "50"clean_run => trueuse_column_value => truerecord_last_run => "true"tracking_column => "id"schedule => "*/1 * * * *"#last_run_metadata_path => "/Users/menglinjie/ES-node/testdata.text"statement => "select u.id,u.register_name,u.mobile,u.avatar,u.status,svp.id as pet_id,svp.name,svp.images,svp.gender,svp.birthday,pb.id as breed_id,pb.name as breed_name from user u left join store_vip_pet svp on svp.user_id = u.id and svp.pet_status = 1 left join pet_breed pb on svp.breed_id = pb.id order by u.id desc"}}filter { #這里做聚合aggregate {task_id => "%{id}"code => "map['id'] = event.get('id')map['register_name'] = event.get('register_name')map['mobile'] = event.get('mobile')map['avatar'] = event.get('avatar')map['pet_list'] ||=[]map['pets'] ||=[]if (event.get('pet_id') != nil)if !(map['pet_list'].include? event.get('pet_id')) map['pet_list'] << event.get('pet_id') map['pets'] << {'pet_id' => event.get('pet_id'),'name' => event.get('name'),'images' => event.get('images'),'breed_id' => event.get('breed_id'),'breed_name' => event.get('breed_name'),'birthday' => event.get('birthday')}endendevent.cancel()"push_previous_map_as_event => truetimeout => 5}json {source => "message"remove_field => ["message"]#remove_field => ["message", "type", "@timestamp", "@version"]}mutate {#將不需要的JSON字段過濾,且不會被存入 ES 中remove_field => ["tags", "@timestamp", "@version"]} }output {stdout {#codec => json_lines}elasticsearch {hosts => ["127.0.0.1:9200"]index => "user"document_id => "%{id}"} }2、解決聚合過程中子數組對象丟失
剛開始考慮到是否是sql查詢分頁問題,導致多寵物沒有一起聚合,而是分開聚合。但是通過看日志和數據發現每次丟失的數據不同,沒有任何規律性。
隨機性的問題讓我想到多線程,然后查找logstash配置。在config/logstash.yml中有以下配置:
# ------------ Pipeline Settings -------------- # # The ID of the pipeline. # 管道id # pipeline.id: test1 # # Set the number of workers that will, in parallel, execute the filters+outputs # stage of the pipeline. # # This defaults to the number of the host's CPU cores. #output 和 filter的線程數,默認是cpu核數 # pipeline.workers: 1 # # How many events to retrieve from inputs before sending to filters+workers # # pipeline.batch.size: 1000 # # How long to wait in milliseconds while polling for the next event # before dispatching an undersized batch to filters+outputs # # pipeline.batch.delay: 50問題定位:多線程跑聚合過程中,同一個用戶的多個寵物可能被分配到不通過的線程,分別做不同的聚合,導致一個用戶存在多條數據,分別擁有不同的寵物,然后多線程的進行輸出到ES,ES保存過程中會把存在的數據給更新掉,這就是我的寵物丟失的原因,多線程分配的隨機性導致數據也隨機丟失。
嘗試修改線程數,驗證猜想是否正確。
指定配置文件運行會使logstash忽略logstash.yml配置。所以在logstash.yml指定配置文件,運行logstash時不用指定配置,logstash會自動尋找logstash.yml配置
# path.config: /Users/menglinjie/ES-node/logstash-6.3.1/conf.d驗證后確認猜想正確。
回想剛才把線程數設置為1,這樣肯定會影響性能的吧,萬一以后我有不需要聚合的的數據時完全可以多線程跑。Logstash提供的pipelines.yml可以配置多管道,使不同的同步任務綁定不同管道配置。
這里pipeline.workers: 4,pipeline.output.workers: 3,那么執行聚合的filter就是1,這樣可以單線程聚合,多線程輸出。
多個任務可以配置多個管道,pipeline.id標示管道唯一性。
- pipeline.id: user_pipelinepipeline.workers: 4pipeline.batch.size: 1000# 輸出pipeline.output.workers: 3# 配置文件位置path.config: "/Users/menglinjie/ES-node/logstash-6.3.1/conf.d/*.conf"# 對基于磁盤的排隊進行“持久化”。默認值是內存queue.type: persisted更新:
影響聚合結果的還有sql語句!!
sql語句必須根據聚合task_id排序,也就是需要聚合的數據必須排在一起。否則map[‘pets’]會被覆蓋掉,導致數據丟失。
3、logstash同步時少同步一條數據,在停止logstash服務時才進行同步
在filter 聚合配置中添加:
timeout => 3
filter aggregate 創建中 event map 并不知道我、這次事件是不是應該結束,也就是它也不知道到那一條才是最后一條, 因此設置一個 timeout 告訴它這個時間執行多少秒就結束繼續執行第二個。但這樣并不是很嚴謹,因為你也不確定你的 event map 到底要執行多久 。最好的方式是 我們應該給定一個 task end 的條件 ES官網關于 aggregate 的說明
4、es 配置id的問題,必須有唯一性,否則被覆蓋
參考鏈接:
https://segmentfault.com/a/1190000016592277
https://segmentfault.com/q/1010000016861266
https://blog.csdn.net/weixin_33910460/article/details/88719101
https://elasticsearch.cn/question/6648
總結
以上是生活随笔為你收集整理的Logstash同步mysql一对多数据到ES(踩坑日记系列)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java进阶:ReentrantLock
- 下一篇: Docker安装Logstash7.7.