Skip to content

CassandraRepository should support batch writes #1631

@sankalpn

Description

@sankalpn

For multi-row operations, we have the following methods:

<S extends T> List<S> saveAll(Iterable<S> entities)
void deleteAllById(Iterable<? extends ID> ids)
void deleteAll(Iterable<? extends T> entities)
<S extends T> List<S> insert(Iterable<S> entities)

These methods perform respective operation one row at a time. Which means, for N operations with RTT to DB T, the latency is Θ(NT). This is even more significant in case of multi-datacenter deployments where application and DB end up in different datacenters for load balancing or compliance reasons. Even within the same datacenter, the latency can be significant for large number of rows, regardless if the writes are to the same partition or if the load is not very significant on the DB.
We should consider an option to write these rows in batches. Of course, performance would be unpredictable if the writes involve multiple partitions, and that could be made explicit in the documentation.

I can think of the following possible approaches:

  1. Provide config option for batch size that defaults to 1 (no batching) and the above API would honor it.
  2. Provide explicit saveAllInBatch, insertAllInBatch and deleteAllInBatch API and a batch size config option with a sensible default, say, 500.
  3. Take in batch size as a parameter to multi-row write APIs, either existing or new method names.

Any other options are welcome. For now, we have used a custom repository implementation which is not generic enough, but works for our use cases:

  @Override
  public <S extends T> List<S> saveAll(Iterable<S> entities) {
    List<S> entityList = StreamSupport.stream(entities.spliterator(), false).toList();
    return ListUtils.partition(entityList, CASSANDRA_WRITE_BATCH_SIZE).stream()
        .flatMap(batch -> {
          CassandraBatchOperations batchOps = cassandraTemplate.batchOps();
          batchOps.insert(batch);
          WriteResult execute = batchOps.execute();
          Class<?> clazz = batch.get(0).getClass();
          return execute.getRows().stream()
              .map(row -> (S) cassandraTemplate.getConverter().read(clazz, row));
        })
        .toList();
  }

  @Override
  public void deleteAll(Iterable<? extends T> entities) {
    List<?> entityList = StreamSupport.stream(entities.spliterator(), false).toList();
    ListUtils.partition(entityList, CASSANDRA_WRITE_BATCH_SIZE)
        .forEach(batch -> {
          CassandraBatchOperations batchOps = cassandraTemplate.batchOps();
          batchOps.delete(entities);
          batchOps.execute();
        });
  }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions