一、利用Iqunt
(一)运用策略
1.确认并下载 1 分钟历史数据(必须!)
在 iQuant 客户端 操作 → 数据管理 中:
品种:000001.SZ
周期:1分钟
时间范围:覆盖 2024-01-01 至 2024-01-10
点击下载,等待完成。

2.运行策略
导出的文件在D:\iquant_data\
#encoding:gbk
import pandas as pd
import os
def init(ContextInfo):
# ====== 配置参数 ======
ContextInfo.stock_code = '000001.SZ' # 股票代码
ContextInfo.start_date = '20250101' # 开始日期 yyyymmdd
ContextInfo.end_date = '20260110' # 结束日期 yyyymmdd
ContextInfo.output_dir = r'D:\iquant_data' # 输出目录
ContextInfo.export_done = False
def after_init(ContextInfo):
if ContextInfo.export_done:
return
ContextInfo.export_done = True
stock = ContextInfo.stock_code
start = ContextInfo.start_date
end = ContextInfo.end_date
print(f"开始下载 {stock} 从 {start} 到 {end} 的1分钟K线数据...")
stock_name = ContextInfo.get_stock_name(stock)
print(f"股票名称: {stock_name}")
data_dict = ContextInfo.get_market_data_ex(
fields=['open', 'high', 'low', 'close', 'volume', 'amount'],
stock_code=[stock],
period='1m',
start_time=start,
end_time=end,
count=-1,
dividend_type='front',
fill_data=False,
subscribe=False
)
df = data_dict.get(stock)
if df is None or df.empty:
print("未获取到任何数据,请检查日期范围或数据是否已下载。")
return
print(f"共获取 {len(df)} 条数据")
df = df.copy()
df.reset_index(inplace=True)
time_col = 'stime' if 'stime' in df.columns else 'time'
if time_col not in df.columns:
print("错误:未找到时间列")
return
# ---------- 时间转换 ----------
time_str = df[time_col].astype(str)
sample = time_str.iloc[0] if len(time_str) > 0 else ''
if sample.isdigit() and len(sample) == 14:
df['datetime'] = pd.to_datetime(time_str, format='%Y%m%d%H%M%S', errors='coerce')
elif sample.isdigit() and len(sample) == 8:
df['datetime'] = pd.to_datetime(time_str, format='%Y%m%d', errors='coerce')
else:
df[time_col] = pd.to_numeric(df[time_col], errors='coerce')
df = df[df[time_col] > 0]
df['datetime'] = pd.to_datetime(df[time_col], unit='ms', errors='coerce')
df = df.dropna(subset=['datetime'])
print(f"有效数据行数: {len(df)}")
if len(df) == 0:
print("时间转换后无有效数据,请检查数据")
return
# 检查数据实际起止日期
actual_start = df['datetime'].min().strftime('%Y-%m-%d')
actual_end = df['datetime'].max().strftime('%Y-%m-%d')
print(f"实际数据范围: {actual_start} 至 {actual_end}")
if actual_start > start:
print(f"警告:本地数据最早只到 {actual_start},请补充 {start} 至 {actual_start} 的历史数据")
# ---------- 格式化输出 ----------
# 日期和时间分别格式化为 yyyy-mm-dd 和 HH:MM:SS 字符串
df['日期'] = df['datetime'].dt.strftime('%Y-%m-%d')
df['时间'] = df['datetime'].dt.strftime('%H:%M:%S')
# 价格字段保留2位小数
for col in ['open', 'high', 'low', 'close']:
if col in df.columns:
df[col] = df[col].round(2)
# 成交额一般也保留2位小数(可选)
if 'amount' in df.columns:
df.rename(columns={'amount': '成交额'}, inplace=True)
df['成交额'] = df['成交额'].round(2)
else:
df['成交额'] = 0.0
df['股票代码'] = stock
df['股票名称'] = stock_name
columns_order = ['日期', '时间', '股票代码', '股票名称',
'open', 'high', 'low', 'close', 'volume', '成交额']
existing_cols =
df = df[existing_cols]
# 保存 CSV(注意:strftime 输出的已经是格式化字符串,所以不会自动变回日期对象)
if not os.path.exists(ContextInfo.output_dir):
os.makedirs(ContextInfo.output_dir)
file_name = stock.replace('.', '_') + '.csv'
file_path = os.path.join(ContextInfo.output_dir, file_name)
df.to_csv(file_path, index=False, encoding='gbk')
print(f"数据已保存至: {file_path}")
print("\n数据预览:")
print(df.head())
def handlebar(ContextInfo):
pass
3.核对数据无误

二、通达信导出:
选择标的(螺纹钢主连)-> 选择K线级别(1分钟)-> 导出为txt或excel[reference:0]
按代码:34或者在菜单中选择:


2.使用iquant