Home >  > VNPY源码(六)BacktesterEngine回测引擎

VNPY源码(六)BacktesterEngine回测引擎

0

蜗牛博客VNPY源码学习系列文章

VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流

提示:
回测时最好使用脚本,使用UI界面回测经常出错了啥提示也没有,让你抓狂。

首先看看回测引擎的代码:

from vnpy.app.cta_strategy.backtesting import BacktestingEngine
from vnpy.app.cta_strategy.strategies.boll_channel_strategy import BollChannelStrategy
from datetime import datetime


engine = BacktestingEngine()
engine.set_parameters(
	vt_symbol = "000001.SZSE",
	interval ="d",
	start = datetime(2018,3,23),
	end = datetime(2018,4,23),
	rate = 0,
	slippage = 0,
	size = 300,
	pricetick = 0.2,
	capital = 1_000_000,
	)

engine.add_strategy(BollChannelStrategy,{})
engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()

一、首先看一下set_parameters函数。
首先要明确,这是BacktestingEngine,是在vnpy/app/cta_strategy/backtesting.py这里面定义的,在vnpy/app/cta_backtester/engine.py里面,有一个BacktesterEngine,很容易搞混。

这个函数没啥特别的地方,就是赋值。

二、add_strategy
传入参数,实例化一个策略,相当于执行了DoubleMaStrategy(strategy_name,vt_symbol, setting)

def add_strategy(self, strategy_class: type, setting: dict):
    """"""
    self.strategy_class = strategy_class
    self.strategy = strategy_class(
        self, strategy_class.__name__, self.vt_symbol, setting
    )

三、loda_data
最终的结果是通过数据库的ORM取出DbBarData,遍历DbBarData,通过to_tick或to_bar方法生成tick或Bar,最终得到self.history_data(里面保存tick或bar)。

def load_data(self):
    """"""
    self.output("开始加载历史数据")

    if not self.end:
        self.end = datetime.now()

    if self.start >= self.end:
        self.output("起始日期必须小于结束日期")    
        return        

    self.history_data.clear()       # Clear previously loaded history data

    # Load 30 days of data each time and allow for progress update
    progress_delta = timedelta(days=30)
    total_delta = self.end - self.start

    start = self.start
    end = self.start + progress_delta
    progress = 0

    while start < self.end:
        end = min(end, self.end)  # Make sure end time stays within set range
        
        if self.mode == BacktestingMode.BAR:
            data = load_bar_data(
                self.symbol,
                self.exchange,
                self.interval,
                start,
                end
            )
        else:
            data = load_tick_data(
                self.symbol,
                self.exchange,
                start,
                end
            )

        self.history_data.extend(data)
        
        progress += progress_delta / total_delta
        progress = min(progress, 1)
        progress_bar = "#" * int(progress * 10)
        self.output(f"加载进度:{progress_bar} [{progress:.0%}]")
        
        start = end
        end += progress_delta
    
    self.output(f"历史数据加载完成,数据量:{len(self.history_data)}")

1.load_bar_data

@lru_cache(maxsize=999)
def load_bar_data(
    symbol: str,
    exchange: Exchange,
    interval: Interval,
    start: datetime,
    end: datetime
):
    """"""
    return database_manager.load_bar_data(
        symbol, exchange, interval, start, end
    )

它其实是调用database_manager的load_bar_data方法。通过查看头部可以发现from vnpy.trader.database import database_manager,可是在vnpy.trader.database下面没有找到这个database_manager,只在init.py找到下面这句:database_manager: "BaseDatabaseManager" = init(settings=settings),据官方的说法:

这里的database_manager,是在database内部代码中定义的,并会基于GlobalSetting中的数据库配置自行创建配置不同的对象,inti函数就是返回这个对象

然后在vnpy\trader\database\database_mongo.py里面找到下在的语句。

