금융공학/논문구현

논문 리뷰 및 구현 - Does Trend Following Work on Stocks? (2005)

냥냥펀치데스 2022. 1. 19. 14:37
728x90

글쓴이

더보기

Cole Wilcox, Eric Crittenden

 

 

소개

행동 편향(투자자 무리, 과소 및 과잉 반응 등)은 금융 시장에서 비정규 수익 분포를 만듭니다. 추세 추종 시스템은 롱테일 분포의 왼쪽 꼬리를 자릅니다. 이 특성은 다양한 매수 및 보유 접근 방식과 비교할 때 추세 추종 시스템의 개선된 위험/수익률 특성을 생성합니다.

최소 주가 필터는 페니 주식을 피하기 위해 사용되며 최소 일일 유동성 필터는 충분히 유동적이지 않은 주식을 피하기 위해 사용됩니다

 

핵심이론

  • 최소 주가 필터, 최소 일일 유동성 필터 설정
  • 포트폴리오는 매일 균등하게 가중되고 재조정됩니다
  • 새로운 사상 최고치를 진입 조건
  • ATR trailing stop 
  • 0.5% 라운드 턴의 거래 비용은 예상 수수료 및 슬리피지를 설명하기 위해 각 거래에서 공제됩니다.

논문구현

python 으로 코드구현을 해보았다

사용한 라이브러리 : ccxt, backtrader, empyrical, pandas, numpy, matplotlib

사용한 환경 : colab

환경설정

import coinmarketcapapi
import pandas as pd
import ccxt
import datetime
from datetime import datetime as dt

필터 설정

  1. 최소 주가 필터 (시총 20위 이내)
  2. 최소 유동성 필터 (유동성 20위 이내)
cmc = coinmarketcapapi.CoinMarketCapAPI('{Your API}')
data_listing = cmc.cryptocurrency_listings_latest()
filter = pd.DataFrame(data_listing.data, columns =['symbol','quote'])

filter['volumne'] = 0
filter['market_cap'] = 0

for i in range(len(filter)) :
  filter.loc[filter.index == i, ['market_cap']] = filter.quote[i]['USD']['market_cap']
  filter.loc[filter.index == i, ['volumne']] = filter.quote[i]['USD']['volume_24h']

filter['rank_volumne'] = filter['volumne'].rank(ascending=False)
filter['rank_market_cap'] = filter['market_cap'].rank(ascending=False)

filter = filter[(filter['rank_market_cap'] <= 20) & (filter['rank_volumne'] <= 20)]

#Stablecoin 제외
filter = filter[~filter['symbol'].isin(['USDT','USDC','BUSD'])]
filter = filter['symbol']
filter

Data 준비

def get_data(place,coin) :
    start_date = int(datetime.datetime(2020, 1, 1, 0, 0).timestamp() * 1000)

    if place == 'binance' :
      binance = ccxt.binance()
      ohlcvs = binance.fetch_ohlcv(coin, timeframe='1d', since=start_date, limit = 365)

    for idx, ohlcv in enumerate(ohlcvs):
        ohlcvs[idx] = [dt.fromtimestamp(ohlcv[0]/1000).strftime('%Y-%m-%d %H:%M:%S'), ohlcv[1], ohlcv[2], ohlcv[3], ohlcv[4],ohlcv[5]]

    df = pd.DataFrame(ohlcvs)
    df.columns = ['Time', 'Open','High','Low','Close','Volume']
    df['Time'] = pd.to_datetime(df['Time'], format='%Y-%m-%d %H:%M:%S', errors='raise')
    df.set_index('Time',inplace=True)

    return df
    
datas = {}

for i in range(len(filter)) :
  datas[filter[i]] = get_data('binance',f'{filter[0]}/USDT')

Indicator 설정

class Highest(bt.Indicator):
    lines = ('highest',)

    def next(self):
          self.lines.highest[0] = max(self.data[0], self.highest[-1])
          
class TrailStop(bt.Indicator):
    lines = (   'trailatr',
                'trailstop',
                'dyntrailstop',
                'trailstoplong',
                'trailstopshort',
                )
    
    params = (  ('direction', 'Short'),        # Long is starting point
                ('tradeopen', False),         # For dynamic behaviour from the strategy
                ('atr_period', 8),
                ('trail_mult', 3),
                )

    plotinfo = dict(subplot=False)
    plotlines = dict(   trailstop       =   dict( ls='',    color='green',  marker='_', _plotskip=True,),
                        dyntrailstop    =   dict( ls='',    color='blue',   marker='_', _plotskip=False,),
                        trailatr        =   dict( ls='-',   color='black',              _plotskip=True),
                        trailstoplong   =   dict( ls='',    color='green',  marker='_', _plotskip=False,),
                        trailstopshort  =   dict( ls='',    color='red',    marker='_', _plotskip=False,),
                        )

    def __init__(self):
        self.l.trailatr = bt.indicators.AverageTrueRange(period=self.p.atr_period)

    def prenext(self):
        self.l.trailstop[0] = 0
        self.l.trailstoplong[0] = 0
        self.l.trailstopshort[0] = 0
        
    def next(self):

        if len(self.datas[0]) == 1:                 # first datapoint
            
            self.l.trailstopshort[0]    =   100000  # Very large number
            self.l.trailstoplong[0]     =   0       # smallest number
        
        else:

            if self.p.direction == 'Long':
                
                self.l.trailstopshort[0]    =   100000
                self.l.trailstoplong[0]     =   max(self.datas[0].close[0] - self.p.trail_mult * self.l.trailatr[0], self.l.trailstoplong[-1],)
                self.l.trailstop[0]         =   self.l.trailstoplong[0]
                
                if self.datas[0].close[0] < self.l.trailstoplong[0]:
                    self.p.direction = 'Short'

            else:

                self.l.trailstoplong[0]     =   0              
                self.l.trailstopshort[0]    =   min(self.datas[0].close[0] + self.p.trail_mult * self.l.trailatr[0], self.l.trailstopshort[-1],)
                self.l.trailstop[0]         =   self.trailstopshort[0]
                
                if self.datas[0].close[0] > self.l.trailstopshort[0]:
                    self.p.direction = 'Long' 

        if self.p.tradeopen == True:  # Dynamic stop long direction only
            self.l.dyntrailstop[0]  =   max(self.datas[0].close[0] - self.p.trail_mult * self.l.trailatr[0], self.l.dyntrailstop[-1],)
        else:
            self.l.dyntrailstop[0]  =   0

