Home >  > VNPY源码(八)VNPY的数据流

VNPY源码(八)VNPY的数据流

0

VNPY源码学习系列文章:

VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流

我觉得这张图是最清晰明了的:

策略通过CtaEngine向mdApi发送订阅行情的请求,返回tick交能eventEngine处理。
策略通过CtaEngine向tdApi发送订单相关的请求,返回order交能eventEngine处理。
至于eventEngine处理这些事件的原理,请看Vn.py学习记录十(事件驱动引擎)

尝试写一下自己理解的吧:

一、接收Tick数据到执行策略的流程:

1.ctaEngine对象向eventEngine中注册EVENT_TICK类型事件的处理函数句柄ctaEngine.processTickEvent

2.CTP的OnRtnDepthMarketData返回tick数据。

3.VNPY在OnRtnDepthMarketData中进行了处理,将data里的数据读取并转化成VtTickData对象,并调用ctpGateway.onTick函数,注册EVENT_TICK事件。
(1)OnRtnDepthMarketData
位于cnpy\gateway\ctp-tageway.py里面的class CtpMdApi(MdApi):

def onRtnDepthMarketData(self, data: dict):
    """
    Callback of tick data update.
    """
    symbol = data["InstrumentID"]
    exchange = symbol_exchange_map.get(symbol, "")
    if not exchange:
        return
    
    timestamp = f"{data['ActionDay']} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}"
    
    tick = TickData(
        symbol=symbol,
        exchange=exchange,
        datetime=datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f"),
        name=symbol_name_map[symbol],
        volume=data["Volume"],
        open_interest=data["OpenInterest"],
        last_price=data["LastPrice"],
        limit_up=data["UpperLimitPrice"],
        limit_down=data["LowerLimitPrice"],
        open_price=data["OpenPrice"],
        high_price=data["HighestPrice"],
        low_price=data["LowestPrice"],
        pre_close=data["PreClosePrice"],
        bid_price_1=data["BidPrice1"],
        ask_price_1=data["AskPrice1"],
        bid_volume_1=data["BidVolume1"],
        ask_volume_1=data["AskVolume1"],
        gateway_name=self.gateway_name
    )
    self.gateway.on_tick(tick)  

(2)在ctpGateway里面没有onTick函数,它是在BaseGateway基类里面定义的。

class BaseGateway(ABC):
    """
    Abstract gateway class for creating gateways connection
    to different trading systems.

    # How to implement a gateway:

    ---
    ## Basics
    A gateway should satisfies:
    * this class should be thread-safe:
        * all methods should be thread-safe
        * no mutable shared properties between objects.
    * all methods should be non-blocked
    * satisfies all requirements written in docstring for every method and callbacks.
    * automatically reconnect if connection lost.

    ---
    ## methods must implements:
    all @abstractmethod

    ---
    ## callbacks must response manually:
    * on_tick
    * on_trade
    * on_order
    * on_position
    * on_account
    * on_contract

    All the XxxData passed to callback should be constant, which means that
        the object should not be modified after passing to on_xxxx.
    So if you use a cache to store reference of data, use copy.copy to create a new object
    before passing that data into on_xxxx



    """

    # Fields required in setting dict for connect function.
    default_setting = {}

    # Exchanges supported in the gateway.
    exchanges = []

    def __init__(self, event_engine: EventEngine, gateway_name: str):
        """"""
        self.event_engine = event_engine
        self.gateway_name = gateway_name

    def on_event(self, type: str, data: Any = None):
        """
        General event push.
        """
        event = Event(type, data)
        self.event_engine.put(event)

    def on_tick(self, tick: TickData):
        """
        Tick event push.
        Tick event of a specific vt_symbol is also pushed.
        """
        self.on_event(EVENT_TICK, tick)
        self.on_event(EVENT_TICK + tick.vt_symbol, tick)

4.ctpGateway.onTick函数将VtTickData对象包装成类型为EVENT_TICK的行情事件对象Event,并调用eventEngine.put函数,放入事件引擎的缓冲队列

5.事件引擎的工作线程,从缓冲队列中读取出最新的行情事件后,根据EVENT_TICK事件类型去查找缓存在内部字典中的处理函数列表,并将事件对象作为入参,遍历调用到列表中的处理函数ctaEngine.process_tick_event。

6.执行ctaEngine中的process_tick_event函数,通过Tick的代码vtSymbol,调用交易该代码合约的策略对象strategy.onTick函数,最终去运行策略中的逻辑

#vnpy/app/cta_strategy/engine.py
def process_tick_event(self, event: Event):
    """"""
    tick = event.data

    strategies = self.symbol_strategy_map[tick.vt_symbol]   #symbol_strategy_map是defaultdict,是vt_symbol: strategy list的形式。
    if not strategies:
        return

    self.check_stop_order(tick)

    for strategy in strategies:
        if strategy.inited:
            self.call_strategy_func(strategy, strategy.on_tick, tick)

二、主动订阅:
1.用户通过mainEngine.subscribe函数,发起订阅。
2.mainEngine.subscribe其实又是调用ctpGateway.subscribe函数
3.ctpGateway.subscribe中调用ctpMdApi.subscrbie函数
4.ctpMdApi.subscribe中调用C++封装的MdApi.subscribeMarketData函数,将订阅行情的请求最终通过底层C++ CTP API发出

三、流程描述
整个流程下来,不考虑stoporder,是ctaTemplate -> CtaEngine->mainEngine ->ctpgateway ->CtpTdApi, 传到C++封装的接口。返回的就是vtOrderID; 因为存在平昨,平今还有锁仓,反手等拆分情况,返回的可能是一组。

比如:
策略中的buy -> CTATemplate中的buy -> 其实是执行CTATemplate中的send_order -> 其实是调用self.cta_engine.send_order -> 根据限价单、停止单执行不同逻辑,如果是限价单,执行send_limit_order函数 —> 其实是调用send_server_order -> 调用main_engine.send_order -> 其实是调用gateway.send_order

最后,附一张官方推荐的图:

原载:蜗牛博客
网址:http://www.snailtoday.com
尊重版权,转载时务必以链接形式注明作者和原始出处及本声明。

暧昧帖

本文暂无标签

发表评论

*

*