蜗牛博客VNPY学习记录:
VN.PY 2.0学习记录一(如何回测)
VN.PY 2.0学习记录二(策略开发)
Vn.py学习记录三(米筐教程)
VN.PY 2.0学习记录四(多线程、多进程)
Vn.py学习记录五–交易时间段及Widgets
Vn.py学习记录六(无界面模拟盘)
Vn.py学习记录七(V2.0.5版本)
Vnpy学习记录八(R-Breaker及pickle)
Vn.py学习记录九(事件驱动引擎)
VN.PY学习记录十(源码概述)
VNPY学习记录11(微信+Vscode)
VNPY学习记录12(父子进程、回调函数)
VNPY学习记录13(部署到云服务器,实现自动交易)
这是2019年VNPY量化交易实战教程附源码讲义 30课的学习记录。
一、导入jason文件为字典
jason文件
{ "rqUsername": "", "rqPassword": "", "symbols": ["I99", "IF99", "RB99", "TA99"] }
而且可以通过其他文件来导入,见四的代码。
config = open('config.json') setting = json.load(config)
备注:如果print(config)则显示“<_io.TextIOWrapper name='config.json' mode='r' encoding='cp936'>”
取值:
USERNAME = setting['rqUsername'] PASSWORD = setting['rqPassword']
二、MongoDB replace_one用法
使用replace_one()替代文档,如果未找到匹配的文档,就插入它
olddoc={...} #定义被替换的文档
newdoc={...} #定义要替换的文档
collection.replace_one(olddoc,newdoc,upsert=Ture)
for ix, row in df.iterrows(): bar = generateVtBar(row, symbol) d = bar.__dict__ flt = {'datetime': bar.datetime} cl.replace_one(flt, d, True)
关于df.iterrows(),请看这里
三、判断周末的方法(见四的代码)
四、父子进程
好处是子进程出错,不会影响父进程。
# encoding: UTF-8 import multiprocessing from datetime import datetime, time from time import sleep from dataService import downloadDailyBarBySymbol, setting #---------------------------------------------------------------------- def runChildProcess(): """子进程运行函数""" print "-" * 60 print u"开始更新日K线数据" for symbol in setting["symbols"]: downloadDailyBarBySymbol(symbol) print u"日K线数据更新完成" print "-" * 60 #---------------------------------------------------------------------- def runParentProcess(): """守护进程运行函数""" updateDate = None # 已更新数据的日期 while True: # 每轮检查等待5秒,避免跑满CPU(浪费算力) sleep(5) currentTime = datetime.now().time() print "#" * 60 print u"当前时间为:", currentTime # 过滤交易日 today = datetime.now().date() weekday = datetime.now().weekday() if weekday in [5, 6]: # 从0开始,周六为5,周日为6 continue # 每日5点后开始下载当日数据,通常3:15收盘后(国债)数据提供商需要一定时间清洗数据) if currentTime <= time(17, 0): continue # 每日只需要更新一次数据 if updateDate == today: continue # 启动子进程来更新数据 p = multiprocessing.Process(target=runChildProcess) p.start() print u"启动子进程" p.join() print u"子进程已关闭" # 记录当日数据已经更新 updateDate = today if __name__ == "__main__": runParentProcess()
五、回调函数
最典型的应用是爬虫,试想一下,你要采集一个网站,你是要将这个网站的所有url都全部获取到,再爬取这个url的数据,还是得到一条url,就爬取这个url的数据?如果是后者,那就要使用回调函数了。
from multiprocessing import Pool import os import requests def get_page(url): print('<%s> is getting [%s]'%(os.getpid(),url)) res = requests.get(url).text print('<%s> is done [%s]'%(os.getpid(),url)) return {'url': url, 'res': res} def parse_page(res): print('<%s> parse [%s]'%(os.getpid(),res['url'])) with open('ab.txt', 'a') as f: f.write('%s - %s\n' % (res['url'], len(res['res']))) f.write("==========================="+'\n') if __name__ == '__main__': urls = [ 'https://www.baidu.com', 'http://www.sohu.com', 'https://www.qq.com', 'https://www.163.com/', 'http://www.sina.com.cn/', ] p = Pool(2) for url in urls: #使用回调函数,当get_page下载完后,主线程调用parse_page自动处理get_page下载的结果,节省了parse_page的时间,该场景用于一个函数为耗时操作并且产生数据,另一个函数是非耗时操作,这样就节省了非耗时操作函数的时间 p.apply_async(get_page, args=(url,), callback=parse_page) p.close() p.join() print('主进程结束')