MySql自动同步主库数据(Canal)
MySql自動(dòng)同步主庫數(shù)據(jù)(Canal)
上篇文章介紹了MongoDB數(shù)據(jù)庫模擬從庫實(shí)現(xiàn)主從復(fù)制效果,不會(huì)影響線上數(shù)據(jù),本文介紹MySql模擬從庫實(shí)現(xiàn)主從復(fù)制。
-
Find Action Ctrl+Shift+A,在此對話框中輸入想操作的英文立即出現(xiàn)想的操作
-
System.out.println輸入sout即可
-
for循環(huán)N輸入N.for即可
-
return n;輸入n.return即可
-
離線寫博客
-
導(dǎo)入導(dǎo)出Markdown文件
-
豐富的快捷鍵
MySql主從復(fù)制原理
MySql主從復(fù)制指數(shù)據(jù)從一個(gè)MySql復(fù)制到一個(gè)或多個(gè)MySql服務(wù)器中,基于binlog采用異步復(fù)制直接IO,這樣數(shù)據(jù)訪問不用一直訪問主庫來完成,主庫負(fù)責(zé)修改,從庫負(fù)責(zé)讀取,實(shí)現(xiàn)讀寫分離。
- master會(huì)將變更數(shù)據(jù)存入binlog文件中。
- slave連接主庫時(shí)主庫會(huì)開啟一個(gè)dump線程發(fā)送binlog內(nèi)容。
- slave會(huì)啟動(dòng)一個(gè)I\O Thread請求主庫發(fā)送binlog記錄并保存到中繼日志中。
- 從庫啟動(dòng)Sql Thread線程讀取中繼日志,本地重放,使得數(shù)據(jù)與主庫保持一致,最后I/O Thread和SQL Thread將進(jìn)入睡眠狀態(tài),等待下一次被喚醒。
總結(jié):
- 從庫生成兩個(gè)線程I/O Thread、Sql Thread。
- I/O Thread請求主庫binlog并寫到中繼日志(relay-log)中。
- 主庫生成一個(gè)dump線程,給從庫傳binlog。
- Sql線程讀取中繼日志解析并執(zhí)行成sql
Canal原理
canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議,MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
canal 解析 binary log 對象(原始為 byte 流) —— [ 官網(wǎng)地址 ]
下載源碼修改:conf/example/instance.properties文件
//設(shè)置主庫IP和端口 canal.instance.master.address=IP:端口 //設(shè)置同步開始時(shí)間(毫秒),可忽略 canal.instance.master.timestamp=啟動(dòng)canal:/bin/startup.sh
本文用的阿里云服務(wù),不需要有多過設(shè)置,其它小伙伴參照官網(wǎng)配置一下即可。
代碼
SimpleCanalClientExample.class
package canal;import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.Message; import org.springframework.util.StringUtils;import java.net.InetSocketAddress; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.List;/*** @Author: LailaiMonkey* @Description:* @Date:Created in 2021-01-04 11:53* @Modified By:*/ public class SimpleCanalClientExample {static Statement statement = null;public static void main(String args[]) {//獲得存儲sqlStatementgetStatement();// 創(chuàng)建Canal鏈接CanalConnector connector = getCanalConnector();int batchSize = 10000;while (true) {// 獲取指定數(shù)量的數(shù)據(jù)Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException ignored) {}} else {findEntry(message.getEntries());}connector.ack(batchId); // 提交確認(rèn)// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)}}private static CanalConnector getCanalConnector() {// 創(chuàng)建鏈接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,11111), "example", "", "");connector.connect();connector.subscribe(".*\\..*");connector.rollback();return connector;}private static void getStatement() {//驅(qū)動(dòng)程序名String driver = "com.mysql.cj.jdbc.Driver";// URL指向要訪問的數(shù)據(jù)庫名scutcsString url = "jdbc:mysql://從庫IP:3306/數(shù)據(jù)庫名?useLegacyDatetimeCode=false&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull";// MySQL配置時(shí)的用戶名String user = "賬號";// Java連接MySQL配置時(shí)的密碼String password = "密碼";try {// 加載驅(qū)動(dòng)程序Class.forName(driver);// 連續(xù)數(shù)據(jù)庫Connection conn = DriverManager.getConnection(url, user, password);if (!conn.isClosed()) {System.out.println("Succeeded connecting to the Database!");}// statement用來執(zhí)行SQL語句statement = conn.createStatement();} catch (ClassNotFoundException | SQLException e) {e.printStackTrace();}}private static void findEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {String sql = "delete from " + entry.getHeader().getTableName();sql += deleteSql(rowData.getBeforeColumnsList());doSql(sql);} else if (eventType == CanalEntry.EventType.INSERT) {String sql = "replace into " + entry.getHeader().getTableName();sql += insertSql(rowData.getAfterColumnsList());doSql(sql);} else {String sql = "update " + entry.getHeader().getTableName();sql += updateSql(rowData.getAfterColumnsList());doSql(sql);}}}}private static String updateSql(List<Column> columns) {StringBuilder str = new StringBuilder(" set ");CanalField canalField = getField(columns);List<String> fields = canalField.getFields();List<String> values = canalField.getValues();for (int i = 0; i < fields.size(); i++) {str.append(fields.get(i)).append(" = ").append(values.get(i)).append(",");}str.deleteCharAt(str.length() - 1);//以主鍵為條件更新str.append(" where id = ").append(canalField.getId());return str.toString();}private static String insertSql(List<Column> columns) {StringBuilder str = new StringBuilder(" ( ");CanalField canalField = getField(columns);List<String> fields = canalField.getFields();List<String> values = canalField.getValues();str.append(String.join(",", fields)).append(") values (").append(String.join(",", values)).append(" ) ");return str.toString();}private static String deleteSql(List<Column> columns) {StringBuilder str = new StringBuilder(" where id = ");String id = "";for (Column column : columns) {if ("id".equals(column.getName())) {id = column.getValue();}}str.append(id);return str.toString();}private static CanalField getField(List<Column> columns) {CanalField canalField = new CanalField();List<String> field = new ArrayList<>();List<String> value = new ArrayList<>();for (Column column : columns) {if (StringUtils.isEmpty(column.getValue())) {continue;}//獲得id(主鍵)字段值if ("id".equals(column.getName())) {canalField.setId(column.getValue());}//獲得字段和值int sqlType = column.getSqlType();if (sqlType <= 8 && sqlType >= 2 || sqlType <= -5 && sqlType >= -7) {field.add("`" + column.getName() + "`");value.add(column.getValue());} else {field.add("`" + column.getName() + "`");value.add("'" + column.getValue() + "'");}}canalField.setFields(field);canalField.setValues(value);return canalField;}private static void doSql(String sql) {try {// 要執(zhí)行的SQL語句statement.executeUpdate(sql);} catch (Exception e) {System.out.println("執(zhí)行失敗:" + sql);e.printStackTrace();}}}CanalField.class
package canal;import java.util.List;/*** @Author: LailaiMonkey* @Description:* @Date:Created in 2021-01-05 14:03* @Modified By:*/ public class CanalField {private String id;private List<String> fields;private List<String> values;public String getId() {return id;}public void setId(String id) {this.id = id;}public List<String> getFields() {return fields;}public void setFields(List<String> fields) {this.fields = fields;}public List<String> getValues() {return values;}public void setValues(List<String> values) {this.values = values;} }需要先初始化從庫表結(jié)構(gòu)及數(shù)據(jù),啟動(dòng)java程序?qū)崿F(xiàn)自己同步。—— [ 源碼地址 ]
總結(jié)
以上是生活随笔為你收集整理的MySql自动同步主库数据(Canal)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python实现ORC/文字识别之pyt
- 下一篇: SDOI 2014 数表 题解