Skip to content

Commit 4df2934

Browse files
Add SpanJoiner based on spanId (#176)
1 parent 3548c83 commit 4df2934

File tree

7 files changed

+422
-0
lines changed

7 files changed

+422
-0
lines changed

hypertrace-core-graphql-span-schema/src/main/java/org/hypertrace/core/graphql/span/SpanSchemaModule.java

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.google.inject.multibindings.Multibinder;
55
import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder;
66
import org.hypertrace.core.graphql.span.dao.SpanDaoModule;
7+
import org.hypertrace.core.graphql.span.joiner.SpanJoinerModule;
78
import org.hypertrace.core.graphql.span.request.SpanRequestModule;
89
import org.hypertrace.core.graphql.spi.schema.GraphQlSchemaFragment;
910

@@ -17,5 +18,6 @@ protected void configure() {
1718
requireBinding(ResultSetRequestBuilder.class);
1819
install(new SpanDaoModule());
1920
install(new SpanRequestModule());
21+
install(new SpanJoinerModule());
2022
}
2123
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package org.hypertrace.core.graphql.span.joiner;
2+
3+
import static com.google.common.collect.ImmutableList.copyOf;
4+
import static com.google.common.collect.Iterables.concat;
5+
import static org.hypertrace.core.graphql.atttributes.scopes.HypertraceCoreAttributeScopeString.SPAN;
6+
import static org.hypertrace.core.graphql.span.joiner.SpanJoin.SPAN_KEY;
7+
8+
import graphql.schema.DataFetchingFieldSelectionSet;
9+
import graphql.schema.SelectedField;
10+
import io.reactivex.rxjava3.core.Observable;
11+
import io.reactivex.rxjava3.core.Single;
12+
import java.util.Collection;
13+
import java.util.Collections;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.Map.Entry;
17+
import java.util.Optional;
18+
import java.util.Set;
19+
import java.util.function.Function;
20+
import java.util.stream.Collectors;
21+
import javax.inject.Inject;
22+
import lombok.AllArgsConstructor;
23+
import lombok.Value;
24+
import lombok.experimental.Accessors;
25+
import org.hypertrace.core.graphql.common.request.AttributeAssociation;
26+
import org.hypertrace.core.graphql.common.request.AttributeRequest;
27+
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder;
28+
import org.hypertrace.core.graphql.common.request.ResultSetRequest;
29+
import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder;
30+
import org.hypertrace.core.graphql.common.schema.arguments.TimeRangeArgument;
31+
import org.hypertrace.core.graphql.common.schema.attributes.AttributeScope;
32+
import org.hypertrace.core.graphql.common.schema.attributes.arguments.AttributeExpression;
33+
import org.hypertrace.core.graphql.common.schema.id.Identifiable;
34+
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterArgument;
35+
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterOperatorType;
36+
import org.hypertrace.core.graphql.common.schema.results.arguments.filter.FilterType;
37+
import org.hypertrace.core.graphql.common.schema.results.arguments.order.OrderArgument;
38+
import org.hypertrace.core.graphql.context.GraphQlRequestContext;
39+
import org.hypertrace.core.graphql.span.dao.SpanDao;
40+
import org.hypertrace.core.graphql.span.request.SpanRequest;
41+
import org.hypertrace.core.graphql.span.schema.Span;
42+
import org.hypertrace.core.graphql.span.schema.SpanResultSet;
43+
import org.hypertrace.core.graphql.utils.schema.GraphQlSelectionFinder;
44+
import org.hypertrace.core.graphql.utils.schema.SelectionQuery;
45+
46+
public class DefaultSpanJoinerBuilder implements SpanJoinerBuilder {
47+
48+
private static final int ZERO_OFFSET = 0;
49+
50+
private final SpanDao spanDao;
51+
private final GraphQlSelectionFinder selectionFinder;
52+
private final ResultSetRequestBuilder resultSetRequestBuilder;
53+
private final FilterRequestBuilder filterRequestBuilder;
54+
55+
@Inject
56+
DefaultSpanJoinerBuilder(
57+
SpanDao spanDao,
58+
GraphQlSelectionFinder selectionFinder,
59+
ResultSetRequestBuilder resultSetRequestBuilder,
60+
FilterRequestBuilder filterRequestBuilder) {
61+
this.spanDao = spanDao;
62+
this.selectionFinder = selectionFinder;
63+
this.resultSetRequestBuilder = resultSetRequestBuilder;
64+
this.filterRequestBuilder = filterRequestBuilder;
65+
}
66+
67+
@Override
68+
public Single<SpanJoiner> build(
69+
GraphQlRequestContext context,
70+
TimeRangeArgument timeRange,
71+
DataFetchingFieldSelectionSet selectionSet,
72+
List<String> pathToSpanJoin) {
73+
return Single.just(
74+
new DefaultSpanJoiner(
75+
context, timeRange, this.getSelections(selectionSet, pathToSpanJoin)));
76+
}
77+
78+
private List<SelectedField> getSelections(
79+
DataFetchingFieldSelectionSet selectionSet, List<String> pathToSpanJoin) {
80+
List<String> fullPath = copyOf(concat(pathToSpanJoin, List.of(SPAN_KEY)));
81+
return selectionFinder
82+
.findSelections(selectionSet, SelectionQuery.builder().selectionPath(fullPath).build())
83+
.collect(Collectors.toUnmodifiableList());
84+
}
85+
86+
@AllArgsConstructor
87+
private class DefaultSpanJoiner implements SpanJoiner {
88+
89+
private final GraphQlRequestContext context;
90+
private final TimeRangeArgument timeRange;
91+
private final List<SelectedField> selectedFields;
92+
93+
@Override
94+
public <T> Single<Map<T, Span>> joinSpans(
95+
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) {
96+
return this.buildSourceToIdMap(joinSources, spanIdGetter).flatMap(this::joinSpans);
97+
}
98+
99+
private <T> Single<Map<T, Span>> joinSpans(Map<T, String> sourceToSpanIdMap) {
100+
return this.buildSpanRequest(sourceToSpanIdMap)
101+
.flatMap(spanDao::getSpans)
102+
.map(this::buildSpanIdToSpanMap)
103+
.map(spanIdToSpanMap -> buildSourceToSpanMap(sourceToSpanIdMap, spanIdToSpanMap));
104+
}
105+
106+
private <T> Map<T, Span> buildSourceToSpanMap(
107+
Map<T, String> sourceToSpanIdMap, Map<String, Span> spanIdToSpanMap) {
108+
return sourceToSpanIdMap.entrySet().stream()
109+
.filter(entry -> spanIdToSpanMap.containsKey(entry.getValue()))
110+
.collect(
111+
Collectors.toUnmodifiableMap(
112+
Entry::getKey, entry -> spanIdToSpanMap.get(entry.getValue())));
113+
}
114+
115+
private Map<String, Span> buildSpanIdToSpanMap(SpanResultSet resultSet) {
116+
return resultSet.results().stream()
117+
.collect(Collectors.toUnmodifiableMap(Identifiable::id, Function.identity()));
118+
}
119+
120+
private <T> Single<SpanRequest> buildSpanRequest(Map<T, String> sourceToSpanIdMap) {
121+
Collection<String> spanIds = sourceToSpanIdMap.values();
122+
return buildSpanIdsFilter(spanIds)
123+
.flatMap(filterArguments -> buildSpanRequest(spanIds.size(), filterArguments));
124+
}
125+
126+
private Single<SpanRequest> buildSpanRequest(
127+
int size, List<AttributeAssociation<FilterArgument>> filterArguments) {
128+
return resultSetRequestBuilder
129+
.build(
130+
context,
131+
SPAN,
132+
size,
133+
ZERO_OFFSET,
134+
timeRange,
135+
Collections.emptyList(),
136+
filterArguments,
137+
selectedFields.stream(),
138+
Optional.empty())
139+
.map(spanEventsRequest -> new SpanJoinRequest(context, spanEventsRequest));
140+
}
141+
142+
private Single<List<AttributeAssociation<FilterArgument>>> buildSpanIdsFilter(
143+
Collection<String> spanIds) {
144+
return filterRequestBuilder.build(context, SPAN, Set.of(new SpanIdFilter(spanIds)));
145+
}
146+
147+
private <T> Single<Map<T, String>> buildSourceToIdMap(
148+
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) {
149+
return Observable.fromIterable(joinSources)
150+
.flatMapSingle(source -> this.maybeBuildMapEntry(source, spanIdGetter))
151+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
152+
}
153+
154+
private <T> Single<Entry<T, String>> maybeBuildMapEntry(
155+
T source, SpanIdGetter<T> spanIdGetter) {
156+
return spanIdGetter.getSpanId(source).map(id -> Map.entry(source, id));
157+
}
158+
}
159+
160+
@Value
161+
@Accessors(fluent = true)
162+
private static class SpanIdFilter implements FilterArgument {
163+
FilterType type = FilterType.ID;
164+
String key = null;
165+
AttributeExpression keyExpression = null;
166+
FilterOperatorType operator = FilterOperatorType.IN;
167+
Collection<String> value;
168+
AttributeScope idType = null;
169+
String idScope = SPAN;
170+
}
171+
172+
@Value
173+
@Accessors(fluent = true)
174+
private static class SpanJoinRequest implements SpanRequest {
175+
GraphQlRequestContext context;
176+
ResultSetRequest<OrderArgument> spanEventsRequest;
177+
Collection<AttributeRequest> logEventAttributes = Collections.emptyList();
178+
boolean fetchTotal = false;
179+
}
180+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.hypertrace.core.graphql.span.joiner;
2+
3+
import graphql.annotations.annotationTypes.GraphQLField;
4+
import graphql.annotations.annotationTypes.GraphQLName;
5+
import org.hypertrace.core.graphql.span.schema.Span;
6+
7+
public interface SpanJoin {
8+
String SPAN_KEY = "span";
9+
10+
@GraphQLField
11+
@GraphQLName(SPAN_KEY)
12+
Span span();
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.hypertrace.core.graphql.span.joiner;
2+
3+
import io.reactivex.rxjava3.core.Single;
4+
import java.util.Collection;
5+
import java.util.Collections;
6+
import java.util.Map;
7+
import org.hypertrace.core.graphql.span.schema.Span;
8+
9+
public interface SpanJoiner {
10+
11+
/** A NOOP joiner */
12+
SpanJoiner NO_OP_JOINER =
13+
new SpanJoiner() {
14+
@Override
15+
public <T> Single<Map<T, Span>> joinSpans(
16+
Collection<T> joinSources, SpanIdGetter<T> spanIdGetter) {
17+
return Single.just(Collections.emptyMap());
18+
}
19+
};
20+
21+
<T> Single<Map<T, Span>> joinSpans(Collection<T> joinSources, SpanIdGetter<T> spanIdGetter);
22+
23+
@FunctionalInterface
24+
interface SpanIdGetter<T> {
25+
Single<String> getSpanId(T source);
26+
}
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.hypertrace.core.graphql.span.joiner;
2+
3+
import graphql.schema.DataFetchingFieldSelectionSet;
4+
import io.reactivex.rxjava3.core.Single;
5+
import java.util.List;
6+
import org.hypertrace.core.graphql.common.schema.arguments.TimeRangeArgument;
7+
import org.hypertrace.core.graphql.context.GraphQlRequestContext;
8+
9+
public interface SpanJoinerBuilder {
10+
Single<SpanJoiner> build(
11+
GraphQlRequestContext context,
12+
TimeRangeArgument timeRange,
13+
DataFetchingFieldSelectionSet selectionSet,
14+
List<String> pathToSpanJoin);
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.hypertrace.core.graphql.span.joiner;
2+
3+
import com.google.inject.AbstractModule;
4+
import org.hypertrace.core.graphql.common.request.FilterRequestBuilder;
5+
import org.hypertrace.core.graphql.common.request.ResultSetRequestBuilder;
6+
import org.hypertrace.core.graphql.span.dao.SpanDao;
7+
import org.hypertrace.core.graphql.utils.schema.GraphQlSelectionFinder;
8+
9+
public class SpanJoinerModule extends AbstractModule {
10+
11+
@Override
12+
protected void configure() {
13+
bind(SpanJoinerBuilder.class).to(DefaultSpanJoinerBuilder.class);
14+
15+
requireBinding(SpanDao.class);
16+
requireBinding(GraphQlSelectionFinder.class);
17+
requireBinding(ResultSetRequestBuilder.class);
18+
requireBinding(FilterRequestBuilder.class);
19+
}
20+
}

0 commit comments

Comments
 (0)