Skip to content

Commit

Permalink
[pinpoint-apm#4472] Propagate trace even when header is not supplied
Browse files Browse the repository at this point in the history
  • Loading branch information
Xylus committed Sep 19, 2018
1 parent 6d8947f commit 39e2f80
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private RabbitMQClientConstants() {
public static final ServiceType RABBITMQ_CLIENT = ServiceTypeFactory.of(8300, "RABBITMQ_CLIENT", QUEUE, RECORD_STATISTICS);
public static final ServiceType RABBITMQ_CLIENT_INTERNAL = ServiceTypeFactory.of(8301, "RABBITMQ_CLIENT_INTERNAL", "RABBITMQ_CLIENT");

public static final String RABBITMQ_SCOPE = "rabbitmqScope";
public static final String RABBITMQ_PRODUCER_SCOPE = "rabbitmqProducerScope";
public static final String RABBITMQ_CONSUMER_SCOPE = "rabbitmqConsumerScope";
public static final String RABBITMQ_FRAME_HANDLER_CREATION_SCOPE = "rabbitmqFrameHandlerCreationScope";
public static final String RABBITMQ_TEMPLATE_API_SCOPE = "rabbitmqTemplateApiScope";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,12 @@ private ChannelTransformCallback(boolean traceProducer, boolean traceConsumer) {
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer);
if (traceProducer) {
// copy AMQP.BasicProperties
target.weave("com.navercorp.pinpoint.plugin.rabbitmq.client.aspect.ChannelAspect");

final InstrumentMethod basicPublish = target.getDeclaredMethod("basicPublish", "java.lang.String", "java.lang.String", "boolean", "boolean", "com.rabbitmq.client.AMQP$BasicProperties", "byte[]");
if (basicPublish != null) {
basicPublish.addScopedInterceptor("com.navercorp.pinpoint.plugin.rabbitmq.client.interceptor.ChannelBasicPublishInterceptor", RabbitMQClientConstants.RABBITMQ_SCOPE);
basicPublish.addScopedInterceptor("com.navercorp.pinpoint.plugin.rabbitmq.client.interceptor.ChannelBasicPublishInterceptor", RabbitMQClientConstants.RABBITMQ_PRODUCER_SCOPE, ExecutionPolicy.BOUNDARY);
}
}
if (traceConsumer) {
Expand Down Expand Up @@ -155,6 +158,18 @@ public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, Strin
return target.toBytecode();
}
});
// AMQCommand - for pinpoint header propagation
transformTemplate.transform("com.rabbitmq.client.impl.AMQCommand", new TransformCallback() {
@Override
public byte[] doInTransform(Instrumentor instrumentor, ClassLoader loader, String className, Class<?> classBeingRedefined, ProtectionDomain protectionDomain, byte[] classfileBuffer) throws InstrumentException {
InstrumentClass target = instrumentor.getInstrumentClass(loader, className, classfileBuffer);
InstrumentMethod constructor = target.getConstructor("com.rabbitmq.client.Method", "com.rabbitmq.client.impl.AMQContentHeader", "byte[]");
if (constructor != null) {
constructor.addScopedInterceptor("com.navercorp.pinpoint.plugin.rabbitmq.client.interceptor.AMQCommandConstructInterceptor", RabbitMQClientConstants.RABBITMQ_PRODUCER_SCOPE, ExecutionPolicy.INTERNAL);
}
return target.toBytecode();
}
});
}

private void addAMQChannelEditor(final Filter<String> excludeExchangeFilter) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.rabbitmq.client.aspect;

import com.navercorp.pinpoint.bootstrap.instrument.aspect.Aspect;
import com.navercorp.pinpoint.bootstrap.instrument.aspect.JointPoint;
import com.navercorp.pinpoint.bootstrap.instrument.aspect.PointCut;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.MessageProperties;

/**
* Make a copy of {@code AMQP.BasicProperties} to inject pinpoint headers.
*
* @author HyunGil Jeong
*/
@Aspect
public abstract class ChannelAspect {
@PointCut
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
AMQP.BasicProperties props, byte[] body) {
AMQP.BasicProperties sourceProps = props;
if (sourceProps == null) {
sourceProps = MessageProperties.MINIMAL_BASIC;
}
AMQP.BasicProperties useProps = copy(sourceProps);
__basicPublish(exchange, routingKey, mandatory, immediate, useProps, body);
}

private AMQP.BasicProperties copy(AMQP.BasicProperties source) {
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentType(source.getContentType());
builder.contentEncoding(source.getContentEncoding());
builder.headers(source.getHeaders());
builder.deliveryMode(source.getDeliveryMode());
builder.priority(source.getPriority());
builder.correlationId(source.getCorrelationId());
builder.replyTo(source.getReplyTo());
builder.expiration(source.getExpiration());
builder.messageId(source.getMessageId());
builder.timestamp(source.getTimestamp());
builder.type(source.getType());
builder.userId(source.getUserId());
builder.appId(source.getAppId());
builder.clusterId(source.getClusterId());
return builder.build();
}

