python使用elasticsearch维护数据_使用Python对ElasticSearch获取数据及操作
#!/usr/bin/env python#-*- coding: utf-8 -*-
"""@Time : 2018/7/4
@Author : LiuXueWen
@Site :
@File : ElasticSearchOperation.py
@Software: PyCharm
@Description: 對elasticsearch數據的操作,包括獲取數據,發送數據"""
importelasticsearchimportjsonimportUtil_Ini_Operationclasselasticsearch_data():def __init__(self,hosts,username,password,maxsize,is_ssl):#初始化ini操作腳本,獲取配置文件
try:#判斷請求方式是否ssl加密
if is_ssl == "true":#獲取證書地址
cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
es_ssl=elasticsearch.Elasticsearch(#地址
hosts=hosts,#用戶名密碼
http_auth=(username,password),#開啟ssl
use_ssl=True,#確認有加密證書
verify_certs=True,#對應的加密證書地址
client_cert=cert_pem
)
self.es=es_sslelif is_ssl == "false":#創建普通類型的ES客戶端
es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
self.es=es_ordinaryexceptException as e:print(e)defquery_data(self,keywords_list,date):
gte= "now-"+str(date)
query_data={#查詢語句
"query": {"bool": {"must": [
{"query_string": {"query": keywords_list,"analyze_wildcard": True
}
},
{"range": {"@timestamp": {"gte": gte,"lte": "now","format": "epoch_millis"}
}
}
],"must_not": []
}
}
}returnquery_data#從es獲取數據
defget_datas_by_query(self,index_name,keywords,param,date):''':param index_name: 索引名稱
:param keywords: 關鍵字詞,數組
:param param: 需要數據條件,例如_source
:param date: 過去時間范圍,字符串格式,例如過去30分鐘內數據,"30m"
:return: all_datas 返回查詢到的所有數據(已經過param過濾)'''all_datas=[]#遍歷所有的查詢條件
for keywords_list inkeywords:#DSL語句
query_data =self.query_data(keywords_list,date)
res=self.es.search(
index=index_name,
body=query_data
)for hit in res['hits']['hits']:#獲取指定的內容
response =hit[param]#添加所有數據到數據集中
all_datas.append(response)#返回所有數據內容
returnall_datas#當索引不存在創建索引
defcreate_index(self,index_name):''':param index_name: 索引名稱
:return:如果創建成功返回創建結果信息,試過已經存在創建新的index失敗返回index的名稱'''
#獲取索引的映射
#index_mapping = IndexMapping.index_mapping
## 判斷索引是否存在
#if self.es.indices.exists(index=index_name) is not True:
## 創建索引
#res = self.es.indices.create(index=index_name,body=index_mapping)
## 返回結果
#return res
#else:
## 返回索引名稱
#return index_name
pass
#插入指定的單條數據內容
definsert_single_data(self,index_name,doc_type,data):''':param index_name: 索引名稱
:param doc_type: 文檔類型
:param data: 需要插入的數據內容
:return: 執行結果'''res= self.es.index(index=index_name,doc_type=doc_type,body=data)returnres#向ES中新增數據,批量插入
definsert_datas(self,index_name):''':desc 通過讀取指定的文件內容獲取需要插入的數據集
:param index_name: 索引名稱
:return: 插入成功的數據條數'''insert_datas=[]#判斷插入數據的索引是否存在
self.createIndex(index_name=index_name)#獲取插入數據的文件地址
data_file_path = self.ini.get_key_value("datafile","datafilepath")#獲取需要插入的數據集
with open(data_file_path,"r+") as data_file:#獲取文件所有數據
data_lines =data_file.readlines()for data_line indata_lines:#string to json
data_line =json.loads(data_line)
insert_datas.append(data_line)#批量處理
res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)returnres#從ES中在指定的索引中刪除指定數據(根據id判斷)
defdelete_data_by_id(self,index_name,doc_type,id):''':param index_name: 索引名稱
:param index_type: 文檔類型
:param id: 唯一標識id
:return: 刪除結果信息'''res= self.es.delete(index=index_name,doc_type=doc_type,id=id)returnres#根據條件刪除數據
defdelete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):''':param index_name:索引名稱,為空查詢所有索引
:param doc_type:文檔類型,為空查詢所有文檔類型
:param param:過濾條件值
:param gt_time:時間范圍,大于該時間
:param lt_time:時間范圍,小于該時間
:return:執行條件刪除后的結果信息'''
#DSL語句
query_data ={#查詢語句
"query": {"bool": {"must": [
{"query_string": {"query": param,"analyze_wildcard": True
}
},
{"range": {"@timestamp": {"gte": gt_time,"lte": lt_time,"format": "epoch_millis"}
}
}
],"must_not": []
}
}
}
res= self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)returnres#指定index中刪除指定時間段內的全部數據
defdelete_all_datas(self,index_name,doc_type,gt_time,lt_time):''':param index_name:索引名稱,為空查詢所有索引
:param doc_type:文檔類型,為空查詢所有文檔類型
:param gt_time:時間范圍,大于該時間
:param lt_time:時間范圍,小于該時間
:return:執行條件刪除后的結果信息'''
#DSL語句
query_data ={#查詢語句
"query": {"bool": {"must": [
{"match_all": {}
},
{"range": {"@timestamp": {"gte": gt_time,"lte": lt_time,"format": "epoch_millis"}
}
}
],"must_not": []
}
}
}
res= self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)returnres#修改ES中指定的數據
defupdate_data_by_id(self,index_name,doc_type,id,data):''':param index_name: 索引名稱
:param doc_type: 文檔類型,為空表示所有類型
:param id: 文檔唯一標識編號
:param data: 更新的數據
:return: 更新結果信息'''res= self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)return res
總結
以上是生活随笔為你收集整理的python使用elasticsearch维护数据_使用Python对ElasticSearch获取数据及操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 富士康g41怎么刷bios 富士康G41
- 下一篇: 怎么用引导盘装系统 使用引导盘安装系统的