VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
一、概述
CtaEngine引擎是策略的容器,它启动的时候会将所有的策略都加载进来。

在no_ui的run.py里面,
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
ctp_setting = {
"用户名": "",
"密码": "",
"经纪商代码": "",
"交易服务器": "",
"行情服务器": "",
"产品名称": "",
"授权编码": "",
"产品信息": ""
}
def run_child():
"""
Running in the child process.
"""
SETTINGS["log.file"] = True
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
cta_engine = main_engine.add_app(CtaStrategyApp)
main_engine.write_log("主引擎创建成功")
log_engine = main_engine.get_engine("log")
event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
main_engine.write_log("注册日志事件监听")
main_engine.connect(ctp_setting, "CTP")
main_engine.write_log("连接CTP接口")
sleep(10)
cta_engine.init_engine() #从rqdata获得数据,加载所有策略文件,加载setting文件中设定的策略,加载cta_strategy_data.json文件中的数据,注册tick,order等事件。
main_engine.write_log("CTA策略初始化完成")
cta_engine.init_all_strategies() #订阅行情
sleep(60) # Leave enough time to complete strategy initialization
main_engine.write_log("CTA策略全部初始化")
cta_engine.start_all_strategies() #设定strategy.trading = True,启动策略
main_engine.write_log("CTA策略全部启动")
while True:
sleep(1)
有一句:
cta_engine = main_engine.add_app(CtaStrategyApp)
其实就相当于相当于CtaEngine(MainEngine(),event_engine)。
我们先来看一下CtaStrategyApp是哪里来的,它是在vnpy/app/cta_strategy/__init__.py这个文件里面定义的。
class CtaStrategyApp(BaseApp):
""""""
app_name = APP_NAME
app_module = __module__
app_path = Path(__file__).parent
display_name = "CTA策略"
engine_class = CtaEngine
widget_name = "CtaManager"
icon_name = "cta.ico"
再来看一下这个add_app
它传入的参数是BaseApp。
这个函数的作用相当于:实例化传入的BaseApp(CtaStrategyApp),并添加到self.apps字典里,再添加CtaEngine(传入了MainEngine(),event_engine()), 返回了一个实例化的CtaEngine
def add_app(self, app_class: Type[BaseApp]):
"""
Add app.
"""
app = app_class() #实例化类
self.apps[app.app_name] = app #将类对像添加到 self.apps字典里。
engine = self.add_engine(app.engine_class) #这里app.engine_class就是CtaStrategyApp中的engine_class = CtaEngine
return engine
#再看add_engine,参数形式为:add_engine(CtaEngine),
def add_engine(self, engine_class: Any):
"""
Add function engine.(添加功能引擎)
"""
engine = engine_class(self, self.event_engine) #self指当前类的实例,相当于CtaEngine(MainEngine(),event_engine()),即将主引擎,事件引擎传入了CtaEngine。
self.engines[engine.engine_name] = engine #将实例化后的CtaEngine存入到self.engines字典里面,CtaEngine继承自BaseEngine,BaseEngine中有engine_name
return engine #返回实例化后的引擎
顺便说一下mainEngine,它的作用就是负责对所有引擎实例化。
二、CtaEngine
在vnpy/app/cta_strategy/__init__.py里面,有from .engine import CtaEngine这样的语句,其实就是从在vnpy/app/cta_strategy/engine.py这个文件里面导入CtaEngine。下面我们来看这个CtaEngine。
首先,它是继承自BaseEngine,然后初始化的时候,需要传入两个engine.
class CtaEngine(BaseEngine):
""""""
engine_type = EngineType.LIVE # live trading engine
setting_filename = "cta_strategy_setting.json"
data_filename = "cta_strategy_data.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super(CtaEngine, self).__init__(
main_engine, event_engine, APP_NAME)
在run.py里面,使用了它的下面三个功能:
cta_engine.init_engine()
main_engine.write_log("CTA策略初始化完成")
cta_engine.init_all_strategies()
sleep(60) # Leave enough time to complete strategy initialization
main_engine.write_log("CTA策略全部初始化")
cta_engine.start_all_strategies()
main_engine.write_log("CTA策略全部启动")
(一).init_engine()
def init_engine(self):
"""
"""
self.init_rqdata()
self.load_strategy_class() #载入策略
self.load_strategy_setting()
self.load_strategy_data()
self.register_event()
self.write_log("CTA策略引擎初始化成功")
1.init_rqdata()
初始化rqdata,可以看到有from vnpy.trader.rqdata import rqdata_client,说明这里对米筐进行了封装,因为我自己之前用的米筐是直接使用rq.init()。
2.load_strategy_class()
其实最终的结果就是执行了“self.classes[value.__name__] = value”这一句。
def load_strategy_class(self):
"""
Load strategy class from source code.
"""
path1 = Path(__file__).parent.joinpath("strategies") #就是变成“strategies\”这样的形式
self.load_strategy_class_from_folder(
path1, "vnpy.app.cta_strategy.strategies")
path2 = Path.cwd().joinpath("strategies") #就是变成 “C:\Users\Kevin\Desktop\py2\strategies” 这样的形式
self.load_strategy_class_from_folder(path2, "strategies")
def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
"""
Load strategy class from certain folder.根据传入的文件夹,以及module_name,遍历方位夹内的文件,找到.py文件,拼接成module_name.double_ma这样的形式,再传入到load_strategy_class_from_module这个函数。
"""
for dirpath, dirnames, filenames in os.walk(str(path)):
for filename in filenames: #遍历strategies下所有的文件
if filename.endswith(".py"): #如果是.py文件
strategy_module_name = ".".join( #拼接成 strategies.double_ma_strategy这样的形式
[module_name, filename.replace(".py", "")])
self.load_strategy_class_from_module(strategy_module_name) #执行load_strategy_class_from_module(strategies.double_ma_strategy)
def load_strategy_class_from_module(self, module_name: str):
"""
Load strategy class from module file.
"""
try:
module = importlib.import_module(module_name) #绝对导入,其中的module_name为strategies.double_ma_strategy。
for name in dir(module): #遍历double_ma_strategy的所有属性、方法
value = getattr(module, name)
#只有当name是__class__的时候,isinstance(value, type)才是True,issubclass(value, CtaTemplate) 调试不出来。
if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate):
self.classes[value.__name__] = value #将value放到self.classes这个字典里,value.__name__应该就是策略的class_name。
except: # noqa
msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
self.write_log(msg)
知识点:
(1).os.walk()方法
os.walk方法,主要用来遍历一个目录内各个子目录和子文件。
os.walk(top, topdown=True, onerror=None, followlinks=False)
可以得到一个三元tupple(dirpath, dirnames, filenames),
第一个为起始路径,第二个为起始路径下的文件夹,第三个是起始路径下的文件。
dirpath 是一个string,代表目录的路径,
dirnames 是一个list,包含了dirpath下所有子目录的名字。
filenames 是一个list,包含了非目录文件的名字。
这些名字不包含路径信息,如果需要得到全路径,需要使用os.path.join(dirpath, name).
(2).import importlib
一个函数运行需要根据不同项目的配置,动态导入对应的配置文件运行。
import importlib
# 绝对导入
a = importlib.import_module("clazz.a")
a.show()
# show A
# 相对导入
b = importlib.import_module(".b", "clazz")
b.show()
# show B
#注意,相对导入有个一点., 类似路径
(3).dir()
返回模块的属性列表
class A(object): name = "xxx" bar = 1 def fu(sefl): pass a = A() for xxx in dir(a): print(xxx)