@JointPoint
abstract void __basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
AMQP.BasicProperties props, byte[] body);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.plugin.rabbitmq.client.interceptor;

import com.navercorp.pinpoint.bootstrap.context.Trace;
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor;
import com.navercorp.pinpoint.bootstrap.interceptor.scope.InterceptorScope;
import com.navercorp.pinpoint.bootstrap.logging.PLogger;
import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory;
import com.navercorp.pinpoint.common.util.MapUtils;
import com.navercorp.pinpoint.plugin.rabbitmq.client.RabbitMQClientConstants;
import com.navercorp.pinpoint.plugin.rabbitmq.client.field.setter.HeadersFieldSetter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Method;

import java.util.HashMap;
import java.util.Map;

/**
* <p>{@code AMQContentHeader} received as an argument to the constructor is sharable and can be reused, such as
* {@code MessageProperties.MINIMAL_BASIC}. Any changes made to it (ie injecting pinpoint headers) may have undesirable
* consequences.
*
* <p>Hence, we make a copy via {@code ChannelAspect} and add pinpoint headers to it when propagating trace, and have
* {@code AMQCommand} use this.
*
* @author HyunGil Jeong
*
* @see com.navercorp.pinpoint.plugin.rabbitmq.client.aspect.ChannelAspect
*/
public class AMQCommandConstructInterceptor implements AroundInterceptor {

// AMQP spec
private static final String AMQP_METHOD_TO_INTERCEPT = "basic.publish";

private final PLogger logger = PLoggerFactory.getLogger(this.getClass());
private final boolean isDebug = logger.isDebugEnabled();

private final TraceContext traceContext;
private final InterceptorScope scope;

public AMQCommandConstructInterceptor(TraceContext traceContext, InterceptorScope scope) {
this.traceContext = traceContext;
this.scope = scope;
}

@Override
public void before(Object target, Object[] args) {
if (!validate(target, args)) {
return;
}

if (isDebug) {
logger.beforeInterceptor(target, args);
}

Trace trace = traceContext.currentRawTraceObject();
if (trace == null) {
return;
}

final AMQP.BasicProperties properties = (AMQP.BasicProperties) args[1];
final Map<String, Object> headers = createHeader(properties, trace);
if (headers != null) {
((HeadersFieldSetter) properties)._$PINPOINT$_setHeaders(headers);
}
}

private Map<String, Object> createHeader(AMQP.BasicProperties properties, Trace trace) {
final Map<String, Object> headers = copyHeader(properties);
if (trace.canSampled()) {
TraceId nextId = retrieveNextTraceId();
if (nextId == null) {
return null;
}
headers.put(RabbitMQClientConstants.META_TRACE_ID, nextId.getTransactionId());
headers.put(RabbitMQClientConstants.META_SPAN_ID, Long.toString(nextId.getSpanId()));
headers.put(RabbitMQClientConstants.META_PARENT_SPAN_ID, Long.toString(nextId.getParentSpanId()));
headers.put(RabbitMQClientConstants.META_PARENT_APPLICATION_TYPE, Short.toString(traceContext.getServerTypeCode()));
headers.put(RabbitMQClientConstants.META_PARENT_APPLICATION_NAME, traceContext.getApplicationName());
headers.put(RabbitMQClientConstants.META_FLAGS, Short.toString(nextId.getFlags()));
} else {
headers.put(RabbitMQClientConstants.META_SAMPLED, "1");
}
return headers;
}

private Map<String, Object> copyHeader(AMQP.BasicProperties properties) {
final Map<String, Object> headers = properties.getHeaders();
if (MapUtils.isEmpty(headers)) {
return new HashMap<String, Object>();
}
// headers wrapped as unmodifiable map
return new HashMap<String, Object>(headers);
}

private TraceId retrieveNextTraceId() {
Object attachment = scope.getCurrentInvocation().getAttachment();
if (attachment == null) {
if (isDebug) {
logger.debug("Invalid attachment. Expected {}, but got null", TraceId.class.getName());
}
return null;
}
if (!(attachment instanceof TraceId)) {
if (isDebug) {
logger.debug("Invalid attachment. Expected {}, but got {}", TraceId.class.getName(), attachment.getClass());
}
return null;
}
return (TraceId) attachment;
}

@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (!validate(target, args)) {
return;
}

if (isDebug) {
logger.afterInterceptor(target, args);
}
}

