AI量化框架的主体是backtrader的事件驱动回测以及实盘能力。外加wxPython的GUI,支持csv、hdf5,Arctic(mongo),qlib的数据存储能力——这里也会把qlib框架整合进来,但不会用它的全流程——我把它的“因子表达式”能力独立拆分出来,不依赖它自有的数据格式,可以兼容csv,hdf5,mongo等。
01 backtrader策略
为了与“向量化回测”整合:qlib的AI导向的框架与backtrader的事件驱动与实盘做对比,使用backtrader来走一下“双均线”的策略流程。
初始化大脑(默认是1万,可以通过broker的setcash把初始资金改为10万)
取初始投资组合总市值
运行大脑
取投资组合总市值
cerebro = bt.Cerebro(maxcpus=1) # 在notebook里运作,maxcpus=1很重要
cerebro.broker.setcash(100000.0) #
print('初始总市值: %.2f' % cerebro.broker.getvalue())
cerebro.run()
print('期末总市值: %.2f' % cerebro.broker.getvalue())
添加数据源,使用pandasdata
使用HDFStore从hdf5存储中读出数据
初始化为bt的格式,只需要多一列openinterest
使用bt.feeds.PandasData封装后,添加给大脑
import pandas as pd
from datetime import datetime
symbol = 'SPX'
start = datetime(2010,1,1)
end = datetime(2022,9,23)
with pd.HDFStore('../../data/hdf5/index.h5') as store:
df = store[symbol]
df['openinterest'] = 0
df = df[['open', 'high', 'low', 'close', 'volume', 'openinterest']]
data = bt.feeds.PandasData(dataname=df, name=symbol, fromdate=start, todate=end)
cerebro.adddata(data)
双均线策略:
class SMAStrategy(bt.Strategy):
def __init__(self):
self.sma_short = bt.indicators.SMA(period=42)
self.sma_long = bt.indicators.SMA(period=252)
#self.sma = btind.SimpleMovingAverage(period=15)
def next(self):
#print(self.data0[0], self.sma_short[0], self.sma_long[0])
if self.sma_short[0] > self.sma_long[0]:
self.buy()
else:
self.close()
02 backtrader通用模板
# encoding:utf8
from datetime import datetime
import backtrader as bt
import pandas as pd
from loguru import logger
class BacktraderEngine:
def __init__(self, init_cash=1000000.0, benchmark='000300.SH', start=datetime(2010, 1, 1),
end=datetime.now().date()):
self.init_cash = init_cash
self.start = start
self.end = end
self.benchmark = benchmark
cerebro = bt.Cerebro()
cerebro.broker.setcash(init_cash)
# 设置手续费
cerebro.broker.setcommission(0.0001)
# 滑点:双边各 0.0001
cerebro.broker.set_slippage_perc(perc=0.0001)
self.cerebro = cerebro
self.cerebro.addanalyzer(bt.analyzers.PyFolio, _name='_PyFolio')
添加数据:
def add_data(self, code):
# 加载数据
df = self.feed.get_df(code)
df = to_backtrader_dataframe(df)
data = bt.feeds.PandasData(dataname=df, name=code, fromdate=self.start, todate=self.end)
self.cerebro.adddata(data)
self.cerebro.addobserver(bt.observers.Benchmark,
data=data)
self.cerebro.addobserver(bt.observers.TimeReturn)
策略模板:
策略里,notify_order, notify_trade两个函数,一个是订阅提交时触发,一个是交易产生时触发,这个在策略调试时特别有用,而且是通用的,直接写在基类里,另外就是基数实现log函数
# encoding: utf8
import backtrader as bt
from loguru import logger
import pandas as pd
class StrategyBase(bt.Strategy):
def log(self, txt, dt=None):
dt = dt or self.datas[0].datetime.date(0)
logger.info('%s, %s' % (dt.isoformat(), txt))
def get_current_dt(self):
return self.datas[0].datetime.date(0)
# 打印订单日志
def notify_order(self, order):
order_status = ['Created', 'Submitted', 'Accepted', 'Partial',
'Completed', 'Canceled', 'Expired', 'Margin', 'Rejected']
# 未被处理的订单
if order.status in [order.Submitted, order.Accepted]:
self.log('未处理订单:订单号:%.0f, 标的: %s, 状态状态: %s' % (order.ref,
order.data._name,
order_status[order.status]))
# 已经处理的订单
if order.status in [order.Partial, order.Completed]:
if order.isbuy():
self.log(
'BUY EXECUTED, 状态: %s, 订单号:%.0f, 标的: %s, 数量: %.2f, 价格: %.2f, 成本: %.2f, 手续费 %.2f' %
(order_status[order.status], # 订单状态
order.ref, # 订单编号
order.data._name, # 股票名称
order.executed.size, # 成交量
order.executed.price, # 成交价
order.executed.value, # 成交额
order.executed.comm)) # 佣金
else: # Sell
self.log(
'SELL EXECUTED, status: %s, ref:%.0f, name: %s, Size: %.2f, Price: %.2f, Cost: %.2f, Comm %.2f' %
(order_status[order.status],
order.ref,
order.data._name,
order.executed.size,
order.executed.price,
order.executed.value,
order.executed.comm))
elif order.status in [order.Canceled, order.Margin, order.Rejected, order.Expired]:
# order.Margin资金不足,订单无法成交
# 订单未完成
self.log('未完成订单,订单号:%.0f, 标的 : %s, 订单状态: %s' % (
order.ref, order.data._name, order_status[order.status]))
self.order = None
def notify_trade(self, trade):
logger.debug('trade......',trade.status)
# 交易刚打开时
if trade.justopened:
self.log('开仓, 标的: %s, 股数: %.2f,价格: %.2f' % (
trade.getdataname(), trade.size, trade.price))
# 交易结束
elif trade.isclosed:
self.log('平仓, 标的: %s, GROSS %.2f, NET %.2f, 手续费 %.2f' % (
trade.getdataname(), trade.pnl, trade.pnlcomm, trade.commission))
# 更新交易状态
else:
self.log('交易更新, 标的: %s, 仓位: %.2f,价格: %.2f' % (
trade.getdataname(), trade.size, trade.price))
这样我们在写一个新策略时,直接继承使用,代码就非常简洁:
class StrategyPickTime(StrategyBase):
params = (
('long', 252),
('short', 42),
)
def __init__(self):
# bt.ind.EMA(self.data, period=self.p.ema_period)
self.long = bt.ind.SMA(period=self.p.long)
self.short = bt.ind.SMA(period=self.p.short)
def next(self):
if self.short[0] > self.long[0]:
self.buy()
else:
self.close()
运行时,可以打印出所有的日志:
03 “模块化算子”
这个才是“积木式”策略开发的精华。
我们连上面的代码都不想写,想直接从“仓库”里复用。
我们把策略逻辑拆解成:选标的、按信号筛选,排序,仓位分配等等。
先从最简单的“等权重买入并持有,每个季度动态再平衡”,这个经典的资本配置策略开始。
这里拆分成“算子”会是:
1、每个季度运行一次(含首次)
2、选标的(选择所有)
3、仓位分配(等权)
我们在策略模板的基础上,再继承实现”算子策略模板“。
传入一个算子列表,在策略next函数里,逐个执行每个algo,若遇到某个algo返回True,则本次运行结果。
class StratgeyAlgoBase(StrategyBase):
def __init__(self, algo_list=None):
self.algos = algo_list
def next(self):
context = {
'strategy': self
}
for algo in self.algos:
if algo(context) is True: # 如果algo返回True,直接不运行,本次不调仓
return
有了这个模板,结合前面说的三个步骤:
我们不需要写代码就可以完成这个策略!
e.cerebro.addstrategy(StratgeyAlgoBase, algo_list=[
RunQuarterly(),
SelectAll(),
WeightEqually
])
我们只需要持续封装算子库,可以最大化减少开发策略所需要的时间,把时间多多用在策略的思想上吧。
后续我们再来介绍每个“算子”如何设计。
所有代码、数据请前往星球下载。
整合:qlib的AI导向的框架与backtrader的事件驱动与实盘