007_Curator框架二
1. 分布式鎖功能
1.1. 在分布式場景中, 我們為了保證數據的一致性, 經常在程序運行的某一個點需要進行同步操作。Java提供的synchronized或者Reentrantlock等, 是同一個應用程序的多個線程的高并發, 并不是多個服務器(分布式)寫同一數據的并發, 這個時候再使用Java提供的鎖, 就會出現分布式不同步的問題。我們使用Curator基于ZooKeeper的特性提供的分布式鎖來處理分布式場景的數據一致性。
2. 分布式鎖實例
2.1. 新建一個名為CuratorDistributed的Java項目, 拷入相關jar
2.2. Java的ReentrantLock鎖
package com.fj.zkcurator;import java.io.File; import java.io.FileOutputStream; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.ReentrantLock;public class Look {public static final ReentrantLock rl = new ReentrantLock();public static void main(String[] args) {try {rl.lock();db();} catch (Exception e) {e.printStackTrace();} finally {rl.unlock();}}public static void db() throws Exception {// 1.獲取連接對象Connection conn = JDBCUtil.getConn(); // 2.創建statement, 跟數據庫打交道, 一定需要這個對象Statement st = conn.createStatement();// 3.執行查詢sql, 獲取ResultSet結果集ResultSet rs = st.executeQuery("select * from product where id = 2");// 4.使用ResultSet結果集遍歷, 下標從1開始List<Product> products = new ArrayList<Product>();while(rs.next()) {products.add(new Product(rs.getInt(1), rs.getString(2), rs.getInt(3)));}// 5.查詢id為2的商品的數量int number = products.get(0).getNumber();File file = new File("Look.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);fos.write((System.currentTimeMillis() + ", number = " + number + "\r\n").getBytes());fos.close();if(number > 0) {// 6.執行查詢sqlst.executeUpdate("update product set number = " + (--number) + " where id = 2");}// 7.釋放資源JDBCUtil.release(conn, st);} }2.3. 在src目錄下創建jdbc.properties
2.4. 數據庫product表
2.5. Product.java
package com.fj.zkcurator;import java.io.Serializable;public class Product implements Serializable {private static final long serialVersionUID = 1L;private Integer id;private String name;private Integer number;public Product() {}public Product(Integer id, String name, Integer number) {this.id = id;this.name = name;this.number = number;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getNumber() {return number;}public void setNumber(Integer number) {this.number = number;}@Overridepublic String toString() {return "Product [id=" + id + ", name=" + name + ", number=" + number + "]";}}2.6. JDBCUtil.java
package com.fj.zkcurator;import java.io.IOException; import java.io.InputStream; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Properties;public class JDBCUtil {private static String driverClass = null;private static String url = null;private static String name = null;private static String password= null;static {try {// 1.創建一個屬性配置對象Properties properties = new Properties();// 2.使用類加載器, 去讀取src底下的資源文件。對應文件位于src目錄底下InputStream is = JDBCUtil.class.getClassLoader().getResourceAsStream("jdbc.properties");// 3.導入輸入流。properties.load(is);// 4.讀取屬性driverClass = properties.getProperty("driverClass");url = properties.getProperty("url");name = properties.getProperty("name");password = properties.getProperty("password");} catch (IOException e) {e.printStackTrace();}}/*** 獲取連接對象*/public static Connection getConn(){Connection conn = null;try {Class.forName(driverClass);conn = DriverManager.getConnection(url, name, password);} catch (Exception e) {e.printStackTrace();}return conn;}/*** 釋放資源* @param conn* @param st* @param rs*/public static void release(Connection conn, Statement st, ResultSet rs){closeRs(rs);closeSt(st);closeConn(conn);}public static void release(Connection conn, Statement st){closeSt(st);closeConn(conn);}private static void closeRs(ResultSet rs){try {if(rs != null){rs.close();}} catch (SQLException e) {e.printStackTrace();}finally{rs = null;}}private static void closeSt(Statement st){try {if(st != null){st.close();}} catch (SQLException e) {e.printStackTrace();}finally{st = null;}}private static void closeConn(Connection conn){try {if(conn != null){conn.close();}} catch (SQLException e) {e.printStackTrace();}finally{conn = null;}}}2.7. 連續運行Look.java十五次(模擬多個應用并發), 出現了分布式并發問題
2.8. 分布式鎖
package com.fj.zkcurator;import java.io.File; import java.io.FileOutputStream; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.List; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry;public class DistributedLook {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/interProcessMutex";public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 分布式鎖InterProcessMutex lock = new InterProcessMutex(cf, PATH);try {// 獲取鎖lock.acquire();db();} catch (Exception e) {e.printStackTrace();} finally {try {// 釋放鎖lock.release();} catch (Exception e) {e.printStackTrace();}}// 5. 關閉連接cf.close();}public static void db() throws Exception {// 1.獲取連接對象Connection conn = JDBCUtil.getConn(); // 2.創建statement, 跟數據庫打交道, 一定需要這個對象Statement st = conn.createStatement();// 3.執行查詢sql, 獲取ResultSet結果集ResultSet rs = st.executeQuery("select * from product where id = 2");// 4.使用ResultSet結果集遍歷, 下標從1開始List<Product> products = new ArrayList<Product>();while(rs.next()) {products.add(new Product(rs.getInt(1), rs.getString(2), rs.getInt(3)));}// 5.查詢id為2的商品的數量int number = products.get(0).getNumber();File file = new File("DistributedLook.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);fos.write((System.currentTimeMillis() + ", number = " + number + "\r\n").getBytes());fos.close();if(number > 0) {// 6.執行查詢sqlst.executeUpdate("update product set number = " + (--number) + " where id = 2");}// 7.釋放資源JDBCUtil.release(conn, st);} }2.9. 連續運行DistributedLook.java十五次(模擬多個應用并發)
3.?分布式計數器功能
3.1. 一說到分布式計數器, 你可能腦海里想到了AtomicInteger這種經典的方式, 如果針對于一個JVM的場景當然沒問題, 但是我們現在是分布式場景下, 這就需要利用Curator框架的DistributedAtomicInteger了。
4. 分布式計數器例子
4.1. 分布式計數器
package com.fj.zkcurator;import java.io.File; import java.io.FileOutputStream; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes;/*** 分布式計數器*/ public class DistributedCounter {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedAtomicInteger";public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 分布式原子整形DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, PATH, new RetryNTimes(3, 1000));try {// 遞增操作AtomicValue<Integer> value = atomicInteger.increment();// 檢查是否操作成功if(value.succeeded()) {// 操作前和操作后的值File file = new File("DistributedCounter.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);fos.write(("preValue = " + value.preValue() + ", postValue = " + value.postValue() + "\r\n").getBytes());fos.close();}} catch (Exception e1) {e1.printStackTrace();}// 5. 關閉連接cf.close();} }4.2. 連續運行DistributedCounter.java十五次(模擬多個應用并發)
5. Barrier屏障
5.1. 首先, 得介紹下Barrier的概念, Barrier從字面理解是屏障的意思, 主要是用作集合線程, 然后再一起往下執行。再具體一點, 在Barrier之前, 若干個thread各自執行, 然后到了Barrier的時候停下, 等待規定數目的所有的其他線程到達這個Barrier, 之后再一起通過這個Barrier各自干自己的事情。
5.2. 在計算機的世界里, Barrier可以解決的問題很多, 比如, 一個程序有若干個線程并發的從網站上下載一個大型xml文件, 這個過程可以相互獨立, 因為一個文件的各個部分并不相關。而在處理這個文件的時候, 可能需要一個完整的文件, 所以, 需要有一條虛擬的線讓這些并發的部分集合一下從而可以拼接成為一個完整的文件, 可能是為了后續處理也可能是為了計算hash值來驗證文件的完整性。而后, 再交由下一步處理。
6. 同時開始任務實例
6.1. 同時開始任務
package com.fj.zkcurator.barrier;import java.io.File; import java.io.FileOutputStream; import java.util.UUID; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.ExponentialBackoffRetry;/*** 分布式屏障, 同時開始任務, 需要它人觸發任務的開啟。*/ public class SameTimeStartWork {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedBarrier";public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 分布式屏障DistributedBarrier barrier = new DistributedBarrier(cf, PATH);try {File file = new File("SameTimeStartWork.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);String uuid = UUID.randomUUID().toString();fos.write((uuid + "準備就緒..." + "\r\n").getBytes());// 5. 設置屏障barrier.setBarrier();// 6. 等待它人觸發才開始工作barrier.waitOnBarrier();fos.write((uuid + "開始任務..." + "\r\n").getBytes());Thread.sleep(1000);fos.write((uuid + "完成任務..." + "\r\n").getBytes());fos.close();} catch (Exception e1) {e1.printStackTrace();}// 7. 關閉連接cf.close();} }6.2. 移除屏障
package com.fj.zkcurator.barrier;import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.ExponentialBackoffRetry;public class RemoveDistributedBarrier {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedBarrier";public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 分布式屏障DistributedBarrier barrier = new DistributedBarrier(cf, PATH);try {// 5. 作為第三者移除屏障barrier.removeBarrier();} catch (Exception e) {e.printStackTrace();}// 7. 關閉連接cf.close();} }6.3. 連續運行SameTimeStartWork.java五次
6.4. 運行RemoveDistributedBarrier.java移除屏障
7. 同時開始任務同時結束任務實例
7.1. 同時開始任務同時結束, 設置執行者到達5個以上開始任務
package com.fj.zkcurator.barrier;import java.io.File; import java.io.FileOutputStream; import java.util.UUID; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier; import org.apache.curator.retry.ExponentialBackoffRetry;/*** 分布式的同時開始任務, 同時離開任務。*/ public class SameTimeEnterAndLeave {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedDoubleBarrier";public static void main(String[] args) {// 1. 重試策略, 初試時間為1s, 最多可重試10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通過工廠創建連接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 開啟連接cf.start();// 4. 分布式雙重屏障DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, PATH, 5);try {// 操作前和操作后的值File file = new File("SameTimeEnterAndLeave.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);String uuid = UUID.randomUUID().toString();fos.write((uuid + "準備就緒..." + "\r\n").getBytes());// 5. 進入任務barrier.enter();fos.write((uuid + "完成任務1..." + "\r\n").getBytes());Thread.sleep(1000);fos.write((uuid + "完成任務2..." + "\r\n").getBytes());Thread.sleep(1000);fos.write((uuid + "完成所有任務..." + "\r\n").getBytes());// 6. 離開任務barrier.leave();fos.write((uuid + "退出任務..." + "\r\n").getBytes());fos.close();} catch (Exception e1) {e1.printStackTrace();}// 5. 關閉連接cf.close();} }7.2. 運行SameTimeEnterAndLeave.java四次, 沒有開始任務, 被阻塞
7.3. 再運行SameTimeEnterAndLeave.java一次
總結
以上是生活随笔為你收集整理的007_Curator框架二的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 006_Curator框架一
- 下一篇: 001_FastDFS介绍