python如何批量发布数据并如何定时更换token
有時我們會遇到一種情況,就是用同一個接口去post不同的數(shù)據(jù),即大量的數(shù)據(jù)。
而開發(fā)在收據(jù)和發(fā)數(shù)據(jù)采用的方法是分頁。
對于這種情況,
- 測試模擬分頁的方式就是用多線程去post數(shù)據(jù)。
- 而定時更換令牌,采用的是定時任務的方法,將獲取的令牌存入文檔里,然后定時更新文檔里的令牌,post數(shù)據(jù)時,從文檔里獲取最新的令牌即可。
多線程post大量數(shù)據(jù)
代碼如下:
import json,requests,os,socket from threading import Thread,Timer import datetime#防止請求接口超時返回錯誤 socket.setdefaulttimeout(100) #文件基礎文件夾目錄,使用時修改 bathpath = r'C:xxxxxxxx' #API 的url,使用時修改 url = "xxxxx" #使用時修改,尾數(shù)文件都會被分配到最后一個線程 threadNum = 10# 獲取文件夾里面的所有的文本 filenames=os.listdir(bathpath)#統(tǒng)計用的 filecount=0 sucfilecount=0 failfilecount=0 updatetimes = 0 #目標函數(shù) def postSDAP(filename):global filecount,sucfilecount,failfilecount,updatetimesfilecount += 1filepath = os.path.join(bathpath, filename)# 讀取tokenwith open('token.txt', 'r', encoding='utf-8') as f:lines = f.readlines()# 讀第一行,token,文件只有一行token = lines[0]with open(filepath, 'r', encoding='utf-8') as load_f:load_dict = json.load(load_f)# 將dict數(shù)據(jù)轉(zhuǎn)化成json數(shù)據(jù),以便去postload_json = json.dumps(load_dict, ensure_ascii=False, indent=4).encode('utf-8')headers = {'Content-Type': 'text/plain; charset =UTF-8',"Authorization": token,"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36","keep-alive" : "False","Sec - Fetch - Mode": "cors"}print('正在發(fā)送文件數(shù)據(jù)--------------' + filename + '-----------------')#不打印requests提示信息requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5try:r = requests.post(url, data=load_json, headers=headers,verify=False,timeout=300)nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') # 現(xiàn)在 含微秒的日期時間,來源 比特量化print('當前時間是:'+str(nowTime)+'------文件' + filename + '-------返回code-------' + str(r))print('文件' + filename + '-------返回content-----------' + str(r.text))print('文件' + filename + '-------響應時間-----------' + str(r.elapsed.total_seconds()))print('文件' + filename + '-------返回header---------' + str(r.headers))print('文件' + filename + '-------request size ---------' + str(len(load_json)))sucfilecount += 1r.close()except requests.ConnectionError as e:failfilecount += 1print('文件' + filename +'----連接出錯'+str(e))except requests.HTTPError as e:failfilecount += 1print('文件' + filename +'----HTTP出錯啦'+str(e))except requests.ConnectTimeout as e:failfilecount += 1print('文件' + filename + '----連接遠程服務器超時' + str(e))except requests.ReadTimeout as e:failfilecount += 1print('文件' + filename + '----讀取超時,遠程服務器無返回' + str(e))print('------------已發(fā)送文件個數(shù)為'+str(filecount))print('------------發(fā)送成功文件個數(shù)為' + str(sucfilecount))print('------------發(fā)送失敗文件個數(shù)為' + str(failfilecount)) #處理多個dir,將作為線程的target函數(shù) def mulitPost(filenames):for i in filenames:postSDAP(i)threadsPool = [] #文件角標起始位置 begin = 0#每個線程處理的文件個數(shù)(根據(jù)總文件個數(shù)去計算 n/10 取整,把尾數(shù)文件分攤放在最后一個線程 num = len(filenames)//threadNum print(num,type(num))# 靈活配置 文件處理個數(shù),開啟 指定個數(shù)的線程。通過range控制 for i in range(threadNum):if i+1 == threadNum:# 處理尾數(shù)的文件,'最后一個線程的起',begin# 注意args 后面的逗號threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:], )))else:threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:begin+num], )))begin = begin+numfor thread in threadsPool:thread.start()for thread in threadsPool:thread.join()需要注意的是: 元組如果只有一個元素的時候,元素后面一定要加逗號,來完成元組的聲明。如果不添加逗號,那么變量的類型是字符串。
上面的代碼只解決了post大批量數(shù)據(jù)的問題,但是沒有解決token問題。
實際情況中,我們經(jīng)常會遇到token從另一個api獲得。
代幣獲取
代碼如下:
''' 遇到問題沒人解答?小編創(chuàng)建了一個Python學習交流QQ群:857662006 尋找有志同道合的小伙伴,互幫互助,群里還有不錯的視頻學習教程和PDF電子書! ''' def getoken():#get token的url url = "xxxx"payload="xxxx"headers = {'Content-Type': "application/x-www-form-urlencoded","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"}requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5response = requests.request("POST", url, data=payload, headers=headers,verify=False,timeout=300)d = json.loads(response.text)token = d['access_token']token = 'Bearer '+ tokenresponse.close()# 將token存入txt文件里with open('uat_token.txt', 'w', encoding='utf-8') as f:nowTime = datetime.datetime.now()f.write(token)print('token更新啦。。。當前時間是',nowTime)return token#執(zhí)行一次gettoken 好讓token文檔里有初始的token gettoken()上述方法解決了,從api直接獲取最新的令牌,但是沒有解決定時更新。
這樣采用了線程在里面的計時器方法。使用變量變量去控制獲取令牌方法的次數(shù),即token.txt文件里的令牌會被更新對應的次數(shù)。
定時獲取令牌
代碼更新如下:
count = 0 # 實時獲取最新的token def getoken():global countcount += 1#使用timer 做定時任務,過1800秒后執(zhí)行 (遞歸),定時執(zhí)行10次,用count 控制if count < 10:global timertimer = Timer(1800, getoken)timer.start()#get token的url url = "xxxx"payload="xxxx"headers = {'Content-Type': "application/x-www-form-urlencoded","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"}requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5response = requests.request("POST", url, data=payload, headers=headers,verify=False,timeout=300)d = json.loads(response.text)token = d['xxxx'] # token在json里面名字token = 'Bearer '+ tokenresponse.close()# 將token存入txt文件里with open('token.txt', 'w', encoding='utf-8') as f:nowTime = datetime.datetime.now()f.write(token)print('token更新啦。。。當前時間是',nowTime)return token#執(zhí)行一次gettoken 好讓token文檔里有初始的token gettoken()但是這個方法會導致,post大量數(shù)據(jù)都結束了,整個程序一直都是執(zhí)行狀態(tài),沒有正常退出。這是因為gettoken這個方法,里面的定時器定時任務,設置的執(zhí)行次數(shù)和間隔時間比較長。
但是又不能確定post方法到底執(zhí)行多久,因為這個是根據(jù)數(shù)據(jù)量的大小去確定的。
最后終于想到一個方法,當post方法執(zhí)行完成,即線程結束時,結束Timer任務,采用的方法是Timer里面的cancle方法。
最后的代碼
代碼更新如下:
import json,requests,os,socket from threading import Thread,Timer import datetime#防止請求接口超時返回錯誤 socket.setdefaulttimeout(100) #文件基礎文件夾目錄,使用時修改 bathpath = r'C:xxxxxxxx' #API 的url,使用時修改 url = "xxxxx" #使用時修改,尾數(shù)文件都會被分配到最后一個線程 threadNum = 10# 獲取文件夾里面的所有的文本 filenames=os.listdir(bathpath)count = 0 # 實時獲取最新的token def getoken():global countcount += 1#使用timer 做定時任務,過1800秒后執(zhí)行 (遞歸),定時執(zhí)行10次,用count 控制if count < 10:global timertimer = Timer(1800, getoken)timer.start()#get token的url url = "xxxx"payload="xxxx"headers = {'Content-Type': "application/x-www-form-urlencoded","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36"}requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5response = requests.request("POST", url, data=payload, headers=headers,verify=False,timeout=300)d = json.loads(response.text)token = d['xxxx'] # token在json里面名字token = 'Bearer '+ tokenresponse.close()# 將token存入txt文件里with open('token.txt', 'w', encoding='utf-8') as f:nowTime = datetime.datetime.now()f.write(token)print('token更新啦。。。當前時間是',nowTime)return token#執(zhí)行一次gettoken 好讓token文檔里有初始的token gettoken()#統(tǒng)計用的 filecount=0 sucfilecount=0 failfilecount=0 updatetimes = 0 #目標函數(shù) def postSDAP(filename):global filecount,sucfilecount,failfilecount,updatetimesfilecount += 1filepath = os.path.join(bathpath, filename)# 讀取tokenwith open('token.txt', 'r', encoding='utf-8') as f:lines = f.readlines()# 讀第一行,token,文件只有一行token = lines[0]with open(filepath, 'r', encoding='utf-8') as load_f:load_dict = json.load(load_f)# 將dict數(shù)據(jù)轉(zhuǎn)化成json數(shù)據(jù),以便去postload_json = json.dumps(load_dict, ensure_ascii=False, indent=4).encode('utf-8')headers = {'Content-Type': 'text/plain; charset =UTF-8',"Authorization": token,"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36","keep-alive" : "False","Sec - Fetch - Mode": "cors"}print('正在發(fā)送文件數(shù)據(jù)--------------' + filename + '-----------------')#不打印requests提示信息requests.packages.urllib3.disable_warnings()requests.adapters.DEFAULT_RETRIES = 5try:r = requests.post(url, data=load_json, headers=headers,verify=False,timeout=300)nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f') # 現(xiàn)在 含微秒的日期時間,來源 比特量化print('當前時間是:'+str(nowTime)+'------文件' + filename + '-------返回code-------' + str(r))print('文件' + filename + '-------返回content-----------' + str(r.text))print('文件' + filename + '-------響應時間-----------' + str(r.elapsed.total_seconds()))print('文件' + filename + '-------返回header---------' + str(r.headers))print('文件' + filename + '-------request size ---------' + str(len(load_json)))sucfilecount += 1r.close()except requests.ConnectionError as e:failfilecount += 1print('文件' + filename +'----連接出錯'+str(e))except requests.HTTPError as e:failfilecount += 1print('文件' + filename +'----HTTP出錯啦'+str(e))except requests.ConnectTimeout as e:failfilecount += 1print('文件' + filename + '----連接遠程服務器超時' + str(e))except requests.ReadTimeout as e:failfilecount += 1print('文件' + filename + '----讀取超時,遠程服務器無返回' + str(e))print('------------已發(fā)送文件個數(shù)為'+str(filecount))print('------------發(fā)送成功文件個數(shù)為' + str(sucfilecount))print('------------發(fā)送失敗文件個數(shù)為' + str(failfilecount)) #處理多個dir,將作為線程的target函數(shù) def mulitPost(filenames):for i in filenames:postSDAP(i)threadsPool = [] #文件角標起始位置 begin = 0#每個線程處理的文件個數(shù)(根據(jù)總文件個數(shù)去計算 n/10 取整,把尾數(shù)文件分攤放在最后一個線程 num = len(filenames)//threadNum print(num,type(num))# 靈活配置 文件處理個數(shù),開啟 指定個數(shù)的線程。通過range控制 for i in range(threadNum):if i+1 == threadNum:# 處理尾數(shù)的文件,'最后一個線程的起',begin# 注意args 后面的逗號threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:], )))else:threadsPool.append(Thread(target=mulitPost, args=(filenames[begin:begin+num], )))begin = begin+numfor thread in threadsPool:thread.start()for thread in threadsPool:thread.join() # 當主任務結束時,直接結束定時任務,避免post都結束了,但是整個程序還沒有結束 timer.cancel()需要注意的是,將timer定義成變量,才能在方法外調(diào)用。
總結
以上是生活随笔為你收集整理的python如何批量发布数据并如何定时更换token的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python得到一个10位随机数的方法及
- 下一篇: 四种高性能数据类型,Python col