class MongoManager(BaseDatabaseManager):

    def load_bar_data(
        self,
        symbol: str,
        exchange: Exchange,
        interval: Interval,
        start: datetime,
        end: datetime,
    ) -> Sequence[BarData]:
        s = DbBarData.objects(
            symbol=symbol,
            exchange=exchange.value,
            interval=interval.value,
            datetime__gte=start,
            datetime__lte=end,
        )
        data = [db_bar.to_bar() for db_bar in s]
        return data

这里的.objects是MangoDB的ORM语法。

2.to.bar函数:

def to_bar(self):
    """
    Generate BarData object from DbBarData.
    """
    bar = BarData(
        symbol=self.symbol,
        exchange=Exchange(self.exchange),
        datetime=self.datetime,
        interval=Interval(self.interval),
        volume=self.volume,
        open_interest=self.open_interest,
        open_price=self.open_price,
        high_price=self.high_price,
        low_price=self.low_price,
        close_price=self.close_price,
        gateway_name="DB",
    )
    return bar

再看看to_tick函数

    def to_tick(self):
        """
        Generate TickData object from DbTickData.
        """
        tick = TickData(
            symbol=self.symbol,
            exchange=Exchange(self.exchange),
            datetime=self.datetime,
            name=self.name,
            volume=self.volume,
            open_interest=self.open_interest,
            last_price=self.last_price,
            last_volume=self.last_volume,
            limit_up=self.limit_up,
            limit_down=self.limit_down,
            open_price=self.open_price,
            high_price=self.high_price,
            low_price=self.low_price,
            pre_close=self.pre_close,
            bid_price_1=self.bid_price_1,
            ask_price_1=self.ask_price_1,
            bid_volume_1=self.bid_volume_1,
            ask_volume_1=self.ask_volume_1,
            gateway_name="DB",
        )

        if self.bid_price_2:
            tick.bid_price_2 = self.bid_price_2
            tick.bid_price_3 = self.bid_price_3
            tick.bid_price_4 = self.bid_price_4
            tick.bid_price_5 = self.bid_price_5

            tick.ask_price_2 = self.ask_price_2
            tick.ask_price_3 = self.ask_price_3
            tick.ask_price_4 = self.ask_price_4
            tick.ask_price_5 = self.ask_price_5

            tick.bid_volume_2 = self.bid_volume_2
            tick.bid_volume_3 = self.bid_volume_3
            tick.bid_volume_4 = self.bid_volume_4
            tick.bid_volume_5 = self.bid_volume_5

            tick.ask_volume_2 = self.ask_volume_2
            tick.ask_volume_3 = self.ask_volume_3
            tick.ask_volume_4 = self.ask_volume_4
            tick.ask_volume_5 = self.ask_volume_5

        return tick

四、run_backtesting
这个函数的作是初始化策略,遍历之前的history_data,并撮合限价单,撮合停止单,再执行策略的on_bar函数。

def run_backtesting(self):
    """"""
    if self.mode == BacktestingMode.BAR:
        func = self.new_bar
    else:
        func = self.new_tick

    self.strategy.on_init()

    # Use the first [days] of history data for initializing strategy
    day_count = 0
    ix = 0
    
    for ix, data in enumerate(self.history_data):
        if self.datetime and data.datetime.day != self.datetime.day:
            day_count += 1
            if day_count >= self.days:
                break

        self.datetime = data.datetime
        self.callback(data)

    self.strategy.inited = True
    self.output("策略初始化完成")

    self.strategy.on_start()
    self.strategy.trading = True
    self.output("开始回放历史数据")

    # Use the rest of history data for running backtesting
    for data in self.history_data[ix:]:
        func(data)

    self.output("历史数据回放结束")

1.history_data
这里的history_data是在执行engine.load_data()后得到的。前面有定义ix=0,所以history_data[ix:]就是所有的数据了。

2.new_bar函数
先撮合限价单,再撮合停止单,再执行策略的on_bar函数,进行策略的判断,最后更新每天的收盘价?

