一、建立你的第一个策略
直接以最简单的双均线为例。
# encoding:gbk import numpy as np class State: pass A = State() # 存储策略状态 def init(C): """初始化策略参数""" A.acct = '您的账号' # 替换为实际账号 A.acct_type = 'STOCK' # 账号类型 A.symbol = '600000.SH' # 格式必须为"代码.交易所" A.ma_fast = 5 # 快线周期(5日均线) A.ma_slow = 10 # 慢线周期(10日均线) A.volume = 100 # 每次交易100股 print(f"双均线策略启动:交易标的 {A.symbol}") def handlebar(C): """K线处理逻辑""" # 1. 使用get_market_data_ex获取数据(新版接口) print(f"当前回测K线时间:{timetag_to_datetime(C.get_bar_timetag(C.barpos), '%Y%m%d')}") try: data = C.get_market_data_ex( ['close'], [A.symbol], end_time=timetag_to_datetime(C.get_bar_timetag(C.barpos), '%Y%m%d'), period='1d', count=A.ma_slow + 1 ) except Exception as e: print(f"获取行情数据失败:{str(e)}") return # 2. 检查数据有效性 if A.symbol not in data or len(data[A.symbol]['close']) < A.ma_slow + 1: print(f"数据异常:{A.symbol} 不存在或长度不足(需要{A.ma_slow+1}天,实际{len(data[A.symbol]['close']) if A.symbol in data else 0}天)") return close_data = data[A.symbol]['close'] # 3. 计算均线值 fast_ma = np.mean(close_data[-A.ma_fast:]) # 5日均线 slow_ma = np.mean(close_data[-A.ma_slow:]) # 10日均线 prev_fast = np.mean(close_data[-A.ma_fast-1:-1]) # 上一日5日均线 prev_slow = np.mean(close_data[-A.ma_slow-1:-1]) # 上一日10日均线 print(f"日期:{timetag_to_datetime(C.get_bar_timetag(C.barpos),'%Y%m%d')} | " f"收盘价:{close_data[-1]:.2f} | " f"5日均线:{fast_ma:.2f} | " f"10日均线:{slow_ma:.2f}") # 4. 获取持仓 holdings = get_trade_detail_data(A.acct, A.acct_type, 'position') hold_vol = 0 for pos in holdings: if pos.m_strInstrumentID + '.' + pos.m_strExchangeID == A.symbol: hold_vol = pos.m_nVolume break # 5. 交易信号 # 金叉信号(5日上穿10日)且无持仓 -> 买入 if prev_fast < prev_slow and fast_ma > slow_ma and hold_vol == 0: msg = f"{A.symbol} 金叉买入 {A.volume}股 @ {close_data[-1]:.2f}" passorder(23, 1101, A.acct, A.symbol, 14, -1, A.volume, '双均线策略', 1, msg, C) print(msg) # 死叉信号(5日下穿10日)且有持仓 -> 卖出 elif prev_fast > prev_slow and fast_ma < slow_ma and hold_vol > 0: msg = f"{A.symbol} 死叉卖出 {hold_vol}股 @ {close_data[-1]:.2f}" passorder(24, 1101, A.acct, A.symbol, 14, -1, hold_vol, '双均线策略', 1, msg, C) print(msg) # 6. 打印账户状态 acc_info = get_trade_detail_data(A.acct, A.acct_type, 'account') if acc_info: print(f"可用资金:{acc_info[0].m_dAvailable:.2f}")
写好策略之后,点击“运行”。
如果没有显示报错信息就代表策略没问题了。
二、回测
1.点击“回测”按钮就可以看到回测结果了
2.设置回测时间。
3.查看交易明细
三、验证
说明:
下载历史数据。
这一步不是必需的,好像不需要。
四、几个比较坑的地方:
1.日志输出那里无法复制。
2.程序明明有bug,可是就是不报错,在日志那里啥都不显示。
好多次都是这样的情况。
我之前让AI写了代码,写的是:
def initialize(context):
结果程序也不提示我有问题,也没有报错,后来在另外的地方报错了。
才发现原来这里有问题。
正确的写法应该是:
def init(ContextInfo):
3.有个TACCOUT函数,软件自带的函数库中明明有,我在程序中用了,又提示这个函数没有defined。
4.盘中运行程序的时候,明明策略产生了信号,在程序的界面也有显示。
但是就是在成交记录中没有,资金也没有减少。
参考:
https://www3.guosen.com.cn/guosen/newxwfiles/20190902/1567416998056.pdf
https://kakawanyifan.com/20501