Home >  > VNPY源码(五)CtaEngine实盘引擎

VNPY源码(五)CtaEngine实盘引擎

0

CtaEngine引擎是策略的容器,它启动的时候会将所有的策略都加载进来。

在no_ui的run.py里面,

SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True


ctp_setting = {
    "用户名": "",
    "密码": "",
    "经纪商代码": "",
    "交易服务器": "",
    "行情服务器": "",
    "产品名称": "",
    "授权编码": "",
    "产品信息": ""
}


def run_child():
    """
    Running in the child process.
    """
    SETTINGS["log.file"] = True

    event_engine = EventEngine()
    main_engine = MainEngine(event_engine)
    main_engine.add_gateway(CtpGateway)
    cta_engine = main_engine.add_app(CtaStrategyApp)
    main_engine.write_log("主引擎创建成功")

    log_engine = main_engine.get_engine("log")
    event_engine.register(EVENT_CTA_LOG, log_engine.process_log_event)
    main_engine.write_log("注册日志事件监听")

    main_engine.connect(ctp_setting, "CTP")
    main_engine.write_log("连接CTP接口")

    sleep(10)

    cta_engine.init_engine()    #从rqdata获得数据,加载所有策略文件,加载setting文件中设定的策略,加载cta_strategy_data.json文件中的数据,注册tick,order等事件。
    main_engine.write_log("CTA策略初始化完成")

    cta_engine.init_all_strategies()  #订阅行情
    sleep(60)   # Leave enough time to complete strategy initialization
    main_engine.write_log("CTA策略全部初始化")

    cta_engine.start_all_strategies()  #设定strategy.trading = True,启动策略
    main_engine.write_log("CTA策略全部启动")

    while True:
        sleep(1)

有一句:

cta_engine = main_engine.add_app(CtaStrategyApp)

其实就相当于相当于CtaEngine(MainEngine(),event_engine)。

我们先来看一下CtaStrategyApp是哪里来的,它是在vnpy/app/cta_strategy/__init__.py这个文件里面定义的。

class CtaStrategyApp(BaseApp):
    """"""

    app_name = APP_NAME
    app_module = __module__
    app_path = Path(__file__).parent
    display_name = "CTA策略"
    engine_class = CtaEngine
    widget_name = "CtaManager"
    icon_name = "cta.ico"

再来看一下这个add_app

def add_app(self, app_class: Type[BaseApp]):
    """
    Add app.
    """
    app = app_class()  #实例化类
    self.apps[app.app_name] = app  #将类对像添加到 self.apps字典里。

    engine = self.add_engine(app.engine_class)  这里app.engine_class就是CtaEngine,engine_class = CtaEngine
    return engine

#再看add_engine
def add_engine(self, engine_class: Any):
    """
    Add function engine.
    """
    engine = engine_class(self, self.event_engine)  #相当于CtaEngine(MainEngine(),event_engine)
    self.engines[engine.engine_name] = engine
    return engine

在vnpy/app/cta_strategy/__init__.py里面,有from .engine import CtaEngine这样的语句,其实就是从在vnpy/app/cta_strategy/engine.py这个文件里面导入CtaEngine。下面我们来看这个CtaEngine。

首先,它是继承自BaseEngine,然后初始化的时候,需要传入两个engine.

class CtaEngine(BaseEngine):
    """"""

    engine_type = EngineType.LIVE  # live trading engine

    setting_filename = "cta_strategy_setting.json"
    data_filename = "cta_strategy_data.json"

    def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
        """"""
        super(CtaEngine, self).__init__(
            main_engine, event_engine, APP_NAME)

在run.py里面,使用了它的下面三个功能:

    cta_engine.init_engine()
    main_engine.write_log("CTA策略初始化完成")

    cta_engine.init_all_strategies()
    sleep(60)   # Leave enough time to complete strategy initialization
    main_engine.write_log("CTA策略全部初始化")

    cta_engine.start_all_strategies()
    main_engine.write_log("CTA策略全部启动")

一.init_engine()

    def init_engine(self):
        """
        """
        self.init_rqdata()
        self.load_strategy_class()
        self.load_strategy_setting()
        self.load_strategy_data()
        self.register_event()
        self.write_log("CTA策略引擎初始化成功")

1.init_rqdata()
初始化rqdata,可以看到有from vnpy.trader.rqdata import rqdata_client,说明这里对米筐进行了封装,因为我自己之前用的米筐是直接使用rq.init()。

2.load_strategy_class()
其实最终的结果就是执行了“self.classes[value.__name__] = value”这一句。


def load_strategy_class(self):
    """
    Load strategy class from source code.
    """
    path1 = Path(__file__).parent.joinpath("strategies")  #就是变成“strategies”这样的形式
    self.load_strategy_class_from_folder(
        path1, "vnpy.app.cta_strategy.strategies")

    path2 = Path.cwd().joinpath("strategies")  #就是变成  “当前文件夹\strategies”  这样的形式
    self.load_strategy_class_from_folder(path2, "strategies")

def load_strategy_class_from_folder(self, path: Path, module_name: str = ""):
    """
    Load strategy class from certain folder.根据传入的文件夹,以及module_name,遍历方位夹内的文件,找到.py文件,拼接成module_name.double_ma这样的形式,再传入到load_strategy_class_from_module这个函数。
    """
    for dirpath, dirnames, filenames in os.walk(str(path)):
        for filename in filenames:
            if filename.endswith(".py"):
                strategy_module_name = ".".join(
                    [module_name, filename.replace(".py", "")])
                self.load_strategy_class_from_module(strategy_module_name)


def load_strategy_class_from_module(self, module_name: str):
    """
    Load strategy class from module file.
    """
    try:
        module = importlib.import_module(module_name)

        for name in dir(module):
            value = getattr(module, name)
            if (isinstance(value, type) and issubclass(value, CtaTemplate) and value is not CtaTemplate):
                self.classes[value.__name__] = value  #将value放到self.classes这个字典里
    except:  # noqa
        msg = f"策略文件{module_name}加载失败,触发异常:\n{traceback.format_exc()}"
        self.write_log(msg)

知识点:
(1).os.walk()方法

os.walk方法,主要用来遍历一个目录内各个子目录和子文件。


os.walk(top, topdown=True, onerror=None, followlinks=False) 

可以得到一个三元tupple(dirpath, dirnames, filenames),

第一个为起始路径,第二个为起始路径下的文件夹,第三个是起始路径下的文件。

dirpath 是一个string,代表目录的路径,

dirnames 是一个list,包含了dirpath下所有子目录的名字。

filenames 是一个list,包含了非目录文件的名字。

这些名字不包含路径信息,如果需要得到全路径,需要使用os.path.join(dirpath, name).

(2).import importlib
一个函数运行需要根据不同项目的配置,动态导入对应的配置文件运行。

(3).dir()
返回模块的属性列表

(4)getattr()
getattr() 函数用于返回一个对象属性值。最好的理解是通过示例:

>>>class A(object):
...     bar = 1
... 
>>> a = A()
>>> getattr(a, 'bar')        # 获取属性 bar 值
1

(5)isinstance
isinstance() 函数来判断一个对象是否是一个已知的类型,类似 type()


>>>a = 2
>>> isinstance (a,int)
True

3.load_strategy_setting
加载users/你的用户名/.vntrader/下面的策略,并且add_strategy。

def load_strategy_setting(self):
    """
    Load setting file.
    """
    self.strategy_setting = load_json(self.setting_filename)

    for strategy_name, strategy_config in self.strategy_setting.items():
        self.add_strategy(
            strategy_config["class_name"], 
            strategy_name,
            strategy_config["vt_symbol"], 
            strategy_config["setting"]
        )           

通过load_json加载了cta_strategy_setting.json文件,并执行了add_strategy。

备注:cta_strategy_setting.json这个文件是在安装了vnpy之后,在users/你的用户名/.vntrader/下面生成的。

它和你打开vnpy的UI界面,然后执行“功能-CTA策略”打开的窗口看到的已添加策略是对应的。比如在这个界面将rb1910螺纹删除,那么cta_strategy_setting.json这个文件就会变成一个空文件。

同时生成的还有cta_strtegy_data.json文件。

再看看add_strategy

def add_strategy(
    self, class_name: str, strategy_name: str, vt_symbol: str, setting: dict
):
    """
    Add a new strategy.
    """
    if strategy_name in self.strategies:
        self.write_log(f"创建策略失败,存在重名{strategy_name}")
        return

    strategy_class = self.classes.get(class_name, None)
    if not strategy_class:
        self.write_log(f"创建策略失败,找不到策略类{class_name}")
        return

    strategy = strategy_class(self, strategy_name, vt_symbol, setting)
    self.strategies[strategy_name] = strategy

    # Add vt_symbol to strategy map.
    strategies = self.symbol_strategy_map[vt_symbol]  #symbol_strategy_map是一个symbol、strategy映射字典
    strategies.append(strategy)

    # Update to setting file.
    self.update_strategy_setting(strategy_name, setting)

    self.put_strategy_event(strategy)

