Home >  > VNPY源码(五)CtaEngine实盘引擎

VNPY源码(五)CtaEngine实盘引擎

0

VNPY源码学习系列文章:

VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流

一、概述

CtaEngine引擎是策略的容器,它启动的时候会将所有的策略都加载进来。

在no_ui的run.py里面,

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True


ctp_setting = {
    "用户名": "",
    "密码": "",
    "经纪商代码": "",
    "交易服务器": "",
    "行情服务器": "",
    "产品名称": "",
    "授权编码": "",
    "产品信息": ""
}


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    cta_engine = main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(ctp_setting, "CTP")
    main_engine.write_log("连接CTP接口")

    sleep(10)

    cta_engine.init_engine()    #从rqdata获得数据,加载所有策略文件,加载setting文件中设定的策略,加载cta_strategy_data.json文件中的数据,注册tick,order等事件。
    main_engine.write_log("CTA策略初始化完成")

    cta_engine.init_all_strategies()  #订阅行情
    sleep(60)   # Leave enough time to complete strategy initialization
    main_engine.write_log("CTA策略全部初始化")

    cta_engine.start_all_strategies()  #设定strategy.trading = True,启动策略
    main_engine.write_log("CTA策略全部启动")

    while True:
        sleep(1)

有一句:

cta_engine = main_engine.add_app(CtaStrategyApp)

其实就相当于相当于CtaEngine(MainEngine(),event_engine)。

我们先来看一下CtaStrategyApp是哪里来的,它是在vnpy/app/cta_strategy/__init__.py这个文件里面定义的。

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    engine_class = CtaEngine
    widget_name = "CtaManager"
    icon_name = "cta.ico"

再来看一下这个add_app
它传入的参数是BaseApp。
这个函数的作用相当于:实例化传入的BaseApp(CtaStrategyApp),并添加到self.apps字典里,再添加CtaEngine(传入了MainEngine(),event_engine()), 返回了一个实例化的CtaEngine

def add_app(self, app_class: Type[BaseApp]):    
    """
    Add app.
    """
    app = app_class()                           #实例化类
    self.apps[app.app_name] = app               #将类对像添加到 self.apps字典里。

    engine = self.add_engine(app.engine_class)  #这里app.engine_class就是CtaStrategyApp中的engine_class = CtaEngine
    return engine

#再看add_engine,参数形式为:add_engine(CtaEngine),

def add_engine(self, engine_class: Any):
    """
    Add function engine.(添加功能引擎)
    """
    engine = engine_class(self, self.event_engine)  #self指当前类的实例,相当于CtaEngine(MainEngine(),event_engine()),即将主引擎,事件引擎传入了CtaEngine。
    self.engines[engine.engine_name] = engine       #将实例化后的CtaEngine存入到self.engines字典里面,CtaEngine继承自BaseEngine,BaseEngine中有engine_name

    return engine                                   #返回实例化后的引擎

顺便说一下mainEngine,它的作用就是负责对所有引擎实例化。

二、CtaEngine

在vnpy/app/cta_strategy/__init__.py里面,有from .engine import CtaEngine这样的语句,其实就是从在vnpy/app/cta_strategy/engine.py这个文件里面导入CtaEngine。下面我们来看这个CtaEngine。

首先,它是继承自BaseEngine,然后初始化的时候,需要传入两个engine.

class CtaEngine(BaseEngine):
    """"""

    engine_type = EngineType.LIVE  # live trading engine

    setting_filename = "cta_strategy_setting.json"
    data_filename = "cta_strategy_data.json"

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(CtaEngine, self).__init__(
            main_engine, event_engine, APP_NAME)

在run.py里面,使用了它的下面三个功能:

    cta_engine.init_engine()
    main_engine.write_log("CTA策略初始化完成")

    cta_engine.init_all_strategies()
    sleep(60)   # Leave enough time to complete strategy initialization
    main_engine.write_log("CTA策略全部初始化")

    cta_engine.start_all_strategies()
    main_engine.write_log("CTA策略全部启动")

(一).init_engine()

    def init_engine(self):
        """
        """
        self.init_rqdata()
        self.load_strategy_class()     #载入策略  
        self.load_strategy_setting()
        self.load_strategy_data()
        self.register_event()
        self.write_log("CTA策略引擎初始化成功")

