go 语言操作es示例
生活随笔
收集整理的這篇文章主要介紹了
go 语言操作es示例
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
使用高度封裝的 orm 查詢(xún)
package main
import (
"context"
"fmt"
"log"
"os"
"reflect"
"time"
"github.com/olivere/elastic/v7"
)
type esObj struct {
db *elastic.Client
index string
}
type Blog struct {
Title string `json:"title"`
Author string `json:"author"`
Content string `json:"content"`
PageView int64 `json:"pageview"`
ReleaseTime time.Time `json:"release_time"`
IsDel bool `json:"is_del"`
}
const blogMap = `{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"author": {
"type": "keyword"
},
"content": {
"type": "text",
"analyzer": "ik_max_word"
},
"pageview": {
"type": "long"
},
"release_time": {
"type": "date"
},
"isdel": {
"type": "boolean"
}
}
}
}`
func main() {
log.SetFlags(log.Ltime | log.Lshortfile)
ips := []string{"http://127.0.0.1:9200", "http://127.0.0.1:9201"}
esDb, err := New(ips, "", "")
if err != nil {
return
}
ctx := context.Background()
if exist, err := esDb.IndexExist(ctx); err != nil {
return
} else if !exist {
log.Printf("index of %v not exist", esDb.index)
if err = esDb.CreateIndex(ctx); err != nil {
return
}
}
log.Println("es 查詢(xún)成功")
defer esDb.DeleteIndex(ctx)
blogs := getBlogDemo()
if err := esDb.InsertPost(ctx, blogs[0]); err != nil {
return
}
_ = esDb.InsertPostList(ctx, blogs)
_ = esDb.QueryMathAll(ctx, false)
_ = esDb.QueryBlogNumbByAuthor(ctx, "李白")
_ = esDb.QueryBlogByContent(ctx, "黃河", false)
_ = esDb.AggsSumPageViesByAuthorName(ctx, "李白")
esDb.BoolQueryBlogByAuthor(ctx, "李白", false)
esDb.BucketAggs(ctx, true)
log.Println("插入成功")
}
func New(ips []string, uname, password string) (es *esObj, err error) {
esOptions := make([]elastic.ClientOptionFunc, 0)
if len(ips) == 0 {
log.Fatalf("ip地址輸入錯(cuò)誤")
}
esOptions = append(esOptions, elastic.SetURL(ips...))
if len(uname) > 0 && len(password) > 0 {
esOptions = append(esOptions, elastic.SetBasicAuth(uname, password))
}
esOptions = append(
// 設(shè)置健康檢查時(shí)間
esOptions, elastic.SetHealthcheckInterval(10*time.Second),
// 啟動(dòng)壓縮算法
elastic.SetGzip(true),
// 設(shè)置錯(cuò)誤日志輸出
elastic.SetErrorLog(log.New(os.Stdout, "es--error ", log.LstdFlags)),
// 設(shè)置info日志輸出
elastic.SetInfoLog(log.New(os.Stdout, "es--info ", log.LstdFlags)),
)
esClient, err := elastic.NewClient(esOptions...)
if err != nil {
log.Printf("es 啟動(dòng)失敗 err is %v", err)
return nil, fmt.Errorf("es 啟動(dòng)失敗 err is %v", err)
}
es = &esObj{
db: esClient,
index: "blog",
}
return es, nil
}
// IndexExist 判斷索引是否存在
func (es *esObj) IndexExist(ctx context.Context) (bool, error) {
exist, err := es.db.IndexExists(es.index).Do(ctx)
if err != nil {
log.Printf("es indexExist fail; index is %s, err is %v", es.index, err)
return false, err
}
return exist, nil
}
// CreateIndex 創(chuàng)建索引
func (es *esObj) CreateIndex(ctx context.Context) (err error) {
result, err := es.db.CreateIndex(es.index).BodyString(blogMap).Do(ctx)
if err != nil {
log.Fatalf("create index fail , index is %s, err is %v", es.index, err)
return err
}
if !result.Acknowledged {
log.Fatalf("not createIndex.Acknowledged ; index is %s", es.index)
return nil
}
log.Printf("es index of %s create success maping is %v", es.index, blogMap)
return nil
}
// DeleteIndex 刪除索引
func (es *esObj) DeleteIndex(ctx context.Context) (bool, error) {
res, err := es.db.DeleteIndex(es.index).Do(ctx)
if err != nil {
log.Fatalf("delete index of %s fail, err is %v", es.index, err)
return false, err
}
if !res.Acknowledged {
log.Printf("delete index of %s not acknowledged", es.index)
return false, nil
}
log.Printf("delet index of %s success", es.index)
return true, nil
}
// InsertPost 添加數(shù)據(jù)
func (es *esObj) InsertPost(ctx context.Context, blog Blog) error {
res, err := es.db.Index().Index(es.index).BodyJson(blog).Do(ctx)
if err != nil {
log.Fatalf("es insert in %s fail, blog is %+v; err is %v", es.index, blog, err)
return err
}
log.Printf("insert result %v", res)
return nil
}
// InsertPostList 批量添加數(shù)據(jù)
func (es *esObj) InsertPostList(ctx context.Context, blogs []Blog) error {
bulkService := es.db.Bulk().Index(es.index).Refresh("true")
for _, blog := range blogs {
bulkService.Add(elastic.NewBulkCreateRequest().Index(es.index).Doc(blog))
}
res, err := bulkService.Do(ctx)
if err != nil {
log.Fatalf("index of %s bulk insert err %v", es.index, err)
return err
}
log.Printf("bulk insert success %v", res.Errors)
return nil
}
// QueryMathAll match_all 查詢(xún)
func (es *esObj) QueryMathAll(ctx context.Context, print bool) error {
res, err := es.db.Search().
Index(es.index).
Query(elastic.NewMatchAllQuery()).
Do(ctx)
if err != nil {
log.Fatalf("queryMathALL fail err is %v", err)
return err
}
log.Printf("查詢(xún)花費(fèi)的時(shí)間 %v", res.TookInMillis)
log.Println("查詢(xún)到的總數(shù)", res.TotalHits())
var ttyp Blog
blogs := make([]Blog, 0)
for _, v := range res.Each(reflect.TypeOf(ttyp)) {
blog := v.(Blog)
blogs = append(blogs, blog)
}
if print {
log.Printf("**********Blogs*****************")
fmt.Println(blogs)
log.Println("*******************")
}
return nil
}
// QueryBlogNumbByAuthor term 查詢(xún)-查詢(xún)某個(gè)作者的作品有多少
func (es *esObj) QueryBlogNumbByAuthor(ctx context.Context, author string) error {
term := elastic.NewTermQuery("author", author)
res, err := es.db.Search().Index(es.index).Query(term).Do(ctx)
if err != nil {
log.Fatalf("QueryBlogNumbByAuthor fail ,author is %v, err is %v", author, err)
return err
}
log.Printf("author of %s all numb blog is %v", author, res.TotalHits())
return nil
}
// QueryBlogByContent 查詢(xún)帶有黃河關(guān)鍵字的博客
func (es *esObj) QueryBlogByContent(ctx context.Context, content string, print bool) (err error) {
match := elastic.NewMatchQuery("content", content)
res, err := es.db.Search().Index(es.index).Query(match).Do(ctx)
if err != nil {
log.Fatalf("es query fail when index is %s ,content is %v; match is %v; err is %v", es.index, content, match, err)
return err
}
log.Printf("sum numb is %d", res.TotalHits())
// 輸出所有的內(nèi)容
var bty Blog
blogs := make([]Blog, 0)
for _, v := range res.Each(reflect.TypeOf(bty)) {
blog := v.(Blog)
blogs = append(blogs, blog)
}
if print {
log.Printf("es打開(kāi) %+v", blogs)
}
return nil
}
func (es *esObj) BoolQueryBlogByAuthor(ctx context.Context, author string, print bool) {
termQuery := elastic.NewTermQuery("author", author)
rangeQuery := elastic.NewRangeQuery("release_time").Gte("0160").Lte("0172").Format("dd/MM/yyyy||yyyy")
boolQuery := elastic.NewBoolQuery().Must(termQuery, rangeQuery)
res, err := es.db.Search().Index(es.index).Query(boolQuery).Do(ctx)
if err != nil {
log.Fatalf("bool query err %v", err)
}
var bty Blog
blogs := make([]Blog, 0)
for _, v := range res.Each(reflect.TypeOf(bty)) {
blog := v.(Blog)
blogs = append(blogs, blog)
}
if print {
log.Printf("es 的 bool 查詢(xún)結(jié)果 %+v", blogs)
}
}
// AggsSumPageViesByAuthorName 聚合索引 李白的書(shū)的總的閱讀量
func (es *esObj) AggsSumPageViesByAuthorName(ctx context.Context, author string) (err error) {
query := elastic.NewTermQuery("author", author)
maxPageViews := elastic.NewMaxAggregation().Field("pageview")
totalPageViews := elastic.NewSumAggregation().Field("pageview")
res, err := es.db.Search().Index(es.index).Query(query).
Aggregation("totalPageViews", totalPageViews).
Aggregation("maxPageViews", maxPageViews).
Size(0).Pretty(true).Do(ctx)
if err != nil {
log.Fatalf("querySumPage fail , err is %v", err)
}
searchRes, find := res.Aggregations.ValueCount("totalPageViews")
if find {
log.Printf("totalPageVies of %s is %f", author, *searchRes.Value)
}
searchRes, find = res.Aggregations.ValueCount("maxPageViews")
if find {
log.Printf("maxPageViews of %s is %f", author, *searchRes.Value)
} else {
log.Printf("maxPageViews of %s not found", author)
}
return nil
}
// BucketAggs 桶聚合
func (es *esObj) BucketAggs(ctx context.Context, print bool) {
maxPageView := elastic.NewMaxAggregation().Field("pageview")
minPageView := elastic.NewMinAggregation().Field("pageview")
everyOneMaxMinPageView := elastic.NewTermsAggregation().Field("author").
SubAggregation("maxPageView", maxPageView).
SubAggregation("minPageView", minPageView)
res, err := es.db.Search(es.index).Query(elastic.NewMatchAllQuery()).
Aggregation("everyOneMaxMinPageView", everyOneMaxMinPageView).
Size(0).
Do(ctx)
if err != nil {
log.Fatalf("buck search fail, err is %v", err)
}
buckRes, find := res.Aggregations.Terms("everyOneMaxMinPageView")
if !find {
log.Fatalf("buck search not find")
}
if !print {
return
}
for _, buck := range buckRes.Buckets {
log.Println(buck.Key)
maxPageViewRes, _ := buck.ValueCount("maxPageView")
log.Printf("maxPageView is %v", *maxPageViewRes.Value)
minPageViewRes, _ := buck.ValueCount("minPageView")
log.Printf("minPageView is %v", *minPageViewRes.Value)
}
}
// 獲取demo數(shù)據(jù)
func getBlogDemo() (blogs []Blog) {
blogs = append(blogs, Blog{
Title: "將進(jìn)酒",
Author: "李白",
Content: `君不見(jiàn),黃河之水天上來(lái),奔流到海不復(fù)回。
君不見(jiàn),高堂明鏡悲白發(fā),朝如青絲暮成雪。
人生得意須盡歡,莫使金樽空對(duì)月。
天生我材必有用,千金散盡還復(fù)來(lái)。
烹羊宰牛且為樂(lè),會(huì)須一飲三百杯。
岑夫子,丹丘生,將進(jìn)酒,杯莫停。
與君歌一曲,請(qǐng)君為我傾耳聽(tīng)。
鐘鼓饌玉不足貴,但愿長(zhǎng)醉不復(fù)醒。
古來(lái)圣賢皆寂寞,惟有飲者留其名。
陳王昔時(shí)宴平樂(lè),斗酒十千恣歡謔。
主人何為言少錢(qián),徑須沽取對(duì)君酌。
五花馬,千金裘,呼兒將出換美酒,與爾同銷(xiāo)萬(wàn)古愁。`,
PageView: 200000,
ReleaseTime: time.Date(168, 1, 20, 0, 0, 0, 0, time.Local),
IsDel: false,
})
blogs = append(blogs, Blog{
Title: "春望",
Author: "杜甫",
Content: `國(guó)破山河在,城春草木深。
感時(shí)花濺淚,恨別鳥(niǎo)驚心。
烽火連三月,家書(shū)抵萬(wàn)金。
白頭搔更短,渾欲不勝簪。`,
PageView: 120000,
ReleaseTime: time.Date(170, 1, 20, 0, 0, 0, 0, time.Local),
IsDel: false,
})
blogs = append(blogs, Blog{
Title: "靜夜思",
Author: "李白",
Content: `床前明月光,疑是地上霜。
舉頭望明月,低頭思故鄉(xiāng)。`,
PageView: 1200000,
ReleaseTime: time.Date(172, 1, 20, 0, 0, 0, 0, time.Local),
IsDel: false,
})
blogs = append(blogs, Blog{
Title: "黃鶴樓送孟浩然之廣陵",
Author: "李白",
Content: `故人西辭黃鶴樓,煙花三月下?lián)P州。
孤帆遠(yuǎn)影碧空盡,唯見(jiàn)長(zhǎng)江天際流。`,
PageView: 180000,
ReleaseTime: time.Date(171, 1, 20, 0, 0, 0, 0, time.Local),
IsDel: false,
})
blogs = append(blogs, Blog{
Title: "望岳",
Author: "杜甫",
Content: `岱宗夫如何?齊魯青未了。
造化鐘神秀,陰陽(yáng)割昏曉。
蕩胸生曾云,決眥入歸鳥(niǎo)。
會(huì)當(dāng)凌絕頂,一覽眾山小。`,
PageView: 190000,
ReleaseTime: time.Date(183, 1, 20, 0, 0, 0, 0, time.Local),
IsDel: false,
})
return blogs
}
使用原始語(yǔ)句查詢(xún) es
package main
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
es7 "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"net/http"
"strings"
)
type H map[string]any
type LH []H
type esService struct {
client *es7.Client
index []string
}
func main() {
es := NewEsClient()
fmt.Println(es)
es.QueryDemo()
}
func NewEsClient() *esService {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
c := es7.Config{
//如果沒(méi)有安全校驗(yàn),測(cè)試環(huán)境,建議使用 http ,不要使用 https
Addresses: []string{"http://vm.ser.cn:9200"},
//Username: "elastic",
//Password: "tophant_017",
Transport: tr,
}
client, err := es7.NewClient(c)
if err != nil {
panic(err)
}
fmt.Println("esService 連接成功")
fmt.Println(client.Info())
return &esService{
client: client,
index: []string{"kibana_sample_data_logs"},
}
}
func (es *esService) QueryDemo() {
countQuery := H{
"query": H{
"match_all": H{},
},
"size": 10,
}
jsonByte, err := json.Marshal(countQuery)
if err != nil {
panic(err)
}
// go 轉(zhuǎn)成 string 類(lèi)型
jsonString := string(jsonByte)
fmt.Println(jsonString)
req := esapi.SearchRequest{
Index: es.index,
Body: strings.NewReader(jsonString),
Pretty: true, //結(jié)果美化
}
if err != nil {
panic(err)
}
ctx := context.Background()
res, err := req.Do(ctx, es.client)
if err != nil {
panic(err)
}
if res.IsError() {
fmt.Printf("鏈接失敗 %s", res.Status())
fmt.Printf("錯(cuò)誤原因字段話%s", res.String())
return
}
defer res.Body.Close()
fmt.Println(res)
}
總結(jié)
以上是生活随笔為你收集整理的go 语言操作es示例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Kotlin学习记录2
- 下一篇: ansible使用报错not possi