def new_bar(self, bar: BarData):
    """"""
    self.bar = bar
    self.datetime = bar.datetime

    #选对之前的订单进行撮合。
    self.cross_limit_order()
    self.cross_stop_order()
    self.strategy.on_bar(bar)

    self.update_daily_close(bar.close_price)

3.撮合成交


    def cross_limit_order(self):
        """
        Cross limit order with last bar/tick data.这是限价单,即低于多少价买入
        """
        if self.mode == BacktestingMode.BAR:
        	##买入的条件是价格低于多少买入,撮合价当然按最低价来算。
            long_cross_price = self.bar.low_price
            short_cross_price = self.bar.high_price
            ##有一种情况,比如设定价格低于98元就买入,下一根k线一开盘就成了95元,然后一路下跌,这时的成交价就是开盘价,不是最低价了。
            long_best_price = self.bar.open_price
            short_best_price = self.bar.open_price
        else:
            long_cross_price = self.tick.ask_price_1
            short_cross_price = self.tick.bid_price_1
            long_best_price = long_cross_price
            short_best_price = short_cross_price

        for order in list(self.active_limit_orders.values()):
            # Push order update with status "not traded" (pending).
            ##这里出现了order,self.active_limit_orders = {}是一个字典,是类开头定义的
            if order.status == Status.SUBMITTING:
                order.status = Status.NOTTRADED
                self.strategy.on_order(order)

            # Check whether limit orders can be filled.
            #限价单情况下,多单成交的条件判断:1.指令是多单;2.回测K线的最低价(long_cross_price)小于发出指令的价格;3.回测K线的最低价(long_cross_price)大于0.
            long_cross = (
                order.direction == Direction.LONG 
                and order.price >= long_cross_price 
                and long_cross_price > 0
            )

            short_cross = (
                order.direction == Direction.SHORT 
                and order.price <= short_cross_price 
                and short_cross_price > 0
            )

            if not long_cross and not short_cross:
                continue

            # Push order udpate with status "all traded" (filled).
            order.traded = order.volume
            order.status = Status.ALLTRADED
            self.strategy.on_order(order)

            self.active_limit_orders.pop(order.vt_orderid)

            # Push trade update
            #生成订单信息
            self.trade_count += 1

            if long_cross:
            	#订单的成交价格
                trade_price = min(order.price, long_best_price)
                pos_change = order.volume
            else:
                trade_price = max(order.price, short_best_price)
                pos_change = -order.volume

            trade = TradeData(
                symbol=order.symbol,
                exchange=order.exchange,
                orderid=order.orderid,
                tradeid=str(self.trade_count),
                direction=order.direction,
                offset=order.offset,
                price=trade_price,
                volume=order.volume,
                time=self.datetime.strftime("%H:%M:%S"),
                gateway_name=self.gateway_name,
            )
            trade.datetime = self.datetime

            self.strategy.pos += pos_change
            self.strategy.on_trade(trade)

            self.trades[trade.vt_tradeid] = trade

五、buy、sell
策略中的on_bar函数中有buy, sell函数。通过查看CtaTemplate可知,buy\sell\short\cover四个函数都是调用send_order。

    def buy(self, price: float, volume: float, stop: bool = False):
        """
        Send buy order to open a long position.
        """
        return self.send_order(Direction.LONG, Offset.OPEN, price, volume, stop)

那我们就来看看send_order

    def send_order(
        self,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool = False,
        lock: bool = False
    ):
        """
        Send a new order.
        """
        if self.trading:
            vt_orderids = self.cta_engine.send_order(
                self, direction, offset, price, volume, stop, lock
            )
            return vt_orderids
        else:
            return []

看后查看发现self.cta_engine = cta_engine,再看它的源码

    def send_order(
        self,
        strategy: CtaTemplate,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        stop: bool,
        lock: bool
    ):
        """
        """
        contract = self.main_engine.get_contract(strategy.vt_symbol)
        if not contract:
            self.write_log(f"委托失败,找不到合约:{strategy.vt_symbol}", strategy)
            return ""

        if stop:
            if contract.stop_supported:
                return self.send_server_stop_order(strategy, contract, direction, offset, price, volume, lock)
            else:
                return self.send_local_stop_order(strategy, direction, offset, price, volume, lock)
        else:
            return self.send_limit_order(strategy, contract, direction, offset, price, volume, lock)

