-
Notifications
You must be signed in to change notification settings - Fork 27
Xxhash integration #96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 17 commits
c209f9f
56c09a6
b64bf48
4a0db60
517f6fa
b4ec1b7
70f3505
b7d36f4
91a5709
06a6fa2
08b6c82
d41aca5
9479001
fd483ea
5480bbc
cba7a9c
ade8b3e
baec383
321f5be
4b0e914
588b75b
2693525
794f281
924fc25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ public abstract class AbstractOperationRequest extends OperationRequest { | |
| converter = StorageLocationTypeConverter.class, | ||
| description = "Location to which files will be backed up or restored from, in form " + | ||
| "cloudProvider://bucketName/clusterId/datacenterId/nodeId or file:///some/path/bucketName/clusterId/datacenterId/nodeId. " + | ||
| "'cloudProvider' is one of 's3', 'oracle', 'ceph', 'minio', 'azure' or 'gcp'.", | ||
| "'cloudProvider' is one of 's3', 'azure' or 'gcp'.", | ||
| required = true) | ||
| @JsonSerialize(using = StorageLocationSerializer.class) | ||
| @JsonDeserialize(using = StorageLocationDeserializer.class) | ||
|
|
@@ -47,9 +47,8 @@ public abstract class AbstractOperationRequest extends OperationRequest { | |
| @JsonProperty("retry") | ||
| public RetrySpec retry = new RetrySpec(); | ||
|
|
||
| @Option(names = {"--cc", "--concurrent-connections"}, | ||
| description = "Number of files (or file parts) to download concurrently. Higher values will increase throughput. Default is 10.", | ||
| defaultValue = "10" | ||
| @Option(names = {"--cc", "--concurrent-connections", "--parallelism"}, | ||
| description = "Number of files (or file parts) to download / upload / hash concurrently. Higher values will increase throughput. Default is 50% of available CPUs." | ||
| ) | ||
| @JsonProperty("concurrentConnections") | ||
| public Integer concurrentConnections; | ||
|
|
@@ -60,6 +59,8 @@ public abstract class AbstractOperationRequest extends OperationRequest { | |
|
|
||
| public AbstractOperationRequest() { | ||
| // for picocli | ||
| if (concurrentConnections == null) | ||
| concurrentConnections = getDefaultConcurrentConnections(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should validate this, what if I put 0 or negative number? Or number bigger than number of cpus I have? We should check some range the value is allowed to be in. There is "validate" method you might maybe move this validation to?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, totally missed this by assumption that users should know what they're doing. But esop could be called by some script or any other automation tool, so it worth to validate if value is valid |
||
| } | ||
|
|
||
| public AbstractOperationRequest(final StorageLocation storageLocation, | ||
|
|
@@ -74,7 +75,7 @@ public AbstractOperationRequest(final StorageLocation storageLocation, | |
| this.skipBucketVerification = skipBucketVerification; | ||
| this.proxySettings = proxySettings; | ||
| this.retry = retry == null ? new RetrySpec() : retry; | ||
| this.concurrentConnections = concurrentConnections == null ? 10 : concurrentConnections; | ||
| this.concurrentConnections = concurrentConnections == null ? getDefaultConcurrentConnections() : concurrentConnections; | ||
| this.kmsKeyId = kmsKeyId; | ||
| } | ||
|
|
||
|
|
@@ -96,5 +97,25 @@ public void validate(final Set<String> storageProviders) { | |
| if (storageProviders != null && !storageProviders.contains(storageLocation.storageProvider)) { | ||
| throw new IllegalStateException(format("Available storage providers: %s", Arrays.toString(storageProviders.toArray()))); | ||
| } | ||
|
|
||
| validateConcurrentConnections(); | ||
| } | ||
|
|
||
| /** | ||
| * Get default number of concurrent connections based on 50% of available processors. | ||
| */ | ||
| private static int getDefaultConcurrentConnections() { | ||
| return Runtime.getRuntime().availableProcessors() / 2; | ||
| } | ||
|
|
||
| private void validateConcurrentConnections() { | ||
smiklosovic marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (concurrentConnections <= 0) { | ||
| throw new IllegalStateException("--parallelism must be greater than 0"); | ||
| } | ||
|
|
||
| if (concurrentConnections > Runtime.getRuntime().availableProcessors()) { | ||
| throw new IllegalStateException("--parallelism value cannot be greater than number of available processors: " | ||
| + Runtime.getRuntime().availableProcessors()); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,14 +5,17 @@ | |
| public class HashModule extends AbstractModule { | ||
|
|
||
| private final HashSpec hashSpec; | ||
| private final int parallelHashingThreads; | ||
|
|
||
| public HashModule(final HashSpec hashSpec) { | ||
| public HashModule(final HashSpec hashSpec, final int parallelHashingThreads) { | ||
| this.hashSpec = hashSpec; | ||
| this.parallelHashingThreads = parallelHashingThreads; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you would get this from hashSpec so you do not need to change this constructor
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no luck with simplifying this? |
||
| } | ||
|
|
||
| @Override | ||
| protected void configure() { | ||
| bind(HashSpec.class).toInstance(this.hashSpec); | ||
| bind(HashService.class).toInstance(new HashServiceImpl(this.hashSpec)); | ||
| bind(ParallelHashService.class).toProvider(() -> new ParallelHashServiceImpl(this.hashSpec, this.parallelHashingThreads)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't you put concurrent connections into the constructor of HashSpec? Just pass it to hashSpec so you do not need to accommodate the code by passing request everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it simplifies things, we use concurrentConnections (--cc) in different places and potentially there could no be HashSpec. Needs deeper investigation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no luck with this?