Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
5.1
* Option to schedule Paxos cleanup on topology change by keyspace (CASSANDRA-20801)
* Introducing comments and security labels for schema elements (CASSANDRA-20943)
* Extend nodetool tablestats for dictionary memory usage (CASSANDRA-20940)
* Introduce separate GCInspector thresholds for concurrent GC events (CASSANDRA-20980)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ public enum CassandraRelevantProperties
PAXOS_LOG_TTL_LINEARIZABILITY_VIOLATIONS("cassandra.paxos.log_ttl_linearizability_violations", "true"),
PAXOS_MODERN_RELEASE("cassandra.paxos.modern_release", "4.1"),
PAXOS_REPAIR_ALLOW_MULTIPLE_PENDING_UNSAFE("cassandra.paxos_repair_allow_multiple_pending_unsafe"),
PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_BY_KEYSPACE("cassandra.paxos_repair_on_topology_change_by_keyspace", "false"),
PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRIES("cassandra.paxos_repair_on_topology_change_retries", "10"),
PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRY_DELAY_SECONDS("cassandra.paxos_repair_on_topology_change_retry_delay_seconds", "10"),
PAXOS_REPAIR_RETRY_TIMEOUT_IN_MS("cassandra.paxos_repair_retry_timeout_millis", "60000"),
Expand Down
10 changes: 8 additions & 2 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
import static org.apache.cassandra.config.CassandraRelevantProperties.CONSISTENT_RANGE_MOVEMENT;
import static org.apache.cassandra.config.CassandraRelevantProperties.DRAIN_EXECUTOR_TIMEOUT_MS;
import static org.apache.cassandra.config.CassandraRelevantProperties.JOIN_RING;
import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_BY_KEYSPACE;
import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRIES;
import static org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_RETRY_DELAY_SECONDS;
import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_ADDRESS_FIRST_BOOT;
Expand Down Expand Up @@ -3226,9 +3227,10 @@ public void repairPaxosForTopologyChange(String reason)
}

@VisibleForTesting
public Future<?> startRepairPaxosForTopologyChange(String reason)
public Future<?> startRepairPaxosForTopologyChange(String reason) throws ExecutionException, InterruptedException
{
logger.info("repairing paxos for {}", reason);
boolean scheduleByKeyspace = PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_BY_KEYSPACE.getBoolean();

List<Future<?>> futures = new ArrayList<>();

Expand All @@ -3242,7 +3244,11 @@ public Future<?> startRepairPaxosForTopologyChange(String reason)
continue;

Collection<Range<Token>> ranges = getLocalAndPendingRanges(ksName);
futures.add(ActiveRepairService.instance().repairPaxosForTopologyChange(ksName, ranges, reason));
if (scheduleByKeyspace)
// blocking wait here if scheduling by keyspace to avoid overwhelming the messages with many repairs at once
ActiveRepairService.instance().repairPaxosForTopologyChange(ksName, ranges, reason).get();
else
futures.add(ActiveRepairService.instance().repairPaxosForTopologyChange(ksName, ranges, reason));
}

return FutureCombiner.allOf(futures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.cassandra.simulator.cluster;

import java.util.concurrent.ExecutionException;

import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.concurrent.Condition;
Expand Down Expand Up @@ -63,7 +65,15 @@ protected static SerializableRunnable invokableTopologyChangeRepair(String reaso
{
return () -> {
Condition condition = newOneTimeCondition();
Future<?> future = StorageService.instance.startRepairPaxosForTopologyChange(reason);
Future<?> future;
try
{
future = StorageService.instance.startRepairPaxosForTopologyChange(reason);
}
catch (ExecutionException | InterruptedException e)
{
throw new RuntimeException(e);
}
future.addListener(condition::signal); // add listener so we don't use Futures.addAllAsList
condition.awaitThrowUncheckedOnInterrupt();
};
Expand Down