VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
一、源码
"""
注册EVENT_TICK、EVENT_CONTRACT,当有EVENT_TICK的时候,调用process_contract_event函数(其实就是record_tick函数),
将task put到queue
通过run函数,从self.queue获得task(Tick、Bar),调用database_manager的方法储存数据
"""
from threading import Thread
from queue import Queue, Empty
from copy import copy
from vnpy.event import Event, EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.trader.object import (
SubscribeRequest,
TickData,
BarData,
ContractData
)
from vnpy.trader.event import EVENT_TICK, EVENT_CONTRACT
from vnpy.trader.utility import load_json, save_json, BarGenerator
from vnpy.trader.database import database_manager
APP_NAME = "DataRecorder"
EVENT_RECORDER_LOG = "eRecorderLog"
EVENT_RECORDER_UPDATE = "eRecorderUpdate"
class RecorderEngine(BaseEngine):
""""""
setting_filename = "data_recorder_setting.json"
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
""""""
super().__init__(main_engine, event_engine, APP_NAME)
self.queue = Queue()
self.thread = Thread(target=self.run)
self.active = False
self.tick_recordings = {}
self.bar_recordings = {}
self.bar_generators = {}
self.load_setting()
self.register_event()
self.start()
self.put_event()
def load_setting(self):
""""""
setting = load_json(self.setting_filename)
self.tick_recordings = setting.get("tick", {})
self.bar_recordings = setting.get("bar", {})
def save_setting(self):
""""""
setting = {
"tick": self.tick_recordings,
"bar": self.bar_recordings
}
save_json(self.setting_filename, setting)
def run(self):
"""
调用database_manager的方法储存数据
"""
while self.active:
try:
task = self.queue.get(timeout=1)
task_type, data = task
if task_type == "tick":
database_manager.save_tick_data([data])
elif task_type == "bar":
database_manager.save_bar_data([data])
except Empty:
continue
def close(self):
""""""
self.active = False
if self.thread.isAlive():
self.thread.join()
def start(self):
""""""
self.active = True
self.thread.start()
def add_bar_recording(self, vt_symbol: str):
"""
将symbol数据写入bar_recordings["symbol"]这个字典,订阅合约,调用save_setting进行保存(里面有tick_recordings,bar_recordings字典),
然后调用put_event(实质为调用Eventengine的self._queue.put(event)),将事件放入队列。
"""
if vt_symbol in self.bar_recordings:
self.write_log(f"已在K线记录列表中:{vt_symbol}")
return
contract = self.main_engine.get_contract(vt_symbol)
if not contract:
self.write_log(f"找不到合约:{vt_symbol}")
return
self.bar_recordings[vt_symbol] = {
"symbol": contract.symbol,
"exchange": contract.exchange.value,
"gateway_name": contract.gateway_name
}
self.subscribe(contract)
self.save_setting()
self.put_event()
self.write_log(f"添加K线记录成功:{vt_symbol}")
def add_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol in self.tick_recordings:
self.write_log(f"已在Tick记录列表中:{vt_symbol}")
return
contract = self.main_engine.get_contract(vt_symbol)
if not contract:
self.write_log(f"找不到合约:{vt_symbol}")
return
self.tick_recordings[vt_symbol] = {
"symbol": contract.symbol,
"exchange": contract.exchange.value,
"gateway_name": contract.gateway_name
}
self.subscribe(contract)
self.save_setting()
self.put_event()
self.write_log(f"添加Tick记录成功:{vt_symbol}")
def remove_bar_recording(self, vt_symbol: str):
""""""
if vt_symbol not in self.bar_recordings:
self.write_log(f"不在K线记录列表中:{vt_symbol}")
return
self.bar_recordings.pop(vt_symbol)
self.save_setting()
#调用下面的put_event方法,将EVENT_RECORDER_UPDATE放入队列
self.put_event()
self.write_log(f"移除K线记录成功:{vt_symbol}")
def remove_tick_recording(self, vt_symbol: str):
""""""
if vt_symbol not in self.tick_recordings:
self.write_log(f"不在Tick记录列表中:{vt_symbol}")
return
self.tick_recordings.pop(vt_symbol)
self.save_setting()
self.put_event()
self.write_log(f"移除Tick记录成功:{vt_symbol}")
def register_event(self):
""""""
self.event_engine.register(EVENT_TICK, self.process_tick_event)
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)
def process_tick_event(self, event: Event):
"""
调用下面的record_tick方法(其实就是将task put到queue)
"""
tick = event.data
if tick.vt_symbol in self.tick_recordings:
self.record_tick(tick)
if tick.vt_symbol in self.bar_recordings:
bg = self.get_bar_generator(tick.vt_symbol)
bg.update_tick(tick)
def process_contract_event(self, event: Event):
""""""
contract = event.data
vt_symbol = contract.vt_symbol
if (vt_symbol in self.tick_recordings or vt_symbol in self.bar_recordings):
self.subscribe(contract)
def write_log(self, msg: str):
""""""
event = Event(
EVENT_RECORDER_LOG,
msg
)
self.event_engine.put(event)
def put_event(self):
"""
调用event_engine的put方法
"""
tick_symbols = list(self.tick_recordings.keys())
tick_symbols.sort()
bar_symbols = list(self.bar_recordings.keys())
bar_symbols.sort()
data = {
"tick": tick_symbols,
"bar": bar_symbols
}
event = Event(
EVENT_RECORDER_UPDATE,
data
)
self.event_engine.put(event)
def record_tick(self, tick: TickData):
"""
将task put到queue, 不过这里为什么要用self.queue,不是调用put_event?
"""
task = ("tick", copy(tick))
self.queue.put(task)
def record_bar(self, bar: BarData):
""""""
task = ("bar", copy(bar))
self.queue.put(task)
def get_bar_generator(self, vt_symbol: str):
"""
从bar_generators这个字典通过symbol取出bg的实例
"""
bg = self.bar_generators.get(vt_symbol, None)
if not bg:
bg = BarGenerator(self.record_bar)
self.bar_generators[vt_symbol] = bg
return bg
def subscribe(self, contract: ContractData):
""""""
req = SubscribeRequest(
symbol=contract.symbol,
exchange=contract.exchange
)
self.main_engine.subscribe(req, contract.gateway_name)
二、database_manager
在DataRecorder模块有下面的代码
from vnpy.trader.database import database_manager
if task_type == "tick":
database_manager.save_tick_data([data])
elif task_type == "bar":
database_manager.save_bar_data([data])
可是在vnpy.trader.database下面没有找到这个database_manager,
只在init.py找到下面这句。
from vnpy.trader.database.database import BaseDatabaseManager database_manager: "BaseDatabaseManager" = init(settings=settings)
原来它调用的是initialize中的init方法,返回的是BaseDatabaseManager对象
它的save_tick_data方法是在C:\vnstudio\Lib\site-packages\vnpy\~rader\database\database.py文件中BaseDatabaseManager这个类定义的。
然后在上面的import中,将这个类import了进来。
@abstractmethod
def save_tick_data(
self,
datas: Sequence["TickData"],
):
pass
参考:https://www.vnpy.com/forum/topic/805-databaseyuan-ma-yue-du-bi-ji-+pei-zhi-jiao-cheng