RedisCluster读写分离改造
生活随笔
收集整理的這篇文章主要介紹了
RedisCluster读写分离改造
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
RedisCluster模式啟動的環境中,通過Redis中的每個連接,都可以訪問 cluster nodes 訪問到所有的服務器列表以及其所處于的角色(master/slave)。對于RedisCluster來說,在實際運行時,只會訪問到其中的master節點,slave既不能用于write操作,也不能進行read。 原有JedisCluster
? JedisCluster的UML圖結果如上圖所示,在每次執行JedisCluster相關操作時,都需要通過JedisClusterCommand提供connection來進行,該connection需要根據key來計算出對應的slot,以便可以進行后續redis相關操作。 return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {@Overridepublic String execute(Jedis connection) {return connection.get(key);} }.run(key); ? 在JedisClusterCommand.run方法中,會根據slot計算出對應的connection。 為了盡量減少對原有代碼的侵入性,我們需要定義線程上下文(ThreadLocal)級別的變量,其中內置了訪問的粒度(READ/WRITE),以便訪問的為master還是slave Redis數據源。 改進的ZhenJedisCluster
?
? 根據jedis連接來獲得Cluster結構: private Map<String, ClusterNodeObject> getClusterNodes(Jedis jedis) {Map<String, ClusterNodeObject> hpToNodeObjectMap = new HashMap<>();String clusterNodesCommand = jedis.clusterNodes();String[] allNodes = clusterNodesCommand.split("\n");for (String allNode : allNodes) {String[] splits = allNode.split(" ");String hostAndPort = splits[1];ClusterNodeObject clusterNodeObject =new ClusterNodeObject(splits[0], splits[1], splits[2].contains("master"), splits[3],Long.parseLong(splits[4]), Long.parseLong(splits[5]), splits[6],splits[7].equalsIgnoreCase("connected"), splits.length == 9 ? splits[8] : null);hpToNodeObjectMap.put(hostAndPort, clusterNodeObject);}return hpToNodeObjectMap; } ?? 將其整理成可用結構,分出master節點,以及slave節點對應的master節點,區分讀寫: Map<String, ZhenJedisPool> masterNodes = new HashMap<>(); for (ClusterNodeObject clusterNodeObject : clusterNodeObjects) {String ipPort = clusterNodeObject.getIpPort();String[] ipPortSplits = ipPort.split(":");HostAndPort hostAndPort = new HostAndPort(ipPortSplits[0], Integer.parseInt(ipPortSplits[1]));setNodeIfNotExist(hostAndPort);if (clusterNodeObject.isMaster()) {ZhenJedisPool zhenJedisPool = new ZhenJedisPool();zhenJedisPool.setWritePool(nodes.get(ipPort));masterNodes.put(clusterNodeObject.getNodeId(), zhenJedisPool);String[] slotSplits = clusterNodeObject.getSlot().split("-");for (int i = Integer.parseInt(slotSplits[0]); i <= Integer.parseInt(slotSplits[1]); i++) {this.slots.put(i, zhenJedisPool);}} }for (ClusterNodeObject clusterNodeObject : clusterNodeObjects) {if (!clusterNodeObject.isMaster()) {String masterNodeId = clusterNodeObject.getMasterNodeId();ZhenJedisPool zhenJedisPool = masterNodes.get(masterNodeId);zhenJedisPool.getReadPools().add(nodes.get(clusterNodeObject.getIpPort()));} } ?? 改進的結構中,需要getConnectionFromSlot方法需要調用ZhenJedisClusterInfoCache.getSlotPool來根據slot以及當前讀寫狀態(read/write)來獲取對應的Jedis連接: public JedisPool getSlotPool(int slot, ZhenQueryContext queryContext) {r.lock();try {ZhenJedisPool zhenJedisPool = slots.get(slot);if (queryContext.getOperationType() == OperationType.WRITE) {return zhenJedisPool.getWritePool();} else {List<JedisPool> readPools = zhenJedisPool.getReadPools();return readPools.get(new Random().nextInt(readPools.size()));}} finally {r.unlock();} } ?? ? 對于JedisCluster中,在執行每一步操作之前,都需要設置對應的讀寫上下文,便于在內部選擇master/slave connection: @Override public String get(final String key) {ZhenQueryContextHolder.getInstance().setQueryContext(new ZhenQueryContext(OperationType.READ));return new ZhenJedisClusterCommand<String>(connectionHandler, maxRedirections) {@Overridepublic String execute(Jedis connection) {return connection.get(key);}}.run(key); } ? 處理完成后,只需要在執行時使用我們提供的JedisCluster即可正常運行。 執行驗證 當前節點如果為slave,也不能只讀,需要額外設置屬性 slave-read-only
? 可以證實,經過改造后確實調用到了指定的master節點上: 5974ed7dd81c112d9a2354a0a985995913b4702c 192.168.1.137:6389 master - 0 1470273087539 26 connected 0-5640 d08dc883ee4fcb90c4bb47992ee03e6474398324 192.168.1.137:6390 master - 0 1470273086034 25 connected 5641-11040 ffb4db4e1ced0f91ea66cd2335f7e4eadc29fd56 192.168.1.138:6390 slave 5974ed7dd81c112d9a2354a0a985995913b4702c 0 1470273087539 26 connected c69b521a30336caf8bce078047cf9bb5f37363ee 192.168.1.137:6388 master - 0 1470273086536 28 connected 11041-16383 532e58842d001f8097fadc325bdb5541b788a360 192.168.1.138:6389 slave c69b521a30336caf8bce078047cf9bb5f37363ee 0 1470273086034 28 connected aa52c7810e499d042e94e0aa4bc28c57a1da74e3 192.168.1.138:6388 myself,slave d08dc883ee4fcb90c4bb47992ee03e6474398324 0 0 19 connected ?? 出現該問題,加上slave-readonly yes 參數后,重啟發現也并沒有什么作用,仍然報上面的錯誤,而且直接通過命令行連接時,仍然出現問題: 192.168.1.137:6390> get key1 -> Redirected to slot [9189] located at 192.168.1.138:6388 "value1" ? 經過查找,在github上發現問題所在,https://github.com/antirez/redis/issues/2202,如果連接到slave節點,可以通過readonly來進行處理: //如果是只讀連接 { connection.readonly(); } return execute(connection); ?? 使用zookeeper監測集群狀態變化
? 建立Redis agent,用于監測RedisCluster的狀態變化,如果RedisCluster中的狀態與zookeeper上的一致,不進行任何操作,否則更新zookeeper上的文件。 agent的執行時間間隔可以控制在1s,便于及時發現rediscluster的狀態問題。 應用程序中需要注冊監聽該文件的變化,如果有變化及時進行更新redis讀寫池。 zookeeper agent 操作zookeeper可以使用curator框架,Curator框架提供了一套高級的API, 簡化了ZooKeeper的操作。 它增加了很多使用ZooKeeper開發的特性,可以處理ZooKeeper集群復雜的連接管理和重試機制。 ZooKeeper原生的API支持通過注冊Watcher來進行事件監聽,但是Watcher通知是一次性的,因此開發過程中需要反復注冊Watcher,比較繁瑣。Curator引入了Cache來監聽ZooKeeper服務端的事件。Cache對ZooKeeper事件監聽進行了封裝,能夠自動處理反復注冊監聽,簡化了ZooKeeper原生API繁瑣的開發過程。 關于curator的基本介紹,可以參考:http://ifeve.com/zookeeper-curato-framework/ 首先需要添加maven依賴: <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.7.0</version> </dependency> ? 在主線程中進行間隔1s的輪詢,查詢zookeeper上的文件與當前redis狀態,如果相同不做任何修改,否則進行更新。 建立zookeeper連接: client = CuratorFrameworkFactory.newClient("xxx",new RetryNTimes(5, 5000)); client.start(); ? 獲取zookeeper上的值與線上redis環境進行比對: public void compareAndSet() throws Exception {List<ZhenJedisPoolObject> jedisPoolFromCluster = getJedisPoolFromCluster();String currentString = JSON.toJSONString(jedisPoolFromCluster);if (client.checkExists().forPath(TOPO_PATH) == null) {SysOutLogger.info("Start to create zk node: " + TOPO_PATH);client.create().creatingParentsIfNeeded().forPath(TOPO_PATH, currentString.getBytes());} else {String statData = new String(client.getData().forPath(TOPO_PATH));if (!currentString.equalsIgnoreCase(statData)) {SysOutLogger.info("Node not synchronized with online, to reset...");client.setData().forPath(TOPO_PATH, currentString.getBytes());}} } ? 應用端監測文件修改 而在應用端,完全依賴zookeeper上的文件狀態變更,來更新rediscluster中的slots,nodes等對象: String content = new String(client.getData().forPath(TOPO_PATH), "UTF-8"); List<ZhenJedisPoolObject> zhenJedisPoolObjects =JSON.parseObject(content, new TypeReference<List<ZhenJedisPoolObject>>() {}); discoverClusterNodesAndSlots(zhenJedisPoolObjects);final NodeCache nodeCache = new NodeCache(client, TOPO_PATH, false); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {String content = new String(nodeCache.getCurrentData().getData(), "UTF-8");List<ZhenJedisPoolObject> zhenJedisPoolObjects =JSON.parseObject(content, new TypeReference<List<ZhenJedisPoolObject>>() {});discoverClusterNodesAndSlots(zhenJedisPoolObjects);} }); ? 注意這里需要使用到curator中的NodeCache來操作,它可以幫助監聽zookeeper上節點數據的變化。如果想要監聽zookeeper上路徑的變化,可以使用:PathChildrenCache,根據對應的事件類型event type:CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED來進行事件處理。 注意需要保證zookeeper上的redis連接,能夠以正常的方式訪問到(內外網切換)。?
? JedisCluster的UML圖結果如上圖所示,在每次執行JedisCluster相關操作時,都需要通過JedisClusterCommand提供connection來進行,該connection需要根據key來計算出對應的slot,以便可以進行后續redis相關操作。 return new JedisClusterCommand<String>(connectionHandler, maxRedirections) {@Overridepublic String execute(Jedis connection) {return connection.get(key);} }.run(key); ? 在JedisClusterCommand.run方法中,會根據slot計算出對應的connection。 為了盡量減少對原有代碼的侵入性,我們需要定義線程上下文(ThreadLocal)級別的變量,其中內置了訪問的粒度(READ/WRITE),以便訪問的為master還是slave Redis數據源。 改進的ZhenJedisCluster
?
? 根據jedis連接來獲得Cluster結構: private Map<String, ClusterNodeObject> getClusterNodes(Jedis jedis) {Map<String, ClusterNodeObject> hpToNodeObjectMap = new HashMap<>();String clusterNodesCommand = jedis.clusterNodes();String[] allNodes = clusterNodesCommand.split("\n");for (String allNode : allNodes) {String[] splits = allNode.split(" ");String hostAndPort = splits[1];ClusterNodeObject clusterNodeObject =new ClusterNodeObject(splits[0], splits[1], splits[2].contains("master"), splits[3],Long.parseLong(splits[4]), Long.parseLong(splits[5]), splits[6],splits[7].equalsIgnoreCase("connected"), splits.length == 9 ? splits[8] : null);hpToNodeObjectMap.put(hostAndPort, clusterNodeObject);}return hpToNodeObjectMap; } ?? 將其整理成可用結構,分出master節點,以及slave節點對應的master節點,區分讀寫: Map<String, ZhenJedisPool> masterNodes = new HashMap<>(); for (ClusterNodeObject clusterNodeObject : clusterNodeObjects) {String ipPort = clusterNodeObject.getIpPort();String[] ipPortSplits = ipPort.split(":");HostAndPort hostAndPort = new HostAndPort(ipPortSplits[0], Integer.parseInt(ipPortSplits[1]));setNodeIfNotExist(hostAndPort);if (clusterNodeObject.isMaster()) {ZhenJedisPool zhenJedisPool = new ZhenJedisPool();zhenJedisPool.setWritePool(nodes.get(ipPort));masterNodes.put(clusterNodeObject.getNodeId(), zhenJedisPool);String[] slotSplits = clusterNodeObject.getSlot().split("-");for (int i = Integer.parseInt(slotSplits[0]); i <= Integer.parseInt(slotSplits[1]); i++) {this.slots.put(i, zhenJedisPool);}} }for (ClusterNodeObject clusterNodeObject : clusterNodeObjects) {if (!clusterNodeObject.isMaster()) {String masterNodeId = clusterNodeObject.getMasterNodeId();ZhenJedisPool zhenJedisPool = masterNodes.get(masterNodeId);zhenJedisPool.getReadPools().add(nodes.get(clusterNodeObject.getIpPort()));} } ?? 改進的結構中,需要getConnectionFromSlot方法需要調用ZhenJedisClusterInfoCache.getSlotPool來根據slot以及當前讀寫狀態(read/write)來獲取對應的Jedis連接: public JedisPool getSlotPool(int slot, ZhenQueryContext queryContext) {r.lock();try {ZhenJedisPool zhenJedisPool = slots.get(slot);if (queryContext.getOperationType() == OperationType.WRITE) {return zhenJedisPool.getWritePool();} else {List<JedisPool> readPools = zhenJedisPool.getReadPools();return readPools.get(new Random().nextInt(readPools.size()));}} finally {r.unlock();} } ?? ? 對于JedisCluster中,在執行每一步操作之前,都需要設置對應的讀寫上下文,便于在內部選擇master/slave connection: @Override public String get(final String key) {ZhenQueryContextHolder.getInstance().setQueryContext(new ZhenQueryContext(OperationType.READ));return new ZhenJedisClusterCommand<String>(connectionHandler, maxRedirections) {@Overridepublic String execute(Jedis connection) {return connection.get(key);}}.run(key); } ? 處理完成后,只需要在執行時使用我們提供的JedisCluster即可正常運行。 執行驗證 當前節點如果為slave,也不能只讀,需要額外設置屬性 slave-read-only
? 可以證實,經過改造后確實調用到了指定的master節點上: 5974ed7dd81c112d9a2354a0a985995913b4702c 192.168.1.137:6389 master - 0 1470273087539 26 connected 0-5640 d08dc883ee4fcb90c4bb47992ee03e6474398324 192.168.1.137:6390 master - 0 1470273086034 25 connected 5641-11040 ffb4db4e1ced0f91ea66cd2335f7e4eadc29fd56 192.168.1.138:6390 slave 5974ed7dd81c112d9a2354a0a985995913b4702c 0 1470273087539 26 connected c69b521a30336caf8bce078047cf9bb5f37363ee 192.168.1.137:6388 master - 0 1470273086536 28 connected 11041-16383 532e58842d001f8097fadc325bdb5541b788a360 192.168.1.138:6389 slave c69b521a30336caf8bce078047cf9bb5f37363ee 0 1470273086034 28 connected aa52c7810e499d042e94e0aa4bc28c57a1da74e3 192.168.1.138:6388 myself,slave d08dc883ee4fcb90c4bb47992ee03e6474398324 0 0 19 connected ?? 出現該問題,加上slave-readonly yes 參數后,重啟發現也并沒有什么作用,仍然報上面的錯誤,而且直接通過命令行連接時,仍然出現問題: 192.168.1.137:6390> get key1 -> Redirected to slot [9189] located at 192.168.1.138:6388 "value1" ? 經過查找,在github上發現問題所在,https://github.com/antirez/redis/issues/2202,如果連接到slave節點,可以通過readonly來進行處理: //如果是只讀連接 { connection.readonly(); } return execute(connection); ?? 使用zookeeper監測集群狀態變化
? 建立Redis agent,用于監測RedisCluster的狀態變化,如果RedisCluster中的狀態與zookeeper上的一致,不進行任何操作,否則更新zookeeper上的文件。 agent的執行時間間隔可以控制在1s,便于及時發現rediscluster的狀態問題。 應用程序中需要注冊監聽該文件的變化,如果有變化及時進行更新redis讀寫池。 zookeeper agent 操作zookeeper可以使用curator框架,Curator框架提供了一套高級的API, 簡化了ZooKeeper的操作。 它增加了很多使用ZooKeeper開發的特性,可以處理ZooKeeper集群復雜的連接管理和重試機制。 ZooKeeper原生的API支持通過注冊Watcher來進行事件監聽,但是Watcher通知是一次性的,因此開發過程中需要反復注冊Watcher,比較繁瑣。Curator引入了Cache來監聽ZooKeeper服務端的事件。Cache對ZooKeeper事件監聽進行了封裝,能夠自動處理反復注冊監聽,簡化了ZooKeeper原生API繁瑣的開發過程。 關于curator的基本介紹,可以參考:http://ifeve.com/zookeeper-curato-framework/ 首先需要添加maven依賴: <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.7.0</version> </dependency> ? 在主線程中進行間隔1s的輪詢,查詢zookeeper上的文件與當前redis狀態,如果相同不做任何修改,否則進行更新。 建立zookeeper連接: client = CuratorFrameworkFactory.newClient("xxx",new RetryNTimes(5, 5000)); client.start(); ? 獲取zookeeper上的值與線上redis環境進行比對: public void compareAndSet() throws Exception {List<ZhenJedisPoolObject> jedisPoolFromCluster = getJedisPoolFromCluster();String currentString = JSON.toJSONString(jedisPoolFromCluster);if (client.checkExists().forPath(TOPO_PATH) == null) {SysOutLogger.info("Start to create zk node: " + TOPO_PATH);client.create().creatingParentsIfNeeded().forPath(TOPO_PATH, currentString.getBytes());} else {String statData = new String(client.getData().forPath(TOPO_PATH));if (!currentString.equalsIgnoreCase(statData)) {SysOutLogger.info("Node not synchronized with online, to reset...");client.setData().forPath(TOPO_PATH, currentString.getBytes());}} } ? 應用端監測文件修改 而在應用端,完全依賴zookeeper上的文件狀態變更,來更新rediscluster中的slots,nodes等對象: String content = new String(client.getData().forPath(TOPO_PATH), "UTF-8"); List<ZhenJedisPoolObject> zhenJedisPoolObjects =JSON.parseObject(content, new TypeReference<List<ZhenJedisPoolObject>>() {}); discoverClusterNodesAndSlots(zhenJedisPoolObjects);final NodeCache nodeCache = new NodeCache(client, TOPO_PATH, false); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {String content = new String(nodeCache.getCurrentData().getData(), "UTF-8");List<ZhenJedisPoolObject> zhenJedisPoolObjects =JSON.parseObject(content, new TypeReference<List<ZhenJedisPoolObject>>() {});discoverClusterNodesAndSlots(zhenJedisPoolObjects);} }); ? 注意這里需要使用到curator中的NodeCache來操作,它可以幫助監聽zookeeper上節點數據的變化。如果想要監聽zookeeper上路徑的變化,可以使用:PathChildrenCache,根據對應的事件類型event type:CHILD_ADDED, CHILD_REMOVED, CHILD_UPDATED來進行事件處理。 注意需要保證zookeeper上的redis連接,能夠以正常的方式訪問到(內外網切換)。?
轉載于:https://www.cnblogs.com/mmaa/p/5789846.html
總結
以上是生活随笔為你收集整理的RedisCluster读写分离改造的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 支付业务学习
- 下一篇: 老oj1965:polygon半平面交