Memcached
標簽 : Java與NoSQL
With Java
比較知名的Java Memcached客戶端有三款:Java-Memcached-Client、XMemcached以及Spymemcached, 其中以XMemcached性能最好, 且維護較穩定/版本較新:
<dependency><groupId>com.googlecode.xmemcached
</groupId><artifactId>xmemcached
</artifactId><version>2.0.0
</version>
</dependency> XMemcached以及其他兩款Memcached客戶端的詳細信息可參考博客XMemcached-一個新的開源Java memcached客戶端、Java幾個Memcached連接客戶端對比選擇.
實踐
任何技術都有其最適用的場景,只有在合適的場景下,才能發揮最好的效果.Memcached使用內存讀寫數據,速度比DB和文件系統快得多, 因此,Memcached的常用場景有:
- 緩存DB查詢數據: 作為緩存“保護”數據庫, 防止頻繁的讀寫帶給DB過大的壓力;
- 中繼MySQL主從延遲: 利用其“讀寫快”特點實現主從數據庫的消息同步.
緩存DB查詢數據
通過Memcached緩存數據庫查詢結果,減少DB訪問次數,以提高動態Web應用響應速度:
/*** @author jifang.* @since 2016/6/13 20:08.*/
public class MemcachedDAO {private static final int _1M =
60 *
1000;
private static final DataSource dataSource;
private static final MemcachedClient mc;
static {Properties properties =
new Properties();
try {properties.load(ClassLoader.getSystemResourceAsStream(
"db.properties"));}
catch (IOException ignored) {}
/** 初始化連接池 **/HikariConfig config =
new HikariConfig();config.setDriverClassName(properties.getProperty(
"mysql.driver.class"));config.setJdbcUrl(properties.getProperty(
"mysql.url"));config.setUsername(properties.getProperty(
"mysql.user"));config.setPassword(properties.getProperty(
"mysql.password"));config.setMaximumPoolSize(Integer.valueOf(properties.getProperty(
"pool.max.size")));config.setMinimumIdle(Integer.valueOf(properties.getProperty(
"pool.min.size")));config.setIdleTimeout(Integer.valueOf(properties.getProperty(
"pool.max.idle_time")));config.setMaxLifetime(Integer.valueOf(properties.getProperty(
"pool.max.life_time")));dataSource =
new HikariDataSource(config);
/** 初始化Memcached **/try {mc =
new XMemcachedClientBuilder(properties.getProperty(
"memcached.servers")).build();}
catch (IOException e) {
throw new RuntimeException(e);}}
public List<Map<String, Object>>
executeQuery(String sql) {List<Map<String, Object>> result;
try {
/** 首先請求MC **/String key = sql.replace(
' ',
'-');result = mc.get(key);
if (result ==
null || result.isEmpty()) {ResultSet resultSet = dataSource.getConnection().createStatement().executeQuery(sql);
/** 獲得列數/列名 **/ResultSetMetaData meta = resultSet.getMetaData();
int columnCount = meta.getColumnCount();List<String> columnName =
new ArrayList<>();
for (
int i =
1; i <= columnCount; ++i) {columnName.add(meta.getColumnName(i));}
/** 填充實體 **/result =
new ArrayList<>();
while (resultSet.next()) {Map<String, Object> entity =
new HashMap<>(columnCount);
for (String name : columnName) {entity.put(name, resultSet.getObject(name));}result.add(entity);}
/** 寫入MC **/mc.set(key, _1M, result);}}
catch (TimeoutException | InterruptedException | MemcachedException | SQLException e) {
throw new RuntimeException(e);}
return result;}
public static void main(String[] args) {MemcachedDAO dao =
new MemcachedDAO();List<Map<String, Object>> execute = dao.executeQuery(
"select * from orders");System.out.println(execute);}
}
注: 代碼僅供展示DB緩存思想,因為一般項目很少會直接使用JDBC操作DB,而是會選用像MyBatis之類的ORM框架代替之,而這類框架框架一般也會開放接口出來實現與緩存產品的整合(如MyBatis開放出一個org.apache.ibatis.cache.Cache接口,通過實現該接口,可將Memcached與MyBatis整合, 細節可參考博客MyBatis與Memcached集成.
中繼MySQL主從延遲
MySQL在做replication時,主從復制時會由一段時間延遲,尤其是主從服務器分處于異地機房時,這種情況更加明顯.FaceBook官方的一篇技術文章提到:其加州的數據中心到弗吉尼亞州數據中心的主從同步延遲達到70MS. 考慮以下場景:
- 用戶U購買電子書B:insert into Master (U,B);
- 用戶U觀看電子書B:select 購買記錄 [user='A',book='B'] from Slave.
由于主從延遲的存在,第②步中無記錄,用戶無權觀看該書.
此時可以利用Memcached在Master與Slave之間做過渡:
- 用戶U購買電子書B:memcached->add('U:B',true);
- 主數據庫: insert into Master (U,B);
- 用戶U觀看電子書B: select 購買記錄 [user='U',book='B'] from Slave;
如果沒查詢到,則memcached->get('U:B'),查到則說明已購買但有主從延遲. - 如果Memcached中也沒查詢到,用戶無權觀看該書.
分布式緩存
Memcached雖然名義上是分布式緩存,但其自身并未實現分布式算法.當一個請求到達時,需要由客戶端實現的分布式算法將不同的key路由到不同的Memcached服務器中.而分布式取模算法有著致命的缺陷(詳細可參考分布式之取模算法的缺陷), 因此Memcached客戶端一般采用一致性Hash算法來保證分布式.
- 目標:
- key的分布盡量均勻;
- 增/減服務器節點對于其他節點的影響盡量小.
一致性Hash算法
首先開辟一塊非常大的空間(如圖中:0~232),然后將所有的數據使用hash函數(如MD5、Ketama等)映射到這個空間內,形成一個Hash環. 當有數據需要存儲時,先得到一個hash值對應到hash環上的具體位置(如k1),然后沿順時針方向找到一臺機器(如B),將k1存儲到B這個節點中:
如果B節點宕機,則B上的所有負載就會落到C節點上:
這樣,只會影響C節點,對其他的節點如A、D的數據都不會造成影響. 然而,這樣又會帶來一定的風險,由于B節點的負載全部由C節點承擔,C節點的負載會變得很高,因此C節點又會很容易宕機,依次下去會造成整個集群的不穩定.
理想的情況下是當B節點宕機時,將原先B節點上的負載平均的分擔到其他的各個節點上. 為此,又引入了“虛擬節點”的概念: 想象在這個環上有很多“虛擬節點”,數據的存儲是沿著環的順時針方向找一個虛擬節點,每個虛擬節點都會關聯到一個真實節點,但一個真實節點會對應多個虛擬節點,且不同真實節點的多個虛擬節點是交差分布的:
圖中A1、A2、B1、B2、C1、C2、D1、D2 都是“虛擬節點”,機器A負責存儲A1、A2的數據, 機器B負責存儲B1、B2的數據… 只要虛擬節點數量足夠多且分布均勻,當其中一臺機器宕機之后,原先機器上的負載就會平均分配到其他所有機器上(如圖中節點B宕機,其負載會分擔到節點A和節點D上).
Java實現
public class ConsistentHash<Node> {
public SortedMap<Long, Node> VRNodesMap =
new TreeMap<>();
private int vCount =
50;
private int rCount =
0;
public ConsistentHash() {}
public ConsistentHash(
int vCount) {
this.vCount = vCount;}
public ConsistentHash(List<Node> rNodes) {init(rNodes);}
public ConsistentHash(List<Node> rNodes,
int vCount) {
this.vCount = vCount;init(rNodes);}
private void init(List<Node> rNodes) {
if (rNodes !=
null) {
for (Node node : rNodes) {add(rCount, node);++rCount;}}}
public void addRNode(Node rNode) {add(rCount, rNode);++rCount;}
public void rmRNode(Node rNode) {--rCount;remove(rCount, rNode);}
public Node
getRNode(String key) {SortedMap<Long, Node> tailMap = VRNodesMap.tailMap(hash(key));
if (tailMap.size() ==
0) {
return VRNodesMap.
get(VRNodesMap.firstKey());}
return tailMap.
get(tailMap.firstKey());}
private void add(
int rIndex, Node rNode) {
for (
int j =
0; j < vCount; ++j) {VRNodesMap.put(hash(String.format(
"RNode-%s-VNode-%s", rIndex, j)), rNode);}}
private void remove(
int rIndex, Node rNode) {
for (
int j =
0; j < vCount; ++j) {VRNodesMap.remove(hash(String.format(
"RNode-%s-VNode-%s", rIndex, j)));}}
private Long
hash(String key) {ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
int seed =
0x1234ABCD;ByteOrder byteOrder = buf.order();buf.order(ByteOrder.LITTLE_ENDIAN);
long m =
0xc6a4a7935bd1e995L;
int r =
47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining() >=
8) {k = buf.getLong();k *= m;k ^= k >>> r;k *= m;h ^= k;h *= m;}
if (buf.remaining() >
0) {ByteBuffer finish = ByteBuffer.allocate(
8).order(ByteOrder.LITTLE_ENDIAN);finish.put(buf).rewind();h ^= finish.getLong();h *= m;}h ^= h >>> r;h *= m;h ^= h >>> r;buf.order(byteOrder);
return h;}
}
public class ConsistentHashMain {
private static final int KEY_COUNT
= 1000;@Test
public void test() {ConsistentHash
<String> nodes
= new ConsistentHash
<>(
new ArrayList
<String>(),
50);nodes
.addRNode(
"10.45.156.11");nodes
.addRNode(
"10.45.156.12");nodes
.addRNode(
"10.45.156.13");nodes
.addRNode(
"10.45.156.14");nodes
.addRNode(
"10.45.156.15");nodes
.addRNode(
"10.45.156.16");nodes
.addRNode(
"10.45.156.17");nodes
.addRNode(
"10.45.156.18");nodes
.addRNode(
"10.45.156.19");nodes
.addRNode(
"10.45.156.10");
Map<String,
String> map = new HashMap
<>();initMap(
map, nodes);nodes
.rmRNode(
"10.45.156.19");nodes
.addRNode(
"10.45.156.20");int mis
= 0;for (
Map.Entry
<String,
String> entry :
map.entrySet()) {
String key
= entry
.getKey();
String value
= entry
.getValue();
if (
!nodes
.getRNode(key)
.equals(value)) {
++mis;}}System
.out
.println(
String.format(
"當前命中率為:%s%%", (KEY_COUNT
- mis)
* 100.0 / KEY_COUNT));}
private void initMap(
Map<String,
String> map, ConsistentHash
<String> nodes) {for (int i
= 0; i
< KEY_COUNT;
++i) {
String key
= String.format(
"key-%s", i);
map.put(key, nodes
.getRNode(key));}}
}
經過實際測試: 當有十臺真實節點,而每個真實節點有50個虛擬節點時,在發生一臺實際節點宕機/新增一臺節點的情況時,命中率仍然能夠達到90%左右.對比簡單取模Hash算法:
當節點從N到N-1時,緩存的命中率直線下降為1/N(N越大,命中率越低);一致性Hash的表現就優秀多了:
命中率只下降為原先的 (N-1)/N ,且服務器節點越多,性能越好.因此一致性Hash算法可以最大限度地減小服務器增減時的緩存重新分布帶來的壓力.
XMemcached實現
實際上XMemcached客戶端自身實現了很多一致性Hash算法(KetamaMemcachedSessionLocator/PHPMemcacheSessionLocator), 因此在開發中沒有必要自己去實現:
- 示例: 支持分布式的MemcachedFilter:
/*** @author jifang.* @since 2016/5/21 15:50.*/
public class MemcachedFilter implements Filter {private MemcachedClient memcached;
private static final int _1MIN =
60;
@Overridepublic void init(FilterConfig filterConfig)
throws ServletException {
try {MemcachedClientBuilder builder =
new XMemcachedClientBuilder(AddrUtil.getAddresses(
"10.45.156.11:11211" +
"10.45.156.12:11211" +
"10.45.156.13:11211"));builder.setSessionLocator(
new KetamaMemcachedSessionLocator());memcached = builder.build();}
catch (IOException e) {
throw new RuntimeException(e);}}
@Overridepublic void doFilter(ServletRequest req, ServletResponse response, FilterChain chain)
throws IOException, ServletException {MemcachedWriter mWriter =
new MemcachedWriter(response.getWriter());chain.doFilter(req,
new MemcachedResponse((HttpServletResponse) response, mWriter));HttpServletRequest request = (HttpServletRequest) req;String key = request.getRequestURI();Enumeration<String> names = request.getParameterNames();
if (names.hasMoreElements()) {String name = names.nextElement();StringBuilder sb =
new StringBuilder(key).append(
"?").append(name).append(
"=").append(request.getParameter(name));
while (names.hasMoreElements()) {name = names.nextElement();sb.append(
"&").append(name).append(
"=").append(request.getParameter(name));}key = sb.toString();}
try {String rspContent = mWriter.getRspContent();memcached.set(key, _1MIN, rspContent);}
catch (TimeoutException | InterruptedException | MemcachedException e) {
throw new RuntimeException(e);}}
@Overridepublic void destroy() {}
private static class MemcachedWriter extends PrintWriter {private StringBuilder sb =
new StringBuilder();
private PrintWriter writer;
public MemcachedWriter(PrintWriter out) {
super(out);
this.writer = out;}
@Overridepublic void print(String s) {sb.append(s);
this.writer.print(s);}
public String
getRspContent() {
return sb.toString();}}
private static class MemcachedResponse extends HttpServletResponseWrapper {private PrintWriter writer;
public MemcachedResponse(HttpServletResponse response, PrintWriter writer) {
super(response);
this.writer = writer;}
@Overridepublic PrintWriter
getWriter()
throws IOException {
return this.writer;}}
}
以上代碼最好有Nginx的如下配置支持:
Nginx以前端請求的"URI+Args"作為key去請求Memcached,如果key命中,則直接由Nginx從緩存中取出數據響應前端;未命中,則產生404異常,Nginx捕獲之并將request提交后端服務器.在后端服務器中,request被MemcachedFilter攔截, 待業務邏輯執行完, 該Filter會將Response的數據拿到并寫入Memcached, 以備下次直接響應.
參考:
緩存系統MemCached的Java客戶端優化歷程 memcached Java客戶端spymemcached的一致性Hash算法 一致性哈希算法及其在分布式系統中的應用 陌生但默默一統江湖的MurmurHash Hash 函數概覽
總結
以上是生活随笔為你收集整理的Memcached - In Action的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。