Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c209f9f
Next development iteration
worryg0d Oct 17, 2025
56c09a6
Implemented xxHash64 wrapper
worryg0d Oct 22, 2025
b64bf48
Added all supported algorithms to HashServiceTest
worryg0d Oct 22, 2025
4a0db60
Increased chunk size for hashing to 4096
worryg0d Oct 28, 2025
517f6fa
Parallel computing sstables hashes
worryg0d Nov 5, 2025
b4ec1b7
Expose snapshots map in Snapshots class
worryg0d Nov 5, 2025
70f3505
Deffer sstable components hash computation
worryg0d Nov 6, 2025
b7d36f4
ParallelHashService for hashing manifest entries
worryg0d Nov 6, 2025
91a5709
Use Runtime.availableProcessors() as a default value for concurrentCo…
worryg0d Nov 6, 2025
06a6fa2
Use 50% of available cpus by default instead
worryg0d Nov 6, 2025
08b6c82
Pass list of manifest entries instead of stream
worryg0d Nov 7, 2025
d41aca5
Parallel hashing of downloaded files during restoration import phase
worryg0d Nov 7, 2025
9479001
Address review comments
worryg0d Nov 10, 2025
fd483ea
Hash only manifest entries of the snapshot we're aiming for
worryg0d Nov 10, 2025
5480bbc
First iteration of fast fail on verifyAll manifests during import pha…
worryg0d Nov 10, 2025
cba7a9c
Code cleanup and comments
worryg0d Nov 11, 2025
ade8b3e
Hash operation interruption in the mid
worryg0d Nov 11, 2025
baec383
Addressed review suggestions by Stefan
worryg0d Nov 11, 2025
321f5be
Removed redundant validation method for concurrentConnections
worryg0d Nov 11, 2025
4b0e914
Change output format to hex for XXHasher.getHash method to align with…
worryg0d Nov 12, 2025
588b75b
Added test file and precomputed hashes for testing
worryg0d Nov 12, 2025
2693525
logs adjustment
worryg0d Nov 12, 2025
794f281
Bump Cassandra version to 5.0.6
worryg0d Nov 12, 2025
924fc25
CRCHasher test
worryg0d Nov 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- m2-{{ checksum "pom.xml" }}
- m2-

- run: (echo "${google_application_credentials}" > /tmp/gcp.json) && mvn clean install -PsnapshotRepo,rpm,deb,cassandra5 -DoutputDirectory=/tmp/artifacts -Dcassandra4.version=4.1.10 -Dcassandra5.version=5.0.5
- run: (echo "${google_application_credentials}" > /tmp/gcp.json) && mvn clean install -PsnapshotRepo,rpm,deb,cassandra5 -DoutputDirectory=/tmp/artifacts -Dcassandra4.version=4.1.10 -Dcassandra5.version=5.0.6

- save_cache:
paths:
Expand Down
4 changes: 2 additions & 2 deletions azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>com.instaclustr</groupId>
<artifactId>esop-parent</artifactId>
<version>4.0.1</version>
<version>4.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>esop-azure</artifactId>
<version>4.0.1</version>
<version>4.0.2-SNAPSHOT</version>

<name>esop-azure</name>
<description>Backup and restoration tooling for Cassandra for Azure</description>
Expand Down
11 changes: 9 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
<parent>
<groupId>com.instaclustr</groupId>
<artifactId>esop-parent</artifactId>
<version>4.0.1</version>
<version>4.0.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>esop-core</artifactId>
<version>4.0.1</version>
<version>4.0.2-SNAPSHOT</version>

<name>esop-core</name>
<description>Core of backup and restoration tooling for Cassandra</description>
Expand All @@ -25,6 +25,7 @@
<commons-io.version>2.20.0</commons-io.version>
<awaitility.version>3.1.6</awaitility.version>
<jackson.bom.version>2.19.2</jackson.bom.version>
<lz4.version>1.8.0</lz4.version>

<slf4j.version>2.0.17</slf4j.version>
<logback.version>1.5.19</logback.version>
Expand Down Expand Up @@ -120,6 +121,12 @@
<version>${awaitility.version}</version>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>${lz4.version}</version>
</dependency>

