Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
feat: integrate compare supports
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed May 14, 2023
1 parent 6817b5b commit 5537590
Showing 1 changed file with 55 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ private DeleteRangeResponse deleteRange(WriteTxn writeTxn, DeleteRangeRequest re
private boolean applyCompares(ReadTxn readTxn, List<Compare> compareList, long revision)
throws InvalidProtocolBufferException {
for (Compare compare : compareList) {
final RangeRequest req = RangeRequest.newBuilder().setKey(compare.getKey()).build();
final RangeRequest req =
RangeRequest.newBuilder().setKey(compare.getKey()).build();
final RangeResponse resp = range(readTxn, req, revision);
if (!applyCompare(compare, resp.getKvsList())) {
return false;
Expand All @@ -179,21 +180,25 @@ private boolean applyCompare(Compare compare, List<KeyValue> kvs) {

final Comparator<ByteString> comparator = ByteString.unsignedLexicographicalComparator();
for (KeyValue kv : kvs) {
final int result = switch (compare.getTarget()) {
case VERSION -> Long.compare(kv.getVersion(), compare.getVersion());
case VALUE -> comparator.compare(kv.getValue(), compare.getValue());
default -> throw new UnsupportedOperationException(compare.getTarget().name());
};

final boolean match = switch (compare.getResult()) {
case EQUAL -> result == 0;
case GREATER -> result > 0;
case LESS -> result < 0;
case NOT_EQUAL -> result != 0;
case GREATER_OR_EQUAL -> result >= 0;
case LESS_OR_EQUAL -> result <= 0;
default -> throw new UnsupportedOperationException(compare.getResult().name());
};
final int result =
switch (compare.getTarget()) {
case VERSION -> Long.compare(kv.getVersion(), compare.getVersion());
case VALUE -> comparator.compare(kv.getValue(), compare.getValue());
default -> throw new UnsupportedOperationException(
compare.getTarget().name());
};

final boolean match =
switch (compare.getResult()) {
case EQUAL -> result == 0;
case GREATER -> result > 0;
case LESS -> result < 0;
case NOT_EQUAL -> result != 0;
case GREATER_OR_EQUAL -> result >= 0;
case LESS_OR_EQUAL -> result <= 0;
default -> throw new UnsupportedOperationException(
compare.getResult().name());
};

if (!match) {
return false;
Expand All @@ -205,15 +210,25 @@ private boolean applyCompare(Compare compare, List<KeyValue> kvs) {

@Override
public CompletableFuture<Message> query(Message request) {
final List<RequestOp> requestList;
final TxnRequest txnRequest;
try {
final TxnRequest.Builder req = TxnRequest.newBuilder();
req.mergeFrom(request.getContent());
requestList = req.getSuccessList();
final TxnRequest.Builder builder = TxnRequest.newBuilder();
builder.mergeFrom(request.getContent());
txnRequest = builder.build();
} catch (InvalidProtocolBufferException e) {
return CompletableFuture.failedFuture(e);
}

final TermIndex termIndex = getLastAppliedTermIndex();

final boolean success;
try (final ReadTxn readTxn = backend.readTxn()) {
success = applyCompares(readTxn, txnRequest.getCompareList(), termIndex.getIndex());
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
final List<RequestOp> requestList = success ? txnRequest.getSuccessList() : txnRequest.getFailureList();

for (RequestOp requestOp : requestList) {
if (!requestOp.hasRequestRange()) {
final String message = "readonly message contains mutations: " + requestOp.getRequestCase();
Expand All @@ -222,7 +237,6 @@ public CompletableFuture<Message> query(Message request) {
}

final List<ResponseOp> responseOps = new ArrayList<>();
final TermIndex termIndex = getLastAppliedTermIndex();

try (final ReadTxn readTxn = backend.readTxn()) {
for (RequestOp requestOp : requestList) {
Expand All @@ -235,24 +249,34 @@ public CompletableFuture<Message> query(Message request) {
return CompletableFuture.failedFuture(e);
}

final TxnResponse resp =
TxnResponse.newBuilder().addAllResponses(responseOps).build();
final TxnResponse resp = TxnResponse.newBuilder()
.setSucceeded(success)
.addAllResponses(responseOps)
.build();
return CompletableFuture.completedFuture(Message.valueOf(resp));
}

@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final RaftProtos.LogEntryProto entry = trx.getLogEntry();

final List<RequestOp> requestList;
final TxnRequest txnRequest;
try {
final TxnRequest.Builder req = TxnRequest.newBuilder();
req.mergeFrom(entry.getStateMachineLogEntry().getLogData());
requestList = req.getSuccessList();
final TxnRequest.Builder builder = TxnRequest.newBuilder();
builder.mergeFrom(entry.getStateMachineLogEntry().getLogData());
txnRequest = builder.build();
} catch (InvalidProtocolBufferException e) {
return CompletableFuture.failedFuture(e);
}

final boolean success;
try (final ReadTxn readTxn = backend.readTxn()) {
success = applyCompares(readTxn, txnRequest.getCompareList(), entry.getIndex());
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
final List<RequestOp> requestList = success ? txnRequest.getSuccessList() : txnRequest.getFailureList();

final List<ResponseOp> responseOps = new ArrayList<>();
final AtomicLong sub = new AtomicLong();
try (final WriteTxn writeTxn = backend.writeTxn()) {
Expand Down Expand Up @@ -294,8 +318,10 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
}

updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
final TxnResponse resp =
TxnResponse.newBuilder().addAllResponses(responseOps).build();
final TxnResponse resp = TxnResponse.newBuilder()
.setSucceeded(success)
.addAllResponses(responseOps)
.build();
return CompletableFuture.completedFuture(Message.valueOf(resp));
}
}

0 comments on commit 5537590

Please sign in to comment.