Home >  > QTM与聚宽ETF数据核对

QTM与聚宽ETF数据核对

一、结果


经核对,基本上都对得上,有的有0.001的差距。

二、代码
(一)iqunt

#coding:gbk
import numpy as np
import pandas as pd

# ========== 目标 ETF ==========
target_etfs = [
    "513100.SH",  # 纳指ETF
    "159509.SZ",  # 纳指科技ETF
    "513520.SH",  # 日经ETF
    "513030.SH",  # 德国ETF
]

g_data_printed = False

def init(ContextInfo):
    global g_data_printed
    g_data_printed = False
    print("策略初始化完成,等待数据获取...")

def handlebar(ContextInfo):
    global g_data_printed
    if g_data_printed:
        return
    g_data_printed = True

    print("\n========== 开始获取 2025年1月 ETF 收盘价 ==========")
    try:
        hist_dict = ContextInfo.get_market_data_ex(
            fields=['close'],
            stock_code=target_etfs,
            period='1d',
            start_time='20250101',
            end_time='20250130',
            dividend_type='front',
            fill_data=True,
            subscribe=False
        )

        first_etf = target_etfs[0]
        if hist_dict is None or first_etf not in hist_dict or hist_dict[first_etf].empty:
            print("错误:未获取到任何数据")
            if hist_dict is not None:
                print(f"返回的字典键: {list(hist_dict.keys())}")
            return

        # 获取日期索引,兼容字符串和 datetime
        raw_index = hist_dict[first_etf].index
        # 如果索引是字符串,保留原样;如果是 datetime,转为字符串
        dates = []
        for d in raw_index:
            if isinstance(d, str):
                dates.append(d)          # 已经是 '2025-01-06' 格式
            else:
                dates.append(d.strftime('%Y-%m-%d'))

        print(f"共获取到 {len(dates)} 个交易日数据")

        # 打印表头
        header = f"{'日期':<12}" + "".join([f"{etf:>12}" for etf in target_etfs])
        print(header)

        # 逐行打印
        for i, date_str in enumerate(dates):
            row_str = f"{date_str:<12}"
            for etf in target_etfs:
                if etf in hist_dict and i < len(hist_dict[etf]):
                    # 使用位置索引获取收盘价,避免 loc 兼容问题
                    price = hist_dict[etf].iloc[i]['close']
                    if pd.isna(price):
                        row_str += f"{'无数据':>12}"
                    else:
                        row_str += f"{price:>12.3f}"
                else:
                    row_str += f"{'无数据':>12}"
            print(row_str)
        print("=" * 80)

    except Exception as e:
        print(f"获取数据时发生异常: {e}")
        import traceback
        traceback.print_exc()

(二)聚宽

"""
聚宽量化策略:获取并显示多只ETF的2025年1月收盘价
适用环境:回测环境(策略研究也可运行)
"""

import pandas as pd

def get_etf_close_prices(etf_codes, start_date, end_date):
    """
    获取多只ETF在指定日期范围内的收盘价数据
    返回: DataFrame,列为各ETF代码,索引为日期
    """
    all_data = []
    log.info(f"正在获取 {start_date} 至 {end_date} 的ETF收盘价数据...")
    
    for code in etf_codes:
        try:
            # 获取日线收盘价(前复权)
            df = get_price(
                code,
                start_date=start_date,
                end_date=end_date,
                frequency='daily',
                fields=['close'],
                skip_paused=True,
                fq='pre'
            )
            df.rename(columns={'close': code}, inplace=True)
            all_data.append(df)
            log.info(f"✓ {code} 获取成功,共{len(df)}个交易日")
        except Exception as e:
            log.error(f"✗ {code} 获取失败:{str(e)}")
    
    if not all_data:
        log.warn("未获取到任何数据,请检查代码和日期范围。")
        return None
    
    result_df = pd.concat(all_data, axis=1)
    result_df.index.name = '日期'
    result_df = result_df.round(4).sort_index()
    return result_df


def display_etf_summary(df):
    """显示ETF数据的统计摘要(通过log.info输出)"""
    if df is None:
        return
    
    log.info("="*60)
    log.info("ETF 收盘价数据统计摘要")
    log.info("="*60)
    
    for col in df.columns:
        start_price = df[col].iloc[0]
        end_price = df[col].iloc[-1]
        change = end_price - start_price
        change_pct = (change / start_price) * 100
        
        log.info(f"\n{col}:")
        log.info(f"  期初价格: {start_price:.4f}")
        log.info(f"  期末价格: {end_price:.4f}")
        log.info(f"  期间涨跌: {change:.4f} ({change_pct:+.2f}%)")
        log.info(f"  最高价: {df[col].max():.4f}")
        log.info(f"  最低价: {df[col].min():.4f}")
        log.info(f"  均值: {df[col].mean():.4f}")


def initialize(context):
    """
    策略初始化:在回测开始时执行一次
    """
    # 1. 定义要获取的ETF列表(聚宽标准格式)
    etf_list = [
        "513100.XSHG", # 纳指ETF
        "159509.XSHE", # 纳指科技ETF
        "513520.XSHG", # 日经ETF
        "513030.XSHG", # 德国ETF
    ]
    
    # 2. 获取指定日期区间的数据
    start_date = '2025-01-01'
    end_date = '2025-01-30'
    etf_data = get_etf_close_prices(etf_list, start_date, end_date)
    
    # 3. 输出到日志(回测日志中可见)
    if etf_data is not None:
        log.info("\n" + "="*60)
        log.info(f"ETF 收盘价数据表 ({start_date} 至 {end_date})")
        log.info("="*60)
        # DataFrame转字符串输出,避免日志显示异常
        log.info("\n" + etf_data.to_string())
        
        # 输出统计摘要
        display_etf_summary(etf_data)
    
    # 4. 任务完成后停止策略,避免空跑
    log.info("\n数据获取与展示完成,策略结束运行。")
    # 通过抛出异常或设置运行标记来终止后续handle_data调用
    # 简单做法:设置一个全局标记,在handle_data中直接返回
    g.task_done = True


def handle_data(context, data):
    """
    每日数据回调:这里不执行任何操作,只是占位
    """
    # 如果任务已完成,不再输出任何信息
    if g.task_done:
        return

暧昧帖

本文暂无标签