1.init_rqdata()
初始化rqdata,可以看到有from vnpy.trader.rqdata import rqdata_client,说明这里对米筐进行了封装,因为我自己之前用的米筐是直接使用rq.init()。

2.load_strategy_class()
其实最终的结果就是执行了“self.classes[value.__name__] = value”这一句。


def load_strategy_class(self):
    """
    Load strategy class from source code.
    """
    path1 = Path(__file__).parent.joinpath("strategies")  #就是变成“strategies\”这样的形式
    self.load_strategy_class_from_folder(
        path1, "vnpy.app.cta_strategy.strategies")

    path2 = Path.cwd().joinpath("strategies")  #就是变成  “C:\Users\Kevin\Desktop\py2\strategies”  这样的形式
    self.load_strategy_class_from_folder(path2, "strategies")

def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
    """
    Load strategy class from certain folder.根据传入的文件夹,以及module_name,遍历方位夹内的文件,找到.py文件,拼接成module_name.double_ma这样的形式,再传入到load_strategy_class_from_module这个函数。
    """
    for dirpath, dirnames, filenames in os.walk(str(path)):
        for filename in filenames:                          #遍历strategies下所有的文件
            if filename.endswith(".py"):                    #如果是.py文件
                strategy_module_name = ".".join(            #拼接成 strategies.double_ma_strategy这样的形式
                    [module_name, filename.replace(".py", "")])
                self.load_strategy_class_from_module(strategy_module_name)   #执行load_strategy_class_from_module(strategies.double_ma_strategy)


def load_strategy_class_from_module(self, module_name: str):
    """
    Load strategy class from module file.
    """
    try:
        module = importlib.import_module(module_name)        #绝对导入,其中的module_name为strategies.double_ma_strategy。

        for name in dir(module):                             #遍历double_ma_strategy的所有属性、方法
            value = getattr(module, name)
            #只有当name是__class__的时候,isinstance(value, type)才是True,issubclass(value, CtaTemplate) 调试不出来。
            if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate):  
                self.classes[value.__name__] = value  #将value放到self.classes这个字典里,value.__name__应该就是策略的class_name。
    except:  # noqa
        msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
        self.write_log(msg)

知识点:
(1).os.walk()方法

os.walk方法,主要用来遍历一个目录内各个子目录和子文件。


os.walk(top, topdown=True, onerror=None, followlinks=False) 

可以得到一个三元tupple(dirpath, dirnames, filenames),

第一个为起始路径,第二个为起始路径下的文件夹,第三个是起始路径下的文件。

dirpath 是一个string,代表目录的路径,

dirnames 是一个list,包含了dirpath下所有子目录的名字。

filenames 是一个list,包含了非目录文件的名字。

这些名字不包含路径信息,如果需要得到全路径,需要使用os.path.join(dirpath, name).

(2).import importlib
一个函数运行需要根据不同项目的配置,动态导入对应的配置文件运行。

import importlib

# 绝对导入
a = importlib.import_module("clazz.a")
a.show()
# show A

# 相对导入
b = importlib.import_module(".b", "clazz")
b.show()
# show B

#注意,相对导入有个一点., 类似路径

(3).dir()
返回模块的属性列表

class A(object):
	name = "xxx"
	bar = 1
	def fu(sefl):
		pass
 
a = A()
for xxx in dir(a):
	print(xxx)


https://www.cnblogs.com/aademeng/articles/7259986.html

(4)getattr()
getattr() 函数用于返回一个对象属性值。最好的理解是通过示例:

>>>class A(object):
...     bar = 1
... 
>>> a = A()
>>> getattr(a, 'bar')        # 获取属性 bar 值
1

(5)isinstance
isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()


>>>a = 2
>>> isinstance (a,int)
True

class A():
	test = "xxx"
	bar = 1
	def fu(sefl):
		pass
 
a = A()
for i in dir(a):
	print("i是{}".format(i))
	value = getattr(a, i)
	print("value是{}".format(value))
	print(isinstance(value, type))
	print("*"*80)	

3.load_strategy_setting
加载users/你的用户名/.vntrader/下面的策略,并且通过add_strategy实例化多个策略类。

