Skip to content

Commit

Permalink
Update use of tracing spans (typedb#5095)
Browse files Browse the repository at this point in the history
- Update the use of ServerTracing spans
- Update to latest build_tools version
  • Loading branch information
Marco Scoppetta authored Apr 8, 2019
1 parent 19b6874 commit dd546ae
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 94 deletions.
2 changes: 1 addition & 1 deletion dependencies/graknlabs/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ def graknlabs_benchmark():
git_repository(
name = "graknlabs_benchmark",
remote = "https://github.com/graknlabs/benchmark.git",
commit = "ceb5a2ebb71ee526d788fb4b17a104a6669d4b70" # keep in sync with protocol changes
commit = "d5828a2c620abd85bb7128ccd48ae336296e2048" # keep in sync with protocol changes
)
67 changes: 26 additions & 41 deletions server/src/graql/executor/QueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,64 +97,53 @@ public QueryExecutor(TransactionOLTP transaction, boolean infer) {

public Stream<ConceptMap> match(MatchClause matchClause) {

ScopedSpan span = null;
if (ServerTracing.tracingActive()) {
span = ServerTracing.startScopedChildSpan("QueryExecutor.match validate pattern");
}

int validatePatternSpanId = ServerTracing.startScopedChildSpan("QueryExecutor.match validate pattern");


//validatePattern
for (Statement statement : matchClause.getPatterns().statements()) {
statement.properties().forEach(property -> validateProperty(property, statement));
}

if (span != null) {
span.finish();
span = ServerTracing.startScopedChildSpan("QueryExecutor.match create stream");
}
ServerTracing.closeScopedChildSpan(validatePatternSpanId);


int createStreamSpanId = ServerTracing.startScopedChildSpan("QueryExecutor.match create stream");


Stream<ConceptMap> answerStream;
try {
if (!infer) {
// time to create the traversal plan
ScopedSpan subSpan = null;
if (span != null) {
subSpan = ServerTracing.startScopedChildSpanWithParentContext("QueryExecutor.match create traversal", span.context());
}

int createTraversalSpanId = ServerTracing.startScopedChildSpanWithParentContext("QueryExecutor.match create traversal", createStreamSpanId);

GraqlTraversal graqlTraversal = GreedyTraversalPlan.createTraversal(matchClause.getPatterns(), transaction);

ServerTracing.closeScopedChildSpan(createTraversalSpanId);

// time to convert plan into a answer stream
if (subSpan != null) {
subSpan.finish();
subSpan = ServerTracing.startScopedChildSpanWithParentContext("QueryExecutor.match traversal to stream", span.context());
}
int traversalToStreamSpanId = ServerTracing.startScopedChildSpanWithParentContext("QueryExecutor.match traversal to stream", createStreamSpanId);

answerStream = traversal(matchClause.getPatterns().variables(), graqlTraversal);

// close this subSpan
if (subSpan != null) {
subSpan.finish();
}
ServerTracing.closeScopedChildSpan(traversalToStreamSpanId);
} else {

ScopedSpan subSpan = null;
if (span != null) {
subSpan = ServerTracing.startScopedChildSpanWithParentContext("QueryExecutor.match disjunction iterator", span.context());
}
int disjunctionSpanId= ServerTracing.startScopedChildSpanWithParentContext("QueryExecutor.match disjunction iterator", createStreamSpanId);

Stream<ConceptMap> stream = new DisjunctionIterator(matchClause, transaction).hasStream();
answerStream = stream.map(result -> result.project(matchClause.getSelectedNames()));

if (subSpan != null) {
subSpan.finish();
}
ServerTracing.closeScopedChildSpan(disjunctionSpanId);
}
} catch (GraqlQueryException e) {
System.err.println(e.getMessage());
answerStream = Stream.empty();
}

if (span != null) {
span.finish();
}
ServerTracing.closeScopedChildSpan(createStreamSpanId);
return answerStream;
}

Expand Down Expand Up @@ -230,10 +219,8 @@ public ConceptMap undefine(GraqlUndefine query) {
}

public Stream<ConceptMap> insert(GraqlInsert query) {
ScopedSpan span = null;
if (ServerTracing.tracingActive()) {
span = ServerTracing.startScopedChildSpan("QueryExecutor.insert create executors");
}
int createExecSpanId= ServerTracing.startScopedChildSpan("QueryExecutor.insert create executors");


Collection<Statement> statements = query.statements().stream()
.flatMap(statement -> statement.innerStatements().stream())
Expand All @@ -246,10 +233,10 @@ public Stream<ConceptMap> insert(GraqlInsert query) {
}
}

if (span != null) {
span.finish();
span = ServerTracing.startScopedChildSpan("QueryExecutor.insert create answer stream");
}
ServerTracing.closeScopedChildSpan(createExecSpanId);

int answerStreamSpanId = ServerTracing.startScopedChildSpan("QueryExecutor.insert create answer stream");


Stream<ConceptMap> answerStream;
if (query.match() != null) {
Expand All @@ -268,9 +255,7 @@ public Stream<ConceptMap> insert(GraqlInsert query) {
answerStream = Stream.of(WriteExecutor.create(transaction, executors.build()).write(new ConceptMap()));
}

if (span != null) {
span.finish();
}
ServerTracing.closeScopedChildSpan(answerStreamSpanId);

return answerStream;
}
Expand Down
26 changes: 11 additions & 15 deletions server/src/graql/executor/WriteExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,36 +230,32 @@ ConceptMap write(ConceptMap preExisting) {
concepts.putAll(preExisting.map());

// time to execute writers for properties
ScopedSpan span = null;
if (ServerTracing.tracingActive()) {
span = ServerTracing.startScopedChildSpan("WriteExecutor.write execute writers");
}
int executeWritersSpanId = ServerTracing.startScopedChildSpan("WriteExecutor.write execute writers");


for (Writer writer : sortedWriters()) {
writer.execute(this);
}

ServerTracing.closeScopedChildSpan(executeWritersSpanId);
// time to delete concepts marked for deletion
if (span != null) {
span.finish();
span = ServerTracing.startScopedChildSpan("WriteExecutor.write delete concepts");
}

int deleteConceptsSpanId = ServerTracing.startScopedChildSpan("WriteExecutor.write delete concepts");


for (Concept concept : conceptsToDelete) {
concept.delete();
}

ServerTracing.closeScopedChildSpan(deleteConceptsSpanId);

// time to build concepts
if (span != null) {
span.finish();
span = ServerTracing.startScopedChildSpan("WriteExecutor.write build concepts for answer");
}

int buildConceptsSpanId = ServerTracing.startScopedChildSpan("WriteExecutor.write build concepts for answer");

conceptBuilders.forEach((var, builder) -> buildConcept(var, builder));

if (span != null) {
span.finish();
}
ServerTracing.closeScopedChildSpan(buildConceptsSpanId);

ImmutableMap.Builder<Variable, Concept> allConcepts = ImmutableMap.<Variable, Concept>builder().putAll(concepts);

Expand Down
11 changes: 4 additions & 7 deletions server/src/graql/reasoner/DisjunctionIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@ public DisjunctionIterator(MatchClause matchClause, TransactionOLTP tx) {
tx.queryCache().clear();


ScopedSpan span = null;
if (ServerTracing.tracingActive()) {
span = ServerTracing.startScopedChildSpan("DisjunctionIterator() create DNF, conjunction iterator");
}

int conjunctionIterSpanId = ServerTracing.startScopedChildSpan("DisjunctionIterator() create DNF, conjunction iterator");

this.conjIterator = matchClause.getPatterns().getNegationDNF().getPatterns().stream().iterator();
answerIterator = conjunctionIterator(conjIterator.next(), tx);

if (span != null) {
span.finish();
}
ServerTracing.closeScopedChildSpan(conjunctionIterSpanId);
}

private Iterator<ConceptMap> conjunctionIterator(Conjunction<Pattern> conj, TransactionOLTP tx) {
Expand Down
18 changes: 9 additions & 9 deletions server/src/server/rpc/SessionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,21 +304,21 @@ private void query(SessionProto.Transaction.Query.Req request) {

/* permanent tracing hooks, as performance here varies depending on query and what's in the graph */

ScopedSpan span = null;
if (ServerTracing.tracingActive()) {
span = ServerTracing.startScopedChildSpan("Parsing Graql Query");
}

int parseQuerySpanId = ServerTracing.startScopedChildSpan("Parsing Graql Query");

GraqlQuery query = Graql.parse(request.getQuery());

if (span != null) {
span.finish();
span = ServerTracing.startScopedChildSpan("Creating query stream");
}
ServerTracing.closeScopedChildSpan(parseQuerySpanId);


int createStreamSpanId = ServerTracing.startScopedChildSpan("Creating query stream");

Stream<Transaction.Res> responseStream = tx().stream(query, request.getInfer().equals(Transaction.Query.INFER.TRUE)).map(ResponseBuilder.Transaction.Iter::query);
Transaction.Res response = ResponseBuilder.Transaction.queryIterator(iterators.add(responseStream.iterator()));

if (span != null) { span.finish(); }
ServerTracing.closeScopedChildSpan(createStreamSpanId);

onNextResponse(response);
}

Expand Down
31 changes: 10 additions & 21 deletions server/src/server/session/TransactionOLTP.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package grakn.core.server.session;

import brave.ScopedSpan;
import grakn.benchmark.lib.instrumentation.ServerTracing;
import grakn.core.api.Transaction;
import grakn.core.common.config.ConfigKey;
Expand Down Expand Up @@ -879,10 +878,9 @@ private void closeTransaction(String closedReason) {
private Optional<CommitLog> commitWithLogs() throws InvalidKBException {

/* This method has permanent tracing because commits can take varying lengths of time depending on operations */
ScopedSpan span = null;
if (ServerTracing.tracingActive()) {
span = ServerTracing.startScopedChildSpan("commitWithLogs validate");
}

int validateSpanId = ServerTracing.startScopedChildSpan("commitWithLogs validate");


checkMutationAllowed();
validateGraph();
Expand All @@ -891,10 +889,9 @@ private Optional<CommitLog> commitWithLogs() throws InvalidKBException {
Map<String, Set<ConceptId>> newAttributes = transactionCache.getNewAttributes();
boolean logsExist = !newInstances.isEmpty() || !newAttributes.isEmpty();

if (span != null) {
span.finish();
span = ServerTracing.startScopedChildSpan("commitWithLogs commit");
}
ServerTracing.closeScopedChildSpan(validateSpanId);

int commitSpanId = ServerTracing.startScopedChildSpan("commitWithLogs commit");

// lock on the keyspace cache shared between concurrent tx's to the same keyspace
// force serialized updates, keeping Janus and our KeyspaceCache in sync
Expand All @@ -903,28 +900,20 @@ private Optional<CommitLog> commitWithLogs() throws InvalidKBException {
transactionCache.flushToKeyspaceCache();
}

ServerTracing.closeScopedChildSpan(commitSpanId);

//If we have logs to commit get them and add them
if (logsExist) {

if (span != null) {
span.finish();
span = ServerTracing.startScopedChildSpan("commitWithLogs create log");
}
int createLogSpanId = ServerTracing.startScopedChildSpan("commitWithLogs create log");

Optional logs = Optional.of(CommitLog.create(keyspace(), newInstances, newAttributes));

if (span != null) {
span.finish();
}
ServerTracing.closeScopedChildSpan(createLogSpanId);

return logs;
}

if (span != null) {
span.finish();
}


return Optional.empty();
}

Expand Down

0 comments on commit dd546ae

Please sign in to comment.