Go 学习笔记(58)— Go 第三方库之 etcd/clientv3(连接客户端、PUT、GET、Lease、Op、Txn、Watch 基础概念说明)
1. 安裝 Golang 的 Etcd 包
我們使用 v3 版本的 etcd client , 首先通過 go get 下載并編譯安裝 etcd clinet v3。
go get -v github.com/coreos/etcd/clientv3
該命令會將包下載到 $GOPATH/src/github.com/coreos/etcd/clientv3 中,所有相關依賴包會自動下載編譯,包括protobuf、grpc等。
我們主要梳理一下使用 etcd 時經常用到的主要 API 并進行演示。
2. 連接客戶端
用程序訪問 etcd 首先要創建 client ,它需要傳入一個 Config 配置,這里傳了 2 個選項:
Endpoints:etcd的多個節點服務地址;DialTimeout:創建client的首次連接超時時間,這里傳了 5 秒,如果 5 秒都沒有連接成功就會返回err,一旦client創建成功,我們就不用再關心后續底層連接的狀態了,client內部會重連;
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"},// Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"}DialTimeout: 5 * time.Second,
})
返回的client,它的類型具體如下:
type Client struct {ClusterKVLeaseWatcherAuthMaintenance// Username is a user name for authentication.Username string// Password is a password for authentication.Password string// contains filtered or unexported fields
}
類型中的成員是 etcd 客戶端幾何核心功能模塊的具體實現,它們分別用于:
Cluster:向集群里增加etcd服務端節點之類,屬于管理員操作。KV:我們主要使用的功能,即K-V鍵值庫的操作。Lease:租約相關操作,比如申請一個TTL=10秒的租約(應用給key可以實現鍵值的自動過期)。Watcher:觀察訂閱,從而監聽最新的數據變化。Auth:管理etcd的用戶和權限,屬于管理員操作。Maintenance:維護etcd,比如主動遷移etcd的leader節點,屬于管理員操作。
我們需要使用什么功能,就去 client 里獲取對應的成員即可。
Client.KV 是一個 interface ,提供了關于 K-V 操作的所有方法:
type KV interface {Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)// Delete deletes a key, or optionally using WithRange(end), [key, end).Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)// Compact compacts etcd KV history before the given rev.Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)Do(ctx context.Context, op Op) (OpResponse, error)// Txn creates a transaction.Txn(ctx context.Context) Txn
}
我們通過方法clientv3.NewKV()來獲得 KV 接口的實現(實現中內置了錯誤重試機制):
kv := clientv3.NewKV(cli)
接下來,我們將通過kv操作 etcd 中的數據。
3. PUT 設置操作
putResp, err := kv.Put(context.TODO(),"/test/key1", "Hello etcd!")
第一個參數是 goroutine 的上下文 Context 。后面兩個參數分別是 key 和 value ,對于 etcd 來說, key=/test/key1 只是一個字符串而已,但是對我們而言卻可以模擬出目錄層級關系。
Put 函數的聲明如下:
// Put puts a key-value pair into etcd.
// Note that key,value can be plain bytes array and string is
// an immutable representation of that bytes array.
// To get a string of bytes, do string([]byte{0x10, 0x20}).
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
除了上面例子中的三個的參數,還支持一個變長參數,可以傳遞一些控制項來影響 Put 的行為,例如可以攜帶一個 lease ID 來支持 key 過期。
Put 操作返回的是 PutResponse ,不同的 KV 操作對應不同的 response 結構,所有 KV 操作返回的 response 結構如下:
type (CompactResponse pb.CompactionResponsePutResponse pb.PutResponseGetResponse pb.RangeResponseDeleteResponse pb.DeleteRangeResponseTxnResponse pb.TxnResponse
)
程序代碼里導入 clientv3 后在 VSCode 中可以很快定位到 PutResponse 的定義文件中, PutResponse 只是 pb.PutResponse 的類型別名,通過VSCode跳轉過去后可以看到 PutResponse 的詳細定義。
type PutResponse struct {Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`// if prev_kv is set in the request, the previous key-value pair will be returned.PrevKv *mvccpb.KeyValue `protobuf:"bytes,2,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}
Header 里保存的主要是本次更新的 revision 信息,而 PrevKv 可以返回 Put 覆蓋之前的 value 是什么(目前是 nil ,后面會說原因),把返回的 PutResponse 打印出來看一下:
fmt.Printf("PutResponse: %v, err: %v", putResp, err)
// output
// PutResponse: &{cluster_id:14841639068965178418 member_id:10276657743932975437 revision:3 raft_term:7 <nil>}, err: <nil>%
我們需要判斷 err 來確定操作是否成功。
我們再 Put 其他 2 個 key ,用于后續演示:
kv.Put(context.TODO(),"/test/key2", "Hello World!")
// 再寫一個同前綴的干擾項
kv.Put(context.TODO(), "/testspam", "spam")
現在 /test 目錄下有兩個鍵: key1 和 key2 , 而 /testspam 并不歸屬于 /test 目錄。
代碼示例:
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于讀寫etcd的鍵值對kv := clientv3.NewKV(client)// clientv3.WithPrevKV() 是一個可選控制項,用于獲取在設置當前鍵值對之前的該鍵的鍵值對// 有了該控制項后,putResp 才有 PrevKv 的屬性,即獲取之前的鍵值對。// context.TODO() 表示當前還不知道用哪個 context 控制該操作,先用該字段占位putResp, err := kv.Put(context.TODO(), "/demo/A/B", "hello", clientv3.WithPrevKV())if err != nil {fmt.Println(err)}fmt.Println("putResp is ", putResp)fmt.Println("Revision:", putResp.Header.Revision)if putResp.PrevKv != nil {fmt.Println("PrevValue:", string(putResp.PrevKv.Value))}}
4. GET 獲取操作
使用 KV 的 Get 方法來讀取給定鍵的值:
getResp, err := kv.Get(context.TODO(), "/test/key1")
其函數聲明如下:
// Get retrieves keys.
// By default, Get will return the value for "key", if any.
// When passed WithRange(end), Get will return the keys in the range [key, end).
// When passed WithFromKey(), Get returns keys greater than or equal to key.
// When passed WithRev(rev) with rev > 0, Get retrieves keys at the given revision;
// if the required revision is compacted, the request will fail with ErrCompacted .
// When passed WithLimit(limit), the number of returned keys is bounded by limit.
// When passed WithSort(), the keys will be sorted.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
和 Put 類似,函數注釋里提示我們可以傳遞一些控制參數來影響 Get 的行為,比如: WithFromKey 表示讀取從參數 key 開始遞增的所有 key ,而不是讀取單個 key 。
在上面的例子中,我沒有傳遞 opOption ,所以就是獲取 key=/test/key1 的最新版本數據。這里 err 并不能反饋出 key 是否存在(只能反饋出本次操作因為各種原因異常了),我們需要通過 GetResponse (實際上是 pb.RangeResponse )判斷 key 是否存在:
type RangeResponse struct {Header *ResponseHeader `protobuf:"bytes,1,opt,name=header" json:"header,omitempty"`// kvs is the list of key-value pairs matched by the range request.// kvs is empty when count is requested.Kvs []*mvccpb.KeyValue `protobuf:"bytes,2,rep,name=kvs" json:"kvs,omitempty"`// more indicates if there are more keys to return in the requested range.More bool `protobuf:"varint,3,opt,name=more,proto3" json:"more,omitempty"`// count is set to the number of keys within the range when requested.Count int64 `protobuf:"varint,4,opt,name=count,proto3" json:"count,omitempty"`
}
Kvs 字段,保存了本次 Get 查詢到的所有 k-v 對,因為上述例子只 Get 了一個單 key ,所以只需要判斷一下 len(Kvs) 是否等于 1 即可知道 key 是否存在。
RangeResponse.More和Count,當我們使用withLimit()等選項進行Get時會發揮作用,相當于翻頁查詢。
接下來,我們通過給 Get 查詢增加 WithPrefix 選項,獲取 /test 目錄下的所有子元素:
rangeResp, err := kv.Get(context.TODO(), "/test/", clientv3.WithPrefix())
WithPrefix()是指查找以/test/為前綴的所有 key ,因此可以模擬出查找子目錄的效果。
etcd是一個有序的 k-v 存儲,因此 /test/ 為前綴的 key 總是順序排列在一起。
withPrefix()實際上會轉化為范圍查詢,它根據前綴/test/生成了一個前閉后開的key range:[“/test/”, “/test0”),為什么呢?因為比/大的字符是0,所以以/test0作為范圍的末尾,就可以掃描到所有以/test/為前綴的 key 了。
在之前,我們 Put 了一個/testspam鍵值,因為不符合/test/前綴(注意末尾的 / ),所以就不會被這次Get獲取到。但是,如果查詢的前綴是/test,那么/testspam就會被返回,使用時一定要特別注意。
打印 rangeResp.Kvs 可以看到獲得了兩個鍵值:
[key:"/test/key1" create_revision:2 mod_revision:13 version:6 value:"Hello etcd!"
key:"/test/key2" create_revision:5 mod_revision:14 version:4 value:"Hello World!" ]
代碼示例:
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints: []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于讀寫etcd的鍵值對kv := clientv3.NewKV(client)kv.Put(context.TODO(), "/demo/A/B", "BBB", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/C", "CCC", clientv3.WithPrevKV())// 讀取/demo/A/為前綴的所有key// clientv3.WithPrefix() , clientv3.WithCountOnly() 可以有多個并以 逗號分隔即可getResp, err := kv.Get(context.TODO(), "/demo/A/", clientv3.WithPrefix() /*,clientv3.WithCountOnly()*/)if err != nil {fmt.Println(err)}fmt.Println(getResp.Kvs, getResp.Count)for _, resp := range getResp.Kvs {fmt.Printf("key: %s, value:%s\n", string(resp.Key), string(resp.Value))}
}
輸出結果為:
[key:"/demo/A/B" create_revision:6 mod_revision:22 version:6 value:"BBB"
key:"/demo/A/C" create_revision:7 mod_revision:23 version:12 value:"CCC" ] 2
key: /demo/A/B, value:BBB
key: /demo/A/C, value:CCC
5. Delete 操作
示例代碼:
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints: []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 用于讀寫etcd的鍵值對kv := clientv3.NewKV(client)kv.Put(context.TODO(), "/demo/A/B1", "BBB", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/B2", "CCC", clientv3.WithPrevKV())kv.Put(context.TODO(), "/demo/A/B3", "DDD", clientv3.WithPrevKV())/*clientv3.WithFromKey() 表示針對的key操作是大于等于當前給定的keyclientv3.WithPrevKV() 表示返回的 response 中含有之前刪除的值,否則下面的 delResp.PrevKvs 為空*/delResp, err := kv.Delete(context.TODO(), "/demo/A/B",clientv3.WithFromKey(), clientv3.WithPrevKV())if err != nil {fmt.Println(err)}// 查看被刪除的 key 和 value 是什么if delResp.PrevKvs != nil {// if len(delResp.PrevKvs) != 0 {for _, kvpair := range delResp.PrevKvs {fmt.Println("已刪除:", string(kvpair.Key), string(kvpair.Value))}}
}
輸出結果:
已刪除: /demo/A/B1 BBB
已刪除: /demo/A/B2 CCC
已刪除: /demo/A/B3 DDD
6. Lease 租約操作
etcd 客戶端的 Lease 對象可以通過以下的代碼獲取到
lease := clientv3.NewLease(cli)
lease 對象是 Lease 接口的實現, Lease 接口的聲明如下:
type Lease interface {// Grant 創建一個新租約Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)// Revoke 銷毀給定租約ID的租約Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)// TimeToLive retrieves the lease information of the given lease ID.TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)// Leases retrieves all leases.Leases(ctx context.Context) (*LeaseLeasesResponse, error)// KeepAlive keeps the given lease alive forever.KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive// should be used instead of KeepAliveOnce.KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)// Close releases all resources Lease keeps for efficient communication// with the etcd server.Close() error
}
Lease 提供了以下功能:
Grant:分配一個租約;Revoke:釋放一個租約;TimeToLive:獲取剩余TTL時間;Leases:列舉所有etcd中的租約;KeepAlive:自動定時的續約某個租約;KeepAliveOnce:為某個租約續約一次;Close:釋放當前客戶端建立的所有租約;
要想實現 key 自動過期,首先得創建一個租約,下面的代碼創建一個 TTL 為 10 秒的租約:
grantResp, err := lease.Grant(context.TODO(), 10)
返回的 grantResponse 的結構體聲明如下:
// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
type LeaseGrantResponse struct {*pb.ResponseHeaderID LeaseIDTTL int64Error string
}
在應用程序代碼中主要使用到的是租約 ID 。
接下來我們用這個 Lease 往 etcd 中存儲一個 10 秒過期的 key :
kv.Put(context.TODO(), "/test/vanish", "vanish in 10s", clientv3.WithLease(grantResp.ID))
這里特別需要注意,有一種情況是在 Put 之前 Lease 已經過期了,那么這個 Put 操作會返回 error ,此時你需要重新分配 Lease 。
當我們實現服務注冊時,需要主動給 Lease 進行續約,通常是以小于 TTL 的間隔循環調用 Lease 的 KeepAliveOnce() 方法對租約進行續期,一旦某個服務節點出錯無法完成租約的續期,等 key 過期后客戶端即無法在查詢服務時獲得對應節點的服務,這樣就通過租約到期實現了服務的錯誤隔離。
keepResp, err := lease.KeepAliveOnce(context.TODO(), grantResp.ID)
或者使用KeepAlive()方法,其會返回<-chan *LeaseKeepAliveResponse只讀通道,每次自動續租成功后會向通道中發送信號。
一般都用KeepAlive()方法, KeepAlive 和 Put 一樣,如果在執行之前 Lease 就已經過期了,那么需要重新分配 Lease 。 etcd 并沒有提供 API 來實現原子的 Put with Lease ,需要我們自己判斷 err 重新分配 Lease 。
示例代碼
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints: []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 創建一個lease(租約)對象lease := clientv3.NewLease(client)// 申請一個10秒的租約leaseGrantResp, err := lease.Grant(context.TODO(), 10)if err != nil {fmt.Println(err)return}// 拿到租約的IDleaseId := leaseGrantResp.ID// 自動永久續租keepRespChan, err := lease.KeepAlive(context.TODO(), leaseId)if err != nil {fmt.Println(err)return}// 處理續約應答的協程go func() {for {select {case keepResp := <-keepRespChan:if keepResp == nil {fmt.Println("租約已經失效了")goto END} else { // 每秒會續租一次, 所以就會受到一次應答fmt.Println("收到自動續租應答:", keepResp.ID)}}}END:}()// 獲得kv API子集kv := clientv3.NewKV(client)// Put一個KV, 讓它與租約關聯起來, 從而實現10秒后自動過期putResp, err := kv.Put(context.TODO(), "/demo/A/B1", "hello", clientv3.WithLease(leaseId))if err != nil {fmt.Println(err)return}fmt.Println("寫入成功:", putResp.Header.Revision)// 定時的看一下key過期了沒有for {getResp, err := kv.Get(context.TODO(), "/demo/A/B1")if err != nil {fmt.Println(err)return}if getResp.Count == 0 {fmt.Println("kv過期了")break}fmt.Println("還沒過期:", getResp.Kvs)time.Sleep(2 * time.Second)}
}
輸出結果:
收到自動續租應答: 8488292048996991588
寫入成功: 80
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自動續租應答: 8488292048996991588
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
還沒過期: [key:"/demo/A/B1" create_revision:80 mod_revision:80 version:1 value:"hello" lease:8488292048996991588 ]
收到自動續租應答: 8488292048996991588
7. Op 獲取設置聯合操作
Op 字面意思就是”操作”, Get 和 Put 都屬于 Op ,只是為了簡化用戶開發而開放的特殊 API 。
KV 對象有一個 Do 方法接受一個 Op :
// Do applies a single Op on KV without a transaction.
// Do is useful when creating arbitrary operations to be issued at a
// later time; the user can range over the operations, calling Do to
// execute them. Get/Put/Delete, on the other hand, are best suited
// for when the operation should be issued at the time of declaration.
Do(ctx context.Context, op Op) (OpResponse, error)
其參數 Op 是一個抽象的操作,可以是 Put/Get/Delete… ;而 OpResponse 是一個抽象的結果,可以是 PutResponse/GetResponse…
可以通過 Client 中定義的一些方法來創建 Op :
- func OpDelete(key string, opts …OpOption) Op
- func OpGet(key string, opts …OpOption) Op
- func OpPut(key, val string, opts …OpOption) Op
- func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op
其實和直接調用 KV.Put , KV.GET 沒什么區別。下面是一個例子:
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints,DialTimeout: dialTimeout,
})
if err != nil {log.Fatal(err)
}
defer cli.Close()
ops := []clientv3.Op{clientv3.OpPut("put-key", "123"),clientv3.OpGet("put-key"),clientv3.OpPut("put-key", "456")}
for _, op := range ops {if _, err := cli.Do(context.TODO(), op); err != nil {log.Fatal(err)}
}
把 Op 交給 Do 方法執行,返回的 opResp 結構如下:
type OpResponse struct {put *PutResponseget *GetResponsedel *DeleteResponsetxn *TxnResponse
}
你的操作是什么類型,你就用哪個指針來訪問對應的結果。
示例代碼:
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints: []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 獲得kv API子集kv := clientv3.NewKV(client)// 創建Op: operationputOp := clientv3.OpPut("/demo/A/B1", "BBBBB")// 執行OP // kv.Do(op)opResp, err := kv.Do(context.TODO(), putOp)if err != nil {fmt.Println(err)return}fmt.Println("寫入Revision:", opResp.Put().Header.Revision)// 創建OpgetOp := clientv3.OpGet("/demo/A/B1")// 執行OPopResp, err = kv.Do(context.TODO(), getOp)if err != nil {fmt.Println(err)return}// 打印 create rev == mod revfmt.Println("數據Revision:", opResp.Get().Kvs[0].ModRevision) fmt.Println("數據value:", string(opResp.Get().Kvs[0].Value))
}
輸出結果:
寫入Revision: 105
數據Revision: 105
數據value: BBBBB
8. Txn 事務操作
etcd 中事務是原子執行的,只支持 if … then … else … 這種表達。首先來看一下 Txn 中定義的方法:
type Txn interface {// If takes a list of comparison. If all comparisons passed in succeed,// the operations passed into Then() will be executed. Or the operations// passed into Else() will be executed.If(cs ...Cmp) Txn// Then takes a list of operations. The Ops list will be executed, if the// comparisons passed in If() succeed.Then(ops ...Op) Txn// Else takes a list of operations. The Ops list will be executed, if the// comparisons passed in If() fail.Else(ops ...Op) Txn// Commit tries to commit the transaction.Commit() (*TxnResponse, error)
}
Txn 必須是這樣使用的:If(滿足條件) Then(執行若干Op) Else(執行若干Op)。
If 中支持傳入多個 Cmp 比較條件,如果所有條件滿足,則執行 Then 中的 Op (上一節介紹過Op),否則執行 Else中 的 Op 。
首先,我們需要開啟一個事務,這是通過 KV 對象的方法實現的:
txn := kv.Txn(context.TODO())
下面的測試程序,判斷如果 k1 的值大于 v1 并且 k1 的版本號是 2,則 Put 鍵值 k2 和 k3 ,否則 Put 鍵值 k4 和 k5 。
kv.Txn(context.TODO()).If(clientv3.Compare(clientv3.Value(k1), ">", v1),clientv3.Compare(clientv3.Version(k1), "=", 2)
).Then(clientv3.OpPut(k2,v2), clentv3.OpPut(k3,v3)
).Else(clientv3.OpPut(k4,v4), clientv3.OpPut(k5,v5)
).Commit()
類似于 clientv3.Value() 用于指定 key 屬性的,有這么幾個方法:
- func CreateRevision(key string) Cmp:key=xxx的創建版本必須滿足…
- func LeaseValue(key string) Cmp:key=xxx的Lease ID必須滿足…
- func ModRevision(key string) Cmp:key=xxx的最后修改版本必須滿足…
- func Value(key string) Cmp:key=xxx的創建值必須滿足…
- func Version(key string) Cmp:key=xxx的累計更新次數必須滿足…
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3"
)func main() {config := clientv3.Config{Endpoints: []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// lease實現鎖自動過期:// op操作// txn事務: if else then// 1, 上鎖 (創建租約, 自動續租, 拿著租約去搶占一個key)lease := clientv3.NewLease(client)// 申請一個5秒的租約leaseGrantResp, err := lease.Grant(context.TODO(), 5)if err != nil {fmt.Println(err)return}// 拿到租約的IDleaseId := leaseGrantResp.ID// 準備一個用于取消自動續租的contextctx, cancelFunc := context.WithCancel(context.TODO())// 確保函數退出后, 自動續租會停止defer cancelFunc()defer lease.Revoke(context.TODO(), leaseId)// 5秒后會取消自動續租keepRespChan, err := lease.KeepAlive(ctx, leaseId)if err != nil {fmt.Println(err)return}// 處理續約應答的協程go func() {for {select {case keepResp := <-keepRespChan:if keepResp == nil {fmt.Println("租約已經失效了")goto END} else { // 每秒會續租一次, 所以就會受到一次應答fmt.Println("收到自動續租應答:", keepResp.ID)}}}END:}()// if 不存在key, then 設置它, else 搶鎖失敗kv := clientv3.NewKV(client)// 創建事務txn := kv.Txn(context.TODO())// 定義事務// 如果key不存在txn.If(clientv3.Compare(clientv3.CreateRevision("/demo/A/B1"), "=", 0)).Then(clientv3.OpPut("/demo/A/B1", "xxx", clientv3.WithLease(leaseId))).Else(clientv3.OpGet("/demo/A/B1")) // 否則搶鎖失敗// 提交事務txnResp, err := txn.Commit()if err != nil {fmt.Println(err)return // 沒有問題}// 判斷是否搶到了鎖if !txnResp.Succeeded {fmt.Println("鎖被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))return}// 2, 處理業務fmt.Println("處理任務")time.Sleep(5 * time.Second)// 3, 釋放鎖(取消自動續租, 釋放租約)// defer 會把租約釋放掉, 關聯的KV就被刪除了
}
輸出結果:
收到自動續租應答: 8488292048996991680
鎖被占用: BBBBB
9. Watch 監聽操作
Watch 用于監聽某個鍵的變化, Watch調用后返回一個WatchChan,它的類型聲明如下:
type WatchChan <-chan WatchResponse
type WatchResponse struct {Header pb.ResponseHeaderEvents []*EventCompactRevision int64Canceled boolCreated bool
}
當監聽的 key 有變化后會向WatchChan發送WatchResponse。
Watch 的典型應用場景是應用于系統配置的熱加載,我們可以在系統讀取到存儲在 etcd key 中的配置后,用 Watch 監聽 key 的變化。在單獨的 goroutine 中接收 WatchChan 發送過來的數據,并將更新應用到系統設置的配置變量中,比如像下面這樣在 goroutine 中更新變量 appConfig ,這樣系統就實現了配置變量的熱加載。
type AppConfig struct {config1 stringconfig2 string
}var appConfig Appconfigfunc watchConfig(clt *clientv3.Client, key string, ss interface{}) {watchCh := clt.Watch(context.TODO(), key)go func() {for res := range watchCh {value := res.Events[0].Kv.Valueif err := json.Unmarshal(value, ss); err != nil {fmt.Println("now", time.Now(), "watchConfig err", err)continue}fmt.Println("now", time.Now(), "watchConfig", ss)}}()
}
watchConfig(client, "config_key", &appConfig)
完整示例代碼:
package mainimport ("context""fmt""time""github.com/coreos/etcd/clientv3""github.com/coreos/etcd/mvcc/mvccpb"
)func main() {config := clientv3.Config{Endpoints: []string{"192.168.0.113:2379"}, // 集群列表DialTimeout: 5 * time.Second,}// 建立一個客戶端client, err := clientv3.New(config)if err != nil {fmt.Println(err)return}// 獲得kv API子集kv := clientv3.NewKV(client)// 模擬etcd中KV的變化go func() {for {kv.Put(context.TODO(), "/demo/A/B1", "i am B1")kv.Delete(context.TODO(), "/demo/A/B1")time.Sleep(1 * time.Second)}}()// 先GET到當前的值,并監聽后續變化getResp, err := kv.Get(context.TODO(), "/demo/A/B1")if err != nil {fmt.Println(err)return}// 現在key是存在的if len(getResp.Kvs) != 0 {fmt.Println("當前值:", string(getResp.Kvs[0].Value))}// 當前etcd集群事務ID, 單調遞增的watchStartRevision := getResp.Header.Revision + 1// 創建一個watcherwatcher := clientv3.NewWatcher(client)// 啟動監聽fmt.Println("從該版本向后監聽:", watchStartRevision)// 創建一個 5s 后取消的上下文ctx, cancelFunc := context.WithCancel(context.TODO())time.AfterFunc(5*time.Second, func() {cancelFunc()})// 該監聽動作在 5s 后取消watchRespChan := watcher.Watch(ctx, "/demo/A/B1", clientv3.WithRev(watchStartRevision))// 處理kv變化事件for watchResp := range watchRespChan {for _, event := range watchResp.Events {switch event.Type {case mvccpb.PUT:fmt.Println("修改為:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)case mvccpb.DELETE:fmt.Println("刪除了", "Revision:", event.Kv.ModRevision)}}}}
輸出結果:
從該版本向后監聽: 94
修改為: i am B1 Revision: 94 94
刪除了 Revision: 95
修改為: i am B1 Revision: 96 96
刪除了 Revision: 97
修改為: i am B1 Revision: 98 98
刪除了 Revision: 99
修改為: i am B1 Revision: 100 100
刪除了 Revision: 101
修改為: i am B1 Revision: 102 102
刪除了 Revision: 103
8. 參考資料
https://segmentfault.com/a/1190000020868242?utm_source=tag-newest
https://godoc.org/github.com/coreos/etcd/clientv3
https://pkg.go.dev/go.etcd.io/etcd/clientv3?tab=doc
總結
以上是生活随笔為你收集整理的Go 学习笔记(58)— Go 第三方库之 etcd/clientv3(连接客户端、PUT、GET、Lease、Op、Txn、Watch 基础概念说明)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2022-2028年中国TPE手套行业市
- 下一篇: 2022-2028年中国模胚行业市场研究