There are two basic types of locking mechanisms used in NoSQL storages for handling concurrent writes — optimistic and pessimistic.
Pessimistic locking mechanism is based on acquiring/releasing write lock on each write operation. Optimistic locking mechanism is based on entity versioning — to make it short you can save entity only with the same version that exists in the storage. See this pseudocode:
do { D document = storage.get(key) document = update(document) storage.save(document) } while (version already modified)
Optimistic locking can be enabled for the document via adding @Version
field in the document. When saving such
document into the storage — you will either get success or an exception OptimisticLockingFailureException.class
.
This exception is thrown when version
field in your document is not the same as in the storage. So to save
the document — you will need to enable retries for OptimisticLockingFailureException.class
.
In this example we are going to use a simple document that will store movies already watched by a user:
@Value
@Builder(toBuilder = true)
@Document
public class WatchedMoviesDocument {
@Id
String key;
@Singular
List<String> watchedMovies;
@NonFinal
@Version // (1)
Long version;
}
Document explained:
-
@Version
enables optimistic locking, so that concurrent updates are not lost when saving an entity.
Next we need to have a simple repository:
public interface WatchedMoviesDocumentRepository extends AerospikeRepository<WatchedMoviesDocument, String> {
}
Configuration:
@Configuration
@EnableAerospikeRepositories(basePackageClasses = WatchedMoviesDocumentRepository.class)
public class AerospikeConfiguration extends AbstractAerospikeDataConfiguration {
}
@EnableRetry
@Configuration
public class AerospikeRetryConfiguration {
}
The next part is the most interesting as it contains the update logic and retries for handling optimistic lock errors:
@Service
@RequiredArgsConstructor
public class WatchedMoviesService {
private final WatchedMoviesDocumentRepository repository;
@Retryable( // (1)
retryFor = { // (2)
QueryTimeoutException.class,
TimeoutException.class,
TransientDataAccessResourceException.class,
OptimisticLockingFailureException.class // (3)
},
maxAttempts = 5,
backoff = @Backoff(
delay = 3,
multiplier = 2,
random = true
)
)
public void addWatchedMovie(String id, String newWatchedMovie) {
WatchedMoviesDocument watchedMoviesDocument = repository.findById(id) // (4)
.map(existingDocument -> updateExistingDocument(existingDocument, newWatchedMovie)) // (5)
.orElseGet(() -> createNewDocumentWithMovie(id, newWatchedMovie)); // (6)
repository.save(watchedMoviesDocument); // (7)
}
private WatchedMoviesDocument createNewDocumentWithMovie(String id, String newWatchedMovie) {
return WatchedMoviesDocument.builder()
.key(id)
.watchedMovie(newWatchedMovie)
.build();
}
private WatchedMoviesDocument updateExistingDocument(WatchedMoviesDocument existingDocument, String newWatchedMovie) {
// NOTE: we do not create new document here, but only update existing while retaining the version
return existingDocument.toBuilder()
.watchedMovie(newWatchedMovie)
.build();
}
public List<String> getWatchedMovies(String id) {
return repository.findById(id)
.map(WatchedMoviesDocument::getWatchedMovies)
.orElseGet(Collections::emptyList);
}
}
WatchedMoviesService
explained:
-
@Retryable
enables retries on the method. For more details on@Retryable
refer to: Basic error handling. -
retryFor
specifies exception types that should be retried. -
OptimisticLockingFailureException.class
enables retries for this exception type. This means that when you have a failing concurrent update — it will be retried. -
repository.findById(id)
gets document from the storage. -
.map(existingDocument → updateExistingDocument(existingDocument, newWatchedMovie))
updates existing document according to the requirements. In current caseupdateExistingDocument
will only addnewWatchedMovie
to the list ofwatchedMovies
. Note, that we are using Lombok’s.toBuilder()
method — it copies existing document without any modifications, this is important because we need to leaveversion
and other fields of the document as is; and after that we add onlynewWatchedMovie
to the document. -
.orElseGet) → createNewDocumentWithMovie(id, newWatchedMovie
creates new document if there is no document in the storage for the given key. -
repository.save(watchedMoviesDocument)
saves new/updated document with all modifications in the storage.
Concurrent test is not trivial, but is required for checking concurrent behavior:
public class WatchedMoviesConcurrentTest extends OptimisticLockingAerospikeDemoApplicationTest {
private final int NUMBER_OF_TASKS = 10;
private final CountDownLatch latch = new CountDownLatch(1);
private ExecutorService executor;
@Autowired
WatchedMoviesService watchedMoviesService;
@BeforeEach
public void before() {
int NUMBER_OF_THREADS = 5;
executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
}
@AfterEach
public void after() {
executor.shutdownNow();
}
@Test
void addWatchedMovie_addsMoviesConcurrently() throws Exception {
String id = "age::" + UUID.randomUUID();
runTasksAndWaitForCompletion(() -> watchedMoviesService.addWatchedMovie(id, "Movie " + UUID.randomUUID()));
List<String> watchedMovies = watchedMoviesService.getWatchedMovies(id);
assertThat(watchedMovies).hasSize(NUMBER_OF_TASKS);
}
@RequiredArgsConstructor
private class LatchAwareTask implements Runnable {
private final Runnable task;
@Override
public void run() {
try {
latch.await();
task.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void runTasksAndWaitForCompletion(Runnable runnable) throws Exception {
// submit tasks
CompletableFuture[] futures = range(0, NUMBER_OF_TASKS)
.mapToObj(index -> new LatchAwareTask(runnable))
.map(task -> CompletableFuture.runAsync(task, executor))
.toArray(CompletableFuture[]::new);
// start all tasks simultaneously
latch.countDown();
// wait for completion
CompletableFuture.allOf(futures).get(10, TimeUnit.SECONDS);
}
}
To see demo application go to Optimistic Locking Demo.