#encoding:gbk """ iQuant:基于KDJ指标的择时策略(多股票) """ import pandas as pd import numpy as np import talib def init(ContextInfo): # 股票池:沪深各市值最大的前几只 ContextInfo.trade_code_list = ['601398.SH', '601857.SH', '601288.SH', '002415.SZ', '000002.SZ'] ContextInfo.set_universe(ContextInfo.trade_code_list) ContextInfo.accID = '6000000058' # KDJ参数(必须设置默认值) ContextInfo.fastk_period = 9 ContextInfo.slowk_period = 3 ContextInfo.slowd_period = 3 # KDJ区间阈值 ContextInfo.up = 80 ContextInfo.down = 20 def handlebar(ContextInfo): h_close = ContextInfo.get_history_data(30, '1d', 'close') h_high = ContextInfo.get_history_data(30, '1d', 'high') h_low = ContextInfo.get_history_data(30, '1d', 'low') h_open = ContextInfo.get_history_data(30, '1d', 'open') # 获取持仓与资金 holdings = get_holdings(ContextInfo.accID, 'STOCK') totalvalue = get_totalvalue(ContextInfo.accID, 'STOCK') cash_per_stock = totalvalue / len(ContextInfo.trade_code_list) for stk in ContextInfo.trade_code_list: if ( stk in h_close and len(h_close[stk]) == 30 and stk in h_open and len(h_open[stk]) == 30 and stk in h_high and len(h_high[stk]) == 30 and stk in h_low and len(h_low[stk]) == 30 ): # 转换为 ndarray c = np.array(h_close[stk][:-1], dtype=np.float64) h = np.array(h_high[stk][:-1], dtype=np.float64) l = np.array(h_low[stk][:-1], dtype=np.float64) # 计算KDJ指标 slowk, slowd = talib.STOCH( h, l, c, fastk_period=ContextInfo.fastk_period, slowk_period=ContextInfo.slowk_period, slowk_matype=0, slowd_period=ContextInfo.slowd_period, slowd_matype=0 ) # 最新价格 last_close = h_close[stk][-1] buynum = int((cash_per_stock / last_close) // 100 * 100) # 按100股取整 # 卖出信号 if slowd[-1] > ContextInfo.up and slowk[-1] < slowd[-1] and stk in holdings: print(f"{stk} 满足卖出条件,执行卖出") order_shares(stk, -holdings[stk]*100, ContextInfo, ContextInfo.accID) # 买入信号 elif slowd[-1] < ContextInfo.down and slowk[-1] > slowd[-1] and buynum >= 100: print(f"{stk} 满足买入条件,买入 {buynum} 股") order_shares(stk, buynum, ContextInfo, ContextInfo.accID) # 获取持仓信息 { code.market: 手数 } def get_holdings(accountid, datatype): holdinglist = {} resultlist = get_trade_detail_data(accountid, datatype, "POSITION") for obj in resultlist: code = obj.m_strInstrumentID + "." + obj.m_strExchangeID holdinglist = obj.m_nVolume // 100 return holdinglist # 获取账户可用资金 def get_totalvalue(accountid, datatype): resultlist = get_trade_detail_data(accountid, datatype, "ACCOUNT") for obj in resultlist: return obj.m_dAvailable # 可用金额 return 0.0
回测结果:
从2017年1月到2025年5月。
# 获取持仓信息 { code.market: 手数 } def get_holdings(accountid, datatype): holdinglist = {} resultlist = get_trade_detail_data(accountid, datatype, "POSITION") for obj in resultlist: print("代码:", obj.m_strInstrumentID) print("市场:", obj.m_strExchangeID) print("持仓手数(总):", obj.m_nVolume) code = obj.m_strInstrumentID + "." + obj.m_strExchangeID holdinglist = obj.m_nVolume // 100 return holdinglist # 获取账户可用资金 def get_totalvalue(accountid, datatype): resultlist = get_trade_detail_data(accountid, datatype, "ACCOUNT") for obj in resultlist: print("账户可用资金:", obj.m_dAvailable) print("账户总资产:", obj.m_dBalance) print("账户保证金:", obj.m_dMargin) return obj.m_dAvailable # 可用金额 return 0.0