Skip to content

Commit 2b0ddee

Browse files
committed
[improve] [log] Print source client addr when enabled haProxyProtocolEnabled (apache#22686)
(cherry picked from commit d77c5de)
1 parent 6e499fe commit 2b0ddee

File tree

7 files changed

+67
-27
lines changed

7 files changed

+67
-27
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -934,7 +934,7 @@ public KeySharedMeta getKeySharedMeta() {
934934
public String toString() {
935935
if (subscription != null && cnx != null) {
936936
return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId)
937-
.add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString();
937+
.add("consumerName", consumerName).add("address", this.cnx.toString()).toString();
938938
} else {
939939
return MoreObjects.toStringHelper(this).add("consumerId", consumerId)
940940
.add("consumerName", consumerName).toString();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ public Map<String, String> getMetadata() {
666666

667667
@Override
668668
public String toString() {
669-
return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.clientAddress())
669+
return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.toString())
670670
.add("producerName", producerName).add("producerId", producerId).toString();
671671
}
672672

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1201,7 +1201,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
12011201
remoteAddress, getPrincipal());
12021202
}
12031203

1204-
log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.ctx().channel().toString(),
1204+
log.info("[{}] Subscribing on topic {} / {}. consumerId: {}", this.toString(),
12051205
topicName, subscriptionName, consumerId);
12061206
try {
12071207
Metadata.validateMetadata(metadata,
@@ -1921,7 +1921,7 @@ protected void handleAck(CommandAck ack) {
19211921
if (log.isDebugEnabled()) {
19221922
log.debug("Consumer future is not complete(not complete or error), but received command ack. so discard"
19231923
+ " this command. consumerId: {}, cnx: {}, messageIdCount: {}", ack.getConsumerId(),
1924-
this.ctx().channel().toString(), ack.getMessageIdsCount());
1924+
this.toString(), ack.getMessageIdsCount());
19251925
}
19261926
}
19271927
}
@@ -2267,7 +2267,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
22672267
@Override
22682268
public String toString() {
22692269
return String.format("ServerCnx [%s] get largest batch index when possible",
2270-
ServerCnx.this.ctx.channel());
2270+
ServerCnx.this.toString());
22712271
}
22722272
}, null);
22732273

@@ -3301,7 +3301,7 @@ private void disableTcpNoDelayIfNeeded(String topic, String producerName) {
33013301
}
33023302
} catch (Throwable t) {
33033303
log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", topic, producerName,
3304-
ctx.channel());
3304+
this.toString());
33053305
}
33063306
}
33073307
}
@@ -3364,6 +3364,31 @@ public SocketAddress getRemoteAddress() {
33643364
return remoteAddress;
33653365
}
33663366

