Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,19 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) {
context.relBuilder.field(node.getAlias()),
context.relBuilder.field(PatternUtils.SAMPLE_LOGS));
flattenParsedPattern(node.getAlias(), parsedNode, context, false);
context.relBuilder.projectExcept(context.relBuilder.field(PatternUtils.SAMPLE_LOGS));
// Reorder fields for consistency with Brain's output
projectPlusOverriding(
List.of(
context.relBuilder.field(node.getAlias()),
context.relBuilder.field(PatternUtils.PATTERN_COUNT),
context.relBuilder.field(PatternUtils.TOKENS),
context.relBuilder.field(PatternUtils.SAMPLE_LOGS)),
List.of(
node.getAlias(),
PatternUtils.PATTERN_COUNT,
PatternUtils.TOKENS,
PatternUtils.SAMPLE_LOGS),
context);
} else {
RexNode parsedNode =
PPLFuncImpTable.INSTANCE.resolve(
Expand Down Expand Up @@ -2258,7 +2270,7 @@ private void flattenParsedPattern(
String originalPatternResultAlias,
RexNode parsedNode,
CalcitePlanContext context,
boolean flattenPatternCount) {
boolean flattenPatternAggResult) {
List<RexNode> fattenedNodes = new ArrayList<>();
List<String> projectNames = new ArrayList<>();
// Flatten map struct fields
Expand All @@ -2274,7 +2286,7 @@ private void flattenParsedPattern(
true);
fattenedNodes.add(context.relBuilder.alias(patternExpr, originalPatternResultAlias));
projectNames.add(originalPatternResultAlias);
if (flattenPatternCount) {
if (flattenPatternAggResult) {
RexNode patternCountExpr =
context.rexBuilder.makeCast(
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT),
Expand All @@ -2300,6 +2312,24 @@ private void flattenParsedPattern(
true);
fattenedNodes.add(context.relBuilder.alias(tokensExpr, PatternUtils.TOKENS));
projectNames.add(PatternUtils.TOKENS);
if (flattenPatternAggResult) {
RexNode sampleLogsExpr =
context.rexBuilder.makeCast(
context
.rexBuilder
.getTypeFactory()
.createArrayType(
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), -1),
PPLFuncImpTable.INSTANCE.resolve(
context.rexBuilder,
BuiltinFunctionName.INTERNAL_ITEM,
parsedNode,
context.rexBuilder.makeLiteral(PatternUtils.SAMPLE_LOGS)),
true,
true);
fattenedNodes.add(context.relBuilder.alias(sampleLogsExpr, PatternUtils.SAMPLE_LOGS));
projectNames.add(PatternUtils.SAMPLE_LOGS);
}
projectPlusOverriding(fattenedNodes, projectNames, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public Object value(Object... argList) {
PatternUtils.PATTERN,
parseResult.toTokenOrderString(PatternUtils.WILDCARD_PREFIX),
PatternUtils.PATTERN_COUNT, count,
PatternUtils.TOKENS, tokensMap);
PatternUtils.TOKENS, tokensMap,
PatternUtils.SAMPLE_LOGS, sampleLogs);
})
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,22 @@ public void testSimplePatternAggregationMode() throws IOException {
result,
schema("pattern_count", "bigint"),
schema("patterns_field", "string"),
schema("tokens", "struct"));
schema("tokens", "struct"),
schema("sample_logs", "array"));
verifyDataRows(
result,
rows(
7,
"<token1>@<token2>.<token3>",
7,
ImmutableMap.of(
"<token1>",
ImmutableList.of("amberduke", "hattiebond", "nanettebates"),
"<token2>",
ImmutableList.of("pyrami", "netagy", "quility"),
"<token3>",
ImmutableList.of("com", "com", "com"))));
ImmutableList.of("com", "com", "com")),
ImmutableList.of(
"[email protected]", "[email protected]", "[email protected]")));
}

@Test
Expand Down Expand Up @@ -168,7 +171,8 @@ public void testBrainAggregationMode() throws IOException {
result,
schema("patterns_field", "string"),
schema("pattern_count", "bigint"),
schema("tokens", "struct"));
schema("tokens", "struct"),
schema("sample_logs", "array"));
verifyDataRows(
result,
rows(
Expand All @@ -178,7 +182,10 @@ public void testBrainAggregationMode() throws IOException {
"<token1>",
ImmutableList.of("for", "for"),
"<token2>",
ImmutableList.of("-1547954353065580372", "6996194389878584395"))),
ImmutableList.of("-1547954353065580372", "6996194389878584395")),
ImmutableList.of(
"Verification succeeded for blk_-1547954353065580372",
"Verification succeeded for blk_6996194389878584395")),
rows(
"BLOCK* NameSystem.addStoredBlock: blockMap updated: <token1> is added to blk_<token2>"
+ " size <token3>",
Expand All @@ -189,7 +196,12 @@ public void testBrainAggregationMode() throws IOException {
"<token3>",
ImmutableList.of("67108864", "67108864"),
"<token2>",
ImmutableList.of("-7017553867379051457", "-3249711809227781266"))),
ImmutableList.of("-7017553867379051457", "-3249711809227781266")),
ImmutableList.of(
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to"
+ " blk_-7017553867379051457 size 67108864",
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added"
+ " to blk_-3249711809227781266 size 67108864")),
rows(
"<token1> NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_<token2>_<token3>_r_<token4>_<token5>/part<token6>"
Expand All @@ -209,15 +221,25 @@ public void testBrainAggregationMode() throws IOException {
"<token3>",
ImmutableList.of("0002", "0002"),
"<token2>",
ImmutableList.of("200811092030", "200811092030"))),
ImmutableList.of("200811092030", "200811092030")),
ImmutableList.of(
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
+ " blk_-6620182933895093708",
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000318_0/part-00318."
+ " blk_2096692261399680562")),
rows(
"PacketResponder failed <token1> blk_<token2>",
2,
ImmutableMap.of(
"<token1>",
ImmutableList.of("for", "for"),
"<token2>",
ImmutableList.of("6996194389878584395", "-1547954353065580372"))));
ImmutableList.of("6996194389878584395", "-1547954353065580372")),
ImmutableList.of(
"PacketResponder failed for blk_6996194389878584395",
"PacketResponder failed for blk_-1547954353065580372")));
}

