拉取并且mongo保存聚宽的期货分钟线数据
聚寬服務
在聚寬官網申請賬號,然后閱讀相關 api 文檔。
https://www.joinquant.com/view/user/floor?type=mainFloor
安裝 SDK 并且登錄
使用 pip install jqdatasdk 進行安裝, 已經安裝之后進行更新安裝。
登錄
def login(user_name, password):try:auth(user_name, password)except Exception as e:logger.info(e)return False# 判斷是否登錄成功if not is_auth():return Falsereturn True在官網申請之后, user_name 一般是手機號, password 默認是手機號的后 6 位。 當前使用期 15 天內,每天有 100w 條的數據權限,100w 條數到了的時候給出提醒,自動升級到 1000 w 條。 需要更多的話可能即需要進行付費。
調用 get_price 接口
對調用聚寬 sdk 的 get_price 接口進行封裝。首先查看一下接口的說明,可以參照官網文檔,或直接點進代碼調用的地方進行查看。
相關說明如下:
@assert_auth def get_price(security, start_date=None, end_date=None, frequency='daily',fields=None, skip_paused=False, fq='pre', count=None, panel=True, fill_paused=True):"""獲取一支或者多只證券的行情數據:param security 一支證券代碼或者一個證券代碼的list:param count 與 start_date 二選一,不可同時使用.數量, 返回的結果集的行數, 即表示獲取 end_date 之前幾個 frequency 的數據:param start_date 與 count 二選一,不可同時使用. 字符串或者 datetime.datetime/datetime.date 對象, 開始時間:param end_date 格式同上, 結束時間, 默認是'2015-12-31', 包含此日期.:param frequency 單位時間長度, 幾天或者幾分鐘, 現在支持'Xd','Xm', 'daily'(等同于'1d'), 'minute'(等同于'1m'), X是一個正整數, 分別表示X天和X分鐘:param fields 字符串list, 默認是None(表示['open', 'close', 'high', 'low', 'volume', 'money']這幾個標準字段), 支持以下屬性 ['open', 'close', 'low', 'high', 'volume', 'money', 'factor', 'high_limit', 'low_limit', 'avg', 'pre_close', 'paused']:param skip_paused 是否跳過不交易日期(包括停牌, 未上市或者退市后的日期). 如果不跳過, 停牌時會使用停牌前的數據填充, 上市前或者退市后數據都為 nan:param panel: 當傳入一個標的列表的時候,是否返回一個panel對象,默認為True,表示返回一哥panel對象注意:當security為一個標的列表,且panel=False的時候,會返回一個dataframe對象,在這個對象中額外多出code、time兩個字段,分別表示該條數據對應的標的、時間:param fill_paused : False 表示使用NAN填充停牌的數據,True表示用close價格填充,默認True:return 如果是一支證券, 則返回pandas.DataFrame對象, 行索引是datetime.datetime對象, 列索引是行情字段名字; 如果是多支證券, 則返回pandas.Panel對象, 里面是很多pandas.DataFrame對象, 索引是行情字段(open/close/…), 每個pandas.DataFrame的行索引是datetime.datetime對象, 列索引是證券代號."""因為不同的期貨一段時間內的根數是不一致的,所以是決定 security 參數每次只傳入一只期貨。 因為我們需要的是分鐘線,所以傳入的 frequency 是 “1m”, 對應的字段 fields 沒有傳入, 默認是期貨的全部字段,在這里是
[‘open’, ‘close’, ‘low’, ‘high’, ‘volume’, ‘money’]
因為是免費賬號,調用是計算次數的,為了避免次數用完,我們可以將數據先在 csv 文件中進行備份,然后下次直接讀取 cvs 文件,避免浪費掉調用次數。缺點就是這個csv 文件也是有點大的,我是保存在本地,然后 IDE 就卡住了。所以根據自己的需求對關鍵數據進行保存。
保存的時候,要指明這是哪一只合約的哪段時間的數據。
相關的代碼:
def jz_get_price(security, start_date: datetime.datetime, end_date: datetime.datetime,frequency='1m'):# 調用聚寬的接口df = get_price(security, start_date, end_date, frequency)if save_csv:# 將結果寫入 csv 文件 (因為聚寬每天的條數是有限制的)dt_format = "%Y-%m-%d-%H-%M-%S"file_name = "_".join([security, start_date.strftime(dt_format), end_date.strftime(dt_format)])file_name = os.path.join("./csv", file_name)# 保存df.to_csv(file_name, index=True, sep=',')return dfdef read_df_from_csv(csv_file):# index_col = 0 的意思是直接使用第一列作為索引df = pd.read_csv(csv_file, index_col=0)return df關于 mongo 數據庫
在我們的 mongo 數據庫里面已經存在了一個叫做 info 的集合,可以利用它篩選出每天未過期的期貨合約。
mongo 數據庫區別一下,可能是線上的,也可能是本地的。所以寫了兩個 get_coll 函數:
統一合約代碼的格式
聚寬的合約代碼是有 交易所后綴的,我們自己的需求是不需要的。從 info 數據庫中篩選出來的合約要經過轉換才能作為聚寬 get_price 接口的參數。
def _jq_code_format(code):# 轉換合約代碼為聚寬要求的后綴模式CON_EXCHANGE_DICT = {'SH': 'XSHG', 'SZ': 'XSHE', 'IX': 'INDX', 'SF': 'XSGE', 'DF': 'XDCE','ZF': 'XZCE', 'CF': 'CCFX', 'IF': 'XINE'}exchange, id = code[:2], code[2:]assert exchange in ("CF", "DF", "SF", "ZF", "IF")con_exchange = CON_EXCHANGE_DICT.get(exchange, "")if con_exchange:return ".".join([id, con_exchange])根據要求對每一行數據進行修正
在拿到的 df 結構中,時間是索引。但是在插入的 mongo 數據中, 時間是作為每一條數據中的一個字段的。首先我們就要把索引字段作為一個正常的列加入 df 中。
另外,聚寬的分鐘線標識規則是 用 9:01 這個時間點來標識 9:00 - 9:01 這個時間段內的分鐘線數據,我們自己的規則是用 9:00 來標識同樣的一段分鐘線。
所以,對于聚寬的索引要進行減 1 min 的操作。
同時,我們拿聚寬某一天的分鐘線,就應該是從 00:01 到下一天的 00:00 , 對應于我們系統的 00:00到同一天的 00:59 。
最后,聚寬的字段是: [‘open’, ‘close’, ‘low’, ‘high’, ‘volume’, ‘money’]
我們系統的字段是: [‘open’, ‘close’, ‘low’, ‘high’, ‘volume’, ‘amount’]
也就是說我們要把 money 字段改名為 amount 字段插入數據庫中。
最后注意我們還要把字符串形式的時間轉換 datetime ,這樣我們使用 python 的庫進行插入的時候,才會自動轉換為 ISODate 類型。
def generate_inserts(df: pd.DataFrame):# 將 money 列重命名為 amountdf = df.rename(columns={'money': 'amount'})# 將索引轉換為其中的一列df['time'] = pd.to_datetime(df.index)# 將聚寬的時間整體減1mindf['time'] = df['time'].map(lambda dt: dt - pd.Timedelta(minutes=1))# 轉換時間格式df.time = df.time.astype(str)inserts = list(json.loads(df.to_json(orient="index", date_format="iso")).values())for insert in inserts:insert.update({"time": datetime.datetime.strptime(insert['time'], "%Y-%m-%d %H:%M:%S")})# 生成待插入的列表return inserts插入數據
數據準備好之后,就要向 mongo 數據庫中進行插入了。
首先我們要確定好數據庫的索引。在本例中,很顯然,我們要使用 code 和 time 作為聯合索引。
創建 mongo 索引可以直接在終端進行,也可以使用 pymongo 進行。
終端方式:
> db.price.ensureIndex({"code": 1, "time": 1}, {unique: true}) {"createdCollectionAutomatically" : true,"numIndexesBefore" : 1,"numIndexesAfter" : 2,"ok" : 1 }主程序
將以上步驟整合起來即可:
導出數據庫文件
完整代碼: https://github.com/furuiyang0715/JQFuturesLoader/blob/master/loader.py
更新時間: 2019-11-22
總結
以上是生活随笔為你收集整理的拉取并且mongo保存聚宽的期货分钟线数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 雄强惨淡 奇宕潇洒
- 下一篇: 【Idea】人工智能编程他来了,Idea