전략 설정

class Test_Strategy(bt.Strategy):
    params = dict(
        using=0.05  # 5% using capital
    )

    def __init__(self):
    #     '''
    #     Create an dictionary of indicators so that we can dynamically add the
    #     indicators to the strategy using a loop. This mean the strategy will
    #     work with any numner of data feeds. 
    #     '''
        self.inds = dict()
        self.holding = dict()  # holding periods per data
        self.tradestate = dict()

        for i, d in enumerate(self.datas):
            self.inds[d] = dict()
            self.inds[d]['Highest'] = Highest(d.close)
            self.tradestate[d]                  =   dict()
            self.tradestate[d]['tradeopen']     =   False  
            self.TrailStop =  TrailStop(d, tradeopen=self.tradestate[d]['tradeopen'])
            self.inds[d]['trailstop']       =   self.TrailStop.trailstop
            self.inds[d]['dyntrailstop']    =   self.TrailStop.dyntrailstop
            

            if i > 0: #Check we are not on the first loop of data feed:
                d.plotinfo.plotmaster = self.datas[0]

    def next(self):

        for i, d in enumerate(self.datas):
            dt, dn = self.datetime.date(), d._name
            pos = self.getposition(d).size
            if not pos:  # no market / no orders
                if dt > datetime.datetime.strptime('2020-01-30', '%Y-%m-%d' ).date():  # order is possible after 30 days
                  if self.inds[d]['Highest'] == d.close :
                      self.order_target_percent(d, target=self.p.using)
                      self.holding[d] = 0
                      self.tradestate[d]['tradeopen'] = True
                      print('{} {} Buy {}'.format(dt, dn,type(dt)))


            else:
                if d.close < self.inds[d]['dyntrailstop'] :
                    self.close(data = d)
                    self.tradestate[d]['tradeopen'] = False
                else : self.holding[d] += 1

Backtrader 실행

cerebro = bt.Cerebro()
for i,data in enumerate(list(datas.values())):

  data = bt.feeds.PandasData(dataname=data)
  cerebro.adddata(data, name= f'd{i}')

cerebro.addanalyzer(bt.analyzers.PyFolio)
cerebro.addstrategy(Test_Strategy)

# Set our desired cash start
cerebro.broker.setcash(1000000)

# Add a FixedSize sizer according to the stake
# cerebro.addsizer(bt.sizers.FixedSize, stake=5)

# Set the commission
cerebro.broker.setcommission(commission=0.0)

# Print out the starting conditions
print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

# bt.position.Position(size=10, price=0.0)
# Run over everything
results = cerebro.run()  # [15]
strat = results[0]  # [16]

# Print out the final result
print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())

결과 출력

strat = results[0]
pyfoliozer = strat.analyzers.getbyname('pyfolio')
returns, positions, transactions, gross_lev = pyfoliozer.get_pf_items()

SIMPLE_STAT_FUNCS = [
    ep.annual_return,
    ep.cum_returns_final,
    ep.annual_volatility,
    ep.sharpe_ratio,
    ep.calmar_ratio,
    ep.stability_of_timeseries,
    ep.max_drawdown,
    ep.omega_ratio,
    ep.sortino_ratio,
    ep.tail_ratio,
]

STAT_FUNC_NAMES = {
    'annual_return': 'Annual return',
    'cum_returns_final': 'Cumulative returns',
    'annual_volatility': 'Annual volatility',
    'sharpe_ratio': 'Sharpe ratio',
    'calmar_ratio': 'Calmar ratio',
    'stability_of_timeseries': 'Stability',
    'max_drawdown': 'Max drawdown',
    'omega_ratio': 'Omega ratio',
    'sortino_ratio': 'Sortino ratio',
    'skew': 'Skew',
    'kurtosis': 'Kurtosis',
    'tail_ratio': 'Tail ratio',
    'common_sense_ratio': 'Common sense ratio',
    'value_at_risk': 'Daily value at risk',
    'alpha': 'Alpha',
    'beta': 'Beta',
}

stats = pd.Series()
for i,stat_func in enumerate(SIMPLE_STAT_FUNCS):
    stats[STAT_FUNC_NAMES[stat_func.__name__]] = stat_func(returns)
    stats.iloc[i] = str(np.round(stats.iloc[i]  * 100,3)) + '%'

stats
perf_stats = pd.DataFrame(stats, columns=['Backtest'])
print(perf_stats)

결과

 

Reference