VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
我觉得这张图是最清晰明了的:
策略通过CtaEngine向mdApi发送订阅行情的请求,返回tick交能eventEngine处理。
策略通过CtaEngine向tdApi发送订单相关的请求,返回order交能eventEngine处理。
至于eventEngine处理这些事件的原理,请看Vn.py学习记录十(事件驱动引擎)
尝试写一下自己理解的吧:
一、接收Tick数据到执行策略的流程:
1.ctaEngine对象向eventEngine中注册EVENT_TICK类型事件的处理函数句柄ctaEngine.processTickEvent
2.CTP的OnRtnDepthMarketData返回tick数据。
3.VNPY在OnRtnDepthMarketData中进行了处理,将data里的数据读取并转化成VtTickData对象,并调用ctpGateway.onTick函数,注册EVENT_TICK事件。
(1)OnRtnDepthMarketData
位于cnpy\gateway\ctp-tageway.py里面的class CtpMdApi(MdApi):
def onRtnDepthMarketData(self, data: dict): """ Callback of tick data update. """ symbol = data["InstrumentID"] exchange = symbol_exchange_map.get(symbol, "") if not exchange: return timestamp = f"{data['ActionDay']} {data['UpdateTime']}.{int(data['UpdateMillisec']/100)}" tick = TickData( symbol=symbol, exchange=exchange, datetime=datetime.strptime(timestamp, "%Y%m%d %H:%M:%S.%f"), name=symbol_name_map[symbol], volume=data["Volume"], open_interest=data["OpenInterest"], last_price=data["LastPrice"], limit_up=data["UpperLimitPrice"], limit_down=data["LowerLimitPrice"], open_price=data["OpenPrice"], high_price=data["HighestPrice"], low_price=data["LowestPrice"], pre_close=data["PreClosePrice"], bid_price_1=data["BidPrice1"], ask_price_1=data["AskPrice1"], bid_volume_1=data["BidVolume1"], ask_volume_1=data["AskVolume1"], gateway_name=self.gateway_name ) self.gateway.on_tick(tick)
(2)在ctpGateway里面没有onTick函数,它是在BaseGateway基类里面定义的。
class BaseGateway(ABC): """ Abstract gateway class for creating gateways connection to different trading systems. # How to implement a gateway: --- ## Basics A gateway should satisfies: * this class should be thread-safe: * all methods should be thread-safe * no mutable shared properties between objects. * all methods should be non-blocked * satisfies all requirements written in docstring for every method and callbacks. * automatically reconnect if connection lost. --- ## methods must implements: all @abstractmethod --- ## callbacks must response manually: * on_tick * on_trade * on_order * on_position * on_account * on_contract All the XxxData passed to callback should be constant, which means that the object should not be modified after passing to on_xxxx. So if you use a cache to store reference of data, use copy.copy to create a new object before passing that data into on_xxxx """ # Fields required in setting dict for connect function. default_setting = {} # Exchanges supported in the gateway. exchanges = [] def __init__(self, event_engine: EventEngine, gateway_name: str): """""" self.event_engine = event_engine self.gateway_name = gateway_name def on_event(self, type: str, data: Any = None): """ General event push. """ event = Event(type, data) self.event_engine.put(event) def on_tick(self, tick: TickData): """ Tick event push. Tick event of a specific vt_symbol is also pushed. """ self.on_event(EVENT_TICK, tick) self.on_event(EVENT_TICK + tick.vt_symbol, tick)
4.ctpGateway.onTick函数将VtTickData对象包装成类型为EVENT_TICK的行情事件对象Event,并调用eventEngine.put函数,放入事件引擎的缓冲队列
5.事件引擎的工作线程,从缓冲队列中读取出最新的行情事件后,根据EVENT_TICK事件类型去查找缓存在内部字典中的处理函数列表,并将事件对象作为入参,遍历调用到列表中的处理函数ctaEngine.process_tick_event。
6.执行ctaEngine中的process_tick_event函数,通过Tick的代码vtSymbol,调用交易该代码合约的策略对象strategy.onTick函数,最终去运行策略中的逻辑
#vnpy/app/cta_strategy/engine.py def process_tick_event(self, event: Event): """""" tick = event.data strategies = self.symbol_strategy_map[tick.vt_symbol] #symbol_strategy_map是defaultdict,是vt_symbol: strategy list的形式。 if not strategies: return self.check_stop_order(tick) for strategy in strategies: if strategy.inited: self.call_strategy_func(strategy, strategy.on_tick, tick)
二、主动订阅:
1.用户通过mainEngine.subscribe函数,发起订阅。
2.mainEngine.subscribe其实又是调用ctpGateway.subscribe函数
3.ctpGateway.subscribe中调用ctpMdApi.subscrbie函数
4.ctpMdApi.subscribe中调用C++封装的MdApi.subscribeMarketData函数,将订阅行情的请求最终通过底层C++ CTP API发出
三、流程描述
整个流程下来,不考虑stoporder,是ctaTemplate -> CtaEngine->mainEngine ->ctpgateway ->CtpTdApi, 传到C++封装的接口。返回的就是vtOrderID; 因为存在平昨,平今还有锁仓,反手等拆分情况,返回的可能是一组。
比如:
策略中的buy -> CTATemplate中的buy -> 其实是执行CTATemplate中的send_order -> 其实是调用self.cta_engine.send_order -> 根据限价单、停止单执行不同逻辑,如果是限价单,执行send_limit_order函数 —> 其实是调用send_server_order -> 调用main_engine.send_order -> 其实是调用gateway.send_order
最后,附一张官方推荐的图:
原载:蜗牛博客
网址:http://www.snailtoday.com
尊重版权,转载时务必以链接形式注明作者和原始出处及本声明。