Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uncovertruth feature/support dax #785

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2fbdc0b
add dax client
Feb 26, 2018
cb32fdb
add dax_endpoints params
Feb 27, 2018
d54c2d1
fix connection
opapy Feb 27, 2018
6899e9f
rmeove connection ppling
opapy Feb 27, 2018
a81605b
fix rebase problem
opapy Mar 1, 2018
eb6fe2b
rebase
opapy Mar 1, 2018
c67a34a
keep connection pool
opapy Mar 1, 2018
11ceb25
fix setup.py
opapy Mar 5, 2018
a891206
add write and read endpoints
opapy Mar 6, 2018
b878b50
fix write and read endpoints parameter
opapy Mar 6, 2018
33f08d6
fix TableConnection
opapy Mar 6, 2018
850e8e0
add write and read client
opapy Mar 6, 2018
c72e107
add OP_READ
opapy Mar 6, 2018
da357ca
fix merge dict
opapy Mar 6, 2018
09fd44c
fix get client
opapy Mar 8, 2018
45f6645
remove debug message
opapy Mar 8, 2018
fc4ad4f
performance tunning
opapy Mar 8, 2018
b9aa012
fix operation name map
opapy Mar 8, 2018
7eb7a20
remove session argument
opapy Mar 12, 2018
cc7435d
remove session arguments
opapy Mar 12, 2018
65d672e
move api call method
opapy Mar 22, 2018
52afadf
add docs
opapy Mar 22, 2018
7d72267
add amazon-dax-client for requirements-dev.txt
opapy Mar 22, 2018
fb649df
minor-fix to support fallback to dynamodb
vedavidhbudimuri May 5, 2018
950e0bd
only install amazon-dax-cient when python_version>=2.7 and python_ver…
opapy Jul 10, 2018
ac8e6f1
add amazon-dax-client>=1.0.5
opapy Jul 10, 2018
d8eff6d
amazon-dax-client==1.0.5
opapy Jan 28, 2019
da4aa23
amazon-dax-client==1.0.5
opapy Jan 28, 2019
bfca679
upgrade dynamodb-dax-clioent
opapy Jan 28, 2019
5ed46cc
remove amazon-dax-client when py26 test
opapy Jan 28, 2019
de2e01e
goodbye python2.6
opapy Jan 28, 2019
38cd04c
goodbye python2.6
opapy Jan 28, 2019
51e8ee7
Merge pull request #1 from pynamodb/master
vedavidhbudimuri May 11, 2019
d74b70f
Merge branch 'feature/support-dax' of https://github.com/uncovertruth…
vedavidhbudimuri May 17, 2020
e8c17a9
updated constants.pyi
vedavidhbudimuri May 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/settings.rst
Original file line number Diff line number Diff line change
@@ -81,6 +81,24 @@ capacity information in responses. If ``False``, scans will fail
should the server not return consumed capacity information in an
effort to prevent unintentional capacity usage..

dax_write_endpoints
------------------

Default: ``[]``

Connect to DAX endpoints when write operations.

PutItem, DeleteItem, UpdateItem, BatchWriteItem These operations are supported.

dax_read_endpoints
------------------

Default: ``[]``

Connect to DAX endpoints when read operations.

GetItem, Scan, BatchGetItem, Query These operations are supported.

Overriding settings
~~~~~~~~~~~~~~~~~~~

55 changes: 52 additions & 3 deletions pynamodb/connection/base.py
Original file line number Diff line number Diff line change
@@ -27,6 +27,11 @@

