Home >  > 反复强化学习与股票交易

反复强化学习与股票交易

0

一、简介
本项目是对论文《Stock Trading with Recurrent Reinforcement Learning (RRL)》的代码重现。
论文URL:http://cs229.stanford.edu/proj2006/Molina-StockTradingWithRecurrentReinforcementLearning.pdf

二、结论
先看结论吧:

这种方法的主要困难在于某些股票事件没有显示结构。从上面的第二个示例中可以看出,强化学习者无法预测股价的急剧下跌,并且像人类一样脆弱。如果与预测这种急剧下降的机制结合使用,可能会更有效。该模型还可以在其他方面作出修改,比如将成交量包含进来,作为预测涨跌的特征。

另外,可以尝试使用固定交易成本以及降低交易频率。例如,可以创建一个模型,该模型从长时间的数据中学习,但只能定期做出决策。这将反映一个散户交易者以固定交易成本参与较小数量交易的情况。因为对于散户而言,以固定的交易成本在每个期间进行交易都太昂贵了,所以具有定期交易策略的模型对于此类用户更为可行。

三、小知识
1.Sharp夏普比率的计算方法

四、代码

# -*- coding: utf-8 -*-
import time
import pickle
import numpy as np
import pandas as pd
from datetime import datetime as dt
import matplotlib.pyplot as plt


def main():

    fname = "../data/USDJPY30.csv"
    init_t = 6000

    T = 1000
    M = 200
    mu = 10000
    sigma = 0.04
    rho = 1.0
    n_epoch = 10000

    # RRL agent with initial weight.
    ini_rrl = TradingRRL(T, M, init_t, mu, sigma, rho, n_epoch)
    ini_rrl.load_csv(fname)
    ini_rrl.set_t_p_r()
    ini_rrl.calc_dSdw()
    # RRL agent for training 
    rrl = TradingRRL(T, M, init_t,  mu, sigma, rho, n_epoch)
    rrl.all_t = ini_rrl.all_t
    rrl.all_p = ini_rrl.all_p
    rrl.set_t_p_r()
    rrl.fit()

    # Plot results.
    # Training for initial term T.
    plt.plot(range(len(rrl.epoch_S)),rrl.epoch_S)
    plt.title("Sharp's ratio optimization")
    plt.xlabel("Epoch times")
    plt.ylabel("Sharp's ratio")
    plt.grid(True)
    plt.savefig("sharp's ratio optimization.png", dpi=300)
    plt.close

    fig, ax = plt.subplots(nrows=3, figsize=(15, 10))
    t = np.linspace(1, rrl.T, rrl.T)[::-1]
    ax[0].plot(t, rrl.p[:rrl.T])
    ax[0].set_xlabel("time")
    ax[0].set_ylabel("USDJPY")
    ax[0].grid(True)

    ax[1].plot(t, ini_rrl.F[:rrl.T], color="blue", label="With initial weights")
    ax[1].plot(t, rrl.F[:rrl.T], color="red", label="With optimized weights")
    ax[1].set_xlabel("time")
    ax[1].set_ylabel("F")
    ax[1].legend(loc="upper left")
    ax[1].grid(True)

    ax[2].plot(t, ini_rrl.sumR, color="blue", label="With initial weights")
    ax[2].plot(t, rrl.sumR, color="red", label="With optimized weights")
    ax[2].set_xlabel("time")
    ax[2].set_ylabel("Sum of reward[yen]")
    ax[2].legend(loc="upper left")
    ax[2].grid(True)
    plt.savefig("rrl_train.png", dpi=300)
    fig.clear()


    # Prediction for next term T with optimized weight.
    # RRL agent with initial weight.
    ini_rrl_f = TradingRRL(T, M, init_t-T, mu, sigma, rho, n_epoch)
    ini_rrl_f.all_t = ini_rrl.all_t
    ini_rrl_f.all_p = ini_rrl.all_p
    ini_rrl_f.set_t_p_r()
    ini_rrl_f.calc_dSdw()
    # RRL agent with optimized weight.
    rrl_f = TradingRRL(T, M, init_t-T, mu, sigma, rho, n_epoch)
    rrl_f.all_t = ini_rrl.all_t
    rrl_f.all_p = ini_rrl.all_p
    rrl_f.set_t_p_r()
    rrl_f.w = rrl.w
    rrl_f.calc_dSdw()

    fig, ax = plt.subplots(nrows=3, figsize=(15, 10))
    t_f = np.linspace(rrl.T+1, rrl.T+rrl.T, rrl.T)[::-1]
    ax[0].plot(t_f, rrl_f.p[:rrl_f.T])
    ax[0].set_xlabel("time")
    ax[0].set_ylabel("USDJPY")
    ax[0].grid(True)

    ax[1].plot(t_f, ini_rrl_f.F[:rrl_f.T], color="blue", label="With initial weights")
    ax[1].plot(t_f, rrl_f.F[:rrl_f.T], color="red", label="With optimized weights")
    ax[1].set_xlabel("time")
    ax[1].set_ylabel("F")
    ax[1].legend(loc="lower right")
    ax[1].grid(True)

    ax[2].plot(t_f, ini_rrl_f.sumR, color="blue", label="With initial weights")
    ax[2].plot(t_f, rrl_f.sumR, color="red", label="With optimized weights")
    ax[2].set_xlabel("time")
    ax[2].set_ylabel("Sum of reward[yen]")
    ax[2].legend(loc="lower right")
    ax[2].grid(True)
    plt.savefig("rrl_prediction.png", dpi=300)
    fig.clear()

