Skip to content

Commit 28bee3a

Browse files
authored
Merge pull request #359 from ydb-platform/logging-improvements
Logging improvements for topic reader & writer
2 parents 4271d4a + b169df1 commit 28bee3a

File tree

6 files changed

+122
-106
lines changed

6 files changed

+122
-106
lines changed

topic/src/main/java/tech/ydb/topic/impl/GrpcStreamRetrier.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package tech.ydb.topic.impl;
22

3-
import java.util.UUID;
3+
import java.util.Random;
44
import java.util.concurrent.CompletableFuture;
55
import java.util.concurrent.RejectedExecutionException;
66
import java.util.concurrent.ScheduledExecutorService;
@@ -22,6 +22,9 @@ public abstract class GrpcStreamRetrier {
2222
private static final int EXP_BACKOFF_BASE_MS = 256;
2323
private static final int EXP_BACKOFF_CEILING_MS = 40000; // 40 sec (max delays would be 40-80 sec)
2424
private static final int EXP_BACKOFF_MAX_POWER = 7;
25+
private static final int ID_LENGTH = 6;
26+
private static final char[] ID_ALPHABET = "abcdefghijklmnopqrstuvwxyzABSDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
27+
.toCharArray();
2528

2629
protected final String id;
2730
protected final AtomicBoolean isReconnecting = new AtomicBoolean(false);
@@ -31,14 +34,22 @@ public abstract class GrpcStreamRetrier {
3134

3235
protected GrpcStreamRetrier(ScheduledExecutorService scheduler) {
3336
this.scheduler = scheduler;
34-
this.id = UUID.randomUUID().toString();
37+
this.id = generateRandomId(ID_LENGTH);
3538
}
3639

3740
protected abstract Logger getLogger();
3841
protected abstract String getStreamName();
3942
protected abstract void onStreamReconnect();
4043
protected abstract void onShutdown(String reason);
4144

45+
protected static String generateRandomId(int length) {
46+
return new Random().ints(0, ID_ALPHABET.length)
47+
.limit(length)
48+
.map(charId -> ID_ALPHABET[charId])
49+
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
50+
.toString();
51+
}
52+
4253
private void tryScheduleReconnect() {
4354
int currentReconnectCounter = reconnectCounter.get() + 1;
4455
if (MAX_RECONNECT_COUNT > 0 && currentReconnectCounter > MAX_RECONNECT_COUNT) {
@@ -49,8 +60,8 @@ private void tryScheduleReconnect() {
4960
shutdownImpl(errorMessage);
5061
return;
5162
} else {
52-
getLogger().debug("[{}] Maximum retry count ({}}) exceeded. But {} is already shut down.", id,
53-
MAX_RECONNECT_COUNT, getStreamName());
63+
getLogger().info("[{}] Maximum retry count ({}}) exceeded. Need to shutdown {} but it's already " +
64+
"shut down.", id, MAX_RECONNECT_COUNT, getStreamName());
5465
}
5566
}
5667
if (isReconnecting.compareAndSet(false, true)) {

topic/src/main/java/tech/ydb/topic/read/impl/CommitterImpl.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@ public CompletableFuture<Void> commit() {
2929

3030
public CompletableFuture<Void> commitImpl(boolean fromCommitter) {
3131
if (logger.isDebugEnabled()) {
32-
logger.debug("[{}] partition session {} (partition {}): committing {} message(s), offsets" +
33-
" [{},{})" + (fromCommitter ? " from Committer" : ""), partitionSession.getPath(),
34-
partitionSession.getId(), partitionSession.getPartitionId(), messageCount,
35-
offsetsToCommit.getStart(), offsetsToCommit.getEnd());
32+
logger.debug("[{}] Committing {} message(s), offsets [{},{})" + (fromCommitter ? " from Committer" : ""),
33+
partitionSession.getFullId(), messageCount, offsetsToCommit.getStart(), offsetsToCommit.getEnd());
3634
}
3735
return partitionSession.commitOffsetRange(offsetsToCommit);
3836
}

topic/src/main/java/tech/ydb/topic/read/impl/DeferredCommitterImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ private void add(OffsetsRange offsetRange) {
4141
rangesLock.unlock();
4242
}
4343
} catch (RuntimeException exception) {
44-
String errorMessage = "Error adding new offset range to DeferredCommitter for partition session " +
45-
partitionSession.getId() + " (partition " + partitionSession.getPartitionId() + "): " +
46-
exception.getMessage();
44+
String errorMessage = "[" + partitionSession.getFullId() + "] Error adding new offset range to " +
45+
"DeferredCommitter for partition session " + partitionSession.getId() + " (partition " +
46+
partitionSession.getPartitionId() + "): " + exception.getMessage();
4747
logger.error(errorMessage);
4848
throw new RuntimeException(errorMessage, exception);
4949
}

0 commit comments

Comments
 (0)