3367+
/**
3368+
* Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038] [SR:/240.240.0.5:58038].
3369+
* L: local Address.
3370+
* R: remote address.
3371+
* SR: source remote address. It is the source address when enabled "haProxyProtocolEnabled".
3372+
*/
3373+
@Override
3374+
public String toString() {
3375+
ChannelHandlerContext ctx = ctx();
3376+
// ctx.channel(): 96.
3377+
// clientSourceAddress: 5 + 46(ipv6).
3378+
// state: 19.
3379+
// Len = 166.
3380+
StringBuilder buf = new StringBuilder(166);
3381+
if (ctx == null) {
3382+
buf.append("[ctx: null]");
3383+
} else {
3384+
buf.append(ctx.channel().toString());
3385+
}
3386+
String clientSourceAddr = clientSourceAddress();
3387+
buf.append(" [SR:").append(clientSourceAddr == null ? "-" : clientSourceAddr)
3388+
.append(", state:").append(state).append("]");
3389+
return buf.toString();
3390+
}
3391+
33673392
@Override
33683393
public BrokerService getBrokerService() {
33693394
return service;
@@ -3510,7 +3535,7 @@ public CompletableFuture<Optional<Boolean>> checkConnectionLiveness() {
35103535
ctx.executor().schedule(() -> {
35113536
if (finalConnectionCheckInProgress == connectionCheckInProgress
35123537
&& !finalConnectionCheckInProgress.isDone()) {
3513-
log.warn("[{}] Connection check timed out. Closing connection.", remoteAddress);
3538+
log.warn("[{}] Connection check timed out. Closing connection.", this.toString());
35143539
ctx.close();
35153540
}
35163541
}, connectionLivenessCheckTimeoutMillis, TimeUnit.MILLISECONDS);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void decrementThrottleCount() {
8787
private void changeAutoRead(boolean autoRead) {
8888
if (isChannelActive()) {
8989
if (log.isDebugEnabled()) {
90-
log.debug("[{}] Setting auto read to {}", serverCnx.ctx().channel(), autoRead);
90+
log.debug("[{}] Setting auto read to {}", serverCnx.toString(), autoRead);
9191
}
9292
// change the auto read flag on the channel
9393
serverCnx.ctx().channel().config().setAutoRead(autoRead);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
131131
} else {
132132
msg += "Pattern longer than maximum: " + maxSubscriptionPatternLength;
133133
}
134-
log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), msg, namespaceName);
134+
log.warn("[{}] {} on namespace {}", connection.toString(), msg, namespaceName);
135135
connection.getCommandSender().sendErrorResponse(requestId, ServerError.NotAllowedError, msg);
136136
lookupSemaphore.release();
137137
return;
@@ -144,14 +144,14 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
144144
TopicListWatcher watcher = existingWatcherFuture.getNow(null);
145145
log.info("[{}] Watcher with the same id is already created:"
146146
+ " watcherId={}, watcher={}",
147-
connection.getRemoteAddress(), watcherId, watcher);
147+
connection.toString(), watcherId, watcher);
148148
watcherFuture = existingWatcherFuture;
149149
} else {
150150
// There was an early request to create a watcher with the same watcherId. This can happen when
151151
// client timeout is lower the broker timeouts. We need to wait until the previous watcher
152152
// creation request either completes or fails.
153153
log.warn("[{}] Watcher with id is already present on the connection,"
154-
+ " consumerId={}", connection.getRemoteAddress(), watcherId);
154+
+ " consumerId={}", connection.toString(), watcherId);
155155
ServerError error;
156156
if (!existingWatcherFuture.isDone()) {
157157
error = ServerError.ServiceNotReady;
@@ -179,14 +179,14 @@ public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, lo
179179
if (log.isDebugEnabled()) {
180180
log.debug(
181181
"[{}] Received WatchTopicList for namespace [//{}] by {}",
182-
connection.getRemoteAddress(), namespaceName, requestId);
182+
connection.toString(), namespaceName, requestId);
183183
}
184184
connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, hash, topicList);
185185
lookupSemaphore.release();
186186
})
187187
.exceptionally(ex -> {
188188
log.warn("[{}] Error WatchTopicList for namespace [//{}] by {}",
189-
connection.getRemoteAddress(), namespaceName, requestId);
189+
connection.toString(), namespaceName, requestId);
190190
connection.getCommandSender().sendErrorResponse(requestId,
191191
BrokerServiceException.getClientErrorCode(
192192
new BrokerServiceException.ServerMetadataException(ex)), ex.getMessage());
@@ -213,7 +213,7 @@ public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watc
213213
} else {
214214
if (!watcherFuture.complete(watcher)) {
215215
log.warn("[{}] Watcher future was already completed. Deregistering watcherId={}.",
216-
connection.getRemoteAddress(), watcherId);
216+
connection.toString(), watcherId);
217217
topicResources.deregisterPersistentTopicListener(watcher);
218218
}
219219
}
@@ -232,7 +232,7 @@ public void deleteTopicListWatcher(Long watcherId) {
232232
CompletableFuture<TopicListWatcher> watcherFuture = watchers.get(watcherId);
233233
if (watcherFuture == null) {
234234
log.info("[{}] TopicListWatcher was not registered on the connection: {}",
235-
watcherId, connection.getRemoteAddress());
235+
watcherId, connection.toString());
236236
return;
237237
}
238238

@@ -242,22 +242,22 @@ public void deleteTopicListWatcher(Long watcherId) {
242242
// watcher future as failed and we can tell the client the close operation was successful. When the actual
243243
// create operation will complete, the new watcher will be discarded.
244244
log.info("[{}] Closed watcher before its creation was completed. watcherId={}",
245-
connection.getRemoteAddress(), watcherId);
245+
connection.toString(), watcherId);
246246
watchers.remove(watcherId);
247247
return;
248248
}
249249

250250
if (watcherFuture.isCompletedExceptionally()) {
251251
log.info("[{}] Closed watcher that already failed to be created. watcherId={}",
252-
connection.getRemoteAddress(), watcherId);
252+
connection.toString(), watcherId);
253253
watchers.remove(watcherId);
254254
return;
255255
}
256256

