#encoding:gbk
"""
iQuant:基于KDJ指标的择时策略(多股票)
"""
import pandas as pd
import numpy as np
import talib
def init(ContextInfo):
# 股票池:沪深各市值最大的前几只
ContextInfo.trade_code_list = ['601398.SH', '601857.SH', '601288.SH', '002415.SZ', '000002.SZ']
ContextInfo.set_universe(ContextInfo.trade_code_list)
ContextInfo.accID = '6000000058'
# KDJ参数(必须设置默认值)
ContextInfo.fastk_period = 9
ContextInfo.slowk_period = 3
ContextInfo.slowd_period = 3
# KDJ区间阈值
ContextInfo.up = 80
ContextInfo.down = 20
def handlebar(ContextInfo):
h_close = ContextInfo.get_history_data(30, '1d', 'close')
h_high = ContextInfo.get_history_data(30, '1d', 'high')
h_low = ContextInfo.get_history_data(30, '1d', 'low')
h_open = ContextInfo.get_history_data(30, '1d', 'open')
# 获取持仓与资金
holdings = get_holdings(ContextInfo.accID, 'STOCK')
totalvalue = get_totalvalue(ContextInfo.accID, 'STOCK')
cash_per_stock = totalvalue / len(ContextInfo.trade_code_list)
for stk in ContextInfo.trade_code_list:
if (
stk in h_close and len(h_close[stk]) == 30
and stk in h_open and len(h_open[stk]) == 30
and stk in h_high and len(h_high[stk]) == 30
and stk in h_low and len(h_low[stk]) == 30
):
# 转换为 ndarray
c = np.array(h_close[stk][:-1], dtype=np.float64)
h = np.array(h_high[stk][:-1], dtype=np.float64)
l = np.array(h_low[stk][:-1], dtype=np.float64)
# 计算KDJ指标
slowk, slowd = talib.STOCH(
h, l, c,
fastk_period=ContextInfo.fastk_period,
slowk_period=ContextInfo.slowk_period,
slowk_matype=0,
slowd_period=ContextInfo.slowd_period,
slowd_matype=0
)
# 最新价格
last_close = h_close[stk][-1]
buynum = int((cash_per_stock / last_close) // 100 * 100) # 按100股取整
# 卖出信号
if slowd[-1] > ContextInfo.up and slowk[-1] < slowd[-1] and stk in holdings:
print(f"{stk} 满足卖出条件,执行卖出")
order_shares(stk, -holdings[stk]*100, ContextInfo, ContextInfo.accID)
# 买入信号
elif slowd[-1] < ContextInfo.down and slowk[-1] > slowd[-1] and buynum >= 100:
print(f"{stk} 满足买入条件,买入 {buynum} 股")
order_shares(stk, buynum, ContextInfo, ContextInfo.accID)
# 获取持仓信息 { code.market: 手数 }
def get_holdings(accountid, datatype):
holdinglist = {}
resultlist = get_trade_detail_data(accountid, datatype, "POSITION")
for obj in resultlist:
code = obj.m_strInstrumentID + "." + obj.m_strExchangeID
holdinglist = obj.m_nVolume // 100
return holdinglist
# 获取账户可用资金
def get_totalvalue(accountid, datatype):
resultlist = get_trade_detail_data(accountid, datatype, "ACCOUNT")
for obj in resultlist:
return obj.m_dAvailable # 可用金额
return 0.0
回测结果:
从2017年1月到2025年5月。

# 获取持仓信息 { code.market: 手数 }
def get_holdings(accountid, datatype):
holdinglist = {}
resultlist = get_trade_detail_data(accountid, datatype, "POSITION")
for obj in resultlist:
print("代码:", obj.m_strInstrumentID)
print("市场:", obj.m_strExchangeID)
print("持仓手数(总):", obj.m_nVolume)
code = obj.m_strInstrumentID + "." + obj.m_strExchangeID
holdinglist = obj.m_nVolume // 100
return holdinglist
# 获取账户可用资金
def get_totalvalue(accountid, datatype):
resultlist = get_trade_detail_data(accountid, datatype, "ACCOUNT")
for obj in resultlist:
print("账户可用资金:", obj.m_dAvailable)
print("账户总资产:", obj.m_dBalance)
print("账户保证金:", obj.m_dMargin)
return obj.m_dAvailable # 可用金额
return 0.0
