Skip to content

Commit 2f39459

Browse files
author
Eric Muller
committed
Support for versioned optimistic locking a la DynamoDBMapper (#228)
1 parent 9ccd683 commit 2f39459

14 files changed

+754
-19
lines changed

AUTHORS.rst

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ PynamoDB is written and maintained by Jharrod LaFon and numerous contributors:
33
* Craig Bruce
44
* Adam Chainz
55
* Andy Wolfe
6-
* Pior Bastida
6+
* Pior Bastida
7+
* Eric Muller

docs/optimistic_locking.rst

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
==================
2+
Optimistic Locking
3+
==================
4+
5+
Optimistic Locking is a strategy for ensuring that your database writes are not overwritten by the writes of others.
6+
With optimistic locking, each item has an attribute that acts as a version number. If you retrieve an item from a
7+
table, the application records the version number of that item. You can update the item, but only if the version number
8+
on the server side has not changed. If there is a version mismatch, it means that someone else has modified the item
9+
before you did. The update attempt fails, because you have a stale version of the item. If this happens, you simply
10+
try again by retrieving the item and then trying to update it. Optimistic locking prevents you from accidentally
11+
overwriting changes that were made by others. It also prevents others from accidentally overwriting your changes.
12+
13+
.. warning:: - Optimistic locking will not work properly if you use DynamoDB global tables as they use last-write-wins for concurrent updates.
14+
15+
See also:
16+
`DynamoDBMapper Documentation on Optimistic Locking <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.OptimisticLocking.html>`_.
17+
18+
Version Attribute
19+
-----------------
20+
21+
To enable optimistic locking for a table simply add a ``VersionAttribute`` to your model definition.
22+
23+
.. code-block:: python
24+
25+
class OfficeEmployeeMap(MapAttribute):
26+
office_employee_id = UnicodeAttribute()
27+
person = UnicodeAttribute()
28+
29+
def __eq__(self, other):
30+
return isinstance(other, OfficeEmployeeMap) and self.person == other.person
31+
32+
def __repr__(self):
33+
return str(vars(self))
34+
35+
36+
class Office(Model):
37+
class Meta:
38+
read_capacity_units = 1
39+
write_capacity_units = 1
40+
table_name = 'Office'
41+
host = "http://localhost:8000"
42+
office_id = UnicodeAttribute(hash_key=True)
43+
employees = ListAttribute(of=OfficeEmployeeMap)
44+
name = UnicodeAttribute()
45+
version = VersionAttribute()
46+
47+
The attribute is underpinned by an integer which is initialized with 1 when an item is saved for the first time
48+
and is incremented by 1 with each subsequent write operation.
49+
50+
.. code-block:: python
51+
52+
justin = OfficeEmployeeMap(office_employee_id=str(uuid4()), person='justin')
53+
garrett = OfficeEmployeeMap(office_employee_id=str(uuid4()), person='garrett')
54+
office = Office(office_id=str(uuid4()), name="office", employees=[justin, garrett])
55+
office.save()
56+
assert office.version == 1
57+
58+
# Get a second local copy of Office
59+
office_out_of_date = Office.get(office.office_id)
60+
61+
# Add another employee and persist the change.
62+
office.employees.append(OfficeEmployeeMap(office_employee_id=str(uuid4()), person='lita'))
63+
office.save()
64+
# On subsequent save or update operations the version is also incremented locally to match the persisted value so
65+
# there's no need to refresh between operations when reusing the local copy.
66+
assert office.version == 2
67+
assert office_out_of_date.version == 1
68+
69+
The version checking is implemented using DynamoDB conditional write constraints. Asserting that no value exists
70+
for the version attribute on the initial save and that the persisted value matches the local value on subsequent writes.
71+
72+
73+
Model.{update, save, delete}
74+
----------------------------
75+
These operations will fail if the local object is out-of-date.
76+
77+
.. code-block:: python
78+
79+
@contextmanager
80+
def assert_condition_check_fails():
81+
try:
82+
yield
83+
except (PutError, UpdateError, DeleteError) as e:
84+
assert isinstance(e.cause, ClientError)
85+
assert e.cause_response_code == "ConditionalCheckFailedException"
86+
except TransactWriteError as e:
87+
assert isinstance(e.cause, ClientError)
88+
assert e.cause_response_code == "TransactionCanceledException"
89+
assert "ConditionalCheckFailed" in e.cause_response_message
90+
else:
91+
raise AssertionError("The version attribute conditional check should have failed.")
92+
93+
94+
with assert_condition_check_fails():
95+
office_out_of_date.update(actions=[Office.name.set('new office name')])
96+
97+
office_out_of_date.employees.remove(garrett)
98+
with assert_condition_check_fails():
99+
office_out_of_date.save()
100+
101+
# After refreshing the local copy our write operations succeed.
102+
office_out_of_date.refresh()
103+
office_out_of_date.employees.remove(garrett)
104+
office_out_of_date.save()
105+
assert office_out_of_date.version == 3
106+
107+
with assert_condition_check_fails():
108+
office.delete()
109+
110+
Transactions
111+
------------
112+
113+
Transactions are supported.
114+
115+
Successful
116+
__________
117+
118+
.. code-block:: python
119+
120+
connection = Connection(host='http://localhost:8000')
121+
122+
office2 = Office(office_id=str(uuid4()), name="second office", employees=[justin])
123+
office2.save()
124+
assert office2.version == 1
125+
office3 = Office(office_id=str(uuid4()), name="third office", employees=[garrett])
126+
office3.save()
127+
assert office3.version == 1
128+
129+
with TransactWrite(connection=connection) as transaction:
130+
transaction.condition_check(Office, office.office_id, condition=(Office.name.exists()))
131+
transaction.delete(office2)
132+
transaction.save(Office(office_id=str(uuid4()), name="new office", employees=[justin, garrett]))
133+
transaction.update(
134+
office3,
135+
actions=[
136+
Office.name.set('birdistheword'),
137+
]
138+
)
139+
140+
try:
141+
office2.refresh()
142+
except DoesNotExist:
143+
pass
144+
145+
assert office.version == 2
146+
assert office3.version == 2
147+
148+
Failed
149+
______
150+
151+
.. code-block:: python
152+
153+
with assert_condition_check_fails(), TransactWrite(connection=connection) as transaction:
154+
transaction.save(Office(office.office_id, name='newer name', employees=[]))
155+
156+
with assert_condition_check_fails(), TransactWrite(connection=connection) as transaction:
157+
transaction.update(
158+
Office(office.office_id, name='newer name', employees=[]),
159+
actions=[Office.name.set('Newer Office Name')]
160+
)
161+
162+
with assert_condition_check_fails(), TransactWrite(connection=connection) as transaction:
163+
transaction.delete(Office(office.office_id, name='newer name', employees=[]))
164+
165+
Batch Operations
166+
----------------
167+
*Unsupported* as they do not support conditional writes.

examples/optimistic_locking.py

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from contextlib import contextmanager
2+
from uuid import uuid4
3+
from botocore.client import ClientError
4+
5+
from pynamodb.connection import Connection
6+
from pynamodb.attributes import ListAttribute, MapAttribute, UnicodeAttribute, VersionAttribute
7+
from pynamodb.exceptions import PutError, UpdateError, TransactWriteError, DeleteError, DoesNotExist
8+
from pynamodb.models import Model
9+
from pynamodb.transactions import TransactWrite
10+
11+
12+
class OfficeEmployeeMap(MapAttribute):
13+
office_employee_id = UnicodeAttribute()
14+
person = UnicodeAttribute()
15+
16+
def __eq__(self, other):
17+
return isinstance(other, OfficeEmployeeMap) and self.person == other.person
18+
19+
def __repr__(self):
20+
return str(vars(self))
21+
22+
23+
class Office(Model):
24+
class Meta:
25+
read_capacity_units = 1
26+
write_capacity_units = 1
27+
table_name = 'Office'
28+
host = "http://localhost:8000"
29+
office_id = UnicodeAttribute(hash_key=True)
30+
employees = ListAttribute(of=OfficeEmployeeMap)
31+
name = UnicodeAttribute()
32+
version = VersionAttribute()
33+
34+
35+
if not Office.exists():
36+
Office.create_table(wait=True)
37+
38+
39+
@contextmanager
40+
def assert_condition_check_fails():
41+
try:
42+
yield
43+
except (PutError, UpdateError, DeleteError) as e:
44+
assert isinstance(e.cause, ClientError)
45+
assert e.cause_response_code == "ConditionalCheckFailedException"
46+
except TransactWriteError as e:
47+
assert isinstance(e.cause, ClientError)
48+
assert e.cause_response_code == "TransactionCanceledException"
49+
assert "ConditionalCheckFailed" in e.cause_response_message
50+
else:
51+
raise AssertionError("The version attribute conditional check should have failed.")
52+
53+
54+
justin = OfficeEmployeeMap(office_employee_id=str(uuid4()), person='justin')
55+
garrett = OfficeEmployeeMap(office_employee_id=str(uuid4()), person='garrett')
56+
office = Office(office_id=str(uuid4()), name="office 3", employees=[justin, garrett])
57+
office.save()
58+
assert office.version == 1
59+
60+
# Get a second local copy of Office
61+
office_out_of_date = Office.get(office.office_id)
62+
# Add another employee and save the changes.
63+
office.employees.append(OfficeEmployeeMap(office_employee_id=str(uuid4()), person='lita'))
64+
office.save()
65+
# After a successful save or update operation the version is set or incremented locally so there's no need to refresh
66+
# between operations using the same local copy.
67+
assert office.version == 2
68+
assert office_out_of_date.version == 1
69+
70+
# Condition check fails for update.
71+
with assert_condition_check_fails():
72+
office_out_of_date.update(actions=[Office.name.set('new office name')])
73+
74+
# Condition check fails for save.
75+
office_out_of_date.employees.remove(garrett)
76+
with assert_condition_check_fails():
77+
office_out_of_date.save()
78+
79+
# After refreshing the local copy the operation will succeed.
80+
office_out_of_date.refresh()
81+
office_out_of_date.employees.remove(garrett)
82+
office_out_of_date.save()
83+
assert office_out_of_date.version == 3
84+
85+
# Condition check fails for delete.
86+
with assert_condition_check_fails():
87+
office.delete()
88+
89+
# Example failed transactions.
90+
connection = Connection(host='http://localhost:8000')
91+
92+
with assert_condition_check_fails(), TransactWrite(connection=connection) as transaction:
93+
transaction.save(Office(office.office_id, name='newer name', employees=[]))
94+
95+
with assert_condition_check_fails(), TransactWrite(connection=connection) as transaction:
96+
transaction.update(
97+
Office(office.office_id, name='newer name', employees=[]),
98+
actions=[
99+
Office.name.set('Newer Office Name'),
100+
]
101+
)
102+
103+
with assert_condition_check_fails(), TransactWrite(connection=connection) as transaction:
104+
transaction.delete(Office(office.office_id, name='newer name', employees=[]))
105+
106+
# Example successful transaction.
107+
office2 = Office(office_id=str(uuid4()), name="second office", employees=[justin])
108+
office2.save()
109+
assert office2.version == 1
110+
office3 = Office(office_id=str(uuid4()), name="third office", employees=[garrett])
111+
office3.save()
112+
assert office3.version == 1
113+
114+
with TransactWrite(connection=connection) as transaction:
115+
transaction.condition_check(Office, office.office_id, condition=(Office.name.exists()))
116+
transaction.delete(office2)
117+
transaction.save(Office(office_id=str(uuid4()), name="new office", employees=[justin, garrett]))
118+
transaction.update(
119+
office3,
120+
actions=[
121+
Office.name.set('birdistheword'),
122+
]
123+
)
124+
125+
try:
126+
office2.refresh()
127+
except DoesNotExist:
128+
pass
129+
else:
130+
raise AssertionError(
131+
"This item should have been deleted, but no DoesNotExist "
132+
"exception was raised when attempting to refresh a local copy."
133+
)
134+
135+
assert office.version == 2
136+
# The version attribute of items which are saved or updated in a transaction are updated automatically to match the
137+
# persisted value.
138+
assert office3.version == 2
139+
office.refresh()
140+
assert office.version == 3

pynamodb/attributes.py

+32
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,38 @@ def deserialize(self, value):
494494
return json.loads(value)
495495

496496

497+
class VersionAttribute(NumberAttribute):
498+
"""
499+
A version attribute
500+
"""
501+
null = True
502+
503+
def __set__(self, instance, value):
504+
"""
505+
Cast assigned value to int.
506+
"""
507+
super(VersionAttribute, self).__set__(instance, int(value))
508+
509+
def __get__(self, instance, owner):
510+
"""
511+
Cast retrieved value to int.
512+
"""
513+
val = super(VersionAttribute, self).__get__(instance, owner)
514+
return int(val) if isinstance(val, float) else val
515+
516+
def serialize(self, value):
517+
"""
518+
Cast value to int then encode as JSON
519+
"""
520+
return super(VersionAttribute, self).serialize(int(value))
521+
522+
def deserialize(self, value):
523+
"""
524+
Decode numbers from JSON and cast to int.
525+
"""
526+
return int(super(VersionAttribute, self).deserialize(value))
527+
528+
497529
class TTLAttribute(Attribute):
498530
"""
499531
A time-to-live attribute that signifies when the item expires and can be automatically deleted.

pynamodb/attributes.pyi

+5
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ class NumberAttribute(Attribute[float]):
119119
@overload
120120
def __get__(self, instance: Any, owner: Any) -> float: ...
121121

122+
class VersionAttribute(Attribute[float]):
123+
@overload
124+
def __get__(self: _A, instance: None, owner: Any) -> _A: ...
125+
@overload
126+
def __get__(self, instance: Any, owner: Any) -> int: ...
122127

123128
class TTLAttribute(Attribute[datetime]):
124129
@overload

pynamodb/connection/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ def get_operation_kwargs(self,
849849

850850
operation_kwargs[TABLE_NAME] = table_name
851851
operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key, key=key))
852-
if attributes:
852+
if attributes and operation_kwargs.get(ITEM) is not None:
853853
attrs = self.get_item_attribute_map(table_name, attributes)
854854
operation_kwargs[ITEM].update(attrs[ITEM])
855855
if attributes_to_get is not None:

0 commit comments

Comments
 (0)