diff --git a/docs/settings.rst b/docs/settings.rst index 3ebb1b186..cf265bea3 100644 --- a/docs/settings.rst +++ b/docs/settings.rst @@ -81,6 +81,24 @@ capacity information in responses. If ``False``, scans will fail should the server not return consumed capacity information in an effort to prevent unintentional capacity usage.. +dax_write_endpoints +------------------ + +Default: ``[]`` + +Connect to DAX endpoints when write operations. + +PutItem, DeleteItem, UpdateItem, BatchWriteItem These operations are supported. + +dax_read_endpoints +------------------ + +Default: ``[]`` + +Connect to DAX endpoints when read operations. + +GetItem, Scan, BatchGetItem, Query These operations are supported. + Overriding settings ~~~~~~~~~~~~~~~~~~~ diff --git a/pynamodb/connection/base.py b/pynamodb/connection/base.py index 07da27b50..62bb4ec74 100644 --- a/pynamodb/connection/base.py +++ b/pynamodb/connection/base.py @@ -22,29 +22,47 @@ from botocore.exceptions import BotoCoreError from botocore.session import get_session from botocore.vendored import requests -from botocore.vendored.requests import Request from six.moves import range from pynamodb.compat import NullHandler from pynamodb.connection.util import pythonic +from pynamodb.connection.dax import ( + OP_READ, + OP_WRITE, + DaxClient, +) from pynamodb.constants import ( - RETURN_CONSUMED_CAPACITY_VALUES, RETURN_ITEM_COLL_METRICS_VALUES, COMPARISON_OPERATOR_VALUES, - RETURN_ITEM_COLL_METRICS, RETURN_CONSUMED_CAPACITY, RETURN_VALUES_VALUES, ATTR_UPDATE_ACTIONS, - COMPARISON_OPERATOR, EXCLUSIVE_START_KEY, SCAN_INDEX_FORWARD, SCAN_FILTER_VALUES, ATTR_DEFINITIONS, - BATCH_WRITE_ITEM, CONSISTENT_READ, ATTR_VALUE_LIST, DESCRIBE_TABLE, KEY_CONDITION_EXPRESSION, - BATCH_GET_ITEM, DELETE_REQUEST, SELECT_VALUES, RETURN_VALUES, REQUEST_ITEMS, ATTR_UPDATES, - PROJECTION_EXPRESSION, SERVICE_NAME, DELETE_ITEM, PUT_REQUEST, UPDATE_ITEM, SCAN_FILTER, TABLE_NAME, - INDEX_NAME, KEY_SCHEMA, ATTR_NAME, ATTR_TYPE, TABLE_KEY, EXPECTED, KEY_TYPE, GET_ITEM, UPDATE, - PUT_ITEM, SELECT, ACTION, EXISTS, VALUE, LIMIT, QUERY, SCAN, ITEM, LOCAL_SECONDARY_INDEXES, - KEYS, KEY, EQ, SEGMENT, TOTAL_SEGMENTS, CREATE_TABLE, PROVISIONED_THROUGHPUT, READ_CAPACITY_UNITS, - WRITE_CAPACITY_UNITS, GLOBAL_SECONDARY_INDEXES, PROJECTION, EXCLUSIVE_START_TABLE_NAME, TOTAL, - DELETE_TABLE, UPDATE_TABLE, LIST_TABLES, GLOBAL_SECONDARY_INDEX_UPDATES, ATTRIBUTES, - CONSUMED_CAPACITY, CAPACITY_UNITS, QUERY_FILTER, QUERY_FILTER_VALUES, CONDITIONAL_OPERATOR, + RETURN_CONSUMED_CAPACITY_VALUES, RETURN_ITEM_COLL_METRICS_VALUES, + COMPARISON_OPERATOR_VALUES, + RETURN_ITEM_COLL_METRICS, RETURN_CONSUMED_CAPACITY, RETURN_VALUES_VALUES, + ATTR_UPDATE_ACTIONS, + COMPARISON_OPERATOR, EXCLUSIVE_START_KEY, SCAN_INDEX_FORWARD, + SCAN_FILTER_VALUES, ATTR_DEFINITIONS, + BATCH_WRITE_ITEM, CONSISTENT_READ, ATTR_VALUE_LIST, DESCRIBE_TABLE, + BATCH_GET_ITEM, DELETE_REQUEST, SELECT_VALUES, RETURN_VALUES, + REQUEST_ITEMS, ATTR_UPDATES, + SERVICE_NAME, DELETE_ITEM, PUT_REQUEST, UPDATE_ITEM, SCAN_FILTER, + TABLE_NAME, + INDEX_NAME, KEY_SCHEMA, ATTR_NAME, ATTR_TYPE, TABLE_KEY, EXPECTED, + KEY_TYPE, GET_ITEM, UPDATE, + PUT_ITEM, SELECT, ACTION, EXISTS, VALUE, LIMIT, QUERY, SCAN, ITEM, + LOCAL_SECONDARY_INDEXES, + KEYS, KEY, EQ, SEGMENT, TOTAL_SEGMENTS, CREATE_TABLE, + PROVISIONED_THROUGHPUT, READ_CAPACITY_UNITS, + WRITE_CAPACITY_UNITS, GLOBAL_SECONDARY_INDEXES, PROJECTION, + EXCLUSIVE_START_TABLE_NAME, TOTAL, + DELETE_TABLE, UPDATE_TABLE, LIST_TABLES, GLOBAL_SECONDARY_INDEX_UPDATES, + ATTRIBUTES, + CONSUMED_CAPACITY, CAPACITY_UNITS, QUERY_FILTER, QUERY_FILTER_VALUES, + CONDITIONAL_OPERATOR, CONDITIONAL_OPERATORS, NULL, NOT_NULL, SHORT_ATTR_TYPES, DELETE, PUT, - ITEMS, DEFAULT_ENCODING, BINARY_SHORT, BINARY_SET_SHORT, LAST_EVALUATED_KEY, RESPONSES, UNPROCESSED_KEYS, - UNPROCESSED_ITEMS, STREAM_SPECIFICATION, STREAM_VIEW_TYPE, STREAM_ENABLED, UPDATE_EXPRESSION, - EXPRESSION_ATTRIBUTE_NAMES, EXPRESSION_ATTRIBUTE_VALUES, KEY_CONDITION_OPERATOR_MAP, - CONDITION_EXPRESSION, FILTER_EXPRESSION, FILTER_EXPRESSION_OPERATOR_MAP, NOT_CONTAINS, AND) + ITEMS, DEFAULT_ENCODING, BINARY_SHORT, BINARY_SET_SHORT, + LAST_EVALUATED_KEY, RESPONSES, UNPROCESSED_KEYS, + UNPROCESSED_ITEMS, STREAM_SPECIFICATION, STREAM_VIEW_TYPE, STREAM_ENABLED, + NOT_CONTAINS, AND, CONDITION_EXPRESSION, UPDATE_EXPRESSION, + EXPRESSION_ATTRIBUTE_NAMES, EXPRESSION_ATTRIBUTE_VALUES, + PROJECTION_EXPRESSION, FILTER_EXPRESSION, KEY_CONDITION_OPERATOR_MAP, + KEY_CONDITION_EXPRESSION, FILTER_EXPRESSION_OPERATOR_MAP) from pynamodb.exceptions import ( TableError, QueryError, PutError, DeleteError, UpdateError, GetError, ScanError, TableDoesNotExist, VerboseClientError @@ -227,14 +245,23 @@ class Connection(object): A higher level abstraction over botocore """ - def __init__(self, region=None, host=None, + def __init__(self, region=None, host=None, max_retry_attempts=None, + base_backoff_ms=None, dax_write_endpoints=None, dax_read_endpoints=None, + fall_back_to_dynamodb=False, read_timeout_seconds=None, connect_timeout_seconds=None, - max_retry_attempts=None, base_backoff_ms=None, max_pool_connections=None, extra_headers=None): self._tables = {} self.host = host + if not dax_write_endpoints: + dax_write_endpoints = [] + if not dax_read_endpoints: + dax_read_endpoints = [] + self.dax_write_endpoints = dax_write_endpoints + self.dax_read_endpoints = dax_read_endpoints self._local = local() self._client = None + self._dax_write_client = None + self._dax_read_client = None if region: self.region = region else: @@ -260,6 +287,11 @@ def __init__(self, region=None, host=None, else: self._base_backoff_ms = get_settings_value('base_backoff_ms') + if fall_back_to_dynamodb is not None: + self._fall_back_to_dynamodb = fall_back_to_dynamodb + else: + self._fall_back_to_dynamodb = get_settings_value('fall_back_to_dynamodb') + if max_pool_connections is not None: self._max_pool_connections = max_pool_connections else: @@ -313,7 +345,9 @@ def dispatch(self, operation_name, operation_kwargs): req_uuid = uuid.uuid4() self.send_pre_boto_callback(operation_name, req_uuid, table_name) + data = self._make_api_call(operation_name, operation_kwargs) + self.send_post_boto_callback(operation_name, req_uuid, table_name) if data and CONSUMED_CAPACITY in data: @@ -341,10 +375,20 @@ def _make_api_call(self, operation_name, operation_kwargs): 1. It's faster to avoid using botocore's response parsing 2. It provides a place to monkey patch requests for unit testing """ + from amazondax.DaxError import DaxClientError + try: + if operation_name in OP_WRITE.keys() and self.dax_write_endpoints: + return self.dax_write_client.dispatch(operation_name, operation_kwargs) + elif operation_name in OP_READ.keys() and self.dax_read_endpoints: + return self.dax_read_client.dispatch(operation_name, operation_kwargs) + except DaxClientError as err: + if not self._fall_back_to_dynamodb: + raise err + operation_model = self.client._service_model.operation_model(operation_name) request_dict = self.client._convert_to_request_dict( operation_kwargs, - operation_model, + operation_model ) for i in range(0, self._max_retry_attempts_exception + 1): @@ -507,6 +551,24 @@ def client(self): self._client = self.session.create_client(SERVICE_NAME, self.region, endpoint_url=self.host, config=config) return self._client + @property + def dax_write_client(self): + if self._dax_write_client is None: + self._dax_write_client = DaxClient( + endpoints=self.dax_write_endpoints, + region_name=self.region + ) + return self._dax_write_client + + @property + def dax_read_client(self): + if self._dax_read_client is None: + self._dax_read_client = DaxClient( + endpoints=self.dax_read_endpoints, + region_name=self.region + ) + return self._dax_read_client + def get_meta_table(self, table_name, refresh=False): """ Returns a MetaTable diff --git a/pynamodb/connection/dax.py b/pynamodb/connection/dax.py new file mode 100644 index 000000000..23dbebe36 --- /dev/null +++ b/pynamodb/connection/dax.py @@ -0,0 +1,33 @@ +# coding: utf-8 +from amazondax import AmazonDaxClient + + +OP_WRITE = { + 'PutItem': 'put_item', + 'DeleteItem': 'delete_item', + 'UpdateItem': 'update_item', + 'BatchWriteItem': 'batch_write_item', +} + +OP_READ = { + 'GetItem': 'get_item', + 'Scan': 'scan', + 'BatchGetItem': 'batch_get_item', + 'Query': 'query', +} + +OP_NAME_TO_METHOD = OP_WRITE.copy() +OP_NAME_TO_METHOD.update(OP_READ) + + +class DaxClient(object): + + def __init__(self, endpoints, region_name): + self.connection = AmazonDaxClient( + endpoints=endpoints, + region_name=region_name + ) + + def dispatch(self, operation_name, kwargs): + method = getattr(self.connection, OP_NAME_TO_METHOD[operation_name]) + return method(**kwargs) diff --git a/pynamodb/connection/table.py b/pynamodb/connection/table.py index 00eab9f23..a0758abd9 100644 --- a/pynamodb/connection/table.py +++ b/pynamodb/connection/table.py @@ -21,7 +21,15 @@ def __init__(self, max_pool_connections=None, extra_headers=None, aws_access_key_id=None, - aws_secret_access_key=None): + aws_secret_access_key=None, + dax_write_endpoints=None, + dax_read_endpoints=None, + fall_back_to_dynamodb=False): + if not dax_read_endpoints: + dax_read_endpoints = [] + if not dax_write_endpoints: + dax_write_endpoints = [] + self._hash_keyname = None self._range_keyname = None self.table_name = table_name @@ -32,7 +40,10 @@ def __init__(self, max_retry_attempts=max_retry_attempts, base_backoff_ms=base_backoff_ms, max_pool_connections=max_pool_connections, - extra_headers=extra_headers) + extra_headers=extra_headers, + dax_write_endpoints=dax_write_endpoints, + dax_read_endpoints=dax_read_endpoints, + fall_back_to_dynamodb=fall_back_to_dynamodb) if aws_access_key_id and aws_secret_access_key: self.connection.session.set_credentials(aws_access_key_id, diff --git a/pynamodb/constants.pyi b/pynamodb/constants.pyi index e9cd68943..c41e78949 100644 --- a/pynamodb/constants.pyi +++ b/pynamodb/constants.pyi @@ -165,3 +165,12 @@ CONDITIONAL_OPERATOR: str AND: str OR: str CONDITIONAL_OPERATORS: Any +CONDITION_EXPRESSION: str +UPDATE_EXPRESSION: str +EXPRESSION_ATTRIBUTE_NAMES: str +EXPRESSION_ATTRIBUTE_VALUES: str +PROJECTION_EXPRESSION: str +FILTER_EXPRESSION: str +KEY_CONDITION_OPERATOR_MAP: Any +KEY_CONDITION_EXPRESSION: str +FILTER_EXPRESSION_OPERATOR_MAP:Any diff --git a/pynamodb/models.py b/pynamodb/models.py index e78b51b25..5edccc994 100644 --- a/pynamodb/models.py +++ b/pynamodb/models.py @@ -194,6 +194,10 @@ def __init__(cls, name, bases, attrs): setattr(attr_obj, 'aws_access_key_id', None) if not hasattr(attr_obj, 'aws_secret_access_key'): setattr(attr_obj, 'aws_secret_access_key', None) + if not hasattr(attr_obj, 'dax_write_endpoints'): + setattr(attr_obj, 'dax_write_endpoints', get_settings_value('dax_write_endpoints')) + if not hasattr(attr_obj, 'dax_read_endpoints'): + setattr(attr_obj, 'dax_read_endpoints', get_settings_value('dax_read_endpoints')) elif issubclass(attr_obj.__class__, (Index, )): attr_obj.Meta.model = cls if not hasattr(attr_obj.Meta, "index_name"): @@ -1301,7 +1305,9 @@ def _get_connection(cls): max_pool_connections=cls.Meta.max_pool_connections, extra_headers=cls.Meta.extra_headers, aws_access_key_id=cls.Meta.aws_access_key_id, - aws_secret_access_key=cls.Meta.aws_secret_access_key) + aws_secret_access_key=cls.Meta.aws_secret_access_key, + dax_write_endpoints=cls.Meta.dax_write_endpoints, + dax_read_endpoints=cls.Meta.dax_read_endpoints) return cls._connection def _deserialize(self, attrs): diff --git a/pynamodb/settings.py b/pynamodb/settings.py index cd50dbf08..05eb59160 100644 --- a/pynamodb/settings.py +++ b/pynamodb/settings.py @@ -17,6 +17,9 @@ 'max_pool_connections': 10, 'extra_headers': None, 'allow_rate_limited_scan_without_consumed_capacity': False, + 'dax_write_endpoints': [], + 'dax_read_endpoints': [], + 'fall_back_to_dynamodb': False } OVERRIDE_SETTINGS_PATH = getenv('PYNAMODB_CONFIG', '/etc/pynamodb/global_default_settings.py') diff --git a/requirements-dev.txt b/requirements-dev.txt index 7adff19db..dc9f7808e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -6,6 +6,7 @@ # we didn't want to bump the actual dependency of the library for consumers as it would effectively # be a breaking change. As a result, we use the 1.6.0 dependency for development here for the # purpose of integration tests, even though requirements.txt still has 1.2.0. +amazon-dax-client==1.0.6 botocore==1.11.4 six==1.10.0 coverage==4.5.3 diff --git a/requirements.txt b/requirements.txt index d6c1997ab..c87c27dee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ +amazon-dax-client==1.0.6 botocore==1.2.0 six==1.9.0 diff --git a/setup.py b/setup.py index 0adaed35c..b2944f38c 100644 --- a/setup.py +++ b/setup.py @@ -24,6 +24,7 @@ def find_stubs(package): install_requires = [ 'six', + 'amazon-dax-client>=1.0.6;python_version>="2.7" and python_version>="3.5"', 'botocore>=1.11.0', 'python-dateutil>=2.1,<3.0.0', ] diff --git a/tox.ini b/tox.ini index d2f1be577..3b17fc641 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ exclude = .tox,docs,build [tox] -envlist = py26,py27,py33,py34,py35,py36,pypy +envlist = py27,py33,py34,py35,py36,pypy [testenv] deps = -rrequirements-dev.txt