【掘金使用技巧5】实时模式下利用tick合成变化的bar

发布于: 雪球转发:0回复:0喜欢:0

对应场景:

在看盘时,经常会看到不断伸缩变化的k线。由于每时每刻都会有新数据推送过来,这些长周期的k线需要不断更新,才能反映出标的最新状态。变化的k线其本质上就是不断用更新、更短周期的数据合成长周期的k线。

举个例子:

假设我的策略是以5分钟bar为基础运行的,在终端里,只有这5分钟结束后对应的bar数据才会返回。但我觉得等5分钟bar返回时再进入买卖逻辑时黄花菜都凉了,不如每返回一个tick就更新一次5分钟bar,这样我得到的5分钟bar虽然是个“残缺的”bar,但它涵盖了标的最新信息。

怎样利用掘金现有接口实现?

以用tick合成不断更新的900sbar为例说明。

1. 订阅tick数据和900sbar数据

tick数据是合成长周期bar的原料,因此要订阅tick数据合成bar。每返回一个tick,就要把tick储存到本地中,为了后续计算。

订阅900sbar主要有2个原因。一方面是想要通过订阅获取历史数据,一方面是因为想要在每个完整bar出现以后,将储存的tick数据清空,合成新的bar,即初始化。

2. 计算“残缺的”bar

历史tick数据储存完了以后,利用这些数据进行合成。需要储存的数据为“price”最新价。

残缺的bar最高价 = tick最新价中最大值
残缺的bar最低价 = tick最新价中最小值
残缺的bar收盘价 = tick的收盘价

Tips:为什么不用tick的最高、最低价合成bar的最高价和最低价呢?

因为tick数据的最高价和最低价是日线的最高和最低,并不是当前tick的最高和最低。

代码实现

tick合成 900s bar 数据

# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
from gm.api import *
import numpy as np
import pandas as pd
from gm.api import *

'''
每个tick合成一次900s的bar
'''

def init(context):
    # 定义标的代码
    context.goods_futures = ['CZCE.MA101', 'SHFE.rb2101', 'SHFE.fu2101', 'DCE.pp2101']
    # 订阅bar数据
    subscribe(symbols=context.goods_futures, frequency='900s', count=100)
    # 缓存tick数据
    context.future_tick_data = get_all_tick(context)
    # 订阅tick数据
    subscribe(symbols=context.goods_futures, frequency='tick')


def on_tick(context, tick):
    # 将每个新返回的tick添加到本地
    context.future_tick_data[tick['symbol']] = np.append(context.future_tick_data[tick['symbol']], tick['price'])
    symbol = tick['symbol']
    close = tick['price']
    
    # 取31个历史900s的bar,all_bar表示历史的31个bar + "残缺的"bar
    data = context.data(symbol=symbol, frequency='900s', count=31, fields='close, low, high, eob')
    all_bar = data.append([{'close': close, 'low': context.future_tick_data[symbol].min(),
                             'high': context.future_tick_data[symbol].max(), 'eob': tick['created_at']}],
                           ignore_index=True)



def on_bar(context, bars):
    # 每达到整900s,初始化一次缓存的tick数据
    if bars[0]['frequency'] == '900s':
        context.future_tick_data[bars[0]['symbol']] = np.array([])


# 缓存策略tick数据(从上一个bar结束到当前时间点的数据)
def get_all_tick(context):
    # 定义局部变量
    future_tick = locals()
    for symbol in context.goods_futures:
        # 定义获取tick数据的时间:从上一个bar结束时间 到 当前时间
        start_time = context.data(symbol=symbol, frequency='900s', count=1)['eob']
        data = history(symbol=symbol, frequency='tick', start_time=start_time, end_time=context.now, adjust=ADJUST_PREV,
                       adjust_end_time=context.now.date(), df=True)
    # 判断数据是否为0
        if data.empty:
            future_tick[symbol] = np.array([])
        else:
            future_tick[symbol] = data['price'].to_numpy()

    return future_tick


if __name__ == '__main__':
    '''
    strategy_id策略ID,由系统生成
    filename文件名,请与本文件名保持一致
    mode实时模式:MODE_LIVE回测模式:MODE_BACKTEST
    token绑定计算机的ID,可在系统设置-密钥管理中生成
    '''
    run(strategy_id='在这里输入策略id',
        filename='main.py',
        mode=MODE_LIVE,
        token='在这里输入token')

来源:掘金量化    原文链接:网页链接