# Mean reversion Spread Trading with Linear Regression
#
# Deniz Turan, (denizstij AT gmail DOT com), 19-Jan-2014
import numpy as np
from scipy.stats import linregress
R_P = 1 # refresh period in days
W_L = 30 # window length in days
def initialize(context):
context.y=sid(14517) # EWC
context.x=sid(14516) # EWA
# for long and shorting
context.max_notional = 1000000
context.min_notional = -1000000.0
# set a fixed slippage
set_slippage(slippage.FixedSlippage(spread=0.01))
context.long=False;
context.short=False;
def handle_data(context, data):
xpx=data[context.x].price
ypx=data[context.y].price
retVal=linearRegression(data,context)
# lets dont do anything if we dont have enough data yet
if retVal is None:
return None
hedgeRatio,intercept=retVal;
spread=ypx-hedgeRatio*xpx
data[context.y]['spread'] = spread
record(ypx=ypx,spread=spread,xpx=xpx)
# find moving average
rVal=getMeanStd(data, context)
# lets dont do anything if we dont have enough data yet
if rVal is None:
return
meanSpread,stdSpread = rVal
# zScore is the number of unit
zScore=(spread-meanSpread)/stdSpread;
QTY=1000
qtyX=-hedgeRatio*QTY*xpx;
qtyY=QTY*ypx;
entryZscore=1;
exitZscore=0;
if zScore < -entryZscore and canEnterLong(context):
# enter long the spread
order(context.y, qtyY)
order(context.x, qtyX)
context.long=True
context.short=False
if zScore > entryZscore and canEnterShort(context):
# enter short the spread
order(context.y, -qtyY)
order(context.x, -qtyX)
context.short=True
context.long=False
record(cash=context.portfolio.cash, stock=context.portfolio.positions_value)
@batch_transform(window_length=W_L, refresh_period=R_P)
def linearRegression(datapanel, context):
xpx = datapanel['price'][context.x]
ypx = datapanel['price'][context.y]
beta, intercept, r, p, stderr = linregress(ypx, xpx)
# record(beta=beta, intercept=intercept)
return (beta, intercept)
@batch_transform(window_length=W_L, refresh_period=R_P)
def getMeanStd(datapanel, context):
spread = datapanel['spread'][context.y]
meanSpread=spread.mean()
stdSpread=spread.std()
if meanSpread is not None and stdSpread is not None :
return (meanSpread, stdSpread)
else:
return None
def canEnterLong(context):
notional=context.portfolio.positions_value
if notional < context.max_notional and not context.long: # and not context.short:
return True
else:
return False
def canEnterShort(context):
notional=context.portfolio.positions_value
if notional > context.max_notional and not context.short: #and not context.short:
return True
else:
return False
Sunday, January 19, 2014
Mean reversion with Linear Regression and Bollinger Band for Spread Trading within Python
Following code demonstrates how to utilize to linear regression to estimate hedge ratio and Bollinger band for spread trading. The code can be back tested at Quantopian.com
Mean reversion with Kalman Filter as Dynamic Linear Regression for Spread Trading within Python
Following code demonstrates how to utilize to kalman filter to estimate hedge ratio for spread trading. The code can be back tested at Quantopian.com
# Mean reversion with Kalman Filter as Dynamic Linear Regression
#
# Following algorithm trades based on mean reversion logic of spread
# between cointegrated securities by using Kalman Filter as
# Dynamic Linear Regression. Kalman filter is used here to estimate hedge (beta)
#
# Kalman Filter structure
#
# - measurement equation (linear regression):
# y= beta*x+err # err is a guassian noise
#
# - Prediction model:
# beta(t) = beta(t-1) + w(t-1) # w is a guassian noise
# Beta is here our hedge unit.
#
# - Prediction section
# beta_hat(t|t-1)=beta_hat(t-1|t-1) # beta_hat is expected value of beta
# P(t|t-1)=P(t-1|t-1) + V_w # prediction error, which is cov(beta-beta_hat)
# y_hat(t)=beta_hat(t|t-1)*x(t) # measurement prediction
# err(t)=y(t)-y_hat(t) # forecast error
# Q(t)=x(t)'*P(t|t-1)*x(t) + V_e # variance of forecast error, var(err(t))
#
# - Update section
# K(t)=R(t|t-1)*x(t)/Q(t) # Kalman filter between 0 and 1
# beta_hat(t|t)=beta_hat(t|t-1)+ K*err(t) # State update
# P(t|t)=P(t|t-1)(1-K*x(t)) # State covariance update
#
# Deniz Turan, (denizstij AT gmail DOT com), 19-Jan-2014
#
import numpy as np
# Initialization logic
def initialize(context):
context.x=sid(14517) # EWC
context.y=sid(14516) # EWA
# for long and shorting
context.max_notional = 1000000
context.min_notional = -1000000.0
# set a fixed slippage
set_slippage(slippage.FixedSlippage(spread=0.01))
# between 0 and 1 where 1 means fastes change in beta,
#whereas small values indicates liniar regression
delta = 0.0001
context.Vw=delta/(1-delta)*np.eye(2);
# default peridiction error variance
context.Ve=0.001;
# beta, holds slope and intersection
context.beta=np.zeros((2,1));
context.postBeta=np.zeros((2,1)); # previous beta
# covariance of error between projected beta and beta
# cov (beta-priorBeta) = E[(beta-priorBeta)(beta-priorBeta)']
context.P=np.zeros((2,2));
context.priorP=np.ones((2,2));
context.started=False;
context.warmupPeriod=3
context.warmupCount=0
context.long=False;
context.short=False;
# Will be called on every trade event for the securities specified.
def handle_data(context, data):
##########################################
# Prediction
##########################################
if context.started:
# state prediction
context.beta=context.postBeta;
#prior P prediction
context.priorP=context.P+context.Vw
else:
context.started=True;
xpx=np.mat([[1,data[context.x].price]])
ypx=data[context.y].price
# projected y
yhat=np.dot(xpx,context.beta)[0,0]
# prediction error
err=(ypx-yhat);
# variance of err, var(err)
Q=(np.dot(np.dot(xpx,context.priorP),xpx.T)+context.Ve)[0,0]
# Kalman gain
K=(np.dot(context.priorP,xpx.T)/Q)[0,0]
##########################################
# Update section
##########################################
context.postBeta=context.beta + np.dot(K,err)
context.warmupCount+=1
if context.warmupPeriod > context.warmupCount:
return
#order(sid(24), 50)
message='started: {st}, xprice: {xpx}, yprice: {ypx},\
yhat:{yhat} beta: {b}, postBeta: {pBeta} err: {e}, Q: {Q}, K: {K}'
message= message.format(st=context.started,xpx=xpx,ypx=ypx,\
yhat=yhat, b=context.beta, \
pBeta=context.postBeta, e=err, Q=Q, K=K)
log.info(message)
# record(xpx=data[context.x].price, ypx=data[context.y].price,err=err, yhat=yhat, beta=context.beta[1,0])
##########################################
# Trading section
# Spread (y-beta*x) is traded
##########################################
QTY=1000
qtyX=-context.beta[1,0]*xpx[0,1]*QTY;
qtyY=ypx*QTY;
# similar to zscore in bollinger band
stdQ=np.sqrt(Q)
if err < -stdQ and canEnterLong(context):
# enter long the spread
order(context.y, qtyY)
order(context.x, qtyX)
context.long=True
if err > -stdQ and canExitLong(context):
# exit long the spread
order(context.y, -qtyY)
order(context.x, -qtyX)
context.long=False
if err > stdQ and canEnterShort(context):
# enter short the spread
order(context.y, -qtyY)
order(context.x, -qtyX)
context.short=True
if err < stdQ and canExitShort(context):
# exit short the spread
order(context.y,qtyY)
order(context.x,qtyX)
context.short=False
record(cash=context.portfolio.cash, stock=context.portfolio.positions_value)
def canEnterLong(context):
notional=context.portfolio.positions_value
if notional < context.max_notional \
and not context.long and not context.short:
return True
else:
return False
def canExitLong(context):
if context.long and not context.short:
return True
else:
return False
def canEnterShort(context):
notional=context.portfolio.positions_value
if notional > context.max_notional \
and not context.long and not context.short:
return True
else:
return False
def canExitShort(context):
if context.short and not context.long:
return True
else:
return False
Subscribe to:
Comments (Atom)