def load_strategy_setting(self):
    """
    Load setting file.
    """
    self.strategy_setting = load_json(self.setting_filename)   #加载cta_strategy_setting.json

    for strategy_name, strategy_config in self.strategy_setting.items():  #对字典进行遍历。
        self.add_strategy(
            strategy_config["class_name"],         #策略的类名
            strategy_name,                         #你自己输入的具体策略名称,比如rb1910螺纹,因为一个策略可以跑多个品种,多个周期。
            strategy_config["vt_symbol"], 
            strategy_config["setting"]             #从下图可以看出,这里存的就是classname:oneMinuteStrategy
        )           


通过load_json加载了cta_strategy_setting.json文件,并执行了add_strategy。

备注:cta_strategy_setting.json这个文件是在安装了vnpy之后,在users/你的用户名/.vntrader/下面生成的。

它和你打开vnpy的UI界面,然后执行“功能-CTA策略”打开的窗口看到的已添加策略是对应的。比如在这个界面将rb1910螺纹删除,那么cta_strategy_setting.json这个文件就会变成一个空文件。

同时生成的还有cta_strtegy_data.json文件。

再看看add_strategy
它做的事情就是:添加一个实例化的类,并将这个类放到self.strategies这个字典里面,最后建立合约与实例化的策略类的映射,更新cta_strategy_setting.json文件,再将event put出去。
它要传入的参数是这样的:add_strategy("oneMinuteStrategy","rb1910螺纹","rb1910.SHFE",{"class_name":"oneMinuteStrategy"})

def add_strategy(
    self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
):
    """
    Add a new strategy.
    """
    if strategy_name in self.strategies:       #如果策略名已经在self.strategies字典中了。见下面的示例。
        self.write_log(f"创建策略失败,存在重名{strategy_name}")
        return

    strategy_class = self.classes.get(class_name, None)    #通过get从self.classes字典找到相应的策略类。
    if not strategy_class:
        self.write_log(f"创建策略失败,找不到策略类{class_name}")
        return

    #实例化一个策略类,查看策略类源码,可以发现需要传入的参数为def __init__(self, cta_engine, strategy_name, vt_symbol, setting),下面的self就表示前当的引擎CtaEngine。
    strategy = strategy_class(self, strategy_name, vt_symbol, setting)  
    self.strategies[strategy_name] = strategy  #将实例化后的类存到self.strategies这个字典里面                            

    # Add vt_symbol to strategy map.   将rb1910添加到合约与策略的映射。
    strategies = self.symbol_strategy_map[vt_symbol]  #symbol_strategy_map是一个symbol、strategy映射字典
    strategies.append(strategy)

    # Update to setting file.
    self.update_strategy_setting(strategy_name, setting)   更新cta_strategy_setting.json文件

    self.put_strategy_event(strategy)

比如判断某值是否在字典中:

aa = {'a':"ccc",'b':"cc",'c':"333"}
b = "a"
print(aa[b])

if "a" in aa:
	print("a在字典中")

from collections import defaultdict

strategy="teststrategy"
vt_symbol="rb1910.SHFE"
symbol_strategy_map = defaultdict(list)  
strategies = symbol_strategy_map[vt_symbol]
print(strategies)
print(symbol_strategy_map)
strategies.append(strategy)
print(strategies)
print(symbol_strategy_map)

再看update_strategy_setting这个函数,它的作用就是将json数据存到setting_filename(即cta_strategy_setting.json)文件。

    def update_strategy_setting(self, strategy_name: str, setting: dict):
        """
        Update setting file.
        """
        strategy = self.strategies[strategy_name]

        self.strategy_setting[strategy_name] = {
            "class_name": strategy.__class__.__name__,
            "vt_symbol": strategy.vt_symbol,
            "setting": setting,
        }
        save_json(self.setting_filename, self.strategy_setting)

再看put_strategy_event函数。这个顾名思议,就是将event put出去。

def put_strategy_event(self, strategy: CtaTemplate):
    """
    Put an event to update strategy status.
    """
    data = strategy.get_data()     #就是获得策略的相关信息
    event = Event(EVENT_CTA_STRATEGY, data)  #将策略的data传到Event事件引擎
    self.event_engine.put(event)	