再看update_strategy_setting这个函数,它的作用就是将json数据存到setting_filename(即cta_strategy_setting.json)文件。

    def update_strategy_setting(self, strategy_name: str, setting: dict):
        """
        Update setting file.
        """
        strategy = self.strategies[strategy_name]

        self.strategy_setting[strategy_name] = {
            "class_name": strategy.__class__.__name__,
            "vt_symbol": strategy.vt_symbol,
            "setting": setting,
        }
        save_json(self.setting_filename, self.strategy_setting)

再看put_strategy_event函数。这个顾名思议,就是将event put出去。

def put_strategy_event(self, strategy: CtaTemplate):
    """
    Put an event to update strategy status.
    """
    data = strategy.get_data()
    event = Event(EVENT_CTA_STRATEGY, data)
    self.event_engine.put(event)	

这里有一个EVENT_CTA_STRATEGY,又是从base.py里面导入进来的,我们看它的定义,

EVENT_CTA_LOG = "eCtaLog"
EVENT_CTA_STRATEGY = "eCtaStrategy"
EVENT_CTA_STOPORDER = "eCtaStopOrder"

可以知道,它就是一个事件。

其中的get_data()是CtaTemplate的方法,其作用就是获得策略相关的参数。

    def get_data(self):
        """
        Get strategy data.
        """
        strategy_data = {
            "strategy_name": self.strategy_name,
            "vt_symbol": self.vt_symbol,
            "class_name": self.__class__.__name__,
            "author": self.author,
            "parameters": self.get_parameters(),
            "variables": self.get_variables(),
        }
        return strategy_data

4.load_strategy_data函数
这个函数的作用就是从cta_strategy_data.json中取出数据。因为根据之前的定义,data_filename = "cta_strategy_data.json"。
取到的数据在init_all_strategies的时候需要用到。

    def load_strategy_data(self):
        """
        Load strategy data from json file.
        """
        self.strategy_data = load_json(self.data_filename)

我们看看这个json文件长什么样:

二、init_all_strategies()
最终的结果是订阅了symbol的行情,并执行了strategy.on_init(这个on_init是在策略里面定义的,其功能就是self.load_bar(10))。

def init_all_strategies(self):
    """
    """
    for strategy_name in self.strategies.keys():
        self.init_strategy(strategy_name)


def init_strategy(self, strategy_name: str):
    """
    Init a strategy.
    """ 
    self.init_queue.put(strategy_name)  #将strategy_name put进去

    if not self.init_thread:
        self.init_thread = Thread(target=self._init_strategy)
        self.init_thread.start()

这个init_queue是在这个类的init中定义的:self.init_queue = Queue()。
init_thread也是在init中定义的:self.init_thread = None

小知识:
queue.empty:exception Queue.Empty 的异常。

再看一下这个_init_strategy


def _init_strategy(self):
    """
    Init strategies in queue.
    """
    while not self.init_queue.empty():
        strategy_name = self.init_queue.get()
        strategy = self.strategies[strategy_name]

        if strategy.inited:
            self.write_log(f"{strategy_name}已经完成初始化,禁止重复操作")
            continue

        self.write_log(f"{strategy_name}开始执行初始化")

        # Call on_init function of strategy
        self.call_strategy_func(strategy, strategy.on_init)

        # Restore strategy data(variables)
        data = self.strategy_data.get(strategy_name, None)
        if data:
            for name in strategy.variables:
                value = data.get(name, None)
                if value:
                    setattr(strategy, name, value)

        # Subscribe market data
        contract = self.main_engine.get_contract(strategy.vt_symbol)  #获得symbol
        if contract:
            req = SubscribeRequest(
                symbol=contract.symbol, exchange=contract.exchange)   #这里是拼接代码
            self.main_engine.subscribe(req, contract.gateway_name)    #调用主引擎的subscribe,而主引擎的subsribe又是调用Gateway的subscribe.可以看下面的流程图:
        else:
            self.write_log(f"行情订阅失败,找不到合约{strategy.vt_symbol}", strategy)

        # Put event to update init completed status.
        strategy.inited = True
        self.put_strategy_event(strategy)
        self.write_log(f"{strategy_name}初始化完成")
    
    self.init_thread = None

这里的put_strategy_event函数在上面已经有了。
这里又有一个call_strategy_func,其实就是执行strategy.on_init。