private boolean validate(Object target, Object[] args) {
if (args == null) {
if (isDebug) {
logger.debug("Expected arguments, but found none.");
}
return false;
}
if (args.length != 3) {
if (isDebug) {
logger.debug("Expected 3 arguments, but found {}", args.length);
}
return false;
}
Object method = args[0];
if (method == null) {
// valid, but this won't be null producer side
return false;
}
if (!(method instanceof Method)) {
if (isDebug) {
logger.debug("Expected args[0] to be {}, but was {}", Method.class.getName(), method.getClass().getName());
}
return false;
}
if (!AMQP_METHOD_TO_INTERCEPT.equals(((Method) method).protocolMethodName())) {
return false;
}
Object contentHeader = args[1];
if (!(contentHeader instanceof AMQP.BasicProperties)) {
// skip header injection for null, or non AMQP.BasicProperties header
return false;
}
if (!(contentHeader instanceof HeadersFieldSetter)) {
if (isDebug) {
logger.debug("Invalid args[1]({}) object. Need field setter({})", contentHeader, HeadersFieldSetter.class.getName());
}
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,23 @@
import com.navercorp.pinpoint.bootstrap.context.TraceContext;
import com.navercorp.pinpoint.bootstrap.context.TraceId;
import com.navercorp.pinpoint.bootstrap.interceptor.AroundInterceptor;
import com.navercorp.pinpoint.bootstrap.interceptor.scope.InterceptorScope;
import com.navercorp.pinpoint.bootstrap.interceptor.scope.InterceptorScopeInvocation;
import com.navercorp.pinpoint.bootstrap.logging.PLogger;
import com.navercorp.pinpoint.bootstrap.logging.PLoggerFactory;
import com.navercorp.pinpoint.common.util.MapUtils;
import com.navercorp.pinpoint.plugin.rabbitmq.client.RabbitMQClientPluginConfig;
import com.navercorp.pinpoint.plugin.rabbitmq.client.RabbitMQClientConstants;
import com.navercorp.pinpoint.plugin.rabbitmq.client.field.accessor.RemoteAddressAccessor;
import com.navercorp.pinpoint.plugin.rabbitmq.client.field.setter.HeadersFieldSetter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.FrameHandler;

import java.util.HashMap;
import java.util.Map;

/**
* @author Jinkai.Ma
* @author Jiaqi Feng
* @author HyunGil Jeong
*/
public class ChannelBasicPublishInterceptor implements AroundInterceptor {

Expand All @@ -34,11 +32,13 @@ public class ChannelBasicPublishInterceptor implements AroundInterceptor {

private final MethodDescriptor descriptor;
private final TraceContext traceContext;
private final InterceptorScope scope;
private final Filter<String> excludeExchangeFilter;

public ChannelBasicPublishInterceptor(TraceContext traceContext, MethodDescriptor descriptor) {
public ChannelBasicPublishInterceptor(TraceContext traceContext, MethodDescriptor descriptor, InterceptorScope scope) {
this.descriptor = descriptor;
this.traceContext = traceContext;
this.scope = scope;

RabbitMQClientPluginConfig rabbitMQClientPluginConfig = new RabbitMQClientPluginConfig(traceContext.getProfilerConfig());
this.excludeExchangeFilter = rabbitMQClientPluginConfig.getExcludeExchangeFilter();
Expand All @@ -65,8 +65,6 @@ public void before(Object target, Object[] args) {
if (trace == null) {
return;
}
final AMQP.BasicProperties properties = (AMQP.BasicProperties) args[4];
final Map<String, Object> headers = copyHeader(properties);

if (trace.canSampled()) {
SpanEventRecorder recorder = trace.traceBlockBegin();
Expand All @@ -76,34 +74,11 @@ public void before(Object target, Object[] args) {

recorder.recordNextSpanId(nextId.getSpanId());

headers.put(RabbitMQClientConstants.META_TRACE_ID, nextId.getTransactionId());
headers.put(RabbitMQClientConstants.META_SPAN_ID, Long.toString(nextId.getSpanId()));
headers.put(RabbitMQClientConstants.META_PARENT_SPAN_ID, Long.toString(nextId.getParentSpanId()));
headers.put(RabbitMQClientConstants.META_PARENT_APPLICATION_TYPE, Short.toString(traceContext.getServerTypeCode()));
headers.put(RabbitMQClientConstants.META_PARENT_APPLICATION_NAME, traceContext.getApplicationName());
headers.put(RabbitMQClientConstants.META_FLAGS, Short.toString(nextId.getFlags()));
} else {
headers.put(RabbitMQClientConstants.META_SAMPLED, "1");
}

if (properties instanceof HeadersFieldSetter) {
((HeadersFieldSetter) properties)._$PINPOINT$_setHeaders(headers);
InterceptorScopeInvocation invocation = scope.getCurrentInvocation();
invocation.setAttachment(nextId);
}
}

private Map<String, Object> copyHeader(AMQP.BasicProperties properties) {
if (properties == null) {
return new HashMap<String, Object>();
}

final Map<String, Object> headers = properties.getHeaders();
if (MapUtils.isEmpty(headers)) {
return new HashMap<String, Object>();
}

return new HashMap<String, Object>(headers);
}

@Override
public void after(Object target, Object[] args, Object result, Throwable throwable) {
if (isDebug) {
Expand Down

0 comments on commit 39e2f80

Please sign in to comment.