再看send_limit_order、send_server_stop_order,发现它是调用send_server_order函数。

    def send_server_order(
        self,
        strategy: CtaTemplate,
        contract: ContractData,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        type: OrderType,
        lock: bool
    ):
        """
        Send a new order to server.
        """
        # Create request and send order.
        original_req = OrderRequest(
            symbol=contract.symbol,
            exchange=contract.exchange,
            direction=direction,
            offset=offset,
            type=type,
            price=price,
            volume=volume,
        )

        # Convert with offset converter
        req_list = self.offset_converter.convert_order_request(original_req, lock)

        # Send Orders
        vt_orderids = []

        for req in req_list:
            vt_orderid = self.main_engine.send_order(
                req, contract.gateway_name)
            vt_orderids.append(vt_orderid)

            self.offset_converter.update_order_request(req, vt_orderid)
            
            # Save relationship between orderid and strategy.
            self.orderid_strategy_map[vt_orderid] = strategy
            self.strategy_orderid_map[strategy.strategy_name].add(vt_orderid)

        return vt_orderids

main.engine中的send_order。

    def send_order(self, req: OrderRequest, gateway_name: str):
        """
        Send new order request to a specific gateway.
        """
        gateway = self.get_gateway(gateway_name)
        if gateway:
            return gateway.send_order(req)
        else:
            return ""

即下面这张图的流程:

那么回测的时候也是执行策略中的on_bar函数,那么sell,buy在哪里区分出来是回测还是实盘的?应该BacktestingEngine里面这个self就代表回测引擎。

    def add_strategy(self, strategy_class: type, setting: dict):
        """"""
        self.strategy_class = strategy_class
        self.strategy = strategy_class(
            self, strategy_class.__name__, self.vt_symbol, setting
        )

最后看一个函数send_local_stop_order。

    def send_local_stop_order(
        self,
        strategy: CtaTemplate,
        direction: Direction,
        offset: Offset,
        price: float,
        volume: float,
        lock: bool
    ):
        """
        Create a new local stop order.
        """
        self.stop_order_count += 1
        stop_orderid = f"{STOPORDER_PREFIX}.{self.stop_order_count}"

        stop_order = StopOrder(
            vt_symbol=strategy.vt_symbol,
            direction=direction,
            offset=offset,
            price=price,
            volume=volume,
            stop_orderid=stop_orderid,
            strategy_name=strategy.strategy_name,
            lock=lock
        )

        self.stop_orders[stop_orderid] = stop_order

        vt_orderids = self.strategy_orderid_map[strategy.strategy_name]
        vt_orderids.add(stop_orderid)

        self.call_strategy_func(strategy, strategy.on_stop_order, stop_order)
        self.put_stop_order_event(stop_order)

        return stop_orderid

六 、calculate_result
这个函数的功能是实现逐日盯市盈亏计算,返回一个名叫daily_df的DataFrame。

def calculate_result(self):
    """"""
    self.output("开始计算逐日盯市盈亏")

    if not self.trades:
        self.output("成交记录为空,无法计算")
        return

    # Add trade data into daily reuslt.
    for trade in self.trades.values():
        d = trade.datetime.date()
        daily_result = self.daily_results[d]
        daily_result.add_trade(trade)

    # Calculate daily result by iteration.
    pre_close = 0
    start_pos = 0

    for daily_result in self.daily_results.values():
        daily_result.calculate_pnl(
            pre_close, start_pos, self.size, self.rate, self.slippage
        )

        pre_close = daily_result.close_price
        start_pos = daily_result.end_pos

    # Generate dataframe
    results = defaultdict(list)

    for daily_result in self.daily_results.values():
        for key, value in daily_result.__dict__.items():
            results[key].append(value)

    self.daily_df = DataFrame.from_dict(results).set_index("date")

    self.output("逐日盯市盈亏计算完成")
    return self.daily_df        

