diff --git a/backtrader/feeds/influxfeed.py b/backtrader/feeds/influxfeed.py index ac21ad53f..bfc6d66d2 100644 --- a/backtrader/feeds/influxfeed.py +++ b/backtrader/feeds/influxfeed.py @@ -18,8 +18,7 @@ # along with this program. If not, see . # ############################################################################### -from __future__ import (absolute_import, division, print_function, - unicode_literals) +from __future__ import absolute_import, division, print_function, unicode_literals import backtrader as bt import backtrader.feed as feed @@ -28,72 +27,89 @@ TIMEFRAMES = dict( ( - (bt.TimeFrame.Seconds, 's'), - (bt.TimeFrame.Minutes, 'm'), - (bt.TimeFrame.Days, 'd'), - (bt.TimeFrame.Weeks, 'w'), - (bt.TimeFrame.Months, 'm'), - (bt.TimeFrame.Years, 'y'), + (bt.TimeFrame.Seconds, "s"), + (bt.TimeFrame.Minutes, "m"), + (bt.TimeFrame.Days, "d"), + (bt.TimeFrame.Weeks, "w"), + (bt.TimeFrame.Months, "m"), + (bt.TimeFrame.Years, "y"), ) ) class InfluxDB(feed.DataBase): frompackages = ( - ('influxdb', [('InfluxDBClient', 'idbclient')]), - ('influxdb.exceptions', 'InfluxDBClientError') + ("influxdb", [("InfluxDBClient", "idbclient")]), + ("influxdb.exceptions", "InfluxDBClientError"), ) params = ( - ('host', '127.0.0.1'), - ('port', '8086'), - ('username', None), - ('password', None), - ('database', None), - ('timeframe', bt.TimeFrame.Days), - ('startdate', None), - ('high', 'high_p'), - ('low', 'low_p'), - ('open', 'open_p'), - ('close', 'close_p'), - ('volume', 'volume'), - ('ointerest', 'oi'), + ("host", "localhost"), + ("port", 8086), + ("username", None), + ("password", None), + ("database", None), + ("timeframe", bt.TimeFrame.Days), + ("high", "high"), + ("low", "low"), + ("open", "open"), + ("close", "close"), + ("volume", "volume"), + ("openinterest", "oi"), ) def start(self): super(InfluxDB, self).start() try: - self.ndb = idbclient(self.p.host, self.p.port, self.p.username, - self.p.password, self.p.database) + self.ndb = idbclient( + self.p.host, + self.p.port, + self.p.username, + self.p.password, + self.p.database, + ) except InfluxDBClientError as err: - print('Failed to establish connection to InfluxDB: %s' % err) + print("Failed to establish connection to InfluxDB: %s" % err) - tf = '{multiple}{timeframe}'.format( + tf = "{multiple}{timeframe}".format( multiple=(self.p.compression if self.p.compression else 1), - timeframe=TIMEFRAMES.get(self.p.timeframe, 'd')) + timeframe=TIMEFRAMES.get(self.p.timeframe, "d"), + ) - if not self.p.startdate: - st = '<= now()' + if self.p.fromdate and self.p.todate: + tcond = "time >= '{fromdate}' AND time <= '{todate}'".format( + fromdate=self.p.fromdate, todate=self.p.todate + ) + elif self.p.fromdate: + tcond = "time >= '{fromdate}'".format(fromdate=self.p.fromdate) + elif self.p.todate: + tcond = "time <= '{todate}'".format(todate=self.p.todate) else: - st = '>= \'%s\'' % self.p.startdate - - # The query could already consider parameters like fromdate and todate - # to have the database skip them and not the internal code - qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", ' - 'mean("{low_f}") AS "low", mean("{close_f}") AS "close", ' - 'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" ' - 'FROM "{dataname}" ' - 'WHERE time {begin} ' - 'GROUP BY time({timeframe}) fill(none)').format( - open_f=self.p.open, high_f=self.p.high, - low_f=self.p.low, close_f=self.p.close, - vol_f=self.p.volume, oi_f=self.p.ointerest, - timeframe=tf, begin=st, dataname=self.p.dataname) + tcond = "time <= now()" + qstr = ( + 'SELECT FIRST("{open_f}") AS "open", MAX("{high_f}") as "high", MIN("{low_f}") as "low", ' + 'LAST("{close_f}") AS "close", SUM("{vol_f}") as "volume", SUM("{oi_f}") as "openinterest" ' + 'FROM "{dataname}" ' + "WHERE {tcond} " + "GROUP BY time({timeframe}) fill(none) " + "ORDER BY time ASC".format( + open_f=self.p.open, + high_f=self.p.high, + low_f=self.p.low, + close_f=self.p.close, + vol_f=self.p.volume, + oi_f=self.p.openinterest, + dataname=self.p.dataname, + tcond=tcond, + timeframe=tf, + ) + ) try: dbars = list(self.ndb.query(qstr).get_points()) except InfluxDBClientError as err: - print('InfluxDB query failed: %s' % err) + print("InfluxDB query failed: %s" % err) + dbars = [] self.biter = iter(dbars) @@ -103,13 +119,15 @@ def _load(self): except StopIteration: return False - self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'], - '%Y-%m-%dT%H:%M:%SZ')) + self.l.datetime[0] = date2num( + dt.datetime.strptime(bar["time"], "%Y-%m-%dT%H:%M:%SZ") + ) - self.l.open[0] = bar['open'] - self.l.high[0] = bar['high'] - self.l.low[0] = bar['low'] - self.l.close[0] = bar['close'] - self.l.volume[0] = bar['volume'] + self.l.open[0] = bar["open"] + self.l.high[0] = bar["high"] + self.l.low[0] = bar["low"] + self.l.close[0] = bar["close"] + self.l.volume[0] = bar["volume"] if bar["volume"] else 0.0 + self.l.openinterest[0] = bar["openinterest"] if bar["openinterest"] else 0.0 return True