蜗牛博客VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
负责所有引擎的实例化。

一、源码
位于C:\vnstudio\Lib\site-packages\vnpy\trader\engine.py下面。这个文件下面有:MainEngine、BaseEngine(ABC)、LogEngine(BaseEngine)、OmsEngine(BaseEngine)、EmailEngine(BaseEngine)共5个Engine。我们先学习MainEngine。
所有的gateway都放在self.gateways字典里面,对应vnpy UI界面的连接菜单的内容。
Subscribe逻辑:
1.add_gateway生成一个self.gateways字典
2.调用get_gateway函数取出CtpGateway实例
3.调用CtpGateway实例的subscribe函数
4.底层API通过sambol,exchange的形式subscribe
class MainEngine:
"""
Acts as the core of VN Trader.
"""
def __init__(self, event_engine: EventEngine = None):
""""""
if event_engine:
self.event_engine = event_engine
else:
self.event_engine = EventEngine()
self.event_engine.start()
self.gateways = {}
self.engines = {}
self.apps = {}
self.exchanges = []
os.chdir(TRADER_DIR) # Change working directory
self.init_engines() # Initialize function engines
def add_engine(self, engine_class: Any):
"""
Add function engine.
"""
engine = engine_class(self, self.event_engine)
self.engines[engine.engine_name] = engine
return engine
def add_gateway(self, gateway_class: Type[BaseGateway]):
"""
Add gateway. 这个函数的作用是传入CTPGateway,将传入的CTPGateway实例化(将event_engine作为参数,将存入self.gateways这个字典中,最后返回CTPGateway实例)
"""
#这里得到一个gateway_class(是CTPGateway之类,不是BaseGateway)的实例,实例的参数是init MainEngine的时候传入的event_engine
gateway = gateway_class(self.event_engine)
#调用上面的实例的gateway_name属性,并作为字典的键
#这里得到了gateways字典,在下面的get_gateway函数要用,取出gateway。
self.gateways[gateway.gateway_name] = gateway
# Add gateway supported exchanges into engine
#取出gateway的exchanges类属性(列表,非实例属性),
for exchange in gateway.exchanges:
#如果类的exchanges,不在当前实例的exchanges里面,则添加进来。
if exchange not in self.exchanges:
self.exchanges.append(exchange)
#返回CTPGateway
return gateway
def add_app(self, app_class: Type[BaseApp]):
"""
Add app.
"""
app = app_class()
self.apps[app.app_name] = app
engine = self.add_engine(app.engine_class)
return engine
def init_engines(self):
"""
Init all engines.
"""
self.add_engine(LogEngine)
self.add_engine(OmsEngine)
self.add_engine(EmailEngine)
def write_log(self, msg: str, source: str = ""):
"""
Put log event with specific message.
"""
#LogData继承自BaseData,BaseData有gateway_name,所以这里可以传gateway_name,得到LogData对象。
log = LogData(msg=msg, gateway_name=source)
event = Event(EVENT_LOG, log)
self.event_engine.put(event)
def get_gateway(self, gateway_name: str):
"""
Return gateway object by name.作用是传入CtpGateway,从字典中取出CtpGateway实例,再返回这个实例
"""
gateway = self.gateways.get(gateway_name, None)
if not gateway:
self.write_log(f"找不到底层接口:{gateway_name}")
#返回CtpGateway的实例
return gateway
def get_engine(self, engine_name: str):
"""
Return engine object by name.
"""
engine = self.engines.get(engine_name, None)
if not engine:
self.write_log(f"找不到引擎:{engine_name}")
return engine
def get_default_setting(self, gateway_name: str):
"""
Get default setting dict of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.get_default_setting()
return None
def get_all_gateway_names(self):
"""
Get all names of gatewasy added in main engine.
"""
return list(self.gateways.keys())
def get_all_apps(self):
"""
Get all app objects.
"""
return list(self.apps.values())
def get_all_exchanges(self):
"""
Get all exchanges.
"""
return self.exchanges
def connect(self, setting: dict, gateway_name: str):
"""
Start connection of a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.connect(setting)
def subscribe(self, req: SubscribeRequest, gateway_name: str):
"""
Subscribe tick data update of a specific gateway.根据传入的CtpGateway,调用get_gateway函数取出CtpGateway实例,然后订阅行情。
"""
#得到CTPGateway实例
gateway = self.get_gateway(gateway_name)
if gateway:
#调用CTPGateway实例的subscribe方法,而self.md_api.subscribe(req)的方法就是self.md_api.subscribe(req),即底层API,而传入的参数是SubscribeRequest(一个类),应该是{self.symbol}.{self.exchange.value}这样的形式。
gateway.subscribe(req)
def send_order(self, req: OrderRequest, gateway_name: str):
"""
Send new order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.send_order(req)
else:
return ""
def cancel_order(self, req: CancelRequest, gateway_name: str):
"""
Send cancel order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.cancel_order(req)
def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.send_orders(reqs)
else:
return ["" for req in reqs]
def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str):
"""
"""
gateway = self.get_gateway(gateway_name)
if gateway:
gateway.cancel_orders(reqs)
def query_history(self, req: HistoryRequest, gateway_name: str):
"""
Send cancel order request to a specific gateway.
"""
gateway = self.get_gateway(gateway_name)
if gateway:
return gateway.query_history(req)
else:
return None
def close(self):
"""
Make sure every gateway and app is closed properly before
programme exit.
"""
# Stop event engine first to prevent new timer event.
self.event_engine.stop()
for engine in self.engines.values():
engine.close()
for gateway in self.gateways.values():
gateway.close()
二、利用Maniengine
1.订阅行情
其实关键的代码只有4行,但是有一个地方需要注意,就是SETTINGS的语句必须要写,不然在cmd窗口打印不出信息。
from vnpy.event import EventEngine
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.gateway.ctp.ctp_gateway import CtpGateway
from vnpy.trader.setting import SETTINGS
from logging import INFO
SETTINGS["log.active"] = True
SETTINGS["log.level"] = INFO
SETTINGS["log.console"] = True
setting = {
"用户名": "",
"密码": "",
"经纪商代码": "9999",
"交易服务器": "180.168.146.187:10130",
"行情服务器": "180.168.146.187:10131;",
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"产品信息": "11111"
}
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.connect(setting, "CTP")
成果展示:

2.打印Tick
from vnpy.event import EventEngine,Event
from vnpy.trader.engine import BaseEngine, MainEngine
from vnpy.gateway.ctp.ctp_gateway import CtpGateway
from vnpy.trader.event import EVENT_LOG,EVENT_TICK
from vnpy.trader.object import SubscribeRequest,ContractData
from vnpy.trader.constant import Exchange
setting = {
"用户名": "",
"密码": "",
"经纪商代码": "9999",
"交易服务器": "180.168.146.187:10130",
"行情服务器": "180.168.146.187:10131;",
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"产品信息": "11111"
}
def process_tick_event(event: Event):
""""""
tick = event.data
print(tick)
print("--"*40)
contract = ContractData(symbol="zn1910",
exchange=Exchange("SHFE"),
gateway_name="CTP",
name="SHFE",
product="SHFE",
size=100,
pricetick=2.0,
)
event_engine = EventEngine()
event_engine.register(EVENT_TICK, process_tick_event) #注册事件
main_engine = MainEngine(event_engine)
main_engine.add_gateway(CtpGateway)
main_engine.connect(setting, "CTP")
main_engine.subscribe(contract,"CTP") #订阅行情
成果展示

小知识:
其他的用法:
def process_log_event(event: Event):
""""""
log = event.data
print(f"{log.time}\t{log.msg}")
event_engine.register(EVENT_LOG, process_log_event)
self.event_engine.register(EVENT_ORDER, self.process_order_event)
self.event_engine.register(EVENT_TIMER, self.process_timer_event)
self.event_engine.register(EVENT_TRADE, self.process_trade_event)
三、关于SubscribeRequest
小知识:
__post_init__就是初始化的意思吧,但是必须加上@dataclass才起作用。下面的例子相当于合约的合成。
from dataclasses import dataclass
from vnpy.trader.object import SubscribeRequest,ContractData
from vnpy.trader.constant import Exchange
contract = ContractData(symbol="zn1910",
exchange=Exchange("SHFE"),
gateway_name="CTP",
name="SHFE",
product="SHFE",
size=100,
pricetick=2.0,
)
@dataclass
class SubscribeRequest:
"""
Request sending to specific gateway for subscribing tick data update.
"""
symbol: str
exchange: Exchange
def __post_init__(self):
""""""
self.vt_symbol = f"{self.symbol}.{self.exchange.value}"
a = SubscribeRequest(symbol="600031",exchange=contract.exchange)
print(a.symbol)
print(a.vt_symbol)
执行结果:
600031
600031.SHFE
四、数据流程
CtpMdApi和CtpTdApi中的Md和Td分别是什么的缩写啊
MarketData和Trade
https://blog.csdn.net/u011331731/article/details/88946916