https://www.cnblogs.com/aademeng/articles/7259986.html
(4)getattr()
getattr() 函数用于返回一个对象属性值。最好的理解是通过示例:
>>>class A(object): ... bar = 1 ... >>> a = A() >>> getattr(a, 'bar') # 获取属性 bar 值 1
(5)isinstance
isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()
>>>a = 2 >>> isinstance (a,int) True
class A():
test = "xxx"
bar = 1
def fu(sefl):
pass
a = A()
for i in dir(a):
print("i是{}".format(i))
value = getattr(a, i)
print("value是{}".format(value))
print(isinstance(value, type))
print("*"*80)

3.load_strategy_setting
加载users/你的用户名/.vntrader/下面的策略,并且通过add_strategy实例化多个策略类。
def load_strategy_setting(self):
"""
Load setting file.
"""
self.strategy_setting = load_json(self.setting_filename) #加载cta_strategy_setting.json
for strategy_name, strategy_config in self.strategy_setting.items(): #对字典进行遍历。
self.add_strategy(
strategy_config["class_name"], #策略的类名
strategy_name, #你自己输入的具体策略名称,比如rb1910螺纹,因为一个策略可以跑多个品种,多个周期。
strategy_config["vt_symbol"],
strategy_config["setting"] #从下图可以看出,这里存的就是classname:oneMinuteStrategy
)

