Skip to content

Commit 2b11a54

Browse files
committed
WIP
1 parent e1597f7 commit 2b11a54

File tree

4 files changed

+33
-21
lines changed

4 files changed

+33
-21
lines changed

core/src/main/java/org/apache/accumulo/core/fate/Fate.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,8 @@ public void run() {
224224
fe -> fe.getFateOps().equals(fateOps) && fe.getName().equals(fateExecutorName))) {
225225
log.debug("[{}] Adding FateExecutor for {} with {} threads", store.type(), fateOps,
226226
poolSize);
227-
var fateExecutor = new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName);
227+
var fateExecutor =
228+
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName);
228229
fateExecutors.add(fateExecutor);
229230
fateExecutor.setPartitions(currentPartitions);
230231
}
@@ -575,11 +576,23 @@ public void close() {
575576
store.close();
576577
}
577578

578-
public void setPartitions(Set<FatePartition> partitions) {
579+
public Set<FatePartition> getPartitions() {
580+
synchronized (fateExecutors) {
581+
return currentPartitions;
582+
}
583+
}
584+
585+
public boolean setPartitions(Set<FatePartition> expected, Set<FatePartition> partitions) {
586+
Objects.requireNonNull(expected);
579587
Objects.requireNonNull(partitions);
580588
synchronized (fateExecutors) {
581-
currentPartitions = Set.copyOf(partitions);
582-
fateExecutors.forEach(fe -> fe.setPartitions(currentPartitions));
589+
if (currentPartitions.equals(expected)) {
590+
currentPartitions = Set.copyOf(partitions);
591+
fateExecutors.forEach(fe -> fe.setPartitions(currentPartitions));
592+
return true;
593+
} else {
594+
return false;
595+
}
583596
}
584597
}
585598

server/manager/src/main/java/org/apache/accumulo/manager/Manager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1285,7 +1285,7 @@ protected Fate<FateEnv> initializeFateInstance(ServerContext context, FateStore<
12851285
.scheduleWithFixedDelay(fateCleaner::ageOff, 10, 4 * 60, MINUTES));
12861286

12871287
if (store.type() == FateInstanceType.META) {
1288-
fateInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META)));
1288+
fateInstance.setPartitions(Set.of(), Set.of(FatePartition.all(FateInstanceType.META)));
12891289
} // else do not run user transactions for now in the manager... it will have an empty set of
12901290
// partitions
12911291

server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void managerWorkers() throws Exception {
8181
Set<FatePartition> partitions = entry.getValue();
8282
var curr = currentAssignments.getOrDefault(worker, Set.of());
8383
if (!Sets.difference(curr, partitions).isEmpty()) {
84-
// This worker has extra partitions that are not desired, unload those
84+
// This worker has extra partitions that are not desired
8585
var intersection = Sets.intersection(curr, partitions);
8686
if (!setWorkerPartitions(worker, curr, intersection)) {
8787
log.debug("Failed to set partitions for {} to {}", worker, intersection);

server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,11 @@ public List<TFatePartition> getPartitions(TInfo tinfo, TCredentials credentials)
8080
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
8181
}
8282

83-
synchronized (currentPartitions) {
84-
return currentPartitions.stream().map(FatePartition::toThrift).toList();
83+
var localFate = fate;
84+
if (localFate == null) {
85+
return List.of();
86+
} else {
87+
return localFate.getPartitions().stream().map(FatePartition::toThrift).toList();
8588
}
8689
}
8790

@@ -93,21 +96,17 @@ public boolean setPartitions(TInfo tinfo, TCredentials credentials, List<TFatePa
9396
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
9497
}
9598

96-
var expectedSet = expected.stream().map(FatePartition::from).collect(Collectors.toSet());
97-
synchronized (currentPartitions) {
98-
if (currentPartitions.equals(expectedSet) && fate != null) {
99-
expectedSet.forEach(p -> log.info("old partition {}", p));
100-
currentPartitions.clear();
101-
desired.stream().map(FatePartition::from).forEach(currentPartitions::add);
102-
desired.stream().map(FatePartition::from).forEach(p -> log.info("new partition {}", p));
103-
log.info("Changed partitions from {} to {}", expectedSet, currentPartitions);
104-
fate.setPartitions(Set.copyOf(currentPartitions));
99+
var localFate = fate;
100+
if (localFate != null) {
101+
var expectedSet = expected.stream().map(FatePartition::from).collect(Collectors.toSet());
102+
var desiredSet = desired.stream().map(FatePartition::from).collect(Collectors.toSet());
103+
if (localFate.setPartitions(expectedSet, desiredSet)) {
104+
log.info("Changed partitions from {} to {}", expectedSet, desiredSet);
105105
return true;
106-
} else {
107-
log.info("Did not change partitions to {} because {} != {}", desired, expectedSet,
108-
currentPartitions);
109-
return false;
110106
}
111107
}
108+
109+
log.info("Did not change partitions to {}", desired);
110+
return false;
112111
}
113112
}

0 commit comments

Comments
 (0)