@Test
Expand All @@ -229,20 +251,25 @@ public void testBrainAggregationModeWithGroupByClause() throws IOException {
+ " mode=aggregation max_sample_count=5"
+ " variable_count_threshold=2 frequency_threshold_percentage=0.2",
TEST_INDEX_HDFS_LOGS));
System.out.println(result);
verifySchema(
result,
schema("level", "string"),
schema("patterns_field", "string"),
schema("pattern_count", "bigint"),
schema("tokens", "struct"));
schema("tokens", "struct"),
schema("sample_logs", "array"));
verifyDataRows(
result,
rows(
"INFO",
"Verification succeeded for blk_<token1>",
2,
ImmutableMap.of(
"<token1>", ImmutableList.of("-1547954353065580372", "6996194389878584395"))),
"<token1>", ImmutableList.of("-1547954353065580372", "6996194389878584395")),
ImmutableList.of(
"Verification succeeded for blk_-1547954353065580372",
"Verification succeeded for blk_6996194389878584395")),
rows(
"INFO",
"BLOCK* NameSystem.addStoredBlock: blockMap updated: <token1> is added to blk_<token2>"
Expand All @@ -254,7 +281,12 @@ public void testBrainAggregationModeWithGroupByClause() throws IOException {
"<token3>",
ImmutableList.of("67108864", "67108864"),
"<token2>",
ImmutableList.of("-7017553867379051457", "-3249711809227781266"))),
ImmutableList.of("-7017553867379051457", "-3249711809227781266")),
ImmutableList.of(
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to"
+ " blk_-7017553867379051457 size 67108864",
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added"
+ " to blk_-3249711809227781266 size 67108864")),
rows(
"INFO",
"BLOCK* NameSystem.allocateBlock:"
Expand All @@ -273,13 +305,23 @@ public void testBrainAggregationModeWithGroupByClause() throws IOException {
"<token3>",
ImmutableList.of("000296", "000318"),
"<token2>",
ImmutableList.of("0002", "0002"))),
ImmutableList.of("0002", "0002")),
ImmutableList.of(
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
+ " blk_-6620182933895093708",
"BLOCK* NameSystem.allocateBlock:"
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000318_0/part-00318."
+ " blk_2096692261399680562")),
rows(
"WARN",
"PacketResponder failed for blk_<token1>",
2,
ImmutableMap.of(
"<token1>", ImmutableList.of("6996194389878584395", "-1547954353065580372"))));
"<token1>", ImmutableList.of("6996194389878584395", "-1547954353065580372")),
ImmutableList.of(
"PacketResponder failed for blk_6996194389878584395",
"PacketResponder failed for blk_-1547954353065580372")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"calcite": {
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))], pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))], tokens=[SAFE_CAST(ITEM($1, 'tokens'))])\n LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n LogicalProject(email=[$9], $f17=[10], $f18=[100000])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n Uncollect\n LogicalProject(patterns_field=[$cor0.patterns_field])\n LogicalValues(tuples=[[{ 0 }]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=['pattern'], expr#3=[ITEM($t1, $t2)], expr#4=[SAFE_CAST($t3)], expr#5=['pattern_count'], expr#6=[ITEM($t1, $t5)], expr#7=[SAFE_CAST($t6)], expr#8=['tokens'], expr#9=[ITEM($t1, $t8)], expr#10=[SAFE_CAST($t9)], patterns_field=[$t4], pattern_count=[$t7], tokens=[$t10])\n EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n EnumerableAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[10], expr#2=[100000], proj#0..2=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[email]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"email\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableUncollect\n EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.patterns_field], patterns_field=[$t2])\n EnumerableValues(tuples=[[{ 0 }]])\n"
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))], pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))], tokens=[SAFE_CAST(ITEM($1, 'tokens'))], sample_logs=[SAFE_CAST(ITEM($1, 'sample_logs'))])\n LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n LogicalProject(email=[$9], $f17=[10], $f18=[100000])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n Uncollect\n LogicalProject(patterns_field=[$cor0.patterns_field])\n LogicalValues(tuples=[[{ 0 }]])\n",
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=['pattern'], expr#3=[ITEM($t1, $t2)], expr#4=[SAFE_CAST($t3)], expr#5=['pattern_count'], expr#6=[ITEM($t1, $t5)], expr#7=[SAFE_CAST($t6)], expr#8=['tokens'], expr#9=[ITEM($t1, $t8)], expr#10=[SAFE_CAST($t9)], expr#11=['sample_logs'], expr#12=[ITEM($t1, $t11)], expr#13=[SAFE_CAST($t12)], patterns_field=[$t4], pattern_count=[$t7], tokens=[$t10], sample_logs=[$t13])\n EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n EnumerableAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[10], expr#2=[100000], proj#0..2=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[email]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"email\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableUncollect\n EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.patterns_field], patterns_field=[$t2])\n EnumerableValues(tuples=[[{ 0 }]])\n"
}
}
Loading
Loading