From 6bb5b10a0cca1c5c1645ee9c8069cfc5b1e7d0ff Mon Sep 17 00:00:00 2001 From: philip-sterne Date: Wed, 13 May 2015 09:14:19 +0200 Subject: [PATCH 1/8] Added support for reversing table scans --- happybase/table.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/happybase/table.py b/happybase/table.py index 3cb26b7..4835898 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -215,7 +215,7 @@ def cells(self, row, column, versions=None, timestamp=None, def scan(self, row_start=None, row_stop=None, row_prefix=None, columns=None, filter=None, timestamp=None, include_timestamp=False, batch_size=1000, scan_batching=None, - limit=None, sorted_columns=False): + limit=None, sorted_columns=False, reversed=False): """Create a scanner for data in the table. This method returns an iterable that can be used for looping over the @@ -270,6 +270,9 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, * The `sorted_columns` argument is only available when using HBase 0.96 (or up). + * The `reversed` option is only available when using HBase 0.98 + (or up). + .. versionadded:: 0.8 `sorted_columns` argument @@ -287,6 +290,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, :param bool scan_batching: server-side scan batching (optional) :param int limit: max number of rows to return :param bool sorted_columns: whether to return sorted columns + :param bool reversed: whether or not to reverse the row ordering :return: generator yielding the rows matching the scan :rtype: iterable of `(row_key, row_data)` tuples @@ -369,6 +373,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, filterString=filter, batchSize=scan_batching, sortColumns=sorted_columns, + reversed=reversed ) scan_id = self.connection.client.scannerOpenWithScan( self.name, scan, {}) From a7f62f3ac9a4b741ddfbee15e45d950eab4af61c Mon Sep 17 00:00:00 2001 From: philip-sterne Date: Wed, 13 May 2015 09:17:20 +0200 Subject: [PATCH 2/8] bump version --- happybase/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/happybase/_version.py b/happybase/_version.py index 1105a40..1b154ce 100644 --- a/happybase/_version.py +++ b/happybase/_version.py @@ -5,4 +5,4 @@ setup.py. """ -__version__ = '0.9' +__version__ = '0.9-takealot' From b3894d016a0f18409f17c3dfda90ce1b67470b1e Mon Sep 17 00:00:00 2001 From: philip-sterne Date: Wed, 13 May 2015 09:41:33 +0200 Subject: [PATCH 3/8] initial attempt at counter batching --- happybase/__init__.py | 1 + happybase/batchcounters.py | 51 ++++++++++++++++++++++++++++++++++++++ happybase/table.py | 18 ++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 happybase/batchcounters.py diff --git a/happybase/__init__.py b/happybase/__init__.py index 11e350d..79435a9 100644 --- a/happybase/__init__.py +++ b/happybase/__init__.py @@ -8,6 +8,7 @@ from .connection import DEFAULT_HOST, DEFAULT_PORT, Connection from .table import Table from .batch import Batch +from .batchcounters import BatchCounters from .pool import ConnectionPool, NoConnectionsAvailable # TODO: properly handle errors defined in Thrift specification diff --git a/happybase/batchcounters.py b/happybase/batchcounters.py new file mode 100644 index 0000000..5bd22d5 --- /dev/null +++ b/happybase/batchcounters.py @@ -0,0 +1,51 @@ +from happybase.hbase.ttypes import TIncrement + + +class BatchCounters: + def __init__(self, table, batch_size=None): + self.table = table + self.batch_size = batch_size + self.batch = [] + + def counter_inc(self, row, column, value=1): + self.batch.append({'row': row, 'column': column, 'value': value}) + self._check_send() + + def counter_dec(self, row, column, value=1): + self.batch.append({'row': row, 'column': column, 'value': -value}) + self._check_send() + + def send(self): + increment_rows = [] + for increment in self.batch: + increment_rows.append( + TIncrement( + table=self.table.name, + row=increment['row'], + column=increment['column'], + ammount=increment.get('value', 1), + ) + ) + self.table.connection.client.incrementRows(increment_rows) + + def _check_send(self): + if self.batch_size and (len(self.batch) >= self.batch_size): + self.send() + self.batch = [] + + # + # Context manager methods + # + + def __enter__(self): + """Called upon entering a ``with`` block""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Called upon exiting a ``with`` block""" + # TODO: Examine the exception and decide whether or not to send + # For now we always send + if exc_type is not None: + pass + + self.send() diff --git a/happybase/table.py b/happybase/table.py index 4835898..225a49f 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -10,6 +10,7 @@ from .hbase.ttypes import TScan from .util import thrift_type_to_dict, str_increment, OrderedDict from .batch import Batch +from .batch_increments import BatchCounters logger = logging.getLogger(__name__) @@ -502,6 +503,23 @@ def batch(self, timestamp=None, batch_size=None, transaction=False, del kwargs['self'] return Batch(table=self, **kwargs) + def batch_counters(self, batch_size=None): + """Create a new batch counter operation for this table. + + This method returns a new :py:class:`BatchCounters` instance that can be used + for mass counter manipulation. + + If given, the `batch_size` argument specifies the maximum batch size + after which the batch should send the mutations to the server. By + default this is unbounded. + + :param int batch_size: batch size (optional) + + :return: BatchCounters instance + :rtype: :py:class:`BatchCounters` + """ + return BatchCounters(table=self, batch_size=batch_size) + # # Atomic counters # From 2802e287fca8cc5fcd54b2730070046e6496796e Mon Sep 17 00:00:00 2001 From: philip-sterne Date: Wed, 13 May 2015 09:59:01 +0200 Subject: [PATCH 4/8] suggestions from Carl --- happybase/__init__.py | 2 +- happybase/{batchcounters.py => counter_batch.py} | 2 +- happybase/table.py | 16 ++++++++-------- 3 files changed, 10 insertions(+), 10 deletions(-) rename happybase/{batchcounters.py => counter_batch.py} (98%) diff --git a/happybase/__init__.py b/happybase/__init__.py index 79435a9..91c40eb 100644 --- a/happybase/__init__.py +++ b/happybase/__init__.py @@ -8,7 +8,7 @@ from .connection import DEFAULT_HOST, DEFAULT_PORT, Connection from .table import Table from .batch import Batch -from .batchcounters import BatchCounters +from .counter_batch import CounterBatch from .pool import ConnectionPool, NoConnectionsAvailable # TODO: properly handle errors defined in Thrift specification diff --git a/happybase/batchcounters.py b/happybase/counter_batch.py similarity index 98% rename from happybase/batchcounters.py rename to happybase/counter_batch.py index 5bd22d5..f66f6a6 100644 --- a/happybase/batchcounters.py +++ b/happybase/counter_batch.py @@ -1,7 +1,7 @@ from happybase.hbase.ttypes import TIncrement -class BatchCounters: +class CounterBatch: def __init__(self, table, batch_size=None): self.table = table self.batch_size = batch_size diff --git a/happybase/table.py b/happybase/table.py index 225a49f..55817f7 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -10,7 +10,7 @@ from .hbase.ttypes import TScan from .util import thrift_type_to_dict, str_increment, OrderedDict from .batch import Batch -from .batch_increments import BatchCounters +from .counter_batch import CounterBatch logger = logging.getLogger(__name__) @@ -374,7 +374,7 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None, filterString=filter, batchSize=scan_batching, sortColumns=sorted_columns, - reversed=reversed + reversed=reversed, ) scan_id = self.connection.client.scannerOpenWithScan( self.name, scan, {}) @@ -503,10 +503,10 @@ def batch(self, timestamp=None, batch_size=None, transaction=False, del kwargs['self'] return Batch(table=self, **kwargs) - def batch_counters(self, batch_size=None): - """Create a new batch counter operation for this table. + def counter_batch(self, batch_size=None): + """Create a new batch of counter operation for this table. - This method returns a new :py:class:`BatchCounters` instance that can be used + This method returns a new :py:class:`CounterBatch` instance that can be used for mass counter manipulation. If given, the `batch_size` argument specifies the maximum batch size @@ -515,10 +515,10 @@ def batch_counters(self, batch_size=None): :param int batch_size: batch size (optional) - :return: BatchCounters instance - :rtype: :py:class:`BatchCounters` + :return: CounterBatch instance + :rtype: :py:class:`CounterBatch` """ - return BatchCounters(table=self, batch_size=batch_size) + return CounterBatch(table=self, batch_size=batch_size) # # Atomic counters From c9048bfe16c910ce5a692fb84b6f49d79784a023 Mon Sep 17 00:00:00 2001 From: philip-sterne Date: Fri, 3 Jul 2015 10:43:03 +0200 Subject: [PATCH 5/8] Reverted version number --- happybase/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/happybase/_version.py b/happybase/_version.py index 1b154ce..1105a40 100644 --- a/happybase/_version.py +++ b/happybase/_version.py @@ -5,4 +5,4 @@ setup.py. """ -__version__ = '0.9-takealot' +__version__ = '0.9' From 6663a880ed535434a75c073e9602339cbe0a1d99 Mon Sep 17 00:00:00 2001 From: philip-sterne Date: Fri, 3 Jul 2015 11:06:17 +0200 Subject: [PATCH 6/8] Fixed old-style-class warning --- happybase/counter_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/happybase/counter_batch.py b/happybase/counter_batch.py index f66f6a6..1cab3a1 100644 --- a/happybase/counter_batch.py +++ b/happybase/counter_batch.py @@ -1,7 +1,7 @@ from happybase.hbase.ttypes import TIncrement -class CounterBatch: +class CounterBatch(object): def __init__(self, table, batch_size=None): self.table = table self.batch_size = batch_size From 2d4ed1f8a69557652995978036339485a755a67a Mon Sep 17 00:00:00 2001 From: Carl Scheffler Date: Mon, 26 Oct 2015 15:38:51 +0200 Subject: [PATCH 7/8] Much more efficient counter batching. --- happybase/counter_batch.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/happybase/counter_batch.py b/happybase/counter_batch.py index 1cab3a1..fca2a2d 100644 --- a/happybase/counter_batch.py +++ b/happybase/counter_batch.py @@ -1,37 +1,34 @@ from happybase.hbase.ttypes import TIncrement +from collections import defaultdict class CounterBatch(object): def __init__(self, table, batch_size=None): self.table = table self.batch_size = batch_size - self.batch = [] + self.batch = defaultdict(int) + self.batch_count = 0 def counter_inc(self, row, column, value=1): - self.batch.append({'row': row, 'column': column, 'value': value}) + self.batch[(row, column)] += value + self.batch_count += 1 self._check_send() def counter_dec(self, row, column, value=1): - self.batch.append({'row': row, 'column': column, 'value': -value}) - self._check_send() + self.counter_inc(row, column, -value) def send(self): - increment_rows = [] - for increment in self.batch: - increment_rows.append( - TIncrement( - table=self.table.name, - row=increment['row'], - column=increment['column'], - ammount=increment.get('value', 1), - ) - ) + increment_rows = [ + TIncrement(table=self.table.name, row=key[0], column=key[1], ammount=value) + for key, value in self.batch.iteritems() + ] self.table.connection.client.incrementRows(increment_rows) + self.batch.clear() + self.batch_count = 0 def _check_send(self): - if self.batch_size and (len(self.batch) >= self.batch_size): + if self.batch_size and (self.batch_count >= self.batch_size): self.send() - self.batch = [] # # Context manager methods From c83991c66b751c37d05b3aa1900af4610cc36f22 Mon Sep 17 00:00:00 2001 From: Gabriel Gavilan Date: Tue, 12 Jun 2018 15:26:56 +0200 Subject: [PATCH 8/8] Use six for iteritems --- happybase/counter_batch.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/happybase/counter_batch.py b/happybase/counter_batch.py index 6bcc8ff..122fbb3 100644 --- a/happybase/counter_batch.py +++ b/happybase/counter_batch.py @@ -1,4 +1,4 @@ - +import six from Hbase_thrift import Hbase, ColumnDescriptor, TIncrement from collections import defaultdict @@ -11,7 +11,6 @@ def __init__(self, table, batch_size=None): def counter_inc(self, row, column, value=1): self.batch[(row, column)] += value - self.batch_count += 1 self._check_send() def counter_dec(self, row, column, value=1): @@ -20,7 +19,7 @@ def counter_dec(self, row, column, value=1): def send(self): increment_rows = [ TIncrement(table=self.table.name, row=key[0], column=key[1], ammount=value) - for key, value in self.batch.iteritems() + for key, value in six.iteritems(self.batch) ] self.table.connection.client.incrementRows(increment_rows) self.batch.clear()