1.self.trades
这里出现了self.trades,它是一个字典,我们发现它是在cross_limit_order、cross_stop_order这两个函数里面有赋值。因为这两个函数比较复杂难懂,关于这两个函数,打算下一章专门探索。

2.add_trade
这里出现了一个add_trade函数,非常简单,差不多就是一个append的封装。

    def add_trade(self, trade: TradeData):
        """"""
        self.trades.append(trade)

3.calculate_pnl

def calculate_pnl(
    self,
    pre_close: float,
    start_pos: float,
    size: int,
    rate: float,
    slippage: float,
):
    """"""
    self.pre_close = pre_close

    # Holding pnl is the pnl from holding position at day start
    self.start_pos = start_pos
    self.end_pos = start_pos
    self.holding_pnl = self.start_pos * \
        (self.close_price - self.pre_close) * size

    # Trading pnl is the pnl from new trade during the day
    self.trade_count = len(self.trades)

    for trade in self.trades:
        if trade.direction == Direction.LONG:
            pos_change = trade.volume
        else:
            pos_change = -trade.volume

        turnover = trade.price * trade.volume * size

        self.trading_pnl += pos_change * \
            (self.close_price - trade.price) * size
        self.end_pos += pos_change
        self.turnover += turnover
        self.commission += turnover * rate
        self.slippage += trade.volume * size * slippage

    # Net pnl takes account of commission and slippage cost
    self.total_pnl = self.trading_pnl + self.holding_pnl
    self.net_pnl = self.total_pnl - self.commission - self.slippage

七、calculate_statistics
这个函数的功能是计算策略统计指标,返回一个统计指标的字典:

