from __future__ import (absolute_import, division, print_function,
unicode_literals)
import zmq
import collections
from datetime import datetime
import threading
from mql5_zmq_backtrader.adapter import PositionAdapter, OrderAdapter, BalanceAdapter
import backtrader as bt
from backtrader.metabase import MetaParams
from backtrader.utils.py3 import queue, with_metaclass
import sys
[docs]class MTraderError(Exception):
def __init__(self, *args, **kwargs):
default = 'Meta Trader 5 ERROR'
if not (args or kwargs):
args = (default)
super(MTraderError, self).__init__(*args, **kwargs)
[docs]class ServerConfigError(MTraderError):
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
[docs]class ServerDataError(MTraderError):
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
[docs]class TimeFrameError(MTraderError):
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
[docs]class StreamError(MTraderError):
def __init__(self, *args, **kwargs):
super(self.__class__, self).__init__(*args, **kwargs)
[docs]class MTraderAPI:
"""
This class implements Python side for MQL5 JSON API
See https://github.com/khramkov/MQL5-JSON-API for docs
"""
# TODO: unify error handling
def __init__(self, host=None):
self.HOST = host or 'localhost'
self.SYS_PORT = 15555 # REP/REQ port
self.DATA_PORT = 15556 # PUSH/PULL port
self.LIVE_PORT = 15557 # PUSH/PULL port
self.EVENTS_PORT = 15558 # PUSH/PULL port
# ZeroMQ timeout in miliseconds
self.SYS_TIMEOUT = 1000
self.DATA_TIMEOUT = 10000
self.REQUEST_RETRIES = 3 # Lazy Pirate implementation
self.sequence = 0 # Lazy Pirate request sequence
# initialise ZMQ context
self.context = zmq.Context()
# connect to server sockets
try:
self.sys_socket = self.context.socket(zmq.REQ)
# set port timeout
self.sys_socket.RCVTIMEO = self.SYS_TIMEOUT
self.sys_socket.connect(
'tcp://{}:{}'.format(self.HOST, self.SYS_PORT))
# Lazy Pirate implementation
self.poll = zmq.Poller()
self.poll.register(self.sys_socket, zmq.POLLIN)
self.data_socket = self.context.socket(zmq.PULL)
# set port timeout
self.data_socket.RCVTIMEO = self.DATA_TIMEOUT
self.data_socket.connect(
'tcp://{}:{}'.format(self.HOST, self.DATA_PORT))
except zmq.ZMQError:
raise zmq.ZMQBindError("E: Binding ports ERROR")
def _send_request(self, data: dict) -> None:
"""Send request to server via ZeroMQ System socket
Lazy Pirate implementation.
"""
# ram Caller's name
print("I: Caller 2 ", sys._getframe(2).f_code.co_name)
try:
# ram sequence = 0
retries_left = self.REQUEST_RETRIES
while retries_left:
self.sequence += 1
request = str(self.sequence).encode()
print("I: Sending (%s)" % self.sequence)
print("data ", data)
self.sys_socket.send_json(data)
expect_reply = True
while expect_reply:
socks = dict(self.poll.poll(self.SYS_TIMEOUT))
if socks.get(self.sys_socket) == zmq.POLLIN:
msg = self.sys_socket.recv_string()
if not msg:
break
# terminal received the request
if str(msg) == 'OK':
print("I: Server replied %s" % msg)
retries_left = 0
expect_reply = False
else:
print("E: Malformed reply from server: %s" % msg)
else:
print("W: No response from server, retrying…")
# Socket is confused. Close and remove it.
self.sys_socket.setsockopt(zmq.LINGER, 0)
self.sys_socket.close()
self.poll.unregister(self.sys_socket)
retries_left -= 1
if retries_left == 0:
print("E: Server seems to be offline, abandoning")
break
print("I: Reconnecting and resending (%s)" %
self.sequence)
# Create new connection
self.sys_socket = self.context.socket(zmq.REQ)
self.sys_socket.RCVTIMEO = self.SYS_TIMEOUT
self.sys_socket.connect(
'tcp://{}:{}'.format(self.HOST, self.SYS_PORT))
self.poll.register(self.sys_socket, zmq.POLLIN)
self.sys_socket.send_json(data)
# ram self.context.term()
except zmq.ZMQError:
raise zmq.NotDone("E: Sending request ERROR")
def _pull_reply(self):
# Get reply from server via Data socket with timeout
try:
msg = self.data_socket.recv_json()
#ram except zmq.ZMQError:
#ram raise zmq.NotDone('Data socket timeout ERROR')
except zmq.Again as e:
return None
except zmq.ZMQError as e:
logger.debug("W: Strange ZMQ behaviour during node-to-node message receiving, experienced {}".format(e))
return msg
[docs] def live_socket(self, context=None):
"""Connect to socket in a ZMQ context"""
try:
context = context or zmq.Context.instance()
socket = context.socket(zmq.PULL)
socket.connect('tcp://{}:{}'.format(self.HOST, self.LIVE_PORT))
except zmq.ZMQError:
raise zmq.ZMQBindError("E: Live port connection ERROR")
return socket
[docs] def streaming_socket(self, context=None):
"""Connect to socket in a ZMQ context"""
try:
context = context or zmq.Context.instance()
socket = context.socket(zmq.PULL)
socket.connect('tcp://{}:{}'.format(self.HOST, self.EVENTS_PORT))
except zmq.ZMQError:
raise zmq.ZMQBindError("E: Data port connection ERROR")
return socket
[docs] def construct_and_send(self, **kwargs) -> dict:
"""Construct a request dictionary from default and send it to server"""
# default dictionary
request = {
"action": None,
"actionType": None,
"symbol": None,
"chartTF": None,
"fromDate": None,
"toDate": None,
"id": None,
"magic": 1234,
"volume": None,
"price": None,
"stoploss": None,
"takeprofit": None,
"expiration": None,
"deviation": None,
"comment": None
}
# update dict values if exist
for key, value in kwargs.items():
if key in request:
request[key] = value
else:
raise KeyError('E: Unknown key in **kwargs ERROR')
# send dict to server
self._send_request(request)
# return server reply
return self._pull_reply()
[docs]class MTraderStore(with_metaclass(MetaSingleton, object)):
"""
Singleton class wrapping to control the connections to MetaTrader.
Balance update occurs at the beginning and after each
transaction registered by '_t_streaming_events'.
"""
# TODO: implement stop_limit
# TODO: Check position ticket
BrokerCls = None # broker class will autoregister
DataCls = None # data class will auto register
params = ()
# The Unix epoch (or Unix time or POSIX time or Unix timestamp)
_DTEPOCH = datetime(1970, 1, 1)
# MTrader supported granularities
_GRANULARITIES = {
# (bt.TimeFrame.Ticks, 1): 'Ticks',
(bt.TimeFrame.Minutes, 1): 'M1',
(bt.TimeFrame.Minutes, 2): 'M2',
(bt.TimeFrame.Minutes, 3): 'M3',
(bt.TimeFrame.Minutes, 4): 'M4',
(bt.TimeFrame.Minutes, 5): 'M5',
(bt.TimeFrame.Minutes, 6): 'M6',
(bt.TimeFrame.Minutes, 10): 'M10',
(bt.TimeFrame.Minutes, 12): 'M12',
(bt.TimeFrame.Minutes, 15): 'M15',
(bt.TimeFrame.Minutes, 20): 'M20',
(bt.TimeFrame.Minutes, 30): 'M30',
(bt.TimeFrame.Minutes, 60): 'H1',
(bt.TimeFrame.Minutes, 120): 'H2',
(bt.TimeFrame.Minutes, 180): 'H3',
(bt.TimeFrame.Minutes, 240): 'H4',
(bt.TimeFrame.Minutes, 360): 'H6',
(bt.TimeFrame.Minutes, 480): 'H8',
(bt.TimeFrame.Minutes, 720): 'H12',
(bt.TimeFrame.Days, 1): 'D1',
(bt.TimeFrame.Weeks, 1): 'W1',
(bt.TimeFrame.Months, 1): 'MN1',
}
# Order type matching with MetaTrader 5
_ORDEREXECS = {
# Market Buy order
(bt.Order.Market, 'buy'): 'ORDER_TYPE_BUY',
# Market Sell order
(bt.Order.Market, 'sell'): 'ORDER_TYPE_SELL',
# Buy Limit pending order
(bt.Order.Limit, 'buy'): 'ORDER_TYPE_BUY_LIMIT',
# Sell Limit pending order
(bt.Order.Limit, 'sell'): 'ORDER_TYPE_SELL_LIMIT',
# Buy Stop pending order
(bt.Order.Stop, 'buy'): 'ORDER_TYPE_BUY_STOP',
# Sell Stop pending order
(bt.Order.Stop, 'sell'): 'ORDER_TYPE_SELL_STOP',
# Upon reaching the order price, a pending Buy Limit
(bt.Order.StopLimit, 'buy'): 'ORDER_TYPE_BUY_STOP_LIMIT',
# order is placed at the StopLimit price
# Upon reaching the order price, a pending Sell Limit
(bt.Order.StopLimit, 'sell'): 'ORDER_TYPE_SELL_STOP_LIMIT',
# order is placed at the StopLimit price
}
[docs] @classmethod
def getdata(cls, *args, **kwargs):
"""Returns `DataCls` with args, kwargs"""
return cls.DataCls(*args, **kwargs)
[docs] @classmethod
def getbroker(cls, *args, **kwargs):
"""Returns broker with *args, **kwargs from registered `BrokerCls`"""
return cls.BrokerCls(*args, **kwargs)
def __init__(self, host='localhost'):
super(MTraderStore, self).__init__()
self.notifs = collections.deque() # store notifications for cerebro
self._env = None # reference to cerebro for general notifications
self.broker = None # broker instance
self.datas = list() # datas that have registered over start
self._orders = collections.OrderedDict() # map order.ref to oid
self._ordersrev = collections.OrderedDict() # map oid to order.ref
self._orders_type = dict() # keeps order types
self.oapi = MTraderAPI(host)
self._cash = 0.0
self._value = 0.0
self.q_livedata = queue.Queue()
self._cancel_flag = False
self.debug = True
[docs] def start(self, data=None, broker=None):
# Datas require some processing to kickstart data reception
if data is None and broker is None:
self.cash = None
return
if data is not None:
self._env = data._env
# For datas simulate a queue with None to kickstart co
self.datas.append(data)
if self.broker is not None:
self.broker.data_started(data)
elif broker is not None:
self.broker = broker
self.broker_threads()
self.streaming_events()
[docs] def stop(self):
# signal end of thread
if self.broker is not None:
self.q_ordercreate.put(None)
self.q_orderclose.put(None)
[docs] def put_notification(self, msg, *args, **kwargs):
self.notifs.append((msg, args, kwargs))
[docs] def get_notifications(self):
"""Return the pending "store" notifications"""
self.notifs.append(None) # put a mark / threads could still append
return [x for x in iter(self.notifs.popleft, None)]
[docs] def get_positions(self):
positions = self.oapi.construct_and_send(action="POSITIONS")
# Error handling
# if positions["error"]:
# raise ServerDataError(positions)
pos_list = positions.get('positions', [])
if self.debug:
print('Open positions: {}.'.format(pos_list))
return [PositionAdapter(o) for o in pos_list]
[docs] def get_granularity(self, timeframe, compression):
granularity = self._GRANULARITIES.get((timeframe, compression), None)
if granularity is None:
raise ValueError("W: Metatrader 5 doesn't support frame %s with compression %s" %
(bt.TimeFrame.getname(timeframe), compression))
return granularity
[docs] def get_cash(self):
return self._cash
[docs] def get_value(self):
return self._value
[docs] def get_balance(self):
try:
bal = self.oapi.construct_and_send(action="BALANCE")
except Exception as e:
self.put_notification(e)
# TODO: error handling
# if bal['error']:
# self.put_notification(bal)
# continue
try:
self._cash = float(bal["balance"])
self._value = float(bal["equity"])
except KeyError as e:
#ram
self.put_notification(e)
pass
[docs] def streaming_events(self):
t = threading.Thread(target=self._t_livedata, daemon=True)
t.start()
t = threading.Thread(target=self._t_streaming_events, daemon=True)
t.start()
def _t_livedata(self):
# create socket connection for the Thread
socket = self.oapi.live_socket()
while True:
try:
last_candle = socket.recv_json()
except zmq.ZMQError:
raise zmq.NotDone("Live data ERROR")
self.q_livedata.put(last_candle)
def _t_streaming_events(self):
# create socket connection for the Thread
socket = self.oapi.streaming_socket()
while True:
try:
transaction = socket.recv_json()
except zmq.ZMQError:
raise zmq.NotDone("E: Streaming data ERROR")
self._transaction(transaction)
[docs] def broker_threads(self):
self.q_ordercreate = queue.Queue()
t = threading.Thread(target=self._t_order_create, daemon=True)
t.start()
self.q_orderclose = queue.Queue()
t = threading.Thread(target=self._t_order_cancel, daemon=True)
t.start()
[docs] def order_create(self, order, stopside=None, takeside=None, **kwargs):
"""Creates an order"""
okwargs = dict()
okwargs['action'] = 'TRADE'
side = 'buy' if order.isbuy() else 'sell'
order_type = self._ORDEREXECS.get((order.exectype, side), None)
if order_type is None:
raise ValueError("W: Wrong order type: %s or side: %s" %
(order.exectype, side))
okwargs['actionType'] = order_type
okwargs['symbol'] = order.data._dataname
okwargs['volume'] = abs(order.created.size)
if order.exectype != bt.Order.Market:
okwargs['price'] = format(order.created.price)
if order.valid is None:
okwargs['expiration'] = 0 # good to cancel
else:
okwargs['expiration'] = order.valid # good to date
if order.exectype == bt.Order.StopLimit:
okwargs['price'] = order.created.pricelimit
# TODO: implement StopTrail
# if order.exectype == bt.Order.StopTrail:
# okwargs['distance'] = order.trailamount
okwargs['comment'] = dict()
if stopside is not None and stopside.price is not None:
okwargs['stoploss'] = stopside.price
okwargs['comment']['stopside'] = stopside.ref
if takeside is not None and takeside.price is not None:
okwargs['takeprofit'] = takeside.price
okwargs['comment']['takeside'] = takeside.ref
# set store backtrader order ref as MT5 order magic number
try:
okwargs['magic'] = order.info["magic"] #Ram Magic number must be inmutable
except KeyError:
print(KeyError)
okwargs.update(**kwargs) # anything from the user
self.q_ordercreate.put((order.ref, okwargs,))
# notify orders of being submitted
self.broker._submit(order.ref)
if stopside is not None and stopside.price is not None:
self.broker._submit(stopside.ref)
if takeside is not None and takeside.price is not None:
self.broker._submit(takeside.ref)
return order
def _t_order_create(self):
while True:
msg = self.q_ordercreate.get()
if msg is None:
break
oref, okwargs = msg
try:
o = self.oapi.construct_and_send(**okwargs)
except Exception as e:
self.put_notification(e)
self.broker._reject(oref)
return
if self.debug:
print(o)
if o['error']:
self.put_notification(o['description'])
self.broker._reject(oref)
return
else:
oid = o['order']
self._orders[oref] = oid
self.broker._submit(oref)
# keeps orders types
self._orders_type[oref] = okwargs['actionType']
# maps ids to backtrader order
self._ordersrev[oid] = oref
[docs] def order_cancel(self, order):
self.q_orderclose.put(order.ref)
return order
def _t_order_cancel(self):
while True:
oref = self.q_orderclose.get()
if oref is None:
break
oid = self._orders.get(oref, None)
if oid is None:
continue # the order is no longer there
# get symbol name
order = self.broker.orders[oref]
symbol = order.data._dataname
# get order type
order_type = self._orders_type.get(oref, None)
try:
if order_type in ['ORDER_TYPE_BUY', 'ORDER_TYPE_SELL']:
self.close_position(oid, symbol)
else:
self.cancel_order(oid, symbol)
except Exception as e:
self.put_notification(
"Order not cancelled: {}, {}".format(oid, e))
continue
self._cancel_flag = True
self.broker._cancel(oref)
[docs] def candles(self, dataname, dtbegin, dtend, timeframe, compression, include_first=False):
tf = self.get_granularity(timeframe, compression)
begin = end = None
if dtbegin:
begin = int((dtbegin - self._DTEPOCH).total_seconds())
if dtend:
end = int((dtbegin - self._DTEPOCH).total_seconds())
if self.debug:
print('Fetching: {}, Timeframe: {}, Fromdate: {}'.format(
dataname, tf, dtbegin))
data = self.oapi.construct_and_send(action="HISTORY", actionType="DATA", symbol=dataname,
chartTF=tf, fromDate=begin, toDate=end)
candles = data['data']
# Remove last unclosed candle
if not include_first:
try:
del candles[-1]
except:
pass
q = queue.Queue()
for c in candles:
q.put(c)
q.put({})
return q
'''ram
def config_server(self, symbol: str, timeframe: str) -> None:
"""Set server terminal symbol and time frame"""
conf = self.oapi.construct_and_send(action="CONFIG", symbol=symbol, chartTF=timeframe)
# TODO Error
# Error handling
if conf["error"]:
print(conf)
if conf["description"] == "Wrong symbol dosn't exist":
raise ServerConfigError("Symbol dosn't exist")
self.put_notification(conf["description"])
'''
[docs] def check_account(self) -> None:
"""Get MetaTrader 5 account settings"""
# ram Caller's name
print("I: Caller 3 ", sys._getframe(2).f_code.co_name)
conf = self.oapi.construct_and_send(action="ACCOUNT")
# Error handling
if conf["error"]:
raise ServerDataError(conf)
for key, value in conf.items():
print(key, value, sep=' - ')
[docs] def close_position(self, oid, symbol):
if self.debug:
print('Closing position: {}, on symbol: {}'.format(oid, symbol))
conf = self.oapi.construct_and_send(
action="TRADE", actionType='POSITION_CLOSE_ID', symbol=symbol, id=oid)
print(conf)
# Error handling
if conf["error"]:
raise ServerDataError(conf)
[docs] def cancel_order(self, oid, symbol):
if self.debug:
print('Cancelling order: {}, on symbol: {}'.format(oid, symbol))
conf = self.oapi.construct_and_send(
action="TRADE", actionType='ORDER_CANCEL', symbol=symbol, id=oid)
print(conf)
# Error handling
if conf["error"]:
raise ServerDataError(conf)
def _transaction(self, trans):
# Invoked from Streaming Events. May actually receive an event for an
# oid which has not yet been returned after creating an order. Hence
# store if not yet seen, else forward to processer
oid = oref = None
try:
request, reply = trans.values()
except KeyError:
raise KeyError(trans)
# Update balance after transaction
# self.get_balance()
if self.debug:
print(request, reply, sep='\n')
if request['action'] == 'TRADE_ACTION_DEAL':
# get order id (matches transaction id)
oid = request['order']
elif request['action'] == 'TRADE_ACTION_PENDING':
oid = request['order']
elif request['action'] == 'TRADE_ACTION_SLTP':
pass
elif request['action'] == 'TRADE_ACTION_MODIFY':
pass
elif request['action'] == 'TRADE_ACTION_REMOVE':
pass
elif request['action'] == 'TRADE_ACTION_CLOSE_BY':
pass
else:
return
# try:
# oref = self._ordersrev.pop(oid)
# except KeyError:
# raise KeyError(oid)
if oid in self._orders.values():
# when an order id exists process transaction
self._process_transaction(oid, request, reply)
else:
# external order created this transaction
if self._cancel_flag and reply['result'] == 'TRADE_RETCODE_DONE':
self._cancel_flag = False
size = float(reply['volume'])
price = float(reply['price'])
if request['type'].endswith('_SELL'):
size = -size
for data in self.datas:
if data._name == request['symbol']:
self.broker._fill_external(data, size, price)
break
def _process_transaction(self, oid, request, reply):
try:
# get a reference to a backtrader order based on the order id / trade id
oref = self._ordersrev[oid]
except KeyError:
return
if request['action'] == 'TRADE_ACTION_PENDING':
pass
if reply['result'] == 'TRADE_RETCODE_DONE':
size = float(reply['volume'])
price = float(reply['price'])
if request['type'].endswith('_SELL'):
size = -size
self.broker._fill(oref, size, price, reason=request['type'])