Skip to content

Commit 19f3f61

Browse files
committed
Update transaction examples
1 parent 01a767f commit 19f3f61

File tree

3 files changed

+19
-15
lines changed

3 files changed

+19
-15
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@
88
**/.idea/
99
**/*.iml
1010
**/*.iws
11-
ydb-cookbook/logs/app.log
11+
12+
# Log file
13+
*.log

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadAsync.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public class TransactionReadAsync extends SimpleExample {
4343
private static final int MESSAGES_COUNT = 5;
4444

4545
private final CompletableFuture<Void> messageReceivedFuture = new CompletableFuture<>();
46-
private long lastSeqNo = -1;
4746
private TableClient tableClient;
4847
private AsyncReader reader;
4948

@@ -80,6 +79,14 @@ protected void run(GrpcTransport transport, String pathPrefix) {
8079
}
8180
}
8281

82+
public static void analyzeCommitStatus(Status status) {
83+
if (status.isSuccess()) {
84+
logger.info("Transaction committed successfully");
85+
} else {
86+
logger.error("Failed to commit transaction: {}", status);
87+
}
88+
}
89+
8390
private class Handler extends AbstractReadEventHandler {
8491
private final AtomicInteger messageCounter = new AtomicInteger(0);
8592

@@ -122,13 +129,6 @@ public void onMessages(DataReceivedEvent event) {
122129
} else {
123130
logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
124131
}
125-
if (lastSeqNo > message.getSeqNo()) {
126-
logger.error("Received a message with seqNo {}. Previously got a message with seqNo {}",
127-
message.getSeqNo(), lastSeqNo);
128-
messageReceivedFuture.complete(null);
129-
} else {
130-
lastSeqNo = message.getSeqNo();
131-
}
132132

133133
// creating session and transaction
134134
Result<Session> sessionResult = tableClient.createSession(Duration.ofSeconds(10)).join();
@@ -154,7 +154,9 @@ public void onMessages(DataReceivedEvent event) {
154154
return; // retry or shutdown
155155
}
156156

157-
transaction.commit().join();
157+
Status commitStatus = transaction.commit().join();
158+
analyzeCommitStatus(commitStatus);
159+
158160
if (messageCounter.incrementAndGet() >= MESSAGES_COUNT) {
159161
logger.info("{} messages committed in transaction. Finishing reading.", MESSAGES_COUNT);
160162
messageReceivedFuture.complete(null);

ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import tech.ydb.topic.read.Message;
1919
import tech.ydb.topic.read.SyncReader;
2020
import tech.ydb.topic.settings.ReaderSettings;
21+
import tech.ydb.topic.settings.ReceiveSettings;
2122
import tech.ydb.topic.settings.TopicReadSettings;
2223

2324
/**
@@ -63,12 +64,10 @@ protected void run(GrpcTransport transport, String pathPrefix) {
6364
session.executeDataQuery("SELECT 1", TxControl.id(transaction)).join();
6465
// analyzeQueryResultIfNeeded();
6566

66-
// Set transaction for reader.
67-
// Further messages will be committed automatically with this transaction
68-
reader.setTransaction(transaction);
69-
7067
//Session session
71-
Message message = reader.receive();
68+
Message message = reader.receive(ReceiveSettings.newBuilder()
69+
.setTransaction(transaction)
70+
.build());
7271
byte[] messageData;
7372
try {
7473
messageData = message.getData();
@@ -79,6 +78,7 @@ protected void run(GrpcTransport transport, String pathPrefix) {
7978
logger.info("Message received: {}", new String(messageData, StandardCharsets.UTF_8));
8079

8180
transaction.commit().join();
81+
// analyze commit status
8282
}
8383
} catch (InterruptedException exception) {
8484
logger.error("Interrupted exception while waiting for message: ", exception);

0 commit comments

Comments
 (0)