Skip to content

Commit 20c4e6e

Browse files
authored
Merge pull request #357 from alex268/master
Unite trace logs between transport and topics
2 parents f6b9cbf + 9c98639 commit 20c4e6e

File tree

7 files changed

+93
-79
lines changed

7 files changed

+93
-79
lines changed

core/src/main/java/tech/ydb/core/impl/call/ReadWriteStreamCall.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public CompletableFuture<Status> start(Observer<R> observer) {
8888
public void sendNext(W message) {
8989
synchronized (call) {
9090
if (flush()) {
91+
if (logger.isTraceEnabled()) {
92+
String msg = TextFormat.shortDebugString((Message) message);
93+
logger.trace("ReadWriteStreamCall[{}] --> {}", traceId, msg);
94+
}
9195
call.sendMessage(message);
9296
} else {
9397
messagesQueue.add(message);
@@ -103,7 +107,8 @@ private boolean flush() {
103107
}
104108

105109
if (logger.isTraceEnabled()) {
106-
logger.trace("ReadWriteStreamCall[{}] --> {}", traceId, TextFormat.shortDebugString((Message) next));
110+
String msg = TextFormat.shortDebugString((Message) next);
111+
logger.trace("ReadWriteStreamCall[{}] --> {}", traceId, msg);
107112
}
108113
call.sendMessage(next);
109114
}

topic/src/main/java/tech/ydb/topic/TopicRpc.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,13 @@ CompletableFuture<Result<YdbTopic.DescribeConsumerResult>> describeConsumer(
7575
CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffsetsInTransactionRequest request,
7676
GrpcRequestSettings settings);
7777

78-
GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient> writeSession();
78+
GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient> writeSession(
79+
String traceId
80+
);
7981

80-
GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient> readSession();
82+
GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient> readSession(
83+
String traceId
84+
);
8185

8286
ScheduledExecutorService getScheduler();
8387
}

topic/src/main/java/tech/ydb/topic/impl/GrpcTopicRpc.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -91,21 +91,21 @@ public CompletableFuture<Status> updateOffsetsInTransaction(YdbTopic.UpdateOffse
9191
}
9292

9393
@Override
94-
public GrpcReadWriteStream<
95-
YdbTopic.StreamWriteMessage.FromServer,
96-
YdbTopic.StreamWriteMessage.FromClient
97-
> writeSession() {
98-
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamWriteMethod(),
99-
GrpcRequestSettings.newBuilder().build());
94+
public GrpcReadWriteStream<YdbTopic.StreamWriteMessage.FromServer, YdbTopic.StreamWriteMessage.FromClient>
95+
writeSession(String streamId) {
96+
return transport.readWriteStreamCall(
97+
TopicServiceGrpc.getStreamWriteMethod(),
98+
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
99+
);
100100
}
101101

102102
@Override
103-
public GrpcReadWriteStream<
104-
YdbTopic.StreamReadMessage.FromServer,
105-
YdbTopic.StreamReadMessage.FromClient
106-
> readSession() {
107-
return transport.readWriteStreamCall(TopicServiceGrpc.getStreamReadMethod(),
108-
GrpcRequestSettings.newBuilder().build());
103+
public GrpcReadWriteStream<YdbTopic.StreamReadMessage.FromServer, YdbTopic.StreamReadMessage.FromClient>
104+
readSession(String streamId) {
105+
return transport.readWriteStreamCall(
106+
TopicServiceGrpc.getStreamReadMethod(),
107+
GrpcRequestSettings.newBuilder().withTraceId(streamId).build()
108+
);
109109
}
110110

111111
@Override

topic/src/main/java/tech/ydb/topic/read/impl/ReadSession.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
public abstract class ReadSession extends SessionBase<FromServer, FromClient> {
1616
private static final Logger logger = LoggerFactory.getLogger(ReadSession.class);
1717

18-
public ReadSession(TopicRpc rpc) {
19-
super(rpc.readSession());
18+
protected final String streamId;
19+
20+
public ReadSession(TopicRpc rpc, String streamId) {
21+
super(rpc.readSession(streamId));
22+
this.streamId = streamId;
2023
}
2124

2225
@Override

0 commit comments

Comments
 (0)