一、结果


经核对,基本上都对得上,有的有0.001的差距。
二、代码
(一)iqunt
#coding:gbk
import numpy as np
import pandas as pd
# ========== 目标 ETF ==========
target_etfs = [
"513100.SH", # 纳指ETF
"159509.SZ", # 纳指科技ETF
"513520.SH", # 日经ETF
"513030.SH", # 德国ETF
]
g_data_printed = False
def init(ContextInfo):
global g_data_printed
g_data_printed = False
print("策略初始化完成,等待数据获取...")
def handlebar(ContextInfo):
global g_data_printed
if g_data_printed:
return
g_data_printed = True
print("\n========== 开始获取 2025年1月 ETF 收盘价 ==========")
try:
hist_dict = ContextInfo.get_market_data_ex(
fields=['close'],
stock_code=target_etfs,
period='1d',
start_time='20250101',
end_time='20250130',
dividend_type='front',
fill_data=True,
subscribe=False
)
first_etf = target_etfs[0]
if hist_dict is None or first_etf not in hist_dict or hist_dict[first_etf].empty:
print("错误:未获取到任何数据")
if hist_dict is not None:
print(f"返回的字典键: {list(hist_dict.keys())}")
return
# 获取日期索引,兼容字符串和 datetime
raw_index = hist_dict[first_etf].index
# 如果索引是字符串,保留原样;如果是 datetime,转为字符串
dates = []
for d in raw_index:
if isinstance(d, str):
dates.append(d) # 已经是 '2025-01-06' 格式
else:
dates.append(d.strftime('%Y-%m-%d'))
print(f"共获取到 {len(dates)} 个交易日数据")
# 打印表头
header = f"{'日期':<12}" + "".join([f"{etf:>12}" for etf in target_etfs])
print(header)
# 逐行打印
for i, date_str in enumerate(dates):
row_str = f"{date_str:<12}"
for etf in target_etfs:
if etf in hist_dict and i < len(hist_dict[etf]):
# 使用位置索引获取收盘价,避免 loc 兼容问题
price = hist_dict[etf].iloc[i]['close']
if pd.isna(price):
row_str += f"{'无数据':>12}"
else:
row_str += f"{price:>12.3f}"
else:
row_str += f"{'无数据':>12}"
print(row_str)
print("=" * 80)
except Exception as e:
print(f"获取数据时发生异常: {e}")
import traceback
traceback.print_exc()
(二)聚宽
"""
聚宽量化策略:获取并显示多只ETF的2025年1月收盘价
适用环境:回测环境(策略研究也可运行)
"""
import pandas as pd
def get_etf_close_prices(etf_codes, start_date, end_date):
"""
获取多只ETF在指定日期范围内的收盘价数据
返回: DataFrame,列为各ETF代码,索引为日期
"""
all_data = []
log.info(f"正在获取 {start_date} 至 {end_date} 的ETF收盘价数据...")
for code in etf_codes:
try:
# 获取日线收盘价(前复权)
df = get_price(
code,
start_date=start_date,
end_date=end_date,
frequency='daily',
fields=['close'],
skip_paused=True,
fq='pre'
)
df.rename(columns={'close': code}, inplace=True)
all_data.append(df)
log.info(f"✓ {code} 获取成功,共{len(df)}个交易日")
except Exception as e:
log.error(f"✗ {code} 获取失败:{str(e)}")
if not all_data:
log.warn("未获取到任何数据,请检查代码和日期范围。")
return None
result_df = pd.concat(all_data, axis=1)
result_df.index.name = '日期'
result_df = result_df.round(4).sort_index()
return result_df
def display_etf_summary(df):
"""显示ETF数据的统计摘要(通过log.info输出)"""
if df is None:
return
log.info("="*60)
log.info("ETF 收盘价数据统计摘要")
log.info("="*60)
for col in df.columns:
start_price = df[col].iloc[0]
end_price = df[col].iloc[-1]
change = end_price - start_price
change_pct = (change / start_price) * 100
log.info(f"\n{col}:")
log.info(f" 期初价格: {start_price:.4f}")
log.info(f" 期末价格: {end_price:.4f}")
log.info(f" 期间涨跌: {change:.4f} ({change_pct:+.2f}%)")
log.info(f" 最高价: {df[col].max():.4f}")
log.info(f" 最低价: {df[col].min():.4f}")
log.info(f" 均值: {df[col].mean():.4f}")
def initialize(context):
"""
策略初始化:在回测开始时执行一次
"""
# 1. 定义要获取的ETF列表(聚宽标准格式)
etf_list = [
"513100.XSHG", # 纳指ETF
"159509.XSHE", # 纳指科技ETF
"513520.XSHG", # 日经ETF
"513030.XSHG", # 德国ETF
]
# 2. 获取指定日期区间的数据
start_date = '2025-01-01'
end_date = '2025-01-30'
etf_data = get_etf_close_prices(etf_list, start_date, end_date)
# 3. 输出到日志(回测日志中可见)
if etf_data is not None:
log.info("\n" + "="*60)
log.info(f"ETF 收盘价数据表 ({start_date} 至 {end_date})")
log.info("="*60)
# DataFrame转字符串输出,避免日志显示异常
log.info("\n" + etf_data.to_string())
# 输出统计摘要
display_etf_summary(etf_data)
# 4. 任务完成后停止策略,避免空跑
log.info("\n数据获取与展示完成,策略结束运行。")
# 通过抛出异常或设置运行标记来终止后续handle_data调用
# 简单做法:设置一个全局标记,在handle_data中直接返回
g.task_done = True
def handle_data(context, data):
"""
每日数据回调:这里不执行任何操作,只是占位
"""
# 如果任务已完成,不再输出任何信息
if g.task_done:
return