from pynamodb.compat import NullHandler
from pynamodb.connection.util import pythonic
from pynamodb.connection.dax import (
OP_READ,
OP_WRITE,
DaxClient,
)
from pynamodb.constants import (
RETURN_CONSUMED_CAPACITY_VALUES, RETURN_ITEM_COLL_METRICS_VALUES, COMPARISON_OPERATOR_VALUES,
RETURN_ITEM_COLL_METRICS, RETURN_CONSUMED_CAPACITY, RETURN_VALUES_VALUES, ATTR_UPDATE_ACTIONS,
@@ -227,14 +232,23 @@ class Connection(object):
A higher level abstraction over botocore
"""

def __init__(self, region=None, host=None,
def __init__(self, region=None, host=None, max_retry_attempts=None,
base_backoff_ms=None, dax_write_endpoints=None, dax_read_endpoints=None,
fall_back_to_dynamodb=False,
read_timeout_seconds=None, connect_timeout_seconds=None,
max_retry_attempts=None, base_backoff_ms=None,
max_pool_connections=None, extra_headers=None):
self._tables = {}
self.host = host
if not dax_write_endpoints:
dax_write_endpoints = []
if not dax_read_endpoints:
dax_read_endpoints = []
self.dax_write_endpoints = dax_write_endpoints
self.dax_read_endpoints = dax_read_endpoints
self._local = local()
self._client = None
self._dax_write_client = None
self._dax_read_client = None
if region:
self.region = region
else:
@@ -260,6 +274,11 @@ def __init__(self, region=None, host=None,
else:
self._base_backoff_ms = get_settings_value('base_backoff_ms')

if fall_back_to_dynamodb is not None:
self._fall_back_to_dynamodb = fall_back_to_dynamodb
else:
self._fall_back_to_dynamodb = get_settings_value('fall_back_to_dynamodb')

if max_pool_connections is not None:
self._max_pool_connections = max_pool_connections
else:
@@ -313,7 +332,9 @@ def dispatch(self, operation_name, operation_kwargs):
req_uuid = uuid.uuid4()

self.send_pre_boto_callback(operation_name, req_uuid, table_name)

data = self._make_api_call(operation_name, operation_kwargs)

self.send_post_boto_callback(operation_name, req_uuid, table_name)

if data and CONSUMED_CAPACITY in data:
@@ -341,10 +362,20 @@ def _make_api_call(self, operation_name, operation_kwargs):
1. It's faster to avoid using botocore's response parsing
2. It provides a place to monkey patch requests for unit testing
"""
from amazondax.DaxError import DaxClientError
try:
if operation_name in OP_WRITE.keys() and self.dax_write_endpoints:
return self.dax_write_client.dispatch(operation_name, operation_kwargs)
elif operation_name in OP_READ.keys() and self.dax_read_endpoints:
return self.dax_read_client.dispatch(operation_name, operation_kwargs)
except DaxClientError as err:
if not self._fall_back_to_dynamodb:
raise err

operation_model = self.client._service_model.operation_model(operation_name)
request_dict = self.client._convert_to_request_dict(
operation_kwargs,
operation_model,
operation_model
)

for i in range(0, self._max_retry_attempts_exception + 1):
@@ -507,6 +538,24 @@ def client(self):
self._client = self.session.create_client(SERVICE_NAME, self.region, endpoint_url=self.host, config=config)
return self._client

@property
def dax_write_client(self):
if self._dax_write_client is None:
self._dax_write_client = DaxClient(
endpoints=self.dax_write_endpoints,
region_name=self.region
)
return self._dax_write_client

@property
def dax_read_client(self):
if self._dax_read_client is None:
self._dax_read_client = DaxClient(
endpoints=self.dax_read_endpoints,
region_name=self.region
)
return self._dax_read_client

def get_meta_table(self, table_name, refresh=False):
"""
Returns a MetaTable
33 changes: 33 additions & 0 deletions pynamodb/connection/dax.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# coding: utf-8
from amazondax import AmazonDaxClient


OP_WRITE = {
'PutItem': 'put_item',
'DeleteItem': 'delete_item',
'UpdateItem': 'update_item',
'BatchWriteItem': 'batch_write_item',
}

OP_READ = {
'GetItem': 'get_item',
'Scan': 'scan',
'BatchGetItem': 'batch_get_item',
'Query': 'query',
}

OP_NAME_TO_METHOD = OP_WRITE.copy()
OP_NAME_TO_METHOD.update(OP_READ)


class DaxClient(object):

def __init__(self, endpoints, region_name):
self.connection = AmazonDaxClient(
endpoints=endpoints,
region_name=region_name
)

def dispatch(self, operation_name, kwargs):
method = getattr(self.connection, OP_NAME_TO_METHOD[operation_name])
return method(**kwargs)
16 changes: 14 additions & 2 deletions pynamodb/connection/table.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,15 @@ def __init__(self,
max_pool_connections=None,
extra_headers=None,
aws_access_key_id=None,
aws_secret_access_key=None):
aws_secret_access_key=None,
dax_write_endpoints=None,
dax_read_endpoints=None,
fall_back_to_dynamodb=False):
if not dax_read_endpoints:
dax_read_endpoints = []
if not dax_write_endpoints:
dax_write_endpoints = []

self._hash_keyname = None
self._range_keyname = None
self.table_name = table_name
@@ -32,7 +40,11 @@ def __init__(self,
max_retry_attempts=max_retry_attempts,
base_backoff_ms=base_backoff_ms,
max_pool_connections=max_pool_connections,
extra_headers=extra_headers)
extra_headers=extra_headers,
base_backoff_ms=base_backoff_ms,
dax_write_endpoints=dax_write_endpoints,
dax_read_endpoints=dax_read_endpoints,
fall_back_to_dynamodb=fall_back_to_dynamodb)

if aws_access_key_id and aws_secret_access_key:
self.connection.session.set_credentials(aws_access_key_id,
8 changes: 7 additions & 1 deletion pynamodb/models.py
Original file line number Diff line number Diff line change
@@ -194,6 +194,10 @@ def __init__(cls, name, bases, attrs):
setattr(attr_obj, 'aws_access_key_id', None)
if not hasattr(attr_obj, 'aws_secret_access_key'):
setattr(attr_obj, 'aws_secret_access_key', None)
if not hasattr(attr_obj, 'dax_write_endpoints'):
setattr(attr_obj, 'dax_write_endpoints', get_settings_value('dax_write_endpoints'))
if not hasattr(attr_obj, 'dax_read_endpoints'):
setattr(attr_obj, 'dax_read_endpoints', get_settings_value('dax_read_endpoints'))
elif issubclass(attr_obj.__class__, (Index, )):
attr_obj.Meta.model = cls
if not hasattr(attr_obj.Meta, "index_name"):
@@ -1301,7 +1305,9 @@ def _get_connection(cls):
max_pool_connections=cls.Meta.max_pool_connections,
extra_headers=cls.Meta.extra_headers,
aws_access_key_id=cls.Meta.aws_access_key_id,
aws_secret_access_key=cls.Meta.aws_secret_access_key)
aws_secret_access_key=cls.Meta.aws_secret_access_key,
dax_write_endpoints=cls.Meta.dax_write_endpoints,
dax_read_endpoints=cls.Meta.dax_read_endpoints)
return cls._connection

def _deserialize(self, attrs):
3 changes: 3 additions & 0 deletions pynamodb/settings.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@
'max_pool_connections': 10,
'extra_headers': None,
'allow_rate_limited_scan_without_consumed_capacity': False,
'dax_write_endpoints': [],
'dax_read_endpoints': [],
'fall_back_to_dynamodb': False
}

OVERRIDE_SETTINGS_PATH = getenv('PYNAMODB_CONFIG', '/etc/pynamodb/global_default_settings.py')
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
# we didn't want to bump the actual dependency of the library for consumers as it would effectively
# be a breaking change. As a result, we use the 1.6.0 dependency for development here for the
# purpose of integration tests, even though requirements.txt still has 1.2.0.
amazon-dax-client==1.0.6
botocore==1.11.4
six==1.10.0
coverage==4.5.3
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
amazon-dax-client==1.0.6
botocore==1.2.0
six==1.9.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ def find_stubs(package):

install_requires = [
'six',
'amazon-dax-client>=1.0.6;python_version>="2.7" and python_version>="3.5"',
'botocore>=1.11.0',
'python-dateutil>=2.1,<3.0.0',
]
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
exclude = .tox,docs,build

[tox]
envlist = py26,py27,py33,py34,py35,py36,pypy
envlist = py27,py33,py34,py35,py36,pypy

[testenv]
deps = -rrequirements-dev.txt