#记录 配对交易学习
#提取中國(guó)銀行和浦發(fā)銀行的調(diào)整后收盤數(shù)據(jù)
PAf = web.get_data_yahoo('601988.SS', start = '2014-01-01', end = '2015-01-01') PBf = web.get_data_yahoo('600000.SS', start ='2014-01-01', end = '2015-01-01')#PAF= ts.get_k_data(code=‘601988’,start=‘2014-01-01’,end=‘2015-01-01’)
PAf = PAf['Adj Close'] PBf = PBf['Adj Close'] #將兩只股票數(shù)據(jù)合在一起形成Dataframe pairf = pd.concat([PAf,PBf],axis=1)#求形成期長(zhǎng)度
len(pairf)#最小距離法
#構(gòu)造標(biāo)準(zhǔn)化價(jià)格之差平方累積SSD函數(shù)
#協(xié)整模型
from arch.unitroot import ADF#檢驗(yàn)中國(guó)銀行對(duì)數(shù)價(jià)格的一階單整性
PAflog = np.log(PAf) PBflog = np.log(PBf)#對(duì)中國(guó)銀行對(duì)數(shù)價(jià)格進(jìn)行單位根檢驗(yàn)
adfA = ADF(PAflog) print(adfA.summary().as_text())adfB = ADF(PBflog) print(adfB.summary().as_text())#將中國(guó)銀行對(duì)數(shù)價(jià)格差分
retA = PAflog.diff()[1:] retB = PBflog.diff()[1:]adfretA = ADF(retA) print(adfretA.summary().as_text())adfretB = ADF(retB) print(adfretB.summary().as_text())#畫出中國(guó)銀行和浦發(fā)銀行股票對(duì)數(shù)價(jià)格時(shí)序圖
import matplotlib.pyplot as plt plt.rcParams['font.sans-serif']=['SimHei']PAflog.plot(label='601988ZGYH',style='--') PBflog.plot(label='600000ZGLT',style='-') plt.legend(loc='upper left') plt.title('中國(guó)銀行與浦發(fā)銀行的對(duì)數(shù)價(jià)格時(shí)序圖') plot.show()#繪制股票對(duì)數(shù)價(jià)格差分的時(shí)序圖
retA.plot(label='601988ZGYH') retB.plot(label='600000ZGLT') plt.legend(loc='lower left') plt.title('中國(guó)銀行與浦發(fā)銀行的對(duì)數(shù)價(jià)格差分(收益率)') plt.show()#回歸分析
#因變量是中國(guó)銀行(A)股票的對(duì)數(shù)價(jià)格
#自變量是浦發(fā)銀行(B)股票的對(duì)數(shù)價(jià)格
#將浦發(fā)銀行股票的對(duì)數(shù)價(jià)格與中國(guó)銀行股票的對(duì)數(shù)價(jià)格做線型回歸后,對(duì)回歸殘差進(jìn)行平穩(wěn)性檢驗(yàn)。
#提取回歸截距項(xiàng)和系數(shù)
alpha = results.params[0] beta = results.params[1] #求殘差 spread = PBflog-beta*PAflog-alpha spread.plot() plt.title('價(jià)差序列') plt.show()#價(jià)差序列單位根檢驗(yàn)
adfspread = ADF(spread,trend='nc') print(adfspread.summary().as_text())‘最小距離法交易策略’
#中國(guó)銀行和浦發(fā)銀行標(biāo)準(zhǔn)化價(jià)格
#求中國(guó)銀行和浦發(fā)銀行標(biāo)準(zhǔn)化價(jià)格序列的價(jià)差
SSD_pair = standardB-standardA SSD_pair.head()meanSSD_pair = np.mean(SSD_pair) sdSSD_pair = np.std(SSD_pair) thresholdUp = meanSSD_pair+1.2*sdSSD_pair thresholdDown = meanSSD_pair-1.2*sdSSD_pairSSD_pair.plot() plt.title('中國(guó)銀行與浦發(fā)銀行標(biāo)準(zhǔn)化價(jià)差序列(形成期)') plt.axhline(y=meanSSD_pair,color='black') plt.axhline(y=thresholdUp,color='green') plt.axhline(y=thresholdDown,color='green') plt.show()#構(gòu)建pairtrading 類
import re import pandas as pd import numpy as np from arch.unitroot import ADF import statsmodels.api as sm class PairTrading:def SSD(self,priceX,priceY):if priceX is None or priceY is None:print('缺少價(jià)格序列')returnX = (priceX - priceX.shift(1)) / priceX.shift(1)[1:]returnY = (priceY - priceY.shift(1)) / priceY.shift(1)[1:]standardX = (returnX + 1).cumprod()standardY = (returnY + 1).cumprod()SSD = np.sum((standardX - standardY) ** 2)return (SSD)def SSDSpread(self,priceX,priceY):if priceX is None or priceY is None:print('缺少價(jià)格序列')retx = (priceX - priceX.shift(1)) / priceX.shift(1)[1:]rety = (priceY - priceY.shift(1)) / priceY.shift(1)[1:]standardX = (1+retx).cumprod()standardY = (1+rety).cumprod()spread = standardY - standardXreturn(spread)def cointegration(self,priceX,priceY):if priceX is None or priceY is None:print('缺少價(jià)格序列')priceX = np.log(priceX)priceY = np.log(priceY)results = sm.OLS(priceY,sm.add_constant(priceX)).fit()resid = results.residadfSpread = ADF(resid)if adfSpread.pvalue >= 0.05:print('''交易價(jià)格不具有協(xié)整關(guān)系,P-value of ADF test: %fCoefficients of regression: Intercept: %fBeta: %f'''% (adfSpread.pvalue, results.params[0],results.params[1]))return(None)else:print('''交易價(jià)格具有協(xié)整關(guān)系,P-value of ADF test: %fCoefficients of regression: Intercept: %fBeta: %f''' % (adfSpread.pvalue, results.params[0], results.params[1]))return(results.params[0], results.params[1])def CointegrationSpread(self,priceX,priceY,formPeriod, tradePeriod):if priceX is None or priceY is None:print('缺少價(jià)格序列')if not (re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',formPeriod)or re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',tradePeriod)):print('形成期交易期格式錯(cuò)誤.')formX = priceX[formPeriod.split(':')[0]:formPeriod.split(':')[1]]formY = priceY[formPeriod.split(':')[0]:formPeriod.split(':')[1]]coefficients = self.cointegration(formX,formY)if coefficients is None:print('未形成協(xié)整關(guān)系,無(wú)法配對(duì).')else:spread=(np.log(priceY[tradePeriod.split(':')[0]:tradePeriod.split(':')[1]])-coefficients[0]-coefficients[1]*np.log(priceX[tradePeriod.split(':')[0]:tradePeriod.split(':')[1]]))return(spread)def calBound(self,priceX,priceY,method,formPeriod,width=1.5):if not (re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',formPeriod)or re.fullmatch('\d{4}-\d{2}-\d{2}:\d{4}-\d{2}-\d{2}',tradePeriod)):print('形成期格式錯(cuò)誤.')if method=='SSD':spread=self.SSDSpread(priceX[formPeriod.split(':')[0]:formPeriod.split(':')[1]],priceY[formPeriod.split(':')[0]:formPeriod.split(':')[1]])mu = np.mean(spread)sd = np.std(spread)UpperBound = mu+width*sdLowerBound = mu-width*sdreturn(UpperBound,LowerBound)elif method=='Cointegration':spread=self.CointegrationSpread(priceX,priceY,formPeriod,formPeriod)mu = np.mean(spread)sd = np.std(spread)UpperBound = mu + width * sdLowerBound = mu - width * sdreturn (UpperBound, LowerBound)else:print('不存在該方法,請(qǐng)選擇“SSD"或是”Cointegration".')testing
import pandas as pd import numpy as np from pandas_datareader import data as web import fix_yahoo_finance as yf import tushare as tsformPeriod = '2014-01-01:2015-01-01' tradePeriod = '2015-01-01:2015-06-30' priceA = web.get_data_yahoo('601988.SS', start = '2014-01-01', end = '2015-06-30') priceB = web.get_data_yahoo('600000.SS', start ='2014-01-01', end = '2015-06-30') priceA = priceA['Adj Close'] priceB = priceB['Adj Close'] priceAf = priceA[formPeriod.split(':')[0]:formPeriod.split(":")[1]] priceBf = priceB[formPeriod.split(':')[0]:formPeriod.split(":")[1]] priceAt =priceA[tradePeriod.split(':')[0]:tradePeriod.split(":")[1]] priceBt = priceB[tradePeriod.split(':')[0]:tradePeriod.split(":")[1]] pt=PairTrading()SSD=pt.SSD(priceAf,priceBf)SSDspread = pt.SSDSpread(priceAf,priceBf) SSDspread.describe() SSDspread.head()coefficients=pt.cointegration(priceAf,priceBf)CoSpreadF=pt.CointegrationSpread(priceA,priceB,formPeriod,formPeriod) CoSpreadTr=pt.CointegrationSpread(priceA,priceB,formPeriod,tradePeriod)bound = pt.calBound(priceA,priceB,'Cointegration',formPeriod,width=1.2)CoSpreadTr.plot() plt.axhline(bound[0],color='black') plt.axhline(bound[1],color='black') plt.show()logPBf=np.log(priceBf) logPAf=np.log(priceAf)spreadf=logPBf-beta*logPAf-alpha adfSpread=ADF(spreadf) print(adfSpread.summary().as_text())CoSpreadT=logPBf-beta*logPAf-alpha mu=np.mean(spreadf) sd=np.std(spreadf)#設(shè)定交易期
tradeStart = '2015-01-01' tradeEnd='2015-06-30'#繪制價(jià)差區(qū)間圖
CoSpreadTr.plot() plt.title('交易期價(jià)差序列(協(xié)整配對(duì))') plt.axhline(y=mu,color='black') plt.axhline(y=mu+0.2*sd,color='blue',ls='-',lw=2) plt.axhline(y=mu-0.2*sd,color='blue',ls='-',lw=2) plt.axhline(y=mu+1.5*sd,color='green',ls='--',lw=2.5) plt.axhline(y=mu-1.5*sd,color='green',ls='--',lw=2.5) plt.axhline(y=mu+2.5*sd,color='red',ls='-.',lw=3) plt.show()#根據(jù)開倉(cāng)平倉(cāng)點(diǎn)制定交易策略,并模擬交易賬戶。
level =(float('-inf'),mu-2.5*sd,mu-1.5*sd,mu-0.2*sd,mu+0.2*sd,mu+1.5*sd,mu+2.5*sd,float('inf')) prcLevel = pd.cut(CoSpreadTr,level,labels=False)-3 prcLevel.head()#構(gòu)造交易新號(hào)函數(shù)
def TradeSig(prcLevel):n= len(prcLevel)signal = np.zeros(n)for i in range(1,n):if prcLevel[i-1]==1 and prcLevel[i]==2:signal[i]=-2elif prcLevel[i-1]==1 and prcLevel[i]==0:signal[i]=2elif prcLevel[i-1]==2 and prcLevel[i]==3:signal[i]=3elif prcLevel[i-1]==-1 and prcLevel[i]==-2:signal[i]=1elif prcLevel[i-1]==-1 and prcLevel[i]==0:signal[i]=-1elif prcLevel[i-1]==-2 and prcLevel[i]==-3:signal[i]=-3return(signal)signal = TradeSig(prcLevel) position=[signal[0]] ns=len(signal)for i in range (1,ns):position.append(position[-1])if signal[i]==1:position[i]=1elif signal[i]==-2:position[i]==-1elif signal[i]==1 and postion[i-1]==1:position[i]=0elif signal[i]==2 and position[i-1]==-1:position[i]=0elif signal[i]==3:position[i]=0elif signal[i]==-3:position[i]=0position = pd.Series(position,index=CoSpreadTr.index) position.tail()def TraddeSim(priceX, priceY, position):n=len(position)size=1000shareY = size*positionshareX = [(-beta)*shareY[0]*priceY[0]/priceX[0]]cash=[2000]for i in range (1,n):shareX.append(shareX[i-1])cash.append(cash[i-1])if position[i-1]==0 and position[i]==1:shareX[i]=[(-beta)*shareY[i]*priceY[i]/priceX[i]]cash[i]=cash[i-1]-(shareY[i]*priceY[i]+ np.array(shareX[i])*priceX[i])elif position[i-1]==0 and position[i]==-1:shareX[i] = [(-beta) * shareY[i] * priceY[i] / priceX[i]]cash[i]=cash[i-1]-(shareY[i]*priceY[i]+np.array(shareX[i])*priceX[i])elif position[i-1]==1 and position[i]==0:shareX[i]=0cash[i]=cash[i-1]+(shareY[i-1]*priceY[i]+np.array(shareX[i-1])*priceX[i])elif position[i-1]==-1 and position[i]==0:shareX[i]=0cash[i]=cash[i-1]+(shareY[i-1]*priceY[i]+np.array(shareX[i-1])*priceX[i])cash = pd.Series(cash,index=position.index)shareY = pd.Series(shareY,index=position.index)shareX = pd.Series(shareX,index=position.index)asset = cash + shareY*priceY +pd.Series(shareX)*priceXaccount =pd.DataFrame({'Position':position,'ShareY':shareY,'ShareX':shareX,'Cash':cash,'Asset':asset})account.index=position.indexreturn(account)account = TradeSim(priceAt,priceBt,position) account.iloc[:,0].plot() account.iloc[:,1].plot() account.iloc[:,2].plot() plt.show()總結(jié)
以上是生活随笔為你收集整理的#记录 配对交易学习的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 腾讯企业邮箱 java_使用腾讯企业邮箱
- 下一篇: HAC集群修改为单机