Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for self-joins with sub-query #220

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@
import java.util.stream.StreamSupport;
import org.hypertrace.core.documentstore.commons.DocStoreConstants;
import org.hypertrace.core.documentstore.expression.impl.AggregateExpression;
import org.hypertrace.core.documentstore.expression.impl.AliasedIdentifierExpression;
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
import org.hypertrace.core.documentstore.expression.impl.FunctionExpression;
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
import org.hypertrace.core.documentstore.expression.impl.KeyExpression;
import org.hypertrace.core.documentstore.expression.impl.LogicalExpression;
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
import org.hypertrace.core.documentstore.expression.impl.SubQueryJoinExpression;
import org.hypertrace.core.documentstore.expression.impl.UnnestExpression;
import org.hypertrace.core.documentstore.expression.operators.AggregationOperator;
import org.hypertrace.core.documentstore.expression.operators.FunctionOperator;
import org.hypertrace.core.documentstore.expression.operators.RelationalOperator;
import org.hypertrace.core.documentstore.expression.type.FilterTypeExpression;
Expand Down Expand Up @@ -3478,6 +3481,94 @@ public void testToLowerCaseMongoFunctionOperator(String dataStoreName) throws Ex
dataStoreName, resultDocs, "query/case_insensitive_exact_match_response.json", 2);
}

@ParameterizedTest
@ArgumentsSource(MongoProvider.class)
void testSelfJoinWithSubQuery(String dataStoreName) throws IOException {
Collection collection = getCollection(dataStoreName);

/*
This is the query we want to execute:
SELECT item, quantity, date
FROM <implicit_collection>
JOIN (
SELECT item, MAX(date) AS latest_date
FROM <implicit_collection>
GROUP BY item
) latest
ON item = latest.item
AND date = latest.latest_date
ORDER BY `item` ASC;
*/

/*
The right subquery:
SELECT item, MAX(date) AS latest_date
FROM <implicit_collection>
GROUP BY item
*/
Query subQuery =
Query.builder()
.addSelection(
SelectionSpec.of(
IdentifierExpression.of("item")))
.addSelection(
SelectionSpec.of(
AggregateExpression.of(
AggregationOperator.MAX, IdentifierExpression.of("date")),
"latest_date"))
.addAggregation(IdentifierExpression.of("item"))
.build();

/*
The FROM expression representing a join with the right subquery:
FROM <implicit_collection>
JOIN (
SELECT item, MAX(date) AS latest_date
FROM <implicit_collection>
GROUP BY item
) latest
ON item = latest.item
AND date = latest.latest_date;
*/
SubQueryJoinExpression subQueryJoinExpression =
SubQueryJoinExpression.builder()
.subQuery(subQuery)
.subQueryAlias("latest")
.joinCondition(
LogicalExpression.and(
RelationalExpression.of(
IdentifierExpression.of("item"),
RelationalOperator.EQ,
AliasedIdentifierExpression.builder()
.name("item")
.alias("latest")
.build()),
RelationalExpression.of(
IdentifierExpression.of("date"),
RelationalOperator.EQ,
AliasedIdentifierExpression.builder()
.name("latest_date")
.alias("latest")
.build())))
.build();

/*
Now build the top-level Query:
SELECT item, quantity, date FROM <subQueryJoinExpression> ORDER BY `item` ASC;
*/
Query mainQuery =
Query.builder()
.addSelection(IdentifierExpression.of("item"))
.addSelection(IdentifierExpression.of("quantity"))
.addSelection(IdentifierExpression.of("date"))
.addFromClause(subQueryJoinExpression)
.addSort(IdentifierExpression.of("item"), ASC)
.build();

Iterator<Document> iterator = collection.aggregate(mainQuery);
assertDocsAndSizeEqual(dataStoreName, iterator, "self_join_with_sub_query_response.json", 4);
}