<!-- logging -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void run() {
List<Module> additionalModules = new ArrayList<>(Esop.getStorageSpecificModules());
additionalModules.add(new BackupModule());

Esop.init(this, jmxSpec, hashSpec, additionalModules);
Esop.init(this, jmxSpec, hashSpec, request, additionalModules);

final Operation<?> operation = operationsService.submitOperationRequest(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void run() {
List<Module> additionalModules = new ArrayList<>(Esop.getStorageSpecificModules());
additionalModules.add(new CommitlogBackupModule());

Esop.init(this, jmxSpec, hashSpec, additionalModules);
Esop.init(this, jmxSpec, hashSpec, request, additionalModules);

final Operation<?> operation = operationsService.submitOperationRequest(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void run() {
List<Module> additionalModules = new ArrayList<>(Esop.getStorageSpecificModules());
additionalModules.add(new RestoreCommitlogModule());

Esop.init(this, null, hashSpec, additionalModules);
Esop.init(this, null, hashSpec, request, additionalModules);

final Operation<?> operation = operationsService.submitOperationRequest(request);

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/com/instaclustr/esop/cli/Esop.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.inject.Stage;
import com.instaclustr.cassandra.CassandraModule;
import com.instaclustr.esop.SPIModule;
import com.instaclustr.esop.impl.AbstractOperationRequest;
import com.instaclustr.esop.impl.backup.BackupModules.UploadingModule;
import com.instaclustr.esop.impl.hash.HashModule;
import com.instaclustr.esop.impl.hash.HashSpec;
Expand Down Expand Up @@ -70,6 +71,7 @@ public static void main(String[] args, boolean exit) {
static void init(final Runnable command,
final CassandraJMXSpec jmxSpec,
final HashSpec hashSpec,
final AbstractOperationRequest request,
final List<Module> additionalModules) {

final List<Module> modules = new ArrayList<>();
Expand All @@ -92,7 +94,7 @@ protected void configure() {
modules.add(new ExecutorsModule());
modules.add(new UploadingModule());
modules.add(new DownloadingModule());
modules.add(new HashModule(hashSpec));
modules.add(new HashModule(hashSpec, request.concurrentConnections));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you put concurrent connections into the constructor of HashSpec? Just pass it to hashSpec so you do not need to accommodate the code by passing request everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it simplifies things, we use concurrentConnections (--cc) in different places and potentially there could no be HashSpec. Needs deeper investigation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no luck with this?

modules.addAll(additionalModules);

final Injector injector = Guice.createInjector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void run() {
List<Module> additionalModules = new ArrayList<>(Esop.getStorageSpecificModules());
additionalModules.add(new ListModule());

Esop.init(this, jmxSpec, new HashSpec(), additionalModules);
Esop.init(this, jmxSpec, new HashSpec(), request, additionalModules);

final Operation<?> operation = operationsService.submitOperationRequest(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void run() {
List<Module> additionalModules = new ArrayList<>(Esop.getStorageSpecificModules());
additionalModules.add(new RemoveBackupModule());

Esop.init(this, jmxSpec, new HashSpec(), additionalModules);
Esop.init(this, jmxSpec, new HashSpec(), request, additionalModules);

if (rate.value == 0) {
final Operation<?> operation = operationsService.submitOperationRequest(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void run() {
additionalModules.add(new RestoreModule());
additionalModules.add(new RestorationStrategyModule());

Esop.init(this, jmxSpec, hashSpec, additionalModules);
Esop.init(this, jmxSpec, hashSpec, request, additionalModules);

final Operation<?> operation = operationsService.submitOperationRequest(request);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public abstract class AbstractOperationRequest extends OperationRequest {
converter = StorageLocationTypeConverter.class,
description = "Location to which files will be backed up or restored from, in form " +
"cloudProvider://bucketName/clusterId/datacenterId/nodeId or file:///some/path/bucketName/clusterId/datacenterId/nodeId. " +
"'cloudProvider' is one of 's3', 'oracle', 'ceph', 'minio', 'azure' or 'gcp'.",
"'cloudProvider' is one of 's3', 'azure' or 'gcp'.",
required = true)
@JsonSerialize(using = StorageLocationSerializer.class)
@JsonDeserialize(using = StorageLocationDeserializer.class)
Expand All @@ -47,9 +47,8 @@ public abstract class AbstractOperationRequest extends OperationRequest {
@JsonProperty("retry")
public RetrySpec retry = new RetrySpec();

@Option(names = {"--cc", "--concurrent-connections"},
description = "Number of files (or file parts) to download concurrently. Higher values will increase throughput. Default is 10.",
defaultValue = "10"
@Option(names = {"--cc", "--concurrent-connections", "--parallelism"},
description = "Number of files (or file parts) to download / upload / hash concurrently. Higher values will increase throughput. Default is 50% of available CPUs."
)
@JsonProperty("concurrentConnections")
public Integer concurrentConnections;
Expand All @@ -60,6 +59,8 @@ public abstract class AbstractOperationRequest extends OperationRequest {

public AbstractOperationRequest() {
// for picocli
if (concurrentConnections == null)
concurrentConnections = getDefaultConcurrentConnections();
Copy link
Collaborator

@smiklosovic smiklosovic Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should validate this, what if I put 0 or negative number? Or number bigger than number of cpus I have? We should check some range the value is allowed to be in. There is "validate" method you might maybe move this validation to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, totally missed this by assumption that users should know what they're doing. But esop could be called by some script or any other automation tool, so it worth to validate if value is valid

}

public AbstractOperationRequest(final StorageLocation storageLocation,
Expand All @@ -74,7 +75,7 @@ public AbstractOperationRequest(final StorageLocation storageLocation,
this.skipBucketVerification = skipBucketVerification;
this.proxySettings = proxySettings;
this.retry = retry == null ? new RetrySpec() : retry;
this.concurrentConnections = concurrentConnections == null ? 10 : concurrentConnections;
this.concurrentConnections = concurrentConnections == null ? getDefaultConcurrentConnections() : concurrentConnections;
this.kmsKeyId = kmsKeyId;
}

Expand All @@ -96,5 +97,21 @@ public void validate(final Set<String> storageProviders) {
if (storageProviders != null && !storageProviders.contains(storageLocation.storageProvider)) {
throw new IllegalStateException(format("Available storage providers: %s", Arrays.toString(storageProviders.toArray())));
}

if (concurrentConnections <= 0) {
throw new IllegalStateException("--parallelism must be greater than 0");
}

if (concurrentConnections > Runtime.getRuntime().availableProcessors()) {
throw new IllegalStateException("--parallelism value cannot be greater than number of available processors: "
+ Runtime.getRuntime().availableProcessors());
}
}

/**
* Get default number of concurrent connections based on 50% of available processors.
*/
private static int getDefaultConcurrentConnections() {
return Runtime.getRuntime().availableProcessors() / 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class SSTableUtils {
private static final int SSTABLE_PREFIX_IDX = 1;
private static final int SSTABLE_GENERATION_IDX = 2;
private static final Pattern CHECKSUM_RE = Pattern.compile("^([a-zA-Z0-9]+).*");
private static final HashService hashService = new HashServiceImpl(new HashSpec());

public static String sstableHash(Path path) throws IOException {
final Matcher matcher = SSTABLE_RE.matcher(path.getFileName().toString());
Expand Down Expand Up @@ -102,14 +101,11 @@ public static String calculateChecksum(final Path filePath) throws IOException {
public static Map<String, List<ManifestEntry>> getSSTables(String keyspace,
String table,
Path snapshotDirectory,
Path tableBackupPath,
HashSpec hashSpec) throws IOException {
Path tableBackupPath) throws IOException {
if (!Files.exists(snapshotDirectory)) {
return Collections.emptyMap();
}

final HashService hashService = new HashServiceImpl(hashSpec);

return Files.list(snapshotDirectory)
.flatMap(path -> {
if (isCassandra22SecIndex(path)) {
Expand Down Expand Up @@ -148,12 +144,11 @@ public static Map<String, List<ManifestEntry>> getSSTables(String keyspace,
}

backupPath = backupPath.resolve(hash).resolve(manifestComponentFileName.getFileName());
final String hashOfFile = hashService.hash(sstableComponent);

entries.add(new ManifestEntry(backupPath,
sstableComponent,
ManifestEntry.Type.FILE,
hashOfFile,
null,
new KeyspaceTable(keyspace, table),
null));
}
Expand Down
8 changes: 1 addition & 7 deletions core/src/main/java/com/instaclustr/esop/impl/Snapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@

public class Snapshots implements Cloneable {

public static HashSpec hashSpec;

private final Map<String, Snapshot> snapshots = new HashMap<>();

public static Snapshots of(Map<String, Snapshot> snapshots) {
Expand Down Expand Up @@ -569,7 +567,7 @@ public static Table parse(final String keyspace, final String table, final List<
final Path tablePath = Paths.get("data").resolve(Paths.get(keyspace, table));

for (final Path path : value) {
tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath, Snapshots.hashSpec));
tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath));
}

final Optional<Path> schemaPath = value.stream().map(p -> p.resolve("schema.cql")).filter(Files::exists).findFirst();
Expand Down Expand Up @@ -750,10 +748,6 @@ public static synchronized Snapshots merge(final List<Snapshots> scannedSnapshot
}

public static synchronized Snapshots parse(final Path cassandraDir, final String snapshot) throws Exception {
if (Snapshots.hashSpec == null) {
Snapshots.hashSpec = new HashSpec();
}

final Snapshots snapshots = new Snapshots();
final SnapshotLister lister = new SnapshotLister();
Files.walkFileTree(cassandraDir, lister);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.instaclustr.esop.impl.backup.coordination.ClearSnapshotOperation.ClearSnapshotOperationRequest;
import com.instaclustr.esop.impl.backup.coordination.TakeSnapshotOperation.TakeSnapshotOperationRequest;
import com.instaclustr.esop.impl.hash.HashSpec;
import com.instaclustr.esop.impl.hash.ParallelHashService;
import com.instaclustr.esop.impl.hash.ParallelHashServiceImpl;
import com.instaclustr.esop.impl.interaction.CassandraSchemaVersion;
import com.instaclustr.esop.impl.interaction.CassandraTokens;
import com.instaclustr.esop.topology.CassandraClusterTopology;
Expand Down Expand Up @@ -126,8 +128,8 @@ public void coordinate(final Operation<BackupOperationRequest> operation) {
new TakeSnapshotOperationRequest(request.entities, request.snapshotTag),
cassandraVersionProvider).run0();

Snapshots.hashSpec = hashSpec;
final Snapshots snapshots = Snapshots.parse(request.dataDirs, request.snapshotTag);

final Optional<Snapshot> snapshot = snapshots.get(request.snapshotTag);

if (!snapshot.isPresent()) {
Expand All @@ -139,6 +141,11 @@ public void coordinate(final Operation<BackupOperationRequest> operation) {
manifest.setSchemaVersion(request.schemaVersion);
manifest.setTokens(tokens);

// Compute hashes and populate it to manifest entries
try (ParallelHashService parallelHashService = new ParallelHashServiceImpl(hashSpec, request.concurrentConnections)) {
parallelHashService.hashAndPopulate(manifest.getManifestEntries(false));
}

// manifest
final Path localManifestPath = getLocalManifestPath(request.snapshotTag);
manifest.setManifest(getManifestAsManifestEntry(localManifestPath, request));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
public class HashModule extends AbstractModule {

private final HashSpec hashSpec;
private final int parallelHashingThreads;

public HashModule(final HashSpec hashSpec) {
public HashModule(final HashSpec hashSpec, final int parallelHashingThreads) {
this.hashSpec = hashSpec;
this.parallelHashingThreads = parallelHashingThreads;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you would get this from hashSpec so you do not need to change this constructor

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no luck with simplifying this?

}

@Override
protected void configure() {
bind(HashSpec.class).toInstance(this.hashSpec);
bind(HashService.class).toInstance(new HashServiceImpl(this.hashSpec));
bind(ParallelHashService.class).toProvider(() -> new ParallelHashServiceImpl(this.hashSpec, this.parallelHashingThreads));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface HashService {

void verify(Path file, String hash) throws HashVerificationException;

class HashingException extends Exception {
class HashingException extends RuntimeException {

public HashingException(final String message) {
super(message);
Expand All @@ -25,7 +25,7 @@ public HashingException(final String message, final Throwable cause) {
}
}

class HashVerificationException extends Exception {
class HashVerificationException extends RuntimeException {

public HashVerificationException(final String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.instaclustr.esop.impl.hash;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import com.google.inject.Inject;
import com.instaclustr.esop.impl.ManifestEntry;
Expand Down Expand Up @@ -87,9 +87,9 @@ private String getHash(final File file) throws Exception
{
if (hashSpec.algorithm == HashSpec.HashAlgorithm.NONE)
return null;
try (final InputStream is = new FileInputStream(file)) {
try (final FileChannel ch = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
logger.info("Getting {} hash of {} ", hashSpec.algorithm.toString(), file.getAbsolutePath());
return hashSpec.algorithm.getHasher().getHash(is);
return hashSpec.algorithm.getHasher().getHash(ch);
}
}
}
Loading
Loading