通过load_json加载了cta_strategy_setting.json文件,并执行了add_strategy。
备注:cta_strategy_setting.json这个文件是在安装了vnpy之后,在users/你的用户名/.vntrader/下面生成的。
它和你打开vnpy的UI界面,然后执行“功能-CTA策略”打开的窗口看到的已添加策略是对应的。比如在这个界面将rb1910螺纹删除,那么cta_strategy_setting.json这个文件就会变成一个空文件。

同时生成的还有cta_strtegy_data.json文件。
再看看add_strategy
它做的事情就是:添加一个实例化的类,并将这个类放到self.strategies这个字典里面,最后建立合约与实例化的策略类的映射,更新cta_strategy_setting.json文件,再将event put出去。
它要传入的参数是这样的:add_strategy("oneMinuteStrategy","rb1910螺纹","rb1910.SHFE",{"class_name":"oneMinuteStrategy"})
def add_strategy(
self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
):
"""
Add a new strategy.
"""
if strategy_name in self.strategies: #如果策略名已经在self.strategies字典中了。见下面的示例。
self.write_log(f"创建策略失败,存在重名{strategy_name}")
return
strategy_class = self.classes.get(class_name, None) #通过get从self.classes字典找到相应的策略类。
if not strategy_class:
self.write_log(f"创建策略失败,找不到策略类{class_name}")
return
#实例化一个策略类,查看策略类源码,可以发现需要传入的参数为def __init__(self, cta_engine, strategy_name, vt_symbol, setting),下面的self就表示前当的引擎CtaEngine。
strategy = strategy_class(self, strategy_name, vt_symbol, setting)
self.strategies[strategy_name] = strategy #将实例化后的类存到self.strategies这个字典里面
# Add vt_symbol to strategy map. 将rb1910添加到合约与策略的映射。
strategies = self.symbol_strategy_map[vt_symbol] #symbol_strategy_map是一个symbol、strategy映射字典
strategies.append(strategy)
# Update to setting file.
self.update_strategy_setting(strategy_name, setting) 更新cta_strategy_setting.json文件
self.put_strategy_event(strategy)
比如判断某值是否在字典中:
aa = {'a':"ccc",'b':"cc",'c':"333"}
b = "a"
print(aa[b])
if "a" in aa:
print("a在字典中")
from collections import defaultdict
strategy="teststrategy"
vt_symbol="rb1910.SHFE"
symbol_strategy_map = defaultdict(list)
strategies = symbol_strategy_map[vt_symbol]
print(strategies)
print(symbol_strategy_map)
strategies.append(strategy)
print(strategies)
print(symbol_strategy_map)

再看update_strategy_setting这个函数,它的作用就是将json数据存到setting_filename(即cta_strategy_setting.json)文件。
def update_strategy_setting(self, strategy_name: str, setting: dict):
"""
Update setting file.
"""
strategy = self.strategies[strategy_name]
self.strategy_setting[strategy_name] = {
"class_name": strategy.__class__.__name__,
"vt_symbol": strategy.vt_symbol,
"setting": setting,
}
save_json(self.setting_filename, self.strategy_setting)
再看put_strategy_event函数。这个顾名思议,就是将event put出去。
def put_strategy_event(self, strategy: CtaTemplate):
"""
Put an event to update strategy status.
"""
data = strategy.get_data() #就是获得策略的相关信息
event = Event(EVENT_CTA_STRATEGY, data) #将策略的data传到Event事件引擎
self.event_engine.put(event)
(1).get_data
这个来自doubleMA之类策略的父类CtaTemplate,就是获得策略的相关信息
def get_data(self):
"""
Get strategy data.
"""
strategy_data = {
"strategy_name": self.strategy_name,
"vt_symbol": self.vt_symbol,
"class_name": self.__class__.__name__,
"author": self.author,
"parameters": self.get_parameters(),
"variables": self.get_variables(),
}
return strategy_data
(2).这里有一个EVENT_CTA_STRATEGY,又是从base.py里面导入进来的,我们看它的定义,
EVENT_CTA_LOG = "eCtaLog" EVENT_CTA_STRATEGY = "eCtaStrategy" EVENT_CTA_STOPORDER = "eCtaStopOrder"
可以知道,它就是一个事件。
4.load_strategy_data函数
这个函数的作用就是从cta_strategy_data.json中取出数据。因为根据之前的定义,data_filename = "cta_strategy_data.json"。
取到的数据在init_all_strategies的时候需要用到。
def load_strategy_data(self):
"""
Load strategy data from json file.
"""
self.strategy_data = load_json(self.data_filename)
我们看看这个json文件长什么样:

5.register_event
注册事件引擎
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
self.event_engine.register(EVENT_POSITION, self.process_position_event)
二、init_all_strategies()
最终的结果是订阅了symbol的行情,并执行了strategy.on_init(这个on_init是在策略里面定义的,其功能就是self.load_bar(10))。
def init_all_strategies(self):
"""
"""
for strategy_name in self.strategies.keys():
self.init_strategy(strategy_name)
def init_strategy(self, strategy_name: str):
"""
Init a strategy.
"""
self.init_queue.put(strategy_name) #将strategy_name put进去
if not self.init_thread:
self.init_thread = Thread(target=self._init_strategy)
self.init_thread.start()
这个init_queue是在这个类的init中定义的:self.init_queue = Queue()。
init_thread也是在init中定义的:self.init_thread = None
小知识:
queue.empty:exception Queue.Empty 的异常。
再看一下这个_init_strategy
def _init_strategy(self):
"""
Init strategies in queue.
"""
while not self.init_queue.empty():
strategy_name = self.init_queue.get()
strategy = self.strategies[strategy_name]
if strategy.inited:
self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
continue
self.write_log(f"{strategy_name}开始执行初始化")
# Call on_init function of strategy
self.call_strategy_func(strategy, strategy.on_init)
# Restore strategy data(variables)
data = self.strategy_data.get(strategy_name, None)
if data:
for name in strategy.variables:
value = data.get(name, None)
if value:
setattr(strategy, name, value)
# Subscribe market data
contract = self.main_engine.get_contract(strategy.vt_symbol) #获得symbol
if contract:
req = SubscribeRequest(
symbol=contract.symbol, exchange=contract.exchange) #这里是拼接代码
self.main_engine.subscribe(req, contract.gateway_name) #调用主引擎的subscribe,而主引擎的subsribe又是调用Gateway的subscribe.可以看下面的流程图:
else:
self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)
# Put event to update init completed status.
strategy.inited = True
self.put_strategy_event(strategy)
self.write_log(f"{strategy_name}初始化完成")
self.init_thread = None