(1).get_data
这个来自doubleMA之类策略的父类CtaTemplate,就是获得策略的相关信息

    def get_data(self):
        """
        Get strategy data.
        """
        strategy_data = {
            "strategy_name": self.strategy_name,
            "vt_symbol": self.vt_symbol,
            "class_name": self.__class__.__name__,
            "author": self.author,
            "parameters": self.get_parameters(),
            "variables": self.get_variables(),
        }
        return strategy_data

(2).这里有一个EVENT_CTA_STRATEGY,又是从base.py里面导入进来的,我们看它的定义,

EVENT_CTA_LOG = "eCtaLog"
EVENT_CTA_STRATEGY = "eCtaStrategy"
EVENT_CTA_STOPORDER = "eCtaStopOrder"

可以知道,它就是一个事件。

4.load_strategy_data函数
这个函数的作用就是从cta_strategy_data.json中取出数据。因为根据之前的定义,data_filename = "cta_strategy_data.json"。
取到的数据在init_all_strategies的时候需要用到。

    def load_strategy_data(self):
        """
        Load strategy data from json file.
        """
        self.strategy_data = load_json(self.data_filename)

我们看看这个json文件长什么样:

5.register_event
注册事件引擎

    def register_event(self):
        """"""
        self.event_engine.register(EVENT_TICK, self.process_tick_event)
        self.event_engine.register(EVENT_ORDER, self.process_order_event)
        self.event_engine.register(EVENT_TRADE, self.process_trade_event)
        self.event_engine.register(EVENT_POSITION, self.process_position_event)

二、init_all_strategies()
最终的结果是订阅了symbol的行情,并执行了strategy.on_init(这个on_init是在策略里面定义的,其功能就是self.load_bar(10))。

def init_all_strategies(self):
    """
    """
    for strategy_name in self.strategies.keys():
        self.init_strategy(strategy_name)


def init_strategy(self, strategy_name: str):
    """
    Init a strategy.
    """ 
    self.init_queue.put(strategy_name)  #将strategy_name put进去

    if not self.init_thread:
        self.init_thread = Thread(target=self._init_strategy)
        self.init_thread.start()

这个init_queue是在这个类的init中定义的:self.init_queue = Queue()。
init_thread也是在init中定义的:self.init_thread = None

小知识:
queue.empty:exception Queue.Empty 的异常。

再看一下这个_init_strategy


def _init_strategy(self):
    """
    Init strategies in queue.
    """
    while not self.init_queue.empty():
        strategy_name = self.init_queue.get()
        strategy = self.strategies[strategy_name]

        if strategy.inited:
            self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
            continue

        self.write_log(f"{strategy_name}开始执行初始化")

        # Call on_init function of strategy
        self.call_strategy_func(strategy, strategy.on_init)

        # Restore strategy data(variables)
        data = self.strategy_data.get(strategy_name, None)
        if data:
            for name in strategy.variables:
                value = data.get(name, None)
                if value:
                    setattr(strategy, name, value)

        # Subscribe market data
        contract = self.main_engine.get_contract(strategy.vt_symbol)  #获得symbol
        if contract:
            req = SubscribeRequest(
                symbol=contract.symbol, exchange=contract.exchange)   #这里是拼接代码
            self.main_engine.subscribe(req, contract.gateway_name)    #调用主引擎的subscribe,而主引擎的subsribe又是调用Gateway的subscribe.可以看下面的流程图:
        else:
            self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)

        # Put event to update init completed status.
        strategy.inited = True
        self.put_strategy_event(strategy)
        self.write_log(f"{strategy_name}初始化完成")
    
    self.init_thread = None

这里的put_strategy_event函数在上面已经有了。
这里又有一个call_strategy_func,它的解释是“Call function of a strategy and catch any exception raised.”执行策略中的函数,不过需要传入策略,策略的函数两个参数。其实策略也可以不传,但是就无法让后面的strategy.trading = False了。

它的用法是这样的:self.call_strategy_func(strategy, strategy.on_init),其实就是执行strategy.on_init。

def call_strategy_func(
    self, strategy: CtaTemplate, func: Callable, params: Any = None
):
    """
    Call function of a strategy and catch any exception raised.
    """
    try:
        if params:
            func(params)
        else:
            func()
    except Exception:
        strategy.trading = False
        strategy.inited = False

        msg = f"触发异常已停止\n{traceback.format_exc()}"
        self.write_log(msg, strategy)