def calculate_statistics(self, df: DataFrame = None, output=True):
    """"""
    self.output("开始计算策略统计指标")

    # Check DataFrame input exterior
    if df is None:
        df = self.daily_df
    
    # Check for init DataFrame 
    if df is None:
        # Set all statistics to 0 if no trade.
        start_date = ""
        end_date = ""
        total_days = 0
        ......
        return_drawdown_ratio = 0
    else:
        # Calculate balance related time series data
        df["balance"] = df["net_pnl"].cumsum() + self.capital
        df["return"] = np.log(df["balance"] / df["balance"].shift(1)).fillna(0)
        df["highlevel"] = (
            df["balance"].rolling(
                min_periods=1, window=len(df), center=False).max()
        )
        df["drawdown"] = df["balance"] - df["highlevel"]
        df["ddpercent"] = df["drawdown"] / df["highlevel"] * 100

        # Calculate statistics value
        start_date = df.index[0]
        end_date = df.index[-1]

        total_days = len(df)
        profit_days = len(df[df["net_pnl"] > 0])
        loss_days = len(df[df["net_pnl"] < 0])

        end_balance = df["balance"].iloc[-1]
        max_drawdown = df["drawdown"].min()
        max_ddpercent = df["ddpercent"].min()

        total_net_pnl = df["net_pnl"].sum()
        daily_net_pnl = total_net_pnl / total_days

        total_commission = df["commission"].sum()
        daily_commission = total_commission / total_days

        total_slippage = df["slippage"].sum()
        daily_slippage = total_slippage / total_days

        total_turnover = df["turnover"].sum()
        daily_turnover = total_turnover / total_days

        total_trade_count = df["trade_count"].sum()
        daily_trade_count = total_trade_count / total_days

        total_return = (end_balance / self.capital - 1) * 100
        annual_return = total_return / total_days * 240
        daily_return = df["return"].mean() * 100
        return_std = df["return"].std() * 100

        if return_std:
            sharpe_ratio = daily_return / return_std * np.sqrt(240)
        else:
            sharpe_ratio = 0

        return_drawdown_ratio = -total_return / max_ddpercent

    # Output
    if output:
        self.output("-" * 30)
        self.output(f"首个交易日:\t{start_date}")
        self.output(f"最后交易日:\t{end_date}")

        self.output(f"总交易日:\t{total_days}")
        self.output(f"盈利交易日:\t{profit_days}")
        self.output(f"亏损交易日:\t{loss_days}")

        self.output(f"起始资金:\t{self.capital:,.2f}")
        self.output(f"结束资金:\t{end_balance:,.2f}")

        self.output(f"总收益率:\t{total_return:,.2f}%")
        self.output(f"年化收益:\t{annual_return:,.2f}%")
        self.output(f"最大回撤: \t{max_drawdown:,.2f}")
        self.output(f"百分比最大回撤: {max_ddpercent:,.2f}%")

        self.output(f"总盈亏:\t{total_net_pnl:,.2f}")
        self.output(f"总手续费:\t{total_commission:,.2f}")
        self.output(f"总滑点:\t{total_slippage:,.2f}")
        self.output(f"总成交金额:\t{total_turnover:,.2f}")
        self.output(f"总成交笔数:\t{total_trade_count}")

        self.output(f"日均盈亏:\t{daily_net_pnl:,.2f}")
        self.output(f"日均手续费:\t{daily_commission:,.2f}")
        self.output(f"日均滑点:\t{daily_slippage:,.2f}")
        self.output(f"日均成交金额:\t{daily_turnover:,.2f}")
        self.output(f"日均成交笔数:\t{daily_trade_count}")

        self.output(f"日均收益率:\t{daily_return:,.2f}%")
        self.output(f"收益标准差:\t{return_std:,.2f}%")
        self.output(f"Sharpe Ratio:\t{sharpe_ratio:,.2f}")
        self.output(f"收益回撤比:\t{return_drawdown_ratio:,.2f}")

    statistics = {
        "start_date": start_date,
        "end_date": end_date,
        "total_days": total_days,
        "profit_days": profit_days,
        "loss_days": loss_days,
        "capital": self.capital,
        "end_balance": end_balance,
        "max_drawdown": max_drawdown,
        "max_ddpercent": max_ddpercent,
        "total_net_pnl": total_net_pnl,
        "daily_net_pnl": daily_net_pnl,
        "total_commission": total_commission,
        "daily_commission": daily_commission,
        "total_slippage": total_slippage,
        "daily_slippage": daily_slippage,
        "total_turnover": total_turnover,
        "daily_turnover": daily_turnover,
        "total_trade_count": total_trade_count,
        "daily_trade_count": daily_trade_count,
        "total_return": total_return,
        "annual_return": annual_return,
        "daily_return": daily_return,
        "return_std": return_std,
        "sharpe_ratio": sharpe_ratio,
        "return_drawdown_ratio": return_drawdown_ratio,
    }

    return statistics

八、show_chart
最后是显示图表的函数。

def show_chart(self, df: DataFrame = None):
    """"""
    # Check DataFrame input exterior        
    if df is None:
        df = self.daily_df

    # Check for init DataFrame        
    if df is None:
        return

    plt.figure(figsize=(10, 16))

    balance_plot = plt.subplot(4, 1, 1)  #表示4行*1列,第一个图
    balance_plot.set_title("Balance")
    df["balance"].plot(legend=True)

    drawdown_plot = plt.subplot(4, 1, 2)
    drawdown_plot.set_title("Drawdown")
    drawdown_plot.fill_between(range(len(df)), df["drawdown"].values)

    pnl_plot = plt.subplot(4, 1, 3)
    pnl_plot.set_title("Daily Pnl")
    df["net_pnl"].plot(kind="bar", legend=False, grid=False, xticks=[])

    distribution_plot = plt.subplot(4, 1, 4)
    distribution_plot.set_title("Daily Pnl Distribution")
    df["net_pnl"].hist(bins=50)

    plt.show()      
本文暂无标签

发表评论

*

*