def call_strategy_func(
    self, strategy: CtaTemplate, func: Callable, params: Any = None
):
    """
    Call function of a strategy and catch any exception raised.
    """
    try:
        if params:
            func(params)
        else:
            func()
    except Exception:
        strategy.trading = False
        strategy.inited = False

        msg = f"触发异常已停止\n{traceback.format_exc()}"
        self.write_log(msg, strategy)

三、start_all_strategies()
这个函数非常短,就是遍历strategies.keys,然后依次执行start_strategy。最终的结果就是执行strategy.on_start(其实on_start啥都没做,就是输出“策略启动”,再就是执行self.put_event())。
我们来看看start_strategy。

def start_strategy(self, strategy_name: str):
    """
    Start a strategy.
    """
    strategy = self.strategies[strategy_name]
    if not strategy.inited:
        self.write_log(f"策略{strategy.strategy_name}启动失败,请先初始化")
        return

    if strategy.trading:    #在call_strategy_func对strategy.trading有操作。
        self.write_log(f"{strategy_name}已经启动,请勿重复操作")
        return

    self.call_strategy_func(strategy, strategy.on_start)
    strategy.trading = True

    self.put_strategy_event(strategy)

四、其他函数
cta_engine里面有一个check_stop_order函数。

def check_stop_order(self, tick: TickData):
    """
    这个函数就是执行strategy的on_stop_order函数。
    """
    for stop_order in list(self.stop_orders.values()):
        if stop_order.vt_symbol != tick.vt_symbol:
            continue

        long_triggered = (
            stop_order.direction == Direction.LONG and tick.last_price >= stop_order.price
        )
        short_triggered = (
            stop_order.direction == Direction.SHORT and tick.last_price <= stop_order.price
        )

        if long_triggered or short_triggered:
            strategy = self.strategies[stop_order.strategy_name]

            # To get excuted immediately after stop order is
            # triggered, use limit price if available, otherwise
            # use ask_price_5 or bid_price_5
            if stop_order.direction == Direction.LONG:
                if tick.limit_up:
                    price = tick.limit_up
                else:
                    price = tick.ask_price_5
            else:
                if tick.limit_down:
                    price = tick.limit_down
                else:
                    price = tick.bid_price_5
            
            contract = self.main_engine.get_contract(stop_order.vt_symbol)

            vt_orderids = self.send_limit_order(
                strategy, 
                contract,
                stop_order.direction, 
                stop_order.offset, 
                price, 
                stop_order.volume,
                stop_order.lock
            )

            # Update stop order status if placed successfully
            if vt_orderids:
                # Remove from relation map.
                self.stop_orders.pop(stop_order.stop_orderid)

                strategy_vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
                if stop_order.stop_orderid in strategy_vt_orderids:
                    strategy_vt_orderids.remove(stop_order.stop_orderid)

                # Change stop order status to cancelled and update to strategy.
                stop_order.status = StopOrderStatus.TRIGGERED
                stop_order.vt_orderids = vt_orderids

                self.call_strategy_func(
                    strategy, strategy.on_stop_order, stop_order
                )
                self.put_stop_order_event(stop_order)

那么这个函数在哪儿被调用呢?

def process_tick_event(self, event: Event):
    """"""
    tick = event.data

    strategies = self.symbol_strategy_map[tick.vt_symbol]
    if not strategies:
        return

    self.check_stop_order(tick)

    for strategy in strategies:
        if strategy.inited:
            self.call_strategy_func(strategy, strategy.on_tick, tick)

这里面的symbol_strategy_map是在CtaEngine的init里面定义的:

self.symbol_strategy_map = defaultdict(list)                   # vt_symbol: strategy list

再来看看put_stop_order_event函数,它传入的是StopOrder。

def put_stop_order_event(self, stop_order: StopOrder):
    """
    Put an event to update stop order status.
    """
    event = Event(EVENT_CTA_STOPORDER, stop_order)
    self.event_engine.put(event)

最后附上别人画的一张流程图:

process_tick根据symbol分发给相应的策略:

还有这里有一篇文章写得很好,可以参考:
https://www.vnpy.com/forum/topic/1064-ctace-lue-mo-ni-jiao-yi-xue-xi-ji-lu-yi

六、onTrade,onOrder

在单一合约里,onTrade,onOrder是一样原。
在套利的时候,买卖一个跨期的合约,spM1705&m1709,多m1705,空m1709

onOrder会收到一条信息

onTrade会收到两条信息。

本文暂无标签

发表评论

*

*