三、start_all_strategies()
这个函数非常短,就是遍历strategies.keys,然后依次执行start_strategy。最终的结果就是执行strategy.on_start(其实on_start啥都没做,就是输出“策略启动”,再就是执行self.put_event())。
我们来看看start_strategy,它的功能很简单,其实就是执行策略中的on_start函数。

def start_strategy(self, strategy_name: str):
    """
    Start a strategy.
    """
    strategy = self.strategies[strategy_name] #从self.strategies字典中取出键名为“XX”的策略实例 因为add_strategy那里添加的是实例 。
    if not strategy.inited:
        self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
        return

    if strategy.trading:    
        self.write_log(f"{strategy_name}已经启动,请勿重复操作")
        return

    self.call_strategy_func(strategy, strategy.on_start)
    strategy.trading = True

    self.put_strategy_event(strategy)

四、其他函数
cta_engine里面有一个check_stop_order函数。

def check_stop_order(self, tick: TickData):
    """
    这个函数就是执行strategy的on_stop_order函数。
    """
    for stop_order in list(self.stop_orders.values()):
        if stop_order.vt_symbol != tick.vt_symbol:
            continue

        long_triggered = (
            stop_order.direction == Direction.LONG and tick.last_price >= stop_order.price
        )
        short_triggered = (
            stop_order.direction == Direction.SHORT and tick.last_price <= stop_order.price
        )

        if long_triggered or short_triggered:
            strategy = self.strategies[stop_order.strategy_name]

            # To get excuted immediately after stop order is
            # triggered, use limit price if available, otherwise
            # use ask_price_5 or bid_price_5
            if stop_order.direction == Direction.LONG:
                if tick.limit_up:
                    price = tick.limit_up
                else:
                    price = tick.ask_price_5
            else:
                if tick.limit_down:
                    price = tick.limit_down
                else:
                    price = tick.bid_price_5
            
            contract = self.main_engine.get_contract(stop_order.vt_symbol)

            vt_orderids = self.send_limit_order(
                strategy, 
                contract,
                stop_order.direction, 
                stop_order.offset, 
                price, 
                stop_order.volume,
                stop_order.lock
            )

            # Update stop order status if placed successfully
            if vt_orderids:
                # Remove from relation map.
                self.stop_orders.pop(stop_order.stop_orderid)

                strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
                if stop_order.stop_orderid in strategy_vt_orderids:
                    strategy_vt_orderids.remove(stop_order.stop_orderid)

                # Change stop order status to cancelled and update to strategy.
                stop_order.status = StopOrderStatus.TRIGGERED
                stop_order.vt_orderids = vt_orderids

                self.call_strategy_func(
                    strategy, strategy.on_stop_order, stop_order
                )
                self.put_stop_order_event(stop_order)

那么这个函数在哪儿被调用呢?

def process_tick_event(self, event: Event):
    """"""
    tick = event.data

    strategies = self.symbol_strategy_map[tick.vt_symbol]
    if not strategies:
        return

    self.check_stop_order(tick)

    for strategy in strategies:
        if strategy.inited:
            self.call_strategy_func(strategy, strategy.on_tick, tick)

这里面的symbol_strategy_map是在CtaEngine的init里面定义的:

self.symbol_strategy_map = defaultdict(list)                   # vt_symbol: strategy list

再来看看put_stop_order_event函数,它传入的是StopOrder。

def put_stop_order_event(self, stop_order: StopOrder):
    """
    Put an event to update stop order status.
    """
    event = Event(EVENT_CTA_STOPORDER, stop_order)
    self.event_engine.put(event)

最后附上别人画的一张流程图:

process_tick根据symbol分发给相应的策略:

还有这里有一篇文章写得很好,可以参考:
https://www.vnpy.com/forum/topic/1064-ctace-lue-mo-ni-jiao-yi-xue-xi-ji-lu-yi

六、onTrade,onOrder

在单一合约里,onTrade,onOrder是一样原。
在套利的时候,买卖一个跨期的合约,spM1705&m1709,多m1705,空m1709

onOrder会收到一条信息

onTrade会收到两条信息。

本文暂无标签

发表评论

*

*