private static Collection getCollection(final String dataStoreName) {
return getCollection(dataStoreName, COLLECTION_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{"date":"2015-09-10T08:43:00Z","item":"Comb","quantity":10},
{"date":"2014-03-01T09:00:00Z","item":"Mirror","quantity":1},
{"date":"2014-04-04T11:21:39.736Z","item":"Shampoo","quantity":20},
{"date":"2016-02-06T20:20:13Z","item":"Soap","quantity":5}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.hypertrace.core.documentstore.expression.impl;

import com.google.common.base.Preconditions;
import lombok.Value;
import org.hypertrace.core.documentstore.parser.SelectTypeExpressionVisitor;

/**
* Expression representing an identifier/column name with an alias
*
* <p>Example: AliasedIdentifierExpression.of("col1", "col1_alias");
*/
@Value
public class AliasedIdentifierExpression extends IdentifierExpression {
String alias;

private AliasedIdentifierExpression(final String name, final String alias) {
super(name);
this.alias = alias;
}

@Override
public <T> T accept(final SelectTypeExpressionVisitor visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
return "`" + getAlias() + "." + getName() + "`";
}

public static AliasedIdentifierExpressionBuilder builder() {
return new AliasedIdentifierExpressionBuilder();
}

public static class AliasedIdentifierExpressionBuilder {
private String name;
private String alias;

public AliasedIdentifierExpressionBuilder name(final String name) {
this.name = name;
return this;
}

public AliasedIdentifierExpressionBuilder alias(final String alias) {
this.alias = alias;
return this;
}

public AliasedIdentifierExpression build() {
Preconditions.checkArgument(
this.name != null && !this.name.isBlank(), "name is null or blank");
Preconditions.checkArgument(
this.alias != null && !this.alias.isBlank(), "alias is null or blank");
return new AliasedIdentifierExpression(this.name, this.alias);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Value;
import lombok.experimental.NonFinal;
import org.hypertrace.core.documentstore.expression.type.GroupTypeExpression;
import org.hypertrace.core.documentstore.expression.type.SelectTypeExpression;
import org.hypertrace.core.documentstore.expression.type.SortTypeExpression;
Expand All @@ -17,7 +18,8 @@
* <p>Example: IdentifierExpression.of("col1");
*/
@Value
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@NonFinal
@AllArgsConstructor(access = AccessLevel.PROTECTED)
public class IdentifierExpression
implements GroupTypeExpression, SelectTypeExpression, SortTypeExpression {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.hypertrace.core.documentstore.expression.impl;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Value;
import org.hypertrace.core.documentstore.expression.type.FilterTypeExpression;
import org.hypertrace.core.documentstore.expression.type.FromTypeExpression;
import org.hypertrace.core.documentstore.parser.FromTypeExpressionVisitor;
import org.hypertrace.core.documentstore.query.Query;

/**
* Expression representing a join operation where the right side expression is a subquery. Note that
* this currently supports a self-join only, so the collection to be joined with is implicit.
*/
@Value
@Builder(toBuilder = true)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class SubQueryJoinExpression implements FromTypeExpression {
Query subQuery;
String subQueryAlias;
FilterTypeExpression joinCondition;

@Override
public <T> T accept(FromTypeExpressionVisitor visitor) {
return visitor.visit(this);
}

@Override
public String toString() {
return String.format("JOIN (%s) AS %s ON %s", subQuery, subQueryAlias, joinCondition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,23 @@
@Slf4j
@AllArgsConstructor
public class MongoQueryExecutor {
private static final List<Function<Query, Collection<BasicDBObject>>>
private final List<Function<Query, Collection<BasicDBObject>>>
DEFAULT_AGGREGATE_PIPELINE_FUNCTIONS =
List.of(
query -> singleton(getFilterClause(query, Query::getFilter)),
MongoFromTypeExpressionParser::getFromClauses,
query -> new MongoFromTypeExpressionParser(this).getFromClauses(query),
MongoGroupTypeExpressionParser::getGroupClauses,
query -> singleton(getProjectClause(query)),
query -> singleton(getFilterClause(query, Query::getAggregationFilter)),
query -> singleton(getSortClause(query)),
query -> singleton(getSkipClause(query)),
query -> singleton(getLimitClause(query)));

private static final List<Function<Query, Collection<BasicDBObject>>>
private final List<Function<Query, Collection<BasicDBObject>>>
SORT_OPTIMISED_AGGREGATE_PIPELINE_FUNCTIONS =
List.of(
query -> singleton(getFilterClause(query, Query::getFilter)),
MongoFromTypeExpressionParser::getFromClauses,
query -> new MongoFromTypeExpressionParser(this).getFromClauses(query),
query -> singleton(getNonProjectedSortClause(query)),
query -> singleton(getSkipClause(query)),
query -> singleton(getLimitClause(query)),
Expand Down Expand Up @@ -157,14 +157,7 @@ public MongoCursor<BasicDBObject> aggregate(

Query query = transformAndLog(originalQuery);

List<Function<Query, Collection<BasicDBObject>>> aggregatePipeline =
getAggregationPipeline(query);

List<BasicDBObject> pipeline =
aggregatePipeline.stream()
.flatMap(function -> function.apply(query).stream())
.filter(not(BasicDBObject::isEmpty))
.collect(toUnmodifiableList());
List<BasicDBObject> pipeline = convertToAggregatePipeline(query);

logPipeline(pipeline, queryOptions);

Expand Down Expand Up @@ -220,6 +213,22 @@ public long count(final Query originalQuery, final QueryOptions queryOptions) {
return 0;
}

public String getCollectionName() {
return collection.getNamespace().getCollectionName();
}

public List<BasicDBObject> convertToAggregatePipeline(Query query) {
List<Function<Query, Collection<BasicDBObject>>> aggregatePipeline =
getAggregationPipeline(query);

List<BasicDBObject> pipeline =
aggregatePipeline.stream()
.flatMap(function -> function.apply(query).stream())
.filter(not(BasicDBObject::isEmpty))
.collect(toUnmodifiableList());
return pipeline;
}

private void logClauses(
final Query query,
final Bson projection,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.hypertrace.core.documentstore.mongo.query.parser;

import lombok.NoArgsConstructor;
import org.hypertrace.core.documentstore.expression.impl.AliasedIdentifierExpression;

@NoArgsConstructor
public final class MongoAliasedIdentifierExpressionParser extends MongoSelectTypeExpressionParser {

MongoAliasedIdentifierExpressionParser(final MongoSelectTypeExpressionParser baseParser) {
super(baseParser);
}

@SuppressWarnings("unchecked")
@Override
public String visit(final AliasedIdentifierExpression expression) {
return "$" + parse(expression);
}

String parse(final AliasedIdentifierExpression expression) {
return expression.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.hypertrace.core.documentstore.mongo.MongoUtils.PREFIX;

import java.util.Optional;
import org.hypertrace.core.documentstore.expression.impl.AliasedIdentifierExpression;
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;

final class MongoDollarPrefixingIdempotentParser extends MongoSelectTypeExpressionParser {
Expand All @@ -19,6 +20,15 @@ public String visit(final IdentifierExpression expression) {
.orElse(null);
}

@SuppressWarnings("unchecked")
@Override
public String visit(final AliasedIdentifierExpression expression) {
return Optional.ofNullable(baseParser.visit(expression))
.map(Object::toString)
.map(identifier -> PREFIX + identifier)
.orElse(null);
}

private String idempotentPrefix(final String identifier) {
return identifier.startsWith(PREFIX) ? identifier : PREFIX + identifier;
}
Expand Down
Loading
Loading