这里的put_strategy_event函数在上面已经有了。
这里又有一个call_strategy_func,它的解释是“Call function of a strategy and catch any exception raised.”执行策略中的函数,不过需要传入策略,策略的函数两个参数。其实策略也可以不传,但是就无法让后面的strategy.trading = False了。
它的用法是这样的:self.call_strategy_func(strategy, strategy.on_init),其实就是执行strategy.on_init。
def call_strategy_func(
self, strategy: CtaTemplate, func: Callable, params: Any = None
):
"""
Call function of a strategy and catch any exception raised.
"""
try:
if params:
func(params)
else:
func()
except Exception:
strategy.trading = False
strategy.inited = False
msg = f"触发异常已停止\n{traceback.format_exc()}"
self.write_log(msg, strategy)
三、start_all_strategies()
这个函数非常短,就是遍历strategies.keys,然后依次执行start_strategy。最终的结果就是执行strategy.on_start(其实on_start啥都没做,就是输出“策略启动”,再就是执行self.put_event())。
我们来看看start_strategy,它的功能很简单,其实就是执行策略中的on_start函数。
def start_strategy(self, strategy_name: str):
"""
Start a strategy.
"""
strategy = self.strategies[strategy_name] #从self.strategies字典中取出键名为“XX”的策略实例 因为add_strategy那里添加的是实例 。
if not strategy.inited:
self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
return
if strategy.trading:
self.write_log(f"{strategy_name}已经启动,请勿重复操作")
return
self.call_strategy_func(strategy, strategy.on_start)
strategy.trading = True
self.put_strategy_event(strategy)
四、其他函数
cta_engine里面有一个check_stop_order函数。
def check_stop_order(self, tick: TickData):
"""
这个函数就是执行strategy的on_stop_order函数。
"""
for stop_order in list(self.stop_orders.values()):
if stop_order.vt_symbol != tick.vt_symbol:
continue
long_triggered = (
stop_order.direction == Direction.LONG and tick.last_price >= stop_order.price
)
short_triggered = (
stop_order.direction == Direction.SHORT and tick.last_price <= stop_order.price
)
if long_triggered or short_triggered:
strategy = self.strategies[stop_order.strategy_name]
# To get excuted immediately after stop order is
# triggered, use limit price if available, otherwise
# use ask_price_5 or bid_price_5
if stop_order.direction == Direction.LONG:
if tick.limit_up:
price = tick.limit_up
else:
price = tick.ask_price_5
else:
if tick.limit_down:
price = tick.limit_down
else:
price = tick.bid_price_5
contract = self.main_engine.get_contract(stop_order.vt_symbol)
vt_orderids = self.send_limit_order(
strategy,
contract,
stop_order.direction,
stop_order.offset,
price,
stop_order.volume,
stop_order.lock
)
# Update stop order status if placed successfully
if vt_orderids:
# Remove from relation map.
self.stop_orders.pop(stop_order.stop_orderid)
strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
if stop_order.stop_orderid in strategy_vt_orderids:
strategy_vt_orderids.remove(stop_order.stop_orderid)
# Change stop order status to cancelled and update to strategy.
stop_order.status = StopOrderStatus.TRIGGERED
stop_order.vt_orderids = vt_orderids
self.call_strategy_func(
strategy, strategy.on_stop_order, stop_order
)
self.put_stop_order_event(stop_order)
那么这个函数在哪儿被调用呢?
def process_tick_event(self, event: Event):
""""""
tick = event.data
strategies = self.symbol_strategy_map[tick.vt_symbol]
if not strategies:
return
self.check_stop_order(tick)
for strategy in strategies:
if strategy.inited:
self.call_strategy_func(strategy, strategy.on_tick, tick)
这里面的symbol_strategy_map是在CtaEngine的init里面定义的:
self.symbol_strategy_map = defaultdict(list) # vt_symbol: strategy list
再来看看put_stop_order_event函数,它传入的是StopOrder。
def put_stop_order_event(self, stop_order: StopOrder):
"""
Put an event to update stop order status.
"""
event = Event(EVENT_CTA_STOPORDER, stop_order)
self.event_engine.put(event)
最后附上别人画的一张流程图:


process_tick根据symbol分发给相应的策略:

还有这里有一篇文章写得很好,可以参考:
https://www.vnpy.com/forum/topic/1064-ctace-lue-mo-ni-jiao-yi-xue-xi-ji-lu-yi
六、onTrade,onOrder
在单一合约里,onTrade,onOrder是一样原。
在套利的时候,买卖一个跨期的合约,spM1705&m1709,多m1705,空m1709
onOrder会收到一条信息
onTrade会收到两条信息。