Elasticsearch:Elasticsearch 开发入门 - Golang
在本文中,我將分享如何在 Golang 中如何使用?Elasticsearch 來開發(fā)的經(jīng)驗(yàn)。 順便說一句,以防萬一你從未聽說過 Elasticsearch:
Elasticsearch 是一個高度可擴(kuò)展的開源全文本搜索和分析引擎。 它使你可以快速,近乎實(shí)時地存儲,搜索和分析大量數(shù)據(jù)。 它通常用作支持具有復(fù)雜搜索功能和要求的應(yīng)用程序的基礎(chǔ)引擎/技術(shù)。
如果你想了解更多關(guān)于 Elasticsearch 的介紹,你可以參閱我之前的文章 “Elasticsearch 簡介”。
針對 Golang 的 Elasticsearch 支持,你可以訪問 Elastic 的官方 github?https://github.com/elastic/go-elasticsearch。
前提條件
創(chuàng)建一個 Golang?項(xiàng)目
我們在自己的電腦里創(chuàng)建一個如下的目錄:
mkdir go-elasticsearch cd go-elasticsearch接著我們在這個目錄里創(chuàng)建一個叫做 main.go 的文件。你可以使用你喜歡的編輯器,比如:
vi main.go在上面我們使用 vi 編輯器來創(chuàng)建 main.go 文件。
用于 Elasticsearch 的 Golang 驅(qū)動程序(go-elasticsearch)必須安裝在服務(wù)器的 $GOPATH 中。 使用 git 將庫的存儲庫克隆到 $GOPATH 中,如下例所示:
git clone --branch master https://github.com/elastic/go-elasticsearch.git $GOPATH/src/github.com/elastic/go-elasticsearch在編譯 Go 應(yīng)用時,有時遇到庫不能從 github 上下載的錯誤信息。我們需要在 terminal 中打入如下的命令:
export GO111MODULE=on export GOPROXY=https://goproxy.io我們也可以使用如下的方法來達(dá)到安裝的 go-elasticsearch 的目的。我們需要在?go-elasticsearch 目錄下創(chuàng)建一個叫做 go.mod 的文件。它的內(nèi)容如下:
go.mod
require github.com/elastic/go-elasticsearch/v7 master客戶端主要版本與兼容的 Elasticsearch 主要版本相對應(yīng):要連接到 Elasticsearch 7.x,請使用客戶端的 7.x 版本,要連接到Elasticsearch 6.x,請使用客戶端的 6.x 版本。
require github.com/elastic/go-elasticsearch/v7 7.x require github.com/elastic/go-elasticsearch/v7 7.0.0可以在一個項(xiàng)目中使用客戶端的多個版本:
// go.mod github.com/elastic/go-elasticsearch/v6 6.x github.com/elastic/go-elasticsearch/v7 7.x// main.go import (elasticsearch6 "github.com/elastic/go-elasticsearch/v6"elasticsearch7 "github.com/elastic/go-elasticsearch/v7" ) // ... es6, _ := elasticsearch6.NewDefaultClient() es7, _ := elasticsearch7.NewDefaultClient()安裝 Elasticsearch 及 Kibana
如果你之前從來沒有安裝過 Elasticsearch 或 Kibana。你可以閱讀我之前的文章 “Elastic:菜鳥上手指南” 來進(jìn)行安裝。在本練習(xí)中,我們將使用 docker 來安裝 Elasticsearch 及 Kibana。我們首先來創(chuàng)建一個叫做 docker-compose.yml 的文件:
docker-compose.yml
--- version: "3" services:elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:7.10.0container_name: es01environment:- node.name=es01- cluster.name=docker-cluster- bootstrap.memory_lock=true- "ES_JAVA_OPTS=-Xms512m -Xmx512m"- discovery.type=single-nodeulimits:memlock:soft: -1hard: -1volumes:- esdata:/usr/share/elasticsearch/dataports:- 9200:9200kibana:image: docker.elastic.co/kibana/kibana:7.10.0ports:- 5601:5601depends_on:- elasticsearchvolumes:esdata:driver: local在上面,我們使用了 Elastic Stack 7.10.0 發(fā)行版作為實(shí)驗(yàn)的版本。在你實(shí)際的使用中,你可以根據(jù)自己的版本需求而進(jìn)行修改。
我們必須先啟動 docker,然后在命令行中執(zhí)行:
docker-compose up上面命令必須執(zhí)行于 docker-compose.yml 文件所在的目錄中。
它將啟動 http://localhost:9200 中的 Elasticsearch 和 http://localhost:5601 中的 Kibana。 你可以通過在瀏覽器中打開鏈接來進(jìn)行驗(yàn)證。
測試 elasticsearch 包
elasticsearch 軟件包將兩個單獨(dú)的軟件包捆綁在一起,分別用于調(diào)用 Elasticsearch API 和通過 HTTP 傳輸數(shù)據(jù):esapi 和 estransport。
使用 elasticsearch.NewDefaultClient() 函數(shù)創(chuàng)建具有默認(rèn)設(shè)置的客戶端。
main.go
package mainimport ("log"// Import the Elasticsearch library packages"github.com/elastic/go-elasticsearch/v7" )func main() {es, err := elasticsearch.NewDefaultClient()if err != nil {log.Fatalf("Error creating the client: %s", err)}res, err := es.Info()if err != nil {log.Fatalf("Error getting response: %s", err)}defer res.Body.Close()log.Println(res) }我們使用如下的命令來運(yùn)行:
go run main.go上面的命令顯示的結(jié)果為:
$ go run main.go go: finding github.com/elastic/go-elasticsearch latest 2020/12/24 10:56:23 [200 OK] {"name" : "es01","cluster_name" : "docker-cluster","cluster_uuid" : "ZYQ9cGOdS06uZvxOvjug8A","version" : {"number" : "7.10.0","build_flavor" : "default","build_type" : "docker","build_hash" : "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96","build_date" : "2020-11-09T21:30:33.964949Z","build_snapshot" : false,"lucene_version" : "8.7.0","minimum_wire_compatibility_version" : "6.8.0","minimum_index_compatibility_version" : "6.0.0-beta1"},"tagline" : "You Know, for Search" }注意:關(guān)閉并使用響應(yīng) body 至關(guān)重要,以便在默認(rèn)的 HTTP 傳輸中重新使用持久性 TCP 連接。 如果你對響應(yīng)正文不感興趣,請調(diào)用 io.Copy(ioutil.Discard,res.Body)。
當(dāng)你 export ELASTICSEARCH_URL環(huán)境變量時,它將用于設(shè)置集群端點(diǎn)。 用逗號分隔多個地址。
要以編程方式設(shè)置集群端點(diǎn),請將配置對象傳遞給 elasticsearch.NewClient() 函數(shù)。
cfg := elasticsearch.Config{Addresses: []string{"http://localhost:9200","http://localhost:9201",},// ... } es, err := elasticsearch.NewClient(cfg)要設(shè)置用戶名和密碼,請將其包括在端點(diǎn) URL 中,或使用相應(yīng)的配置選項(xiàng)。
cfg := elasticsearch.Config{// ...Username: "foo",Password: "bar", }若要設(shè)置用于對群集節(jié)點(diǎn)的證書進(jìn)行簽名的自定義證書頒發(fā)機(jī)構(gòu),請使用 CACert 配置選項(xiàng)。
cert, _ := ioutil.ReadFile(*cacert)cfg := elasticsearch.Config{// ...CACert: cert, }插入文檔到索引
在這個章節(jié)中,我將一步一步地指導(dǎo)如何如何使用 go-elasticsearch 驅(qū)動來把文檔導(dǎo)入到 Elasticsearch 中。
創(chuàng)建一個 Go 腳本并導(dǎo)入包
現(xiàn)在,我們已經(jīng)確保正確安裝和設(shè)置了我們需要的所有內(nèi)容,我們可以開始使用 Go 腳本了。 編輯之前的 main.go 文件,然后將 main 包放在頂部。 請確保導(dǎo)入所有必需的程序包和庫,如以下示例所示:
package mainimport ( "context" "encoding/json" "fmt" "log" "reflect" "strconv" "strings"// Import the Elasticsearch library packages "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" )在上面,我們使用 v7 版本,它對應(yīng)于 Elastic Stack 7.x 版本的發(fā)布。在之前的部署中,我們使用的版本是 7.10。
為 Elasticsearch 文檔的字段創(chuàng)建結(jié)構(gòu)數(shù)據(jù)類型
我們將使用 Golang struct 數(shù)據(jù)類型為要編制索引的 Elasticsearch 文檔以及索引的相應(yīng)字段創(chuàng)建框架:
// Declare a struct for Elasticsearch fields type ElasticDocs struct {SomeStr stringSomeInt intSomeBool bool }聲明一個將 Elasticsearch 結(jié)構(gòu)數(shù)據(jù)轉(zhuǎn)換為 JSON 字符串的函數(shù)
接下來,讓我們看一個簡單的函數(shù),它將 Elasticsearch struct 文檔實(shí)例轉(zhuǎn)換為 JSON 字符串。 下面顯示的代碼可能看起來有些復(fù)雜,但是實(shí)際上發(fā)生的事情很簡單–所有功能所做的就是將結(jié)構(gòu)轉(zhuǎn)換為字符串文字,然后將該字符串傳遞給 Golang 的 json.Marshal() 方法以使其返回字符串的JSON編碼:
// A function for marshaling structs to JSON string func jsonStruct(doc ElasticDocs) string {// Create struct instance of the Elasticsearch fields struct objectdocStruct := &ElasticDocs{SomeStr: doc.SomeStr,SomeInt: doc.SomeInt,SomeBool: doc.SomeBool,}fmt.Println("\ndocStruct:", docStruct)fmt.Println("docStruct TYPE:", reflect.TypeOf(docStruct))// Marshal the struct to JSON and check for errorsb, err := json.Marshal(docStruct)if err != nil {fmt.Println("json.Marshal ERROR:", err)return string(err.Error())}return string(b) }聲明 main() 函數(shù)并創(chuàng)建一個新的 Elasticsearch Golang 客戶端實(shí)例
在我們的 Go 腳本中,所有 API 方法調(diào)用都必須位于 main() 函數(shù)內(nèi)部或從另一個函數(shù)內(nèi)部進(jìn)行調(diào)用。 讓我們?yōu)?API 調(diào)用創(chuàng)建一個新的上下文對象,并為 Elasticsearch 文檔創(chuàng)建一個 map 對象:
func main() {// Allow for custom formatting of log outputlog.SetFlags(0)// Create a context object for the API callsctx := context.Background()// Create a mapping for the Elasticsearch documentsvar (docMap map[string]interface{})fmt.Println("docMap:", docMap)fmt.Println("docMap TYPE:", reflect.TypeOf(docMap))實(shí)例化 Elasticsearch 客戶端配置和 Golang 客戶端實(shí)例
在這一步中,我們將實(shí)例化一個新的 Elasticsearch 配置對象。 確保將正確的主機(jī)和端口信息以及任何用戶名或密碼傳遞給其 “Adressess” 屬性。
// Declare an Elasticsearch configurationcfg := elasticsearch.Config{Addresses: []string{"http://localhost:9200",},Username: "user",Password: "pass",}// Instantiate a new Elasticsearch client object instanceclient, err := elasticsearch.NewClient(cfg)if err != nil {fmt.Println("Elasticsearch connection error:", err)}檢查用于 Elasticsearch 的 Golang 客戶端在連接到集群時是否返回了任何錯誤
接下來,我們將檢查與 Elasticsearch 的連接是否成功或是否返回了任何錯誤:
// Have the client instance return a responseres, err := client.Info()// Deserialize the response into a map.if err != nil {log.Fatalf("client.Info() ERROR:", err)} else {log.Printf("client response:", res)}創(chuàng)建 Elasticsearch 結(jié)構(gòu)文檔并將其放入數(shù)組
我們將聲明一個空字符串?dāng)?shù)組,以存儲當(dāng)前以 JSON 字符串表示的 Elasticsearch 文檔。 以下代碼顯示了一些將用于索引的 Elasticsearch 文檔示例。 要設(shè)置其字段的值,你需要做的就是修改結(jié)構(gòu)實(shí)例的屬性:
我們會將這些文檔實(shí)例傳遞給我們先前聲明的 jsonStruct() 函數(shù),并使它們返回代表每個文檔的 JSON 字符串。 然后,我們將使用 Golang 的 append() 函數(shù)將 JSON 字符串添加到字符串?dāng)?shù)組中:
迭代 Elasticsearch 文檔數(shù)組并調(diào)用 Golang 客戶端的 IndexRequest() 方法
現(xiàn)在我們已經(jīng)建立了一個文檔數(shù)組,我們將對其進(jìn)行迭代,并在進(jìn)行過程中向 Elasticsearch 集群發(fā)出 API 請求。 這些API調(diào)用將通過調(diào)用 Golang 驅(qū)動程序的 esapi.IndexRequest() 方法來索引文檔:
// Iterate the array of string documentsfor i, bod := range docs {fmt.Println("\nDOC _id:", i+1)fmt.Println(bod)// Instantiate a request objectreq := esapi.IndexRequest {Index: "some_index",DocumentID: strconv.Itoa(i + 1),Body: strings.NewReader(bod),Refresh: "true",}fmt.Println(reflect.TypeOf(req))在上面一定要注意的是:我們設(shè)置 Refresh 為 true。這在實(shí)際的使用中并不建議,原因是每次寫入的時候都會 refresh。當(dāng)我們面對大量的數(shù)據(jù)時,這樣的操作會造成效率的底下。
檢查 IndexRequest() API 方法調(diào)用是否返回任何錯誤
在文檔數(shù)組上進(jìn)行迭代的最后一步是從 API 調(diào)用中獲取響應(yīng),并檢查是否存在錯誤:
// Return an API response object from requestres, err := req.Do(ctx, client)if err != nil {log.Fatalf("IndexRequest ERROR: %s", err)}defer res.Body.Close()在下面顯示的代碼中,如果沒有錯誤返回,我們將解析 API 響應(yīng)返回的結(jié)果對象:
if res.IsError() {log.Printf("%s ERROR indexing document ID=%d", res.Status(), i+1)} else {// Deserialize the response into a map.var resMap map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&resMap); err != nil {log.Printf("Error parsing the response body: %s", err)} else {log.Printf("\nIndexRequest() RESPONSE:")// Print the response status and indexed document version.fmt.Println("Status:", res.Status())fmt.Println("Result:", resMap["result"])fmt.Println("Version:", int(resMap["_version"].(float64)))fmt.Println("resMap:", resMap)fmt.Println("\n")}}} }每個文檔迭代都應(yīng)打印出一個map[string]?interface{}?對象響應(yīng),如下所示:
resMap: map[_id:1 _index:some_index _primary_term:1 _seq_no:32 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:2 forced_refresh:true result:updated]
在上面,我們講了很多代碼。為了方便大家練習(xí),我把整個 main.go 的代碼貼出來:
main.go
package mainimport ( "context" "encoding/json" "fmt" "log" "reflect" "strconv" "strings"// Import the Elasticsearch library packages "github.com/elastic/go-elasticsearch/v7" "github.com/elastic/go-elasticsearch/v7/esapi" )// Declare a struct for Elasticsearch fields type ElasticDocs struct {SomeStr stringSomeInt intSomeBool bool }// A function for marshaling structs to JSON string func jsonStruct(doc ElasticDocs) string {// Create struct instance of the Elasticsearch fields struct objectdocStruct := &ElasticDocs{SomeStr: doc.SomeStr,SomeInt: doc.SomeInt,SomeBool: doc.SomeBool,}fmt.Println("\ndocStruct:", docStruct)fmt.Println("docStruct TYPE:", reflect.TypeOf(docStruct))// Marshal the struct to JSON and check for errorsb, err := json.Marshal(docStruct)if err != nil {fmt.Println("json.Marshal ERROR:", err)return string(err.Error())}return string(b) }func main() {// Allow for custom formatting of log outputlog.SetFlags(0)// Create a context object for the API callsctx := context.Background()// Create a mapping for the Elasticsearch documentsvar (docMap map[string]interface{})fmt.Println("docMap:", docMap)fmt.Println("docMap TYPE:", reflect.TypeOf(docMap))// Declare an Elasticsearch configurationcfg := elasticsearch.Config{Addresses: []string{"http://localhost:9200",},Username: "user",Password: "pass",}// Instantiate a new Elasticsearch client object instanceclient, err := elasticsearch.NewClient(cfg)if err != nil {fmt.Println("Elasticsearch connection error:", err)}// Have the client instance return a responseres, err := client.Info()// Deserialize the response into a map.if err != nil {log.Fatalf("client.Info() ERROR:", err)} else {log.Printf("client response:", res)}// Declare empty array for the document stringsvar docs []string// Declare documents to be indexed using structdoc1 := ElasticDocs{}doc1.SomeStr = "Some Value"doc1.SomeInt = 123456doc1.SomeBool = truedoc2 := ElasticDocs{}doc2.SomeStr = "Another Value"doc2.SomeInt = 42doc2.SomeBool = false // Marshal Elasticsearch document struct objects to JSON stringdocStr1 := jsonStruct(doc1)docStr2 := jsonStruct(doc2)// Append the doc strings to an arraydocs = append(docs, docStr1)docs = append(docs, docStr2)// Iterate the array of string documentsfor i, bod := range docs {fmt.Println("\nDOC _id:", i+1)fmt.Println(bod)// Instantiate a request objectreq := esapi.IndexRequest {Index: "some_index",DocumentID: strconv.Itoa(i + 1),Body: strings.NewReader(bod),Refresh: "true",}fmt.Println(reflect.TypeOf(req))// Return an API response object from requestres, err := req.Do(ctx, client)if err != nil {log.Fatalf("IndexRequest ERROR: %s", err)}defer res.Body.Close()if res.IsError() {log.Printf("%s ERROR indexing document ID=%d", res.Status(), i+1)} else {// Deserialize the response into a map.var resMap map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&resMap); err != nil {log.Printf("Error parsing the response body: %s", err)} else {log.Printf("\nIndexRequest() RESPONSE:")// Print the response status and indexed document version.fmt.Println("Status:", res.Status())fmt.Println("Result:", resMap["result"])fmt.Println("Version:", int(resMap["_version"].(float64)))fmt.Println("resMap:", resMap)fmt.Println("\n")}}} }運(yùn)行上面的代碼,我們將看到如下的輸出:
$ go run main.go go: finding github.com/elastic/go-elasticsearch latest docMap: map[] docMap TYPE: map[string]interface {} client response:%!(EXTRA *esapi.Response=[200 OK] {"name" : "es01","cluster_name" : "docker-cluster","cluster_uuid" : "ZYQ9cGOdS06uZvxOvjug8A","version" : {"number" : "7.10.0","build_flavor" : "default","build_type" : "docker","build_hash" : "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96","build_date" : "2020-11-09T21:30:33.964949Z","build_snapshot" : false,"lucene_version" : "8.7.0","minimum_wire_compatibility_version" : "6.8.0","minimum_index_compatibility_version" : "6.0.0-beta1"},"tagline" : "You Know, for Search" } )docStruct: &{Some Value 123456 true} docStruct TYPE: *main.ElasticDocsdocStruct: &{Another Value 42 false} docStruct TYPE: *main.ElasticDocsDOC _id: 1 {"SomeStr":"Some Value","SomeInt":123456,"SomeBool":true} esapi.IndexRequestIndexRequest() RESPONSE: Status: 200 OK Result: updated Version: 4 resMap: map[_id:1 _index:some_index _primary_term:1 _seq_no:36 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:4 forced_refresh:true result:updated]DOC _id: 2 {"SomeStr":"Another Value","SomeInt":42,"SomeBool":false} esapi.IndexRequestIndexRequest() RESPONSE: Status: 200 OK Result: updated Version: 18 resMap: map[_id:2 _index:some_index _primary_term:1 _seq_no:37 _shards:map[failed:0 successful:1 total:2] _type:_doc _version:18 forced_refresh:true result:updated]我們可以在 Kibana 中使用如下的命令來進(jìn)行查看被導(dǎo)入的文檔:
GET some_index/_search搜索文檔
我們接下來搜索已經(jīng)建立好的文檔。我們接下來搜索在 SomeStr? 這個字段含有 Another 的文檔。在 main.go 里添加如下的代碼:
// Search for the indexed document// Build the request bodyvar buf bytes.Bufferquery := map[string]interface{}{"query": map[string]interface{}{"match": map[string]interface{}{"SomeStr": "Another",},},}if err := json.NewEncoder(&buf).Encode(query); err != nil {log.Fatalf("Error encoding query: %s", err)}// Perform the search request.res, err = client.Search(client.Search.WithContext(context.Background()),client.Search.WithIndex("some_index"),client.Search.WithBody(&buf),client.Search.WithTrackTotalHits(true),client.Search.WithPretty(),)if err != nil {log.Fatalf("Error getting response: %s", err)}defer res.Body.Close()if res.IsError() {var e map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&e); err != nil {log.Fatalf("Error parsing the response body: %s", err)} else {// Print the response status and error information.log.Fatalf("[%s] %s: %s",res.Status(),e["error"].(map[string]interface{})["type"],e["error"].(map[string]interface{})["reason"],)}}var r map[string]interface{}if err := json.NewDecoder(res.Body).Decode(&r); err != nil {log.Fatalf("Error parsing the response body: %s", err)}// Print the response status, number of results, and request duration.log.Printf("[%s] %d hits; took: %dms",res.Status(),int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),int(r["took"].(float64)),)// Print the ID and document source for each hit.for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])}同時由于我們使用了 bytes 模塊,我們需要在文檔的開始部分添加:
import ("context""encoding/json""fmt""log""reflect""strconv""strings""bytes"// Import the Elasticsearch library packages"github.com/elastic/go-elasticsearch/v7""github.com/elastic/go-elasticsearch/v7/esapi" )運(yùn)行上面的代碼。我們可以看到如下新添加的結(jié)果:
[200 OK] 1 hits; took: 1ms
* ID=2, map[SomeBool:%!s(bool=false) SomeInt:%!s(float64=42) SomeStr:Another Value]
刪除文檔
刪除一個文檔非常容易。在 main.go 文件中,我們添加如下的代碼來刪除文檔 id 為 1 的文檔:
// Set up the request object.req := esapi.DeleteRequest{Index: "some_index",DocumentID: strconv.Itoa(1),}res, err = req.Do(context.Background(), client)if err != nil {log.Fatalf("Error getting response: %s", err)}重新運(yùn)行 main.go 應(yīng)用。我們再到 Kibana 中去查詢一下:
這次查詢我們會發(fā)現(xiàn)只有一個文檔存在。那個 id 為 2 的文檔雖然也被導(dǎo)入,但是又被刪除了。
為了方便大家的學(xué)習(xí),我把代碼放在 github 上:https://github.com/liu-xiao-guo/go-elasticsearch-demo
總結(jié)
以上是生活随笔為你收集整理的Elasticsearch:Elasticsearch 开发入门 - Golang的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OpenCV中的图像处理 —— 霍夫线
- 下一篇: Synopsys AXI VIP wst