python 读取mysql大量数据处理_python使用多线程快速把大量mysql数据导入elasticsearch...
使用python多線程,運行時自定義線程數,自動計算每個線程處理的數據量,連接mysql讀取數據,處理成需要的字段入到elasticsearch。
運行效果圖:
10個線程 運行會在這里實時刷新,方便查看每個線程處理到的id數:[root@localhost shw]# python put_album.py
{"0": "2105success ", "1": "196723success ", "2": "392557null", "3": "587819null", "4": "782519null", "5": "977482null", "6": "1172186null", "7": "1366897null", "8": "1561614null", "9": "1754368success "} "}
代碼如下:#!/usr/bin/python
#coding:utf-8
import requests,re,os,time,urllib,urllib2,random
import json,MySQLdb,sys,math,threading
reload(sys)
sys.setdefaultencoding('utf-8')
ES_CFG = {"host":"127.0.0.1","port":"9200","user":"elastic","password":"123456","index_name":"es_album","doc":"zhuanji"}
ES_OBJ = {}
ES_OBJ['album_url'] = "http://"+ES_CFG['host']+":"+str(ES_CFG['port']+"/"+ES_CFG['index_name']+'/'+ES_CFG['doc'])
MSG = {}
#數據庫連接對象
def db_obj():
return MySQLdb.connect('127.0.0.1','root','123456','dbname',port=3308,charset="utf8")
def api_post(url,data):
jdata_str = json.dumps(data)
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko',"Content-Type": "application/json"}
#print http_post('http://www.xxxx.com/api_pnews',jdata_str)
res = requests.post(url,data=jdata_str,auth=(ES_CFG['user'],ES_CFG['password']),headers=headers)
res.close()
return res
def strip_tags(html):
if html == None:
return '';
try:
dr = re.compile(r']+)>',re.S)
html = dr.sub('',html)
html = re.sub(r'[\n]+',r'', html, flags=re.S)
return html.strip()
except Exception as e:
print e
print html
exit(0)
def pretreat(html):
#"去標簽"
str_txt=strip_tags(html)
str_txt = re.sub('\n|\t','',str_txt)
str_txt = re.sub('"','',str_txt)
str_txt = re.sub("'",'',str_txt)
str_txt = re.sub(' ','',str_txt)
str_txt = re.sub("\xa0",'',str_txt)
str_txt = re.sub("\u3000",'',str_txt)
return str_txt
def create_index(start,end,msg_id):
db = db_obj()
url = ES_OBJ['album_url']
for i in range(start,end):
time.sleep(0.01)
cur = db.cursor()
sql = "select id,name,keywords,desption from shwcms_album where id = %s"%i;
cur.execute(sql)
result = cur.fetchone()
if result != None:
data = {}
data['itemid'] = result[0]
data['album_name']=pretreat(result[1])
data['keywords']=pretreat(result[2])
data['desption']=pretreat(result[3])
res = api_post(url,data)
#print json.dumps(data)
if res != None:
MSG[msg_id] = (str(i)+'success ')
else:
MSG[msg_id] = str(i)+"fail"
else:
MSG[msg_id] = str(i)+"null"
db.close()
max_threading = 10 #定義100個線程
max_id = 1947072 #最大值
start_id = 1 #起始值
size = int(math.ceil((max_id-start_id+1)/max_threading))
#print size
for i in range(0,max_threading):
MSG[i] = "";
threading.Thread(target=create_index,args=(i*size,(i+1)*size,i)).start()
while True:
sys.stdout.write("\r %s" %json.dumps(MSG))
sys.stdout.flush()
time.sleep(1)
非特殊說明,本博所有文章均為博主原創。
最新文章
總結
以上是生活随笔為你收集整理的python 读取mysql大量数据处理_python使用多线程快速把大量mysql数据导入elasticsearch...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 大疆手持云台和华为mate20_告别手残
- 下一篇: matlab中response函数,ma