Skip to content

Commit 10dcee9

Browse files
committed
WIP
1 parent 2b11a54 commit 10dcee9

File tree

8 files changed

+33
-18
lines changed

8 files changed

+33
-18
lines changed

core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.atomic.AtomicBoolean;
4141
import java.util.concurrent.atomic.AtomicInteger;
4242
import java.util.concurrent.atomic.AtomicLong;
43+
import java.util.function.BooleanSupplier;
4344
import java.util.function.Consumer;
4445
import java.util.function.Predicate;
4546
import java.util.stream.Collectors;
@@ -161,7 +162,7 @@ public FateTxStore<T> reserve(FateId fateId) {
161162
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);
162163

163164
@Override
164-
public void runnable(Set<FatePartition> partitions, AtomicBoolean keepWaiting,
165+
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
165166
Consumer<FateIdStatus> idConsumer) {
166167

167168
if (partitions.isEmpty()) {
@@ -170,7 +171,7 @@ public void runnable(Set<FatePartition> partitions, AtomicBoolean keepWaiting,
170171

171172
AtomicLong seen = new AtomicLong(0);
172173

173-
while (keepWaiting.get() && seen.get() == 0) {
174+
while (keepWaiting.getAsBoolean() && seen.get() == 0) {
174175
final long beforeCount = unreservedRunnableCount.getCount();
175176
final boolean beforeDeferredOverflow = deferredOverflow.get();
176177

@@ -212,8 +213,7 @@ public void runnable(Set<FatePartition> partitions, AtomicBoolean keepWaiting,
212213
}
213214

214215
if (waitTime > 0) {
215-
unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime,
216-
keepWaiting::get);
216+
unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, keepWaiting);
217217
}
218218
}
219219
}

core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.concurrent.atomic.AtomicBoolean;
4646
import java.util.concurrent.atomic.AtomicInteger;
4747
import java.util.concurrent.atomic.AtomicReference;
48+
import java.util.function.BooleanSupplier;
4849

4950
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
5051
import org.apache.accumulo.core.conf.Property;
@@ -316,8 +317,12 @@ private class WorkFinder implements Runnable {
316317
public void run() {
317318
while (fate.getKeepRunning().get() && !isShutdown()) {
318319
try {
319-
// TODO
320-
fate.getStore().runnable(partitions.get(), fate.getKeepRunning(), fateIdStatus -> {
320+
var localPartitions = partitions.get();
321+
// if the set of partitions changes, we should stop looking for work w/ the old set of
322+
// partitions
323+
BooleanSupplier keepRunning =
324+
() -> fate.getKeepRunning().get() && localPartitions == partitions.get();
325+
fate.getStore().runnable(localPartitions, keepRunning, fateIdStatus -> {
321326
// The FateId with the fate operation 'fateOp' is workable by this FateExecutor if
322327
// 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps')
323328
// 2) The transaction was cancelled while NEW. This is an edge case that needs to be
@@ -328,7 +333,7 @@ public void run() {
328333
var fateOp = fateIdStatus.getFateOperation().orElse(null);
329334
if ((fateOp != null && fateOps.contains(fateOp))
330335
|| txCancelledWhileNew(status, fateOp)) {
331-
while (fate.getKeepRunning().get() && !isShutdown()) {
336+
while (keepRunning.getAsBoolean() && !isShutdown()) {
332337
try {
333338
// The reason for calling transfer instead of queueing is avoid rescanning the
334339
// storage layer and adding the same thing over and over. For example if all

core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.Map;
2525
import java.util.Optional;
2626
import java.util.Set;
27-
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.function.BooleanSupplier;
2828
import java.util.function.Consumer;
2929
import java.util.stream.Stream;
3030

@@ -164,7 +164,7 @@ interface FateIdStatus {
164164
* is found or until the keepWaiting parameter is false. It will return once all runnable ids
165165
* found were passed to the consumer.
166166
*/
167-
void runnable(Set<FatePartition> partitions, AtomicBoolean keepWaiting,
167+
void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
168168
Consumer<FateIdStatus> idConsumer);
169169

170170
/**

core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.Optional;
2626
import java.util.Set;
2727
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.function.BooleanSupplier;
2929
import java.util.function.Consumer;
3030
import java.util.function.Function;
3131
import java.util.stream.Stream;
@@ -142,7 +142,7 @@ public Stream<FateKey> list(FateKey.FateKeyType type) {
142142
}
143143

144144
@Override
145-
public void runnable(Set<FatePartition> partitions, AtomicBoolean keepWaiting,
145+
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
146146
Consumer<FateIdStatus> idConsumer) {
147147
store.runnable(partitions, keepWaiting, idConsumer);
148148
}

core/src/test/java/org/apache/accumulo/core/fate/TestStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.util.Set;
3131
import java.util.UUID;
3232
import java.util.concurrent.CompletableFuture;
33-
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.function.BooleanSupplier;
3434
import java.util.function.Consumer;
3535
import java.util.stream.Stream;
3636

@@ -273,7 +273,7 @@ public Stream<FateKey> list(FateKey.FateKeyType type) {
273273
}
274274

275275
@Override
276-
public void runnable(Set<FatePartition> partitions, AtomicBoolean keepWaiting,
276+
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
277277
Consumer<FateIdStatus> idConsumer) {
278278
throw new UnsupportedOperationException();
279279
}

server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747

4848
/**
4949
* Partitions fate across manager assistant processes. This is done by assigning ranges of the fate
50-
* uuid key space to different processes.
50+
* uuid key space to different processes. The partitions are logical and do not correspond to the
51+
* physical partitioning of the fate table.
5152
*/
5253
public class FateManager {
5354

@@ -163,6 +164,10 @@ private Map<HostAndPort,Set<FatePartition>> computeDesiredAssignments(
163164
return desiredAssignments;
164165
}
165166

167+
/**
168+
* Computes a single partition for each worker such that the partition cover all possible UUIDs
169+
* and evenly divide the UUIDs.
170+
*/
166171
private Set<FatePartition> getDesiredPartitions(int numWorkers) {
167172
Preconditions.checkArgument(numWorkers >= 0);
168173

@@ -184,9 +189,9 @@ private Set<FatePartition> getDesiredPartitions(int numWorkers) {
184189
FateId.from(FateInstanceType.USER, endUuid)));
185190
}
186191

187-
// last one is
188192
long start = ((numWorkers - 1) * jump) << 4;
189193
UUID startUuid = new UUID(start, 0);
194+
// last partition has a special end uuid that is all f nibbles.
190195
UUID endUuid = new UUID(-1, -1);
191196
desired.add(new FatePartition(FateId.from(FateInstanceType.USER, startUuid),
192197
FateId.from(FateInstanceType.USER, endUuid)));

test/src/main/java/org/apache/accumulo/test/MultipleManagerIT.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,19 +51,22 @@ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSit
5151
public void test() throws Exception {
5252

5353
List<Process> managerWorkers = new ArrayList<>();
54+
// start two fate workers initially
5455
for (int i = 0; i < 2; i++) {
5556
managerWorkers.add(exec(ManagerWorker.class));
5657
}
5758

5859
var executor = Executors.newCachedThreadPool();
5960

61+
// This assigns fate partitions to fate worker processes, run it in a background thread.
6062
var fateMgr = new FateManager(getServerContext());
6163
var future = executor.submit(() -> {
6264
fateMgr.managerWorkers();
6365
return null;
6466
});
6567

6668
Thread.sleep(30_000);
69+
// start more fate workers, should see the partitions be shuffled eventually
6770
for (int i = 0; i < 3; i++) {
6871
managerWorkers.add(exec(ManagerWorker.class));
6972
}
@@ -76,9 +79,11 @@ public void test() throws Exception {
7679
var table = "t" + i;
7780
// TODO seeing in the logs that fate operations for the same table are running on different
7881
// processes, however there is a 5 second delay because there is no notification mechanism
82+
// currently.
7983

8084
// TODO its hard to find everything related to a table id in the logs, especially when the
81-
// table id is like "b"
85+
// table id is like "b". Was tring to follow a single table across multiple manager workers
86+
// processes.
8287
var tableOpsFuture = executor.submit(() -> {
8388
client.tableOperations().create(table);
8489
log.info("Created table {}", table);

test/src/main/java/org/apache/accumulo/test/fate/FateStoreITBase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx
201201
// Run and verify all 10 transactions still exist and were not
202202
// run because of the deferral time of all the transactions
203203
future = executor.submit(() -> store.runnable(Set.of(FatePartition.all(store.type())),
204-
keepRunning, fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
204+
keepRunning::get, fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
205205
Thread.sleep(2000);
206206
assertEquals(10, transactions.size());
207207
// Setting this flag to false should terminate the task if sleeping
@@ -227,7 +227,7 @@ protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext sctx
227227
// and removed from the store
228228
keepRunning.set(true);
229229
future = executor.submit(() -> store.runnable(Set.of(FatePartition.all(store.type())),
230-
keepRunning, fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
230+
keepRunning::get, fateIdStatus -> transactions.remove(fateIdStatus.getFateId())));
231231
Wait.waitFor(transactions::isEmpty);
232232
// Setting this flag to false should terminate the task if sleeping
233233
keepRunning.set(false);

0 commit comments

Comments
 (0)