diff --git a/pom.xml b/pom.xml index 96f07ab012..0d72253884 100644 --- a/pom.xml +++ b/pom.xml @@ -189,6 +189,9 @@ 3.25.8 1.77.1 + + 1.59.0 + 1.40.0 true _ @@ -398,7 +401,35 @@ jakarta.annotation-api ${jakarta.annotation.version} + + + + io.opentelemetry + opentelemetry-api + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk + ${opentelemetry.version} + + + io.opentelemetry + opentelemetry-sdk-testing + ${opentelemetry.version} + + + io.opentelemetry.semconv + opentelemetry-semconv + ${opentelemetry-semconv.version} + + + io.opentelemetry + opentelemetry-context + ${opentelemetry.version} + + diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 36c0b3937f..d2146a521f 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -128,6 +128,7 @@ static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftClientReques Optional.ofNullable(request.getSlidingWindowEntry()).ifPresent(b::setSlidingWindowEntry); Optional.ofNullable(request.getRoutingTable()).map(RoutingTable::toProto).ifPresent(b::setRoutingTable); + Optional.ofNullable(request.getSpanContext()).ifPresent(b::setSpanContext); return b.setCallId(request.getCallId()) .setToLeader(request.isToLeader()) @@ -188,6 +189,9 @@ static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) { if (request.hasSlidingWindowEntry()) { b.setSlidingWindowEntry(request.getSlidingWindowEntry()); } + if (request.hasSpanContext()) { + b.setSpanContext(request.getSpanContext()); + } return b.setClientId(ClientId.valueOf(request.getRequestorId())) .setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId())) .setCallId(request.getCallId()) diff --git a/ratis-common/pom.xml b/ratis-common/pom.xml index f6bc0ee413..87f1289cf4 100644 --- a/ratis-common/pom.xml +++ b/ratis-common/pom.xml @@ -38,6 +38,31 @@ slf4j-api + + io.opentelemetry + opentelemetry-api + + + + io.opentelemetry + opentelemetry-sdk + + + + io.opentelemetry.semconv + opentelemetry-semconv + + + + io.opentelemetry + opentelemetry-context + + + + io.opentelemetry + opentelemetry-sdk-testing + + org.junit.jupiter junit-jupiter-api diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index ed41f1ea2c..b04402fe15 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -24,6 +24,7 @@ import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; import org.apache.ratis.proto.RaftProtos.StaleReadRequestTypeProto; import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto; import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto; @@ -305,6 +306,7 @@ public static class Builder { private SlidingWindowEntry slidingWindowEntry; private RoutingTable routingTable; private long timeoutMs; + private SpanContextProto spanContext; public RaftClientRequest build() { return new RaftClientRequest(this); @@ -366,6 +368,11 @@ public Builder setTimeoutMs(long timeoutMs) { this.timeoutMs = timeoutMs; return this; } + + public Builder setSpanContext(SpanContextProto spanContext) { + this.spanContext = spanContext; + return this; + } } public static Builder newBuilder() { @@ -397,6 +404,8 @@ public static RaftClientRequest toWriteRequest(RaftClientRequest r, Message mess private final boolean toLeader; + private final SpanContextProto spanContext; + /** Construct a request for sending to the given server. */ protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) { this(newBuilder() @@ -429,6 +438,7 @@ private RaftClientRequest(Builder b) { this.slidingWindowEntry = b.slidingWindowEntry; this.routingTable = b.routingTable; this.timeoutMs = b.timeoutMs; + this.spanContext = b.spanContext; } @Override @@ -472,6 +482,10 @@ public long getTimeoutMs() { return timeoutMs; } + public SpanContextProto getSpanContext() { + return spanContext; + } + @Override public String toString() { return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", " diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/ClientRequestSpanBuilder.java b/ratis-common/src/main/java/org/apache/ratis/trace/ClientRequestSpanBuilder.java new file mode 100644 index 0000000000..6ff99bca1d --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/ClientRequestSpanBuilder.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace; + +import static org.apache.ratis.trace.RatisAttributes.ATTR_CALL_ID; +import static org.apache.ratis.trace.RatisAttributes.ATTR_CLIENT_INVOCATION_ID; +import static org.apache.ratis.trace.RatisAttributes.ATTR_MEMBER_ID; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; +import org.apache.ratis.protocol.RaftClientRequest; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Construct {@link Span} instances originating from the client request. + */ +public class ClientRequestSpanBuilder implements Supplier { + + private String name; + private Context remoteContext; + private final Map, Object> attributes = new HashMap<>(); + + @Override + public Span get() { + return build(); + } + + public ClientRequestSpanBuilder setAttributes(final RaftClientRequest request, final String memberId) { + remoteContext = TraceUtils.extractContextFromProto(request.getSpanContext()); + setRequestAttributes(attributes, request, memberId); + return this; + } + + public ClientRequestSpanBuilder setSpanName(final String spanName) { + this.name = spanName; + return this; + } + + public ClientRequestSpanBuilder setRemoteContext(final SpanContextProto spanContextProto) { + remoteContext = TraceUtils.extractContextFromProto(spanContextProto); + return this; + } + + + @SuppressWarnings("unchecked") + public Span build() { + final SpanBuilder builder = TraceUtils.getGlobalTracer().spanBuilder(name).setParent(remoteContext) + .setSpanKind(SpanKind.SERVER); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.startSpan(); + } + + /** + * Static utility method that performs the primary logic of this builder. It is visible to other + * classes in this package so that other builders can use this functionality as a mix-in. + * @param attributes the attributes map to be populated. + */ + static void setRequestAttributes(final Map, Object> attributes, + final RaftClientRequest request, final String memberId) { + attributes.put(ATTR_CLIENT_INVOCATION_ID, String.valueOf(request.getClientId())); + attributes.put(ATTR_CALL_ID, String.valueOf(request.getCallId())); + attributes.put(ATTR_MEMBER_ID, memberId); + } + +} diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java new file mode 100644 index 0000000000..948d57a341 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/RatisAttributes.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace; + +import io.opentelemetry.api.common.AttributeKey; + +/** + * The constants in this class correspond with the guidance outlined by the OpenTelemetry Semantic + * Conventions. + */ +public final class RatisAttributes { + public static final AttributeKey ATTR_CLIENT_INVOCATION_ID = + AttributeKey.stringKey("raft.client.invocation.id"); + public static final AttributeKey ATTR_MEMBER_ID = AttributeKey.stringKey("raft.member.id"); + public static final AttributeKey ATTR_CALL_ID = AttributeKey.stringKey("raft.call.id"); + + + private RatisAttributes() { + } +} diff --git a/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java new file mode 100644 index 0000000000..37c48eda96 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/trace/TraceUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.trace; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapGetter; +import org.apache.ratis.proto.RaftProtos.SpanContextProto; +import org.apache.ratis.util.FutureUtils; +import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.function.CheckedSupplier; +import org.apache.ratis.util.VersionInfo; + +import java.util.Map; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; + +public final class TraceUtils { + + private static final Tracer TRACER = GlobalOpenTelemetry.getTracer("org.apache.ratis", + VersionInfo.getSoftwareInfoVersion()); + + private TraceUtils() { + } + + public static Tracer getGlobalTracer() { + return TRACER; + } + + /** + * Trace an asynchronous operation represented by a {@link CompletableFuture}. + * The returned future will complete with the same result or error as the original future, + * but the provided {@code span} will be ended when the future completes. + */ + public static CompletableFuture traceAsyncMethod( + CheckedSupplier, THROWABLE> action, Supplier spanSupplier) throws THROWABLE { + final Span span = spanSupplier.get(); + try (Scope ignored = span.makeCurrent()) { + final CompletableFuture future; + try { + future = action.get(); + } catch (RuntimeException | Error e) { + setError(span, e); + span.end(); + throw e; + } catch (Throwable t) { + setError(span, t); + span.end(); + throw JavaUtils.cast(t); + } + endSpan(future, span); + return future; + } + } + + private static void endSpan(CompletableFuture future, Span span) { + FutureUtils.addListener(future, (resp, error) -> { + if (error != null) { + setError(span, error); + } else { + span.setStatus(StatusCode.OK); + } + span.end(); + }); + } + + public static void setError(Span span, Throwable error) { + span.recordException(error); + span.setStatus(StatusCode.ERROR); + } + + private static final TextMapPropagator PROPAGATOR = + GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + + public static SpanContextProto injectContextToProto(Context context) { + Map carrier = new TreeMap<>(); + PROPAGATOR.inject(context, carrier, (map, key, value) -> map.put(key, value)); + return SpanContextProto.newBuilder().putAllContext(carrier).build(); + } + + public static Context extractContextFromProto(SpanContextProto proto) { + if (proto == null || proto.getContextMap().isEmpty()) { + return Context.current(); + } + final TextMapGetter getter = SpanContextGetter.INSTANCE; + return PROPAGATOR.extract(Context.current(), proto, getter); + } +} + +class SpanContextGetter implements TextMapGetter { + static final SpanContextGetter INSTANCE = new SpanContextGetter(); + + @Override + public Iterable keys(SpanContextProto carrier) { + return carrier.getContextMap().keySet(); + } + + @Override + public String get(SpanContextProto carrier, String key) { + return Optional.ofNullable(carrier).map(SpanContextProto::getContextMap) + .map(map -> map.get(key)).orElse(null); + } + +} \ No newline at end of file diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FutureUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/FutureUtils.java new file mode 100644 index 0000000000..bcb802829f --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/FutureUtils.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.BiConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Helper class for processing futures. + */ +public final class FutureUtils { + + private static final Logger LOG = LoggerFactory.getLogger(FutureUtils.class); + + private FutureUtils() { + } + + /** + * This is method is used when you just want to add a listener to the given future. We will call + * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the + * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it may + * suppress exceptions thrown from the code that completes the future, and this method will catch + * all the exception thrown from the {@code action} to catch possible code bugs. + *

+ * And the error phone check will always report FutureReturnValueIgnored because every method in + * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you always + * have one future that has not been checked. So we introduce this method and add a suppress + * warnings annotation here. + */ + @SuppressWarnings("FutureReturnValueIgnored") + public static void addListener(CompletableFuture future, + BiConsumer action) { + future.whenComplete((resp, error) -> { + try { + // See this post on stack overflow(shorten since the url is too long), + // https://s.apache.org/completionexception + // For a chain of CompletableFuture, only the first child CompletableFuture can get the + // original exception, others will get a CompletionException, which wraps the original + // exception. So here we unwrap it before passing it to the callback action. + action.accept(resp, unwrapCompletionException(error)); + } catch (Throwable t) { + LOG.error("Unexpected error caught when processing CompletableFuture", t); + } + }); + } + + /** + * Get the cause of the {@link Throwable} if it is a {@link CompletionException}. + */ + public static Throwable unwrapCompletionException(Throwable error) { + if (error instanceof CompletionException) { + Throwable cause = error.getCause(); + if (cause != null) { + return cause; + } + } + return error; + } + +} + diff --git a/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java b/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java index 07136e9a40..4f24879f87 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/VersionInfo.java @@ -158,6 +158,14 @@ static void printSystemProperties(BiConsumer out) { sortedMap.forEach(out); } + /** + * Get the current ratis version. + * @return the current ratis version string. + */ + public static String getSoftwareInfoVersion() { + return VersionInfo.load(VersionInfo.class).softwareInfos.getOrDefault(SoftwareInfo.VERSION); + } + public static void main(String[] args) { printSystemProperties((key, value) -> System.out.printf("%-40s = %s%n", key, value)); diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index 6dbfdb15a5..eba5de3b74 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -117,6 +117,7 @@ message RaftRpcRequestProto { uint64 callId = 4; bool toLeader = 5; + SpanContextProto spanContext = 11; repeated uint64 repliedCallIds = 12; // The call ids of the replied requests uint64 timeoutMs = 13; RoutingTableProto routingTable = 14; @@ -569,3 +570,8 @@ message LogInfoProto { TermIndexProto committed = 3; TermIndexProto lastEntry = 4; } + +// The attribute map for opentelemetry trace +message SpanContextProto { + map context = 1; +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index d9dd09d966..25ca612eb3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.impl; +import io.opentelemetry.api.trace.Span; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.metrics.Timekeeper; @@ -100,6 +101,8 @@ import org.apache.ratis.statemachine.impl.TransactionContextImpl; import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.ratis.trace.ClientRequestSpanBuilder; +import org.apache.ratis.trace.TraceUtils; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.ConcurrentUtils; @@ -943,6 +946,17 @@ CompletableFuture executeSubmitClientRequestAsync(RaftClientReq @Override public CompletableFuture submitClientRequestAsync( RaftClientRequest request) throws IOException { + final Supplier supplier = new ClientRequestSpanBuilder() + .setSpanName("raft.server.submitClientRequestAsync") + .setRemoteContext(request.getSpanContext()) + .setAttributes(request, getMemberId().toString()); + return TraceUtils.traceAsyncMethod( + () -> submitClientRequestAsyncInternal(request), + supplier); + } + + private CompletableFuture submitClientRequestAsyncInternal( + RaftClientRequest request) throws IOException { assertLifeCycleState(LifeCycle.States.RUNNING); LOG.debug("{}: receive client request({})", getMemberId(), request); final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType()); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java new file mode 100644 index 0000000000..5ebe12cc65 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerImplTracingTests.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; +import io.opentelemetry.sdk.trace.data.SpanData; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.exceptions.ServerNotReadyException; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; +import org.apache.ratis.trace.TraceUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mock; + +public class RaftServerImplTracingTests { + + @RegisterExtension + private static final OpenTelemetryExtension openTelemetryExtension = + OpenTelemetryExtension.create(); + + @Test + public void testSubmitClientRequestAsync() throws Exception { + RaftGroup group = RaftGroup.emptyGroup(); + StateMachine sm = new SimpleStateMachine4Testing(); + RaftServerProxy proxy = mock(RaftServerProxy.class); + when(proxy.getId()).thenReturn(RaftPeerId.valueOf("peer1")); + when(proxy.getProperties()).thenReturn(new RaftProperties()); + when(proxy.getThreadGroup()).thenReturn(new ThreadGroup("test")); + + RaftServerImpl server = new RaftServerImpl(group, sm, proxy, RaftStorage.StartupOption.FORMAT); + + // build a minimal RaftClientRequest with a client span context; + final RaftClientRequest r = newRaftClientRequest(RaftClientRequest.writeRequestType()); + final String testSpanName = "test-appendEntries_emitsSpan"; + + // invoke submitClientRequestAsync + Span span = openTelemetryExtension + .getOpenTelemetry().getTracer("test").spanBuilder(testSpanName) + .setSpanKind(SpanKind.INTERNAL) + .startSpan(); + try { + server.submitClientRequestAsync(r); + } catch (ServerNotReadyException ignored) { + // ignore the server is not running, because we're just trying to verify the span is emitted, + // and the server not running is expected in this test + } finally { + span.end(); + } + + List spans = openTelemetryExtension.getSpans(); + assertEquals(3, spans.size()); + assertTrue( + spans.stream().anyMatch(s -> s.getKind() == SpanKind.CLIENT && s.getName().equals("client-span")), + "Expected at least one span with SpanKind.CLIENT" + ); + assertTrue( + spans.stream().anyMatch(s -> s.getKind() == SpanKind.SERVER + && s.getName().equals("raft.server.submitClientRequestAsync")), + "Expected at least one span with SpanKind.SERVER" + ); + assertTrue( + spans.stream().anyMatch(s -> s.getKind() == SpanKind.INTERNAL + && s.getName().equals(testSpanName)), + "Expected at least one span with SpanKind.INTERNAL" + ); + + } + + private static RaftClientRequest newRaftClientRequest(RaftClientRequest.Type type) { + final Span clientSpan = + openTelemetryExtension.getOpenTelemetry().getTracer("test") + .spanBuilder("client-span") + .setSpanKind(SpanKind.CLIENT) + .startSpan(); + try { + final Context clientContext = Context.current().with(clientSpan); + return RaftClientRequest.newBuilder() + .setClientId(ClientId.randomId()) + .setServerId(RaftPeerId.valueOf("s0")) + .setGroupId(RaftGroupId.randomId()) + .setCallId(1L) + .setType(type) + .setSpanContext(TraceUtils.injectContextToProto(clientContext)) + .build(); + } finally { + clientSpan.end(); + } + } +} +