257257
// Proceed with normal watcher close
258258
topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null));
259259
watchers.remove(watcherId);
260-
log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId);
260+
log.info("[{}] Closed watcher, watcherId={}", connection.toString(), watcherId);
261261
}
262262

263263
/**

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
122122
cmd.parseFrom(buffer, cmdSize);
123123

124124
if (log.isDebugEnabled()) {
125-
log.debug("[{}] Received cmd {}", ctx.channel().remoteAddress(), cmd.getType());
125+
log.debug("[{}] Received cmd {}", ctx.channel(), cmd.getType());
126126
}
127127
messageReceived();
128128

pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
6767
this.ctx = ctx;
6868

6969
if (log.isDebugEnabled()) {
70-
log.debug("[{}] Scheduling keep-alive task every {} s", ctx.channel(), keepAliveIntervalSeconds);
70+
log.debug("[{}] Scheduling keep-alive task every {} s", this.toString(), keepAliveIntervalSeconds);
7171
}
7272
if (keepAliveIntervalSeconds > 0) {
7373
this.keepAliveTask = ctx.executor()
@@ -85,13 +85,13 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
8585
protected final void handlePing(CommandPing ping) {
8686
// Immediately reply success to ping requests
8787
if (log.isDebugEnabled()) {
88-
log.debug("[{}] Replying back to ping message", ctx.channel());
88+
log.debug("[{}] Replying back to ping message", this.toString());
8989
}
9090
ctx.writeAndFlush(Commands.newPong())
9191
.addListener(future -> {
9292
if (!future.isSuccess()) {
9393
log.warn("[{}] Forcing connection to close since cannot send a pong message.",
94-
ctx.channel(), future.cause());
94+
toString(), future.cause());
9595
ctx.close();
9696
}
9797
});
@@ -107,24 +107,24 @@ private void handleKeepAliveTimeout() {
107107
}
108108

109109
if (!isHandshakeCompleted()) {
110-
log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", ctx.channel());
110+
log.warn("[{}] Pulsar Handshake was not completed within timeout, closing connection", this.toString());
111111
ctx.close();
112112
} else if (waitingForPingResponse && ctx.channel().config().isAutoRead()) {
113113
// We were waiting for a response and another keep-alive just completed.
114114
// If auto-read was disabled, it means we stopped reading from the connection, so we might receive the Ping
115115
// response later and thus not enforce the strict timeout here.
116-
log.warn("[{}] Forcing connection to close after keep-alive timeout", ctx.channel());
116+
log.warn("[{}] Forcing connection to close after keep-alive timeout", this.toString());
117117
ctx.close();
118118
} else if (getRemoteEndpointProtocolVersion() >= ProtocolVersion.v1.getValue()) {
119119
// Send keep alive probe to peer only if it supports the ping/pong commands, added in v1
120120
if (log.isDebugEnabled()) {
121-
log.debug("[{}] Sending ping message", ctx.channel());
121+
log.debug("[{}] Sending ping message", this.toString());
122122
}
123123
waitingForPingResponse = true;
124124
sendPing();
125125
} else {
126126
if (log.isDebugEnabled()) {
127-
log.debug("[{}] Peer doesn't support keep-alive", ctx.channel());
127+
log.debug("[{}] Peer doesn't support keep-alive", this.toString());
128128
}
129129
}
130130
}
@@ -134,7 +134,7 @@ protected ChannelFuture sendPing() {
134134
.addListener(future -> {
135135
if (!future.isSuccess()) {
136136
log.warn("[{}] Forcing connection to close since cannot send a ping message.",
137-
ctx.channel(), future.cause());
137+
this.toString(), future.cause());
138138
ctx.close();
139139
}
140140
});
@@ -152,5 +152,20 @@ public void cancelKeepAliveTask() {
152152
*/
153153
protected abstract boolean isHandshakeCompleted();
154154

155+
/**
156+
* Demo: [id: 0x2561bcd1, L:/10.0.136.103:6650 ! R:/240.240.0.5:58038].
157+
* L: local Address.
158+
* R: remote address.
159+
*/
160+
@Override
161+
public String toString() {
162+
ChannelHandlerContext ctx = this.ctx;
163+
if (ctx == null) {
164+
return "[ctx: null]";
165+
} else {
166+
return ctx.channel().toString();
167+
}
168+
}
169+
155170
private static final Logger log = LoggerFactory.getLogger(PulsarHandler.class);
156171
}

0 commit comments

Comments
 (0)