Skip to content
This repository was archived by the owner on Jun 26, 2024. It is now read-only.

optimize metric aggregations order by #68

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2e91930
chore: using data fetcher node instead of selection and filter node
skjindal93 Dec 30, 2020
c1d9f0f
added unit tests
skjindal93 Dec 31, 2020
df2fd47
added debug logs and comments
skjindal93 Dec 31, 2020
5faec00
optimize filter and order by single source
skjindal93 Jan 4, 2021
5390792
remove redundant method
skjindal93 Jan 4, 2021
2fb6bbf
remove redundant method
skjindal93 Jan 4, 2021
f206201
fix optimization for QS
skjindal93 Jan 4, 2021
8e5d49b
remove redundant method
skjindal93 Jan 4, 2021
f708809
remove redundant utils method
skjindal93 Jan 4, 2021
cfa6df7
Merge branch 'optimize+filter+orderby' into fix+total+query
skjindal93 Jan 5, 2021
7eedad7
revert back entity id equals filter
skjindal93 Jan 5, 2021
2a2c85e
Merge branch 'optimize+filter+orderby' into fix+total+query
skjindal93 Jan 5, 2021
9339eb4
remove entity id equals filter
skjindal93 Jan 5, 2021
120e81e
Merge branch 'main' into fix+total+query
skjindal93 Jan 13, 2021
f234da7
remove redundant code
skjindal93 Jan 13, 2021
a3c3b3e
added entity response
skjindal93 Jan 13, 2021
ff4c013
added unit tests
skjindal93 Jan 15, 2021
5e76e95
data fetcher node is paginated for non null limit and offset
skjindal93 Jan 15, 2021
cd2f403
Merge branch 'main' into fix+total+query
skjindal93 Jan 15, 2021
06226f9
got rid of the total entities request
skjindal93 Jan 15, 2021
69f0f52
Merge branch 'fix+total+query' of github.com:hypertrace/gateway-servi…
skjindal93 Jan 15, 2021
22f2a1e
set equals in unit test method
skjindal93 Jan 15, 2021
b320ef6
removed redundant total entities test
skjindal93 Jan 15, 2021
0c4d602
refactor(entity-fetcher): get rid of aggregated metrics query
skjindal93 Jan 17, 2021
9c43c6f
Merge branch 'main' into remove+get+aggregated+metrics
skjindal93 Jan 17, 2021
2a2068a
added unit tests
skjindal93 Jan 17, 2021
7c57ce4
optimize metric aggregations order by
skjindal93 Jan 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public void mapAliasToFunctionExpression(String alias, FunctionExpression functi
aliasToFunctionExpressionMap.put(alias, functionExpression);
}

public boolean containsFunctionExpression(String alias) {
return aliasToFunctionExpressionMap.containsKey(alias);
}

public FunctionExpression getFunctionExpressionByAlias(String alias) {
return aliasToFunctionExpressionMap.get(alias);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ public EntityFetcherResponse getEntities(
return new EntityFetcherResponse(entityBuilders);
}

@Override
public EntityFetcherResponse getAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
throw new UnsupportedOperationException("Fetching aggregated metrics not supported by EDS");
}

@Override
public EntityFetcherResponse getTimeAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ public interface IEntityFetcher {
EntityFetcherResponse getEntities(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest);

/**
* Get aggregated metrics
*
* @param requestContext Additional context for the incoming request
* @param entitiesRequest encapsulates the aggregated metrics query (selection, filter, order)
* @return Map of the Entity Builders keyed by the EntityId
*/
EntityFetcherResponse getAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest);

