ElasticSearch——路由(_routing)机制
前言
一條數據是如何落地到對應的shard上的?
當索引一個文檔的時候,文檔會被存儲到一個主分片中。 Elasticsearch 如何知道一個文檔應該存放到哪個分片中呢?
首先這肯定不會是隨機的,否則將來要獲取文檔的時候我們就不知道從何處尋找了。實際上,這個過程是根據下面這個算法決定的:
shard_num = hash(_routing) % num_primary_shards
其中_routing是一個可變值,默認是文檔的_id的值 ,也可以設置成一個自定義的值。 _routing 通過 hash 函數生成一個數字,然后這個數字再除以 num_of_primary_shards (主分片的數量)后得到余數 。這個分布在 0 到 number_of_primary_shards-1 之間的余數,就是我們所尋求的文檔所在分片的位置。這就解釋了為什么我們要在創建索引的時候就確定好主分片的數量并且永遠不會改變這個數量:因為如果數量變化了,那么所有之前路由的值都會無效,文檔也再也找不到了。
路由機制
假設你有一個100個分片的索引。當一個請求在集群上執行時會發生什么呢?
1. 這個搜索的請求會被發送到一個節點
2. 接收到這個請求的節點,將這個查詢廣播到這個索引的每個分片上(可能是主分片,也可能是復本分片)
3. 每個分片執行這個搜索查詢并返回結果
4. 結果在通道節點上合并、排序并返回給用戶
因為默認情況下,Elasticsearch使用文檔的ID(類似于關系數據庫中的自增ID),如果插入數據量比較大,文檔會平均的分布于所有的分片上,這導致了Elasticsearch不能確定文檔的位置,
所以它必須將這個請求廣播到所有的N個分片上去執行這種操作會給集群帶來負擔,增大了網絡的開銷;
自定義路由
自定義路由的方式非常簡單,只需要在插入數據的時候指定路由的key即可。雖然使用簡單,但有許多的細節需要注意。我們從一個例子看起(注:本文關于ES的命令都是在Kibana dev tool中執行的):
// 步驟1:先創建一個名為route_test的索引,該索引有3個shard,0個副本
PUT route_test/
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 0
}
}
// 步驟2:查看shard
GET _cat/shards/route_test?v
index shard prirep state docs store ip node
route_test 1 p STARTED 0 230b 172.19.0.2 es7_02
route_test 0 p STARTED 0 230b 172.19.0.5 es7_01
// 步驟3:插入第1條數據
PUT route_test/_doc/a?refresh
{
"data": "A"
}
// 步驟4:查看shard
GET _cat/shards/route_test?v
index shard prirep state docs store ip node
route_test 1 p STARTED 0 230b 172.19.0.2 es7_02
route_test 0 p STARTED 1 3.3kb 172.19.0.5 es7_01
// 步驟5:插入第2條數據
PUT route_test/_doc/b?refresh
{
"data": "B"
}
// 步驟6:查看數據
GET _cat/shards/route_test?v
index shard prirep state docs store ip node
route_test 1 p STARTED 1 3.3kb 172.19.0.2 es7_02
route_test 0 p STARTED 1 3.3kb 172.19.0.5 es7_01
// 步驟7:查看此時索引里面的數據
GET route_test/_search
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "a",
"_score" : 1.0,
"_source" : {
"data" : "A"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_score" : 1.0,
"_source" : {
"data" : "B"
}
}
]
}
}
上面這個例子比較簡單,先創建了一個擁有2個shard,0個副本(為了方便觀察)的索引route_test。創建完之后查看兩個shard的信息,此時shard為空,里面沒有任何文檔(docs列為0)。接著我們插入了兩條數據,每次插完之后,都檢查shard的變化。通過對比可以發現 docid=a 的第一條數據寫入了0號shard,docid=b 的第二條數據寫入了1號 shard。需要注意的是這里的doc id我選用的是字母"a"和"b",而非數字。原因是連續的數字很容易路由到一個shard中去。以上的過程就是不指定routing時候的默認行為。
接著,我們指定routing,看一些有趣的變化:
// 步驟8:插入第3條數據
PUT route_test/_doc/c?routing=key1&refresh
{
"data": "C"
}
// 步驟9:查看shard
GET _cat/shards/route_test?v
index shard prirep state docs store ip node
route_test 1 p STARTED 1 3.4kb 172.19.0.2 es7_02
route_test 0 p STARTED 2 6.9kb 172.19.0.5 es7_01
// 步驟10:查看索引數據
GET route_test/_search
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "a",
"_score" : 1.0,
"_source" : {
"data" : "A"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "c",
"_score" : 1.0,
"_routing" : "key1",
"_source" : {
"data" : "C"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_score" : 1.0,
"_source" : {
"data" : "B"
}
}
]
}
}
我們又插入了1條 docid=c 的新數據,但這次我們指定了路由,路由的值是一個字符串"key1". 通過查看shard信息,能看出這條數據路由到了0號shard。也就是說用"key1"做路由時,文檔會寫入到0號shard。
接著我們使用該路由再插入兩條數據,但這兩條數據的 docid 分別為之前使用過的 "a"和"b",你猜一下最終結果會是什么樣?
// 步驟11:插入 docid=a 的數據,并指定 routing=key1
PUT route_test/_doc/a?routing=key1&refresh
{
"data": "A with routing key1"
}
// es的返回信息為:
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "a",
"_version" : 2,
"result" : "updated", // 注意此處為updated,之前的三次插入返回都為created
"forced_refresh" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 2,
"_primary_term" : 1
}
// 步驟12:查看shard
GET _cat/shards/route_test?v
index shard prirep state docs store ip node
route_test 1 p STARTED 1 3.4kb 172.19.0.2 es7_02
route_test 0 p STARTED 2 10.5kb 172.19.0.5 es7_01
// 步驟13:查詢索引
GET route_test/_search
{
"took" : 6,
"timed_out" : false,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "c",
"_score" : 1.0,
"_routing" : "key1",
"_source" : {
"data" : "C"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "a",
"_score" : 1.0,
"_routing" : "key1",
"_source" : {
"data" : "A with routing key1"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_score" : 1.0,
"_source" : {
"data" : "B"
}
}
]
}
}
之前 docid=a 的數據就在0號shard中,這次依舊寫入到0號shard中了,因為docid重復,所以文檔被更新了。然后再插入 docid=b 的數據:
// 步驟14:插入 docid=b的數據,使用key1作為路由字段的值
PUT route_test/_doc/b?routing=key1&refresh
{
"data": "B with routing key1"
}
// es返回的信息
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_version" : 1,
"result" : "created", // 注意這里不是updated
"forced_refresh" : true,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 3,
"_primary_term" : 1
}
// 步驟15:查看shard信息
GET _cat/shards/route_test?v
index shard prirep state docs store ip node
route_test 1 p STARTED 1 3.4kb 172.19.0.2 es7_02
route_test 0 p STARTED 3 11kb 172.19.0.5 es7_01
// 步驟16:查詢索引內容
{
"took" : 6,
"timed_out" : false,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "c",
"_score" : 1.0,
"_routing" : "key1",
"_source" : {
"data" : "C"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "a",
"_score" : 1.0,
"_routing" : "key1",
"_source" : {
"data" : "A with routing key1"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_score" : 1.0,
"_routing" : "key1", // 和下面的 id=b 的doc相比,多了一個這個字段
"_source" : {
"data" : "B with routing key1"
}
},
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_score" : 1.0,
"_source" : {
"data" : "B"
}
}
]
}
}
和步驟11插入docid=a 的那條數據相比,這次這個有些不同,我們來分析一下。步驟11中插入 docid=a 時,es返回的是updated,也就是更新了步驟2中插入的docid為a的數據,步驟12和13中查詢的結果也能看出,并沒有新增數據,route_test中還是只有3條數據。而步驟14插入 docid=b 的數據時,es返回的是created,也就是新增了一條數據,而不是updated原來docid為b的數據,步驟15和16的確也能看出多了一條數據,現在有4條數據。而且從步驟16查詢的結果來看,有兩條docid為b的數據,但一個有routing,一個沒有。而且也能分析出有routing的在0號shard上面,沒有的那個在1號shard上。
這個就是我們自定義routing后會導致的一個問題:docid不再全局唯一。ES shard的實質是Lucene的索引,所以其實每個shard都是一個功能完善的倒排索引。ES能保證docid全局唯一是采用do id作為了路由,所以同樣的docid肯定會路由到同一個shard上面,如果出現docid重復,就會update或者拋異常,從而保證了集群內docid唯一標識一個doc。但如果我們換用其它值做routing,那這個就保證不了了,如果用戶還需要docid的全局唯一性,那只能自己保證了。因為docid不再全局唯一,所以doc的增刪改查API就可能產生問題,比如下面的查詢:
GET route_test/_doc/b
// es返回
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"data" : "B"
}
}
GET route_test/_doc/b?routing=key1
// es返回
{
"_index" : "route_test",
"_type" : "_doc",
"_id" : "b",
"_version" : 1,
"_seq_no" : 3,
"_primary_term" : 1,
"_routing" : "key1",
"found" : true,
"_source" : {
"data" : "B with routing key1"
}
}
上面兩個查詢,雖然指定的docid都是b,但返回的結果是不一樣的。所以,如果自定義了routing字段的話,一般doc的增刪改查接口都要加上routing參數以保證一致性。注意這里的【一般】指的是查詢,并不是所有查詢接口都要加上routing。
為此,ES在mapping中提供了一個選項,可以強制檢查doc的增刪改查接口是否加了routing參數,如果沒有加,就會報錯。設置方式如下:
PUT <索引名>/
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 0
},
"mappings": {
"_routing": {
"required": true // 設置為true,則強制檢查;false則不檢查,默認為false
}
}
}
舉個例子:
PUT route_test1/
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"_routing": {
"required": true
}
}
}
// 寫入一條數據
PUT route_test1/_doc/b?routing=key1
{
"data": "b with routing"
}
// 以下的增刪改查都會抱錯
GET route_test1/_doc/b
PUT route_test1/_doc/b
{
"data": "B"
}
DELETE route_test1/_doc/b
// 錯誤信息
"error": {
"root_cause": [
{
"type": "routing_missing_exception",
"reason": "routing is required for [route_test1]/[_doc]/[b]",
"index_uuid": "_na_",
"index": "route_test1"
}
],
"type": "routing_missing_exception",
"reason": "routing is required for [route_test1]/[_doc]/[b]",
"index_uuid": "_na_",
"index": "route_test1"
},
"status": 400
}
當然,很多時候自定義路由是為了減少查詢時掃描shard的個數,從而提高查詢效率。默認查詢接口會搜索所有的shard,但也可以指定routing字段,這樣就只會查詢routing計算出來的shard,提高查詢速度。
使用方式也非常簡單,只需在查詢語句上面指定routing即可,允許指定多個:
-- 查詢所有分區
GET route_test/_search
{
"query": {
"match": {
"data": "b"
}
}
}
-- 查詢指定分區
GET route_test/_search?routing=key1,key2
{
"query": {
"match": {
"data": "b"
}
}
}
另外,指定routing還有個弊端就是容易造成負載不均衡。所以ES提供了一種機制可以將數據路由到一組shard上面,而不是某一個。只需在創建索引時(也只能在創建時)設置index.routing_partition_size,默認值是1,即只路由到1個shard,可以將其設置為大于1且小于索引shard總數的某個值,就可以路由到一組shard了。值越大,數據越均勻。當然,從設置就能看出來,這個設置是針對單個索引的,可以加入到動態模板中,以對多個索引生效。指定后,shard的計算方式變為:
shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards
對于同一個routing值,hash(_routing)的結果固定的,hash(_id) % routing_partition_size的結果有routing_partition_size個可能的值,兩個組合在一起,對于同一個routing值的不同doc,也就能計算出routing_partition_size可能的shard num了,即一個shard集合。但要注意這樣做以后有兩個限制:
索引的mapping中不能再定義join關系的字段,原因是join強制要求關聯的doc必須路由到同一個shard,如果采用shard集合,這個是保證不了的。
索引mapping中_routing的required必須設置為true。
但是對于第2點我測試了一下,如果不寫mapping,是可以的,此時_routing的required默認值其實是false的。但如果顯式的寫了,就必須設置為true,否則創建索引會報錯。
// 不顯式的設置mapping,可以成功創建索引
PUT route_test_3/
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 0,
"routing_partition_size": 2
}
}
// 查詢也可以不用帶routing,也可以正確執行,增刪改也一樣
GET route_test_3/_doc/a
// 如果顯式的設置了mappings域,且required設置為false,創建索引就會失敗,必須改為true
PUT route_test_4/
{
"settings": {
"number_of_shards": 2,
"number_of_replicas": 0,
"routing_partition_size": 2
},
"mappings": {
"_routing": {
"required": false
}
}
}
不知道這算不算一個bug。
總結
ElasticSearch的routing算是一個高級用法,但的確非常有用。在我們公司的訂單數據,就用merchant_no作為routing,這樣就能保證同一個商戶的數據全部保存到同一個shard去,后面檢索的時候,同樣使用merchant_no作為routing,就可以精準的從某個shard獲取數據了。對于超大數據量的搜索,routing再配合hot&warm的架構,是非常有用的一種解決方案。而且同一種屬性的數據寫到同一個shard還有很多好處,比如可以提高aggregation的準確性。
注1:本文例子中routing=key1,這里的key1是具體的值,而不是字段名稱; 注2:通過JavaAPI創建 IndexRequest 時,通過 routing(java.lang.String routing) 方法指定routing值,注意這里是具體的值,而不是字段名稱; 注3:本文的所有測試基于ES 7.1.0版本。
hot&warm的架構,參考我另一篇文章:https://www.cnblogs.com/caoweixiong/p/11988457.html
參考:https://niyanchun.com/routing-in-es.html
總結
以上是生活随笔為你收集整理的ElasticSearch——路由(_routing)机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎么使用ping命令进行连通性测试
- 下一篇: C++写矩阵的转置