diff --git a/happybase/__init__.py b/happybase/__init__.py index 978c1d6..158fad9 100644 --- a/happybase/__init__.py +++ b/happybase/__init__.py @@ -15,3 +15,4 @@ from .table import Table # noqa from .batch import Batch # noqa from .pool import ConnectionPool, NoConnectionsAvailable # noqa +from .counter_batch import CounterBatch diff --git a/happybase/counter_batch.py b/happybase/counter_batch.py new file mode 100644 index 0000000..122fbb3 --- /dev/null +++ b/happybase/counter_batch.py @@ -0,0 +1,46 @@ +import six +from Hbase_thrift import Hbase, ColumnDescriptor, TIncrement +from collections import defaultdict + + +class CounterBatch(object): + def __init__(self, table, batch_size=None): + self.table = table + self.batch_size = batch_size + self.batch = defaultdict(int) + + def counter_inc(self, row, column, value=1): + self.batch[(row, column)] += value + self._check_send() + + def counter_dec(self, row, column, value=1): + self.counter_inc(row, column, -value) + + def send(self): + increment_rows = [ + TIncrement(table=self.table.name, row=key[0], column=key[1], ammount=value) + for key, value in six.iteritems(self.batch) + ] + self.table.connection.client.incrementRows(increment_rows) + self.batch.clear() + + def _check_send(self): + if len(self.batch) >= self.batch_size: + self.send() + + # + # Context manager methods + # + + def __enter__(self): + """Called upon entering a ``with`` block""" + return self + + def __exit__(self, exc_type, exc_value, traceback): + """Called upon exiting a ``with`` block""" + # TODO: Examine the exception and decide whether or not to send + # For now we always send + if exc_type is not None: + pass + + self.send() diff --git a/happybase/table.py b/happybase/table.py index 4456211..97257cc 100644 --- a/happybase/table.py +++ b/happybase/table.py @@ -12,6 +12,7 @@ from .util import thrift_type_to_dict, bytes_increment, OrderedDict from .batch import Batch +from .counter_batch import CounterBatch logger = logging.getLogger(__name__) @@ -525,6 +526,23 @@ def batch(self, timestamp=None, batch_size=None, transaction=False, del kwargs['self'] return Batch(table=self, **kwargs) + def counter_batch(self, batch_size=None): + """Create a new batch of counter operation for this table. + + This method returns a new :py:class:`CounterBatch` instance that can be used + for mass counter manipulation. + + If given, the `batch_size` argument specifies the maximum batch size + after which the batch should send the mutations to the server. By + default this is unbounded. + + :param int batch_size: batch size (optional) + + :return: CounterBatch instance + :rtype: :py:class:`CounterBatch` + """ + return CounterBatch(table=self, batch_size=batch_size) + # # Atomic counters #