蜗牛博客VNPY源码学习系列文章:
VNPY源码(一)CTP封装及K线合成
VNPY源码(二)API获取行情和script_trader
VNPY源码(三)主引擎MainEngine
VNPY源码(四)DataRecorder
VNPY源码(五)CtaEngine实盘引擎
VNPY源码(六)BacktesterEngine回测引擎
VNPY源码(七)限价单与停止单
VNPY源码(八)VNPY的数据流
一、init_cli_trading
init_cli_trading位于C:\vnstudio\Lib\site-packages\vnpy\app\script_trader下面的cli.py文件中,这个文件夹下面还有其他的文件,但是通过 from vnpy.app.script_trader import init_cli_trading就可以导入进来了。
在C:\vnstudio\Lib\site-packages\vnpy\gateway\ctptest新建test.py文件。
from vnpy.gateway.ctp.ctp_gateway import CtpGateway
from vnpy.app.script_trader import init_cli_trading
setting = {
"用户名": "1xx11",
"密码": "axx4",
"经纪商代码": "9999",
"交易服务器": "180.168.146.187:10130",
"行情服务器": "180.168.146.187:10131;",
"产品名称": "simnow_client_test",
"授权编码": "0000000000000000",
"产品信息": "11111"
}
engine = init_cli_trading([CtpGateway])
engine.connect_gateway(setting,"CTP")
成果展示:

然后在jupyter中运行,
# 查询所有合约 engine.get_all_contracts(use_df=True)

其他命令:
# 查询资金
engine.get_all_accounts(use_df=True)
# 查询持仓
engine.get_all_positions(use_df=True)
# 查询活动委托
engine.get_all_active_orders(use_df=True)
# 订阅行情
engine.subscribe(["zn1910.SHFE"])
# 查询行情
engine.get_tick("zn1910.SHFE", use_df=True)
# 委托下单
vt_orderid = engine.buy("zn1910.SHFE", 32, 100)
print(vt_orderid)
# 查询特定委托
engine.get_order(vt_orderid)
# 委托撤单
engine.cancel_order(vt_orderid)
二、利用vnpy获取行情
from pathlib import Path
from threading import Thread
from time import sleep
from vnpy.api.ctp.vnctpmd import MdApi
EVENT_LOG = "log"
EVENT_CONTRACT = "contract"
EVENT_TICK = "tick"
EVENT_BAR = "bar"
EVENT_ERROR = "error"
EVENT_POSITION = "position"
EVENT_TRADE = "trade"
EVENT_ORDER = "order"
EVENT_ACCOUNT = "account"
EVENT_SHARED = "shared"
EVENT_LAST = "last"
EVENT_INIT_FINISHED = "init"
def _get_trader_dir(temp_name: str):
"""
Get path where trader is running in.
"""
cwd = Path.cwd()
temp_path = cwd.joinpath(temp_name)
# If .vntrader folder exists in current working directory,
# then use it as trader running path.
if temp_path.exists():
return cwd, temp_path
# Otherwise use home path of system.
home_path = Path.home()
temp_path = home_path.joinpath(temp_name)
if not temp_path.exists():
temp_path.mkdir()
return home_path, temp_path
def get_folder_path(folder_name: str):
"""
Get path for temp folder with folder name.
"""
TRADER_DIR, TEMP_DIR = _get_trader_dir(".ctpbee")
folder_path = TEMP_DIR.joinpath(folder_name)
if not folder_path.exists():
folder_path.mkdir()
return folder_path
class BeeMdApi(MdApi):
""""""
def __init__(self):
"""Constructor"""
super(BeeMdApi, self).__init__()
self.gateway_name = "ctp"
self.reqid = 0
self.connect_status = False
self.login_status = False
self.subscribed = set()
self.userid = ""
self.password = ""
self.brokerid = 0
@property
def md_status(self):
return self.login_status
def on_event(self, type, data):
print(f"事件类型{type}, data={data}")
def onFrontConnected(self):
"""
Callback when front server is connected.
"""
self.connect_status = True
self.on_event(type=EVENT_LOG, data="行情服务器连接成功")
self.login()
def onFrontDisconnected(self, reason: int):
"""
Callback when front server is disconnected.
"""
self.connect_status = False
self.login_status = False
self.on_event(type=EVENT_LOG, data=f"行情连接断开,原因{reason}")
def onRspUserLogin(self, data: dict, error: dict, reqid: int, last: bool):
"""
Callback when user is logged in.
"""
if not error["ErrorID"]:
self.login_status = True
self.on_event(type=EVENT_LOG, data="行情服务器登录成功")
for symbol in self.subscribed:
self.subscribeMarketData(symbol)
else:
error["detail"] = "行情登录失败"
self.on_event(type=EVENT_ERROR, data=error)
def onRspError(self, error: dict, reqid: int, last: bool):
"""
Callback when error occured.
"""
error['detail'] = "行情接口报错"
self.on_event(type=EVENT_ERROR, data=error)
def onRspSubMarketData(self, data: dict, error: dict, reqid: int, last: bool):
""""""
if not error or not error["ErrorID"]:
return
error['detail'] = "行情订阅失败"
self.on_event(type=EVENT_ERROR, data=error)
def onRtnDepthMarketData(self, data: dict):
"""
Callback of tick data update.
"""
print(data)
def connect(self, info: dict):
"""
Start connection to server.
"""
self.userid = info['userid']
self.password = info['password']
self.brokerid = info['brokerid']
# If not connected, then start connection first.
if not self.connect_status:
path = get_folder_path(self.gateway_name.lower() + f"/{self.userid}")
self.createFtdcMdApi(str(path) + "\\Md")
self.registerFront(info['md_address'])
self.init()
# If already connected, then login immediately.
elif not self.login_status:
self.login()
def login(self):
"""
Login onto server.
"""
req = {
"UserID": self.userid,
"Password": self.password,
"BrokerID": self.brokerid
}
self.reqid += 1
self.reqUserLogin(req, self.reqid)
def subscribe(self, symbol):
"""
Subscribe to tick data update.
"""
result = None
if self.login_status:
result = self.subscribeMarketData(symbol)
self.subscribed.add(symbol)
return result
def close(self):
"""
Close the connection.
"""
if self.connect_status:
self.exit()
def run_login():
gateway = BeeMdApi()
info = {
"CONNECT_INFO": {
"userid": "",
"password": "",
"brokerid": "9999",
"md_address": "tcp://180.168.146.187:10131",
"td_address": "tcp://180.168.146.187:10130",
# "md_address": "tcp://218.202.237.33:10112",
# "td_address": "tcp://218.202.237.33:10102",
"product_info": "",
"appid": "simnow_client_test",
"auth_code": "0000000000000000",
},
"INTERFACE": "ctp",
"TD_FUNC": True,
"MD_FUNC": True,
}
gateway.connect(
info["CONNECT_INFO"]
)
gateway.subscribe("ag1912")
while True:
sleep(1)
print("-"*30)
run_login()
成果展示:

可参考:
https://github.com/vnpy/vnpy/blob/master/examples/notebook_trading/demo_notebook.ipynb