/**
* Get time series data
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ public EntityFetcherResponse getEntities(
i++) {
ColumnMetadata metadata = chunk.getResultSetMetadata().getColumnMetadata(i);
org.hypertrace.core.query.service.api.Value columnValue = row.getColumn(i);
addEntityAttribute(entityBuilder,
buildEntity(
entityBuilder,
requestContext,
entitiesRequest,
metadata,
columnValue,
attributeMetadataMap,
Expand All @@ -169,97 +172,6 @@ public EntityFetcherResponse getEntities(
return new EntityFetcherResponse(entityBuilders);
}

@Override
public EntityFetcherResponse getAggregatedMetrics(
EntitiesRequestContext requestContext, EntitiesRequest entitiesRequest) {
// Only supported filter is entityIds IN ["id1", "id2", "id3"]
Map<String, AttributeMetadata> attributeMetadataMap =
attributeMetadataProvider.getAttributesMetadata(
requestContext, entitiesRequest.getEntityType());
entitiesRequestValidator.validate(entitiesRequest, attributeMetadataMap);

List<org.hypertrace.gateway.service.v1.common.Expression> aggregates =
ExpressionReader.getFunctionExpressions(entitiesRequest.getSelectionList().stream());
if (aggregates.isEmpty()) {
return new EntityFetcherResponse();
}

List<String> entityIdAttributes =
AttributeMetadataUtil.getIdAttributeIds(
attributeMetadataProvider, entityIdColumnsConfigs, requestContext, entitiesRequest.getEntityType());

QueryRequest.Builder builder =
constructSelectionQuery(requestContext, entitiesRequest, entityIdAttributes, aggregates);
adjustLimitAndOffset(builder, entitiesRequest.getLimit(), entitiesRequest.getOffset());

QueryRequest request = builder.build();

if (LOG.isDebugEnabled()) {
LOG.debug("Sending Aggregated Metrics Request to Query Service ======== \n {}", request);
}

Iterator<ResultSetChunk> resultSetChunkIterator =
queryServiceClient.executeQuery(request, requestContext.getHeaders(),
requestTimeout);

// We want to retain the order as returned from the respective source. Hence using a
// LinkedHashMap
Map<EntityKey, Builder> entityMap = new LinkedHashMap<>();

while (resultSetChunkIterator.hasNext()) {
ResultSetChunk chunk = resultSetChunkIterator.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Received chunk: " + chunk.toString());
}

if (chunk.getRowCount() < 1) {
break;
}

if (!chunk.hasResultSetMetadata()) {
LOG.warn("Chunk doesn't have result metadata so couldn't process the response.");
break;
}

for (Row row : chunk.getRowList()) {
// Construct the EntityKey from the EntityId attributes columns
EntityKey entityKey =
EntityKey.of(
IntStream.range(0, entityIdAttributes.size())
.mapToObj(value -> row.getColumn(value).getString())
.toArray(String[]::new));
Builder entityBuilder = entityMap.computeIfAbsent(entityKey, k -> Entity.newBuilder());
entityBuilder.setEntityType(entitiesRequest.getEntityType());
entityBuilder.setId(entityKey.toString());
// Always include the id in entity since that's needed to make follow up queries in
// optimal fashion. If this wasn't really requested by the client, it should be removed
// as post processing.
for (int i = 0; i < entityIdAttributes.size(); i++) {
entityBuilder.putAttribute(
entityIdAttributes.get(i),
Value.newBuilder()
.setString(entityKey.getAttributes().get(i))
.setValueType(ValueType.STRING)
.build());
}

for (int i = entityIdAttributes.size();
i < chunk.getResultSetMetadata().getColumnMetadataCount();
i++) {
ColumnMetadata metadata = chunk.getResultSetMetadata().getColumnMetadata(i);
org.hypertrace.core.query.service.api.Value columnValue = row.getColumn(i);
addAggregateMetric(entityBuilder,
requestContext,
entitiesRequest,
metadata,
columnValue,
attributeMetadataMap);
}
}
}
return new EntityFetcherResponse(entityMap);
}

private void adjustLimitAndOffset(QueryRequest.Builder builder, int limit, int offset) {
// If there is more than one groupBy column, we cannot set the same limit that came
// in the request since that might return less entities than needed when the same
Expand Down Expand Up @@ -336,28 +248,51 @@ private QueryRequest.Builder constructSelectionQuery(EntitiesRequestContext requ
return builder;
}

private void addEntityAttribute(Entity.Builder entityBuilder,
private void buildEntity(
Entity.Builder entityBuilder,
QueryRequestContext requestContext,
EntitiesRequest entitiesRequest,
ColumnMetadata metadata,
org.hypertrace.core.query.service.api.Value columnValue,
Map<String, AttributeMetadata> attributeMetadataMap,
boolean isSkipCountColumn) {

// Ignore the count column since we introduced that ourselves into the query
if (isSkipCountColumn &&
StringUtils.equalsIgnoreCase(COUNT_COLUMN_NAME, metadata.getColumnName())) {
if (isSkipCountColumn
&& StringUtils.equalsIgnoreCase(COUNT_COLUMN_NAME, metadata.getColumnName())) {
return;
}

// aggregate
if (requestContext.containsFunctionExpression(metadata.getColumnName())) {
addAggregateMetric(
entityBuilder,
requestContext,
entitiesRequest,
metadata,
columnValue,
attributeMetadataMap);
} else {
// attribute
addEntityAttribute(entityBuilder, metadata, columnValue, attributeMetadataMap);
}
}

private void addEntityAttribute(
Entity.Builder entityBuilder,
ColumnMetadata metadata,
org.hypertrace.core.query.service.api.Value columnValue,
Map<String, AttributeMetadata> attributeMetadataMap) {

String attributeName = metadata.getColumnName();
entityBuilder.putAttribute(
attributeName,
QueryAndGatewayDtoConverter.convertToGatewayValue(
attributeName,
columnValue,
attributeMetadataMap));
attributeName, columnValue, attributeMetadataMap));
}

private void addAggregateMetric(Entity.Builder entityBuilder,
private void addAggregateMetric(
Entity.Builder entityBuilder,
QueryRequestContext requestContext,
EntitiesRequest entitiesRequest,
ColumnMetadata metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,18 @@ public void removePendingSelectionSource(String source) {
pendingSelectionSources.remove(source);
}

public void removePendingMetricAggregationSources(String source) {
public void removePendingMetricAggregationSource(String source) {
pendingMetricAggregationSources.remove(source);
}

public void removePendingSelectionSourceForOrderBy(String source) {
pendingSelectionSourcesForOrderBy.remove(source);
}

public void removePendingMetricAggregationSourceForOrderBy(String source) {
pendingMetricAggregationSourcesForOrderBy.remove(source);
}

public Map<String, List<Expression>> getSourceToFilterExpressionMap() {
return sourceToFilterExpressionMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ QueryNode buildExecutionTree(ExecutionContext executionContext, QueryNode filter
new SelectionNode.Builder(rootNode)
.setAggMetricSelectionSources(metricSourcesForOrderBy)
.build();
metricSourcesForOrderBy.forEach(executionContext::removePendingMetricAggregationSources);
metricSourcesForOrderBy.forEach(executionContext::removePendingMetricAggregationSource);
}

// Try adding SortAndPaginateNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ public Void visit(DataFetcherNode dataFetcherNode) {
executionContext.removePendingSelectionSource(source);
// TODO: Currently, assumes that the order by attribute is also present in the selection set
executionContext.removePendingSelectionSourceForOrderBy(source);
// TODO: Remove redundant attributes for metric aggregation source for order by
// The current metric aggregation source is only QS

// The order by on metric aggregations will also be added in the selections list of
// DataFetcherNode, so that the order by metric aggregations can be fetched before
// and only the required data set is paginated
executionContext.removePendingMetricAggregationSourceForOrderBy(source);

// set of attributes which were fetched from the source
Map<String, Set<String>> sourceToSelectionAttributeMap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import com.google.common.collect.Sets;
import org.hypertrace.gateway.service.common.datafetcher.EntityFetcherResponse;
Expand All @@ -41,6 +41,7 @@
import org.hypertrace.gateway.service.v1.common.Filter;
import org.hypertrace.gateway.service.v1.common.LiteralConstant;
import org.hypertrace.gateway.service.v1.common.Operator;
import org.hypertrace.gateway.service.v1.common.OrderByExpression;
import org.hypertrace.gateway.service.v1.common.Value;
import org.hypertrace.gateway.service.v1.common.ValueType;
import org.hypertrace.gateway.service.v1.entity.EntitiesRequest;
Expand Down Expand Up @@ -145,17 +146,32 @@ public EntityResponse visit(DataFetcherNode dataFetcherNode) {
executionContext.getTimestampAttributeId(),
executionContext.getRequestHeaders());

// fetching both attribute selections and metric order by selections for
// optimized pagination
List<Expression> attributeSelections =
executionContext
.getSourceToSelectionExpressionMap()
.getOrDefault(source, executionContext.getEntityIdExpressions());
List<Expression> metricOrderBySelections =
executionContext
.getSourceToMetricOrderByExpressionMap()
.getOrDefault(source, Collections.emptyList())
.stream()
.map(OrderByExpression::getExpression)
.collect(Collectors.toList());
List<Expression> selections = Stream.of(attributeSelections, metricOrderBySelections)
.flatMap(Collection::stream)
.collect(Collectors.toList());

EntitiesRequest.Builder requestBuilder =
EntitiesRequest.newBuilder(entitiesRequest)
.clearSelection()
.clearTimeAggregation()
.clearFilter()
.clearOrderBy()
.clearLimit()
.clearOffset()
.addAllSelection(
executionContext
.getSourceToSelectionExpressionMap()
.getOrDefault(source, executionContext.getEntityIdExpressions()))
.addAllSelection(selections)
.setFilter(dataFetcherNode.getFilter());

if (dataFetcherNode.getLimit() != null) {
Expand Down Expand Up @@ -243,6 +259,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
// TODO: Should we push order by, limit and offet down to the data source?
// If we want to push the order by down, we would also have to divide order by into
Expand Down Expand Up @@ -273,6 +290,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
.clearOrderBy()
.clearOffset()
Expand All @@ -290,7 +308,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
request.getEntityType(),
executionContext.getTimestampAttributeId(),
executionContext.getRequestHeaders());
return entityFetcher.getAggregatedMetrics(context, request);
return entityFetcher.getEntities(context, request);
})
.collect(Collectors.toList()));
resultMapList.addAll(
Expand All @@ -299,6 +317,7 @@ public EntityResponse visit(SelectionNode selectionNode) {
source -> {
EntitiesRequest request =
EntitiesRequest.newBuilder(executionContext.getEntitiesRequest())
.clearSelection()
.clearTimeAggregation()
.clearFilter()
.clearOrderBy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ public static AggregatedMetricValue getAggregatedMetricValue(FunctionType functi
.build();
}

public static AggregatedMetricValue getAggregatedMetricValue(FunctionType functionType, long value) {
return AggregatedMetricValue.newBuilder()
.setFunction(functionType)
.setValue(Value.newBuilder().setLong(value).setValueType(ValueType.LONG))
.build();
}

public static Expression getLiteralExpression(long value) {
return Expression.newBuilder()
.setLiteral(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,6 @@ public void test_getEntities_WithoutPagination() {
entityDataServiceEntityFetcher.getEntities(entitiesRequestContext, entitiesRequest).size());
}

@Test
public void test_getAggregatedMetrics() {
assertThrows(
UnsupportedOperationException.class,
() -> {
entityDataServiceEntityFetcher.getAggregatedMetrics(
new EntitiesRequestContext(TENANT_ID, 0, 1, "API", "API.startTime", Map.of()),
EntitiesRequest.newBuilder().build());
});
}

@Test
public void test_getTimeAggregatedMetrics() {
assertThrows(
Expand Down
Loading