class TradingRRL(object):
    def __init__(self, T=1000, M=200, init_t=10000, mu=10000, sigma=0.04, rho=1.0, n_epoch=10000):
        self.T = T
        self.M = M
        self.init_t = init_t
        self.mu = mu
        self.sigma = sigma
        self.rho = rho
        self.all_t = None
        self.all_p = None
        self.t = None
        self.p = None
        self.r = None
        self.x = np.zeros([T, M+2])
        self.F = np.zeros(T+1)
        self.R = np.zeros(T)
        self.w = np.ones(M+2)
        self.w_opt = np.ones(M+2)
        self.epoch_S = np.empty(0)
        self.n_epoch = n_epoch
        self.progress_period = 100
        self.q_threshold = 0.7

    def load_csv(self, fname):
        tmp = pd.read_csv(fname, header=None)
        tmp_tstr = tmp[0] +" " + tmp[1]
        tmp_t = [dt.strptime(tmp_tstr[i], '%Y.%m.%d %H:%M') for i in range(len(tmp_tstr))]
        tmp_p = list(tmp[5])
        self.all_t = np.array(tmp_t[::-1])
        self.all_p = np.array(tmp_p[::-1])

    def quant(self, f):
        fc = f.copy()
        fc[np.where(np.abs(fc) < self.q_threshold)] = 0
        return np.sign(fc)

    def set_t_p_r(self):
        self.t = self.all_t[self.init_t:self.init_t+self.T+self.M+1]
        self.p = self.all_p[self.init_t:self.init_t+self.T+self.M+1]
        self.r = -np.diff(self.p)

    def set_x_F(self):
        for i in range(self.T-1, -1 ,-1):
            self.x[i] = np.zeros(self.M+2)
            self.x[i][0] = 1.0
            self.x[i][self.M+2-1] = self.F[i+1]
            for j in range(1, self.M+2-1, 1):
                self.x[i][j] = self.r[i+j-1]
            self.F[i] = np.tanh(np.dot(self.w, self.x[i]))

    def calc_R(self):
        self.R = self.mu * (self.F[1:] * self.r[:self.T] - self.sigma * np.abs(-np.diff(self.F)))

    def calc_sumR(self):
        self.sumR  = np.cumsum(self.R[::-1])[::-1]
        self.sumR2  = np.cumsum((self.R**2)[::-1])[::-1]

    def calc_dSdw(self):
        self.set_x_F()
        self.calc_R()
        self.calc_sumR()
        self.A      =  self.sumR[0] / self.T
        self.B      =  self.sumR2[0] / self.T
        self.S      =  self.A / np.sqrt(self.B - self.A**2)
        self.dSdA   =  self.S * (1 + self.S**2) / self.A
        self.dSdB   = -self.S**3 / 2 / self.A**2
        self.dAdR   =  1.0 / self.T
        self.dBdR   =  2.0 / self.T * self.R
        self.dRdF   = -self.mu * self.sigma * np.sign(-np.diff(self.F))
        self.dRdFp  =  self.mu * self.r[:self.T] + self.mu * self.sigma * np.sign(-np.diff(self.F))
        self.dFdw   = np.zeros(self.M+2)
        self.dFpdw  = np.zeros(self.M+2)
        self.dSdw   = np.zeros(self.M+2)
        for i in range(self.T-1, -1 ,-1):
            if i != self.T-1:
                self.dFpdw = self.dFdw.copy()
            self.dFdw  = (1 - self.F[i]**2) * (self.x[i] + self.w[self.M+2-1] * self.dFpdw)
            self.dSdw += (self.dSdA * self.dAdR + self.dSdB * self.dBdR[i]) * (self.dRdF[i] * self.dFdw + self.dRdFp[i] * self.dFpdw)

    def update_w(self):
        self.w += self.rho * self.dSdw

    def fit(self):
        
        pre_epoch_times = len(self.epoch_S)

        self.calc_dSdw()
        print("Epoch loop start. Initial sharp's ratio is " + str(self.S) + ".")
        self.S_opt = self.S
        
        tic = time.clock()
        for e_index in range(self.n_epoch):
            self.calc_dSdw()
            if self.S > self.S_opt:
                self.S_opt = self.S
                self.w_opt = self.w.copy()
            self.epoch_S = np.append(self.epoch_S, self.S)
            self.update_w()
            if e_index % self.progress_period  == self.progress_period-1:
                toc = time.clock()
                print("Epoch: " + str(e_index + pre_epoch_times + 1) + "/" + str(self.n_epoch + pre_epoch_times) +". Shape's ratio: " + str(self.S) + ". Elapsed time: " + str(toc-tic) + " sec.")
        toc = time.clock()
        print("Epoch: " + str(e_index + pre_epoch_times + 1) + "/" + str(self.n_epoch + pre_epoch_times) +". Shape's ratio: " + str(self.S) + ". Elapsed time: " + str(toc-tic) + " sec.")
        self.w = self.w_opt.copy()
        self.calc_dSdw()
        print("Epoch loop end. Optimized sharp's ratio is " + str(self.S_opt) + ".")

    def save_weight(self):
        pd.DataFrame(self.w).to_csv("w.csv", header=False, index=False)
        pd.DataFrame(self.epoch_S).to_csv("epoch_S.csv", header=False, index=False)
        
    def load_weight(self):
        tmp = pd.read_csv("w.csv", header=None)
        self.w = tmp.T.values[0]


def plot_hist(n_tick, R):
    rnge = max(R) - min(R)
    tick = rnge / n_tick
    tick_min = [min(R) - tick * 0.5 + i * tick for i in range(n_tick)]
    tick_max = [min(R) + tick * 0.5 + i * tick for i in range(n_tick)]
    tick_center = [min(R) + i * tick for i in range(n_tick)]
    tick_val = [0.0] * n_tick
    for i in range(n_tick ):
        tick_val[i] = len(set(np.where(tick_min[i] < np.array(R))[0].tolist()).intersection(np.where(np.array(R) <= tick_max[i])[0]))
    plt.bar(tick_center, tick_val, width=tick)
    plt.grid()
    plt.show()


if __name__ == "__main__":
    main()

运行截图:

可参考:
https://zhuanlan.zhihu.com/p/36632686

代码地址:https://github.com/darden1/tradingrrl

本文暂无标签

发表评论

*

*