Skip to content

Commit 3be90bf

Browse files
authored
[Backport 2.19-dev] [Feature][Enhancement] Enhance patterns command with additional sample_logs output field (#4155) (#4412)
* [Feature][Enhancement] Enhance patterns command with additional sample_logs output field (#4155) * Enhance patterns command with additional sample_logs output field Signed-off-by: Songkan Tang <[email protected]> * Reorder agg fields for simple_pattern Signed-off-by: Songkan Tang <[email protected]> * Test fix after previous fix to not drop group by list Signed-off-by: Songkan Tang <[email protected]> --------- Signed-off-by: Songkan Tang <[email protected]> * Fix tests after merge Signed-off-by: Songkan Tang <[email protected]> * Remove unnecessary change Signed-off-by: Songkan Tang <[email protected]> * Fix unnecesary backport change from previous commit Signed-off-by: Songkan Tang <[email protected]> --------- Signed-off-by: Songkan Tang <[email protected]>
1 parent 028de0b commit 3be90bf

File tree

9 files changed

+119
-42
lines changed

9 files changed

+119
-42
lines changed

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,19 @@ public RelNode visitPatterns(Patterns node, CalcitePlanContext context) {
693693
context.relBuilder.field(node.getAlias()),
694694
context.relBuilder.field(PatternUtils.SAMPLE_LOGS));
695695
flattenParsedPattern(node.getAlias(), parsedNode, context, false);
696-
context.relBuilder.projectExcept(context.relBuilder.field(PatternUtils.SAMPLE_LOGS));
696+
// Reorder fields for consistency with Brain's output
697+
projectPlusOverriding(
698+
List.of(
699+
context.relBuilder.field(node.getAlias()),
700+
context.relBuilder.field(PatternUtils.PATTERN_COUNT),
701+
context.relBuilder.field(PatternUtils.TOKENS),
702+
context.relBuilder.field(PatternUtils.SAMPLE_LOGS)),
703+
List.of(
704+
node.getAlias(),
705+
PatternUtils.PATTERN_COUNT,
706+
PatternUtils.TOKENS,
707+
PatternUtils.SAMPLE_LOGS),
708+
context);
697709
} else {
698710
RexNode parsedNode =
699711
PPLFuncImpTable.INSTANCE.resolve(
@@ -2347,7 +2359,7 @@ private void flattenParsedPattern(
23472359
String originalPatternResultAlias,
23482360
RexNode parsedNode,
23492361
CalcitePlanContext context,
2350-
boolean flattenPatternCount) {
2362+
boolean flattenPatternAggResult) {
23512363
List<RexNode> fattenedNodes = new ArrayList<>();
23522364
List<String> projectNames = new ArrayList<>();
23532365
// Flatten map struct fields
@@ -2363,7 +2375,7 @@ private void flattenParsedPattern(
23632375
true);
23642376
fattenedNodes.add(context.relBuilder.alias(patternExpr, originalPatternResultAlias));
23652377
projectNames.add(originalPatternResultAlias);
2366-
if (flattenPatternCount) {
2378+
if (flattenPatternAggResult) {
23672379
RexNode patternCountExpr =
23682380
context.rexBuilder.makeCast(
23692381
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.BIGINT),
@@ -2389,6 +2401,24 @@ private void flattenParsedPattern(
23892401
true);
23902402
fattenedNodes.add(context.relBuilder.alias(tokensExpr, PatternUtils.TOKENS));
23912403
projectNames.add(PatternUtils.TOKENS);
2404+
if (flattenPatternAggResult) {
2405+
RexNode sampleLogsExpr =
2406+
context.rexBuilder.makeCast(
2407+
context
2408+
.rexBuilder
2409+
.getTypeFactory()
2410+
.createArrayType(
2411+
context.rexBuilder.getTypeFactory().createSqlType(SqlTypeName.VARCHAR), -1),
2412+
PPLFuncImpTable.INSTANCE.resolve(
2413+
context.rexBuilder,
2414+
BuiltinFunctionName.INTERNAL_ITEM,
2415+
parsedNode,
2416+
context.rexBuilder.makeLiteral(PatternUtils.SAMPLE_LOGS)),
2417+
true,
2418+
true);
2419+
fattenedNodes.add(context.relBuilder.alias(sampleLogsExpr, PatternUtils.SAMPLE_LOGS));
2420+
projectNames.add(PatternUtils.SAMPLE_LOGS);
2421+
}
23922422
projectPlusOverriding(fattenedNodes, projectNames, context);
23932423
}
23942424

core/src/main/java/org/opensearch/sql/calcite/udf/udaf/LogPatternAggFunction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ public Object value(Object... argList) {
187187
PatternUtils.PATTERN,
188188
parseResult.toTokenOrderString(PatternUtils.WILDCARD_PREFIX),
189189
PatternUtils.PATTERN_COUNT, count,
190-
PatternUtils.TOKENS, tokensMap);
190+
PatternUtils.TOKENS, tokensMap,
191+
PatternUtils.SAMPLE_LOGS, sampleLogs);
191192
})
192193
.collect(Collectors.toList());
193194
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
public class CalciteExplainIT extends ExplainIT {
2323
@Override
2424
public void init() throws Exception {
25-
GlobalPushdownConfig.enabled = false;
2625
super.init();
2726
enableCalcite();
2827
loadIndex(Index.BANK_WITH_STRING_VALUES);

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLPatternsIT.java

Lines changed: 55 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,19 +101,22 @@ public void testSimplePatternAggregationMode() throws IOException {
101101
result,
102102
schema("pattern_count", "bigint"),
103103
schema("patterns_field", "string"),
104-
schema("tokens", "struct"));
104+
schema("tokens", "struct"),
105+
schema("sample_logs", "array"));
105106
verifyDataRows(
106107
result,
107108
rows(
108-
7,
109109
"<token1>@<token2>.<token3>",
110+
7,
110111
ImmutableMap.of(
111112
"<token1>",
112113
ImmutableList.of("amberduke", "hattiebond", "nanettebates"),
113114
"<token2>",
114115
ImmutableList.of("pyrami", "netagy", "quility"),
115116
"<token3>",
116-
ImmutableList.of("com", "com", "com"))));
117+
ImmutableList.of("com", "com", "com")),
118+
ImmutableList.of(
119+
117120
}
118121

119122
@Test
@@ -181,7 +184,8 @@ public void testBrainAggregationMode() throws IOException {
181184
result,
182185
schema("patterns_field", "string"),
183186
schema("pattern_count", "bigint"),
184-
schema("tokens", "struct"));
187+
schema("tokens", "struct"),
188+
schema("sample_logs", "array"));
185189
verifyDataRows(
186190
result,
187191
rows(
@@ -191,7 +195,10 @@ public void testBrainAggregationMode() throws IOException {
191195
"<token1>",
192196
ImmutableList.of("for", "for"),
193197
"<token2>",
194-
ImmutableList.of("-1547954353065580372", "6996194389878584395"))),
198+
ImmutableList.of("-1547954353065580372", "6996194389878584395")),
199+
ImmutableList.of(
200+
"Verification succeeded for blk_-1547954353065580372",
201+
"Verification succeeded for blk_6996194389878584395")),
195202
rows(
196203
"BLOCK* NameSystem.addStoredBlock: blockMap updated: <token1> is added to blk_<token2>"
197204
+ " size <token3>",
@@ -202,7 +209,12 @@ public void testBrainAggregationMode() throws IOException {
202209
"<token3>",
203210
ImmutableList.of("67108864", "67108864"),
204211
"<token2>",
205-
ImmutableList.of("-7017553867379051457", "-3249711809227781266"))),
212+
ImmutableList.of("-7017553867379051457", "-3249711809227781266")),
213+
ImmutableList.of(
214+
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to"
215+
+ " blk_-7017553867379051457 size 67108864",
216+
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added"
217+
+ " to blk_-3249711809227781266 size 67108864")),
206218
rows(
207219
"<token1> NameSystem.allocateBlock:"
208220
+ " /user/root/sortrand/_temporary/_task_<token2>_<token3>_r_<token4>_<token5>/part<token6>"
@@ -222,15 +234,25 @@ public void testBrainAggregationMode() throws IOException {
222234
"<token3>",
223235
ImmutableList.of("0002", "0002"),
224236
"<token2>",
225-
ImmutableList.of("200811092030", "200811092030"))),
237+
ImmutableList.of("200811092030", "200811092030")),
238+
ImmutableList.of(
239+
"BLOCK* NameSystem.allocateBlock:"
240+
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
241+
+ " blk_-6620182933895093708",
242+
"BLOCK* NameSystem.allocateBlock:"
243+
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000318_0/part-00318."
244+
+ " blk_2096692261399680562")),
226245
rows(
227246
"PacketResponder failed <token1> blk_<token2>",
228247
2,
229248
ImmutableMap.of(
230249
"<token1>",
231250
ImmutableList.of("for", "for"),
232251
"<token2>",
233-
ImmutableList.of("6996194389878584395", "-1547954353065580372"))));
252+
ImmutableList.of("6996194389878584395", "-1547954353065580372")),
253+
ImmutableList.of(
254+
"PacketResponder failed for blk_6996194389878584395",
255+
"PacketResponder failed for blk_-1547954353065580372")));
234256
}
235257

236258
@Test
@@ -242,20 +264,25 @@ public void testBrainAggregationModeWithGroupByClause() throws IOException {
242264
+ " mode=aggregation max_sample_count=5"
243265
+ " variable_count_threshold=2 frequency_threshold_percentage=0.2",
244266
TEST_INDEX_HDFS_LOGS));
267+
System.out.println(result);
245268
verifySchema(
246269
result,
247270
schema("level", "string"),
248271
schema("patterns_field", "string"),
249272
schema("pattern_count", "bigint"),
250-
schema("tokens", "struct"));
273+
schema("tokens", "struct"),
274+
schema("sample_logs", "array"));
251275
verifyDataRows(
252276
result,
253277
rows(
254278
"INFO",
255279
"Verification succeeded for blk_<token1>",
256280
2,
257281
ImmutableMap.of(
258-
"<token1>", ImmutableList.of("-1547954353065580372", "6996194389878584395"))),
282+
"<token1>", ImmutableList.of("-1547954353065580372", "6996194389878584395")),
283+
ImmutableList.of(
284+
"Verification succeeded for blk_-1547954353065580372",
285+
"Verification succeeded for blk_6996194389878584395")),
259286
rows(
260287
"INFO",
261288
"BLOCK* NameSystem.addStoredBlock: blockMap updated: <token1> is added to blk_<token2>"
@@ -267,7 +294,12 @@ public void testBrainAggregationModeWithGroupByClause() throws IOException {
267294
"<token3>",
268295
ImmutableList.of("67108864", "67108864"),
269296
"<token2>",
270-
ImmutableList.of("-7017553867379051457", "-3249711809227781266"))),
297+
ImmutableList.of("-7017553867379051457", "-3249711809227781266")),
298+
ImmutableList.of(
299+
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.31.85:50010 is added to"
300+
+ " blk_-7017553867379051457 size 67108864",
301+
"BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.251.107.19:50010 is added"
302+
+ " to blk_-3249711809227781266 size 67108864")),
271303
rows(
272304
"INFO",
273305
"BLOCK* NameSystem.allocateBlock:"
@@ -286,13 +318,23 @@ public void testBrainAggregationModeWithGroupByClause() throws IOException {
286318
"<token3>",
287319
ImmutableList.of("000296", "000318"),
288320
"<token2>",
289-
ImmutableList.of("0002", "0002"))),
321+
ImmutableList.of("0002", "0002")),
322+
ImmutableList.of(
323+
"BLOCK* NameSystem.allocateBlock:"
324+
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000296_0/part-00296."
325+
+ " blk_-6620182933895093708",
326+
"BLOCK* NameSystem.allocateBlock:"
327+
+ " /user/root/sortrand/_temporary/_task_200811092030_0002_r_000318_0/part-00318."
328+
+ " blk_2096692261399680562")),
290329
rows(
291330
"WARN",
292331
"PacketResponder failed for blk_<token1>",
293332
2,
294333
ImmutableMap.of(
295-
"<token1>", ImmutableList.of("6996194389878584395", "-1547954353065580372"))));
334+
"<token1>", ImmutableList.of("6996194389878584395", "-1547954353065580372")),
335+
ImmutableList.of(
336+
"PacketResponder failed for blk_6996194389878584395",
337+
"PacketResponder failed for blk_-1547954353065580372")));
296338
}
297339

298340
@Test
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
3-
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))], pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))], tokens=[SAFE_CAST(ITEM($1, 'tokens'))])\n LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n LogicalProject(email=[$9], $f17=[10], $f18=[100000])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n Uncollect\n LogicalProject(patterns_field=[$cor0.patterns_field])\n LogicalValues(tuples=[[{ 0 }]])\n",
4-
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=['pattern'], expr#3=[ITEM($t1, $t2)], expr#4=[SAFE_CAST($t3)], expr#5=['pattern_count'], expr#6=[ITEM($t1, $t5)], expr#7=[SAFE_CAST($t6)], expr#8=['tokens'], expr#9=[ITEM($t1, $t8)], expr#10=[SAFE_CAST($t9)], patterns_field=[$t4], pattern_count=[$t7], tokens=[$t10])\n EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n EnumerableAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[10], expr#2=[100000], proj#0..2=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[email]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"email\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableUncollect\n EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.patterns_field], patterns_field=[$t2])\n EnumerableValues(tuples=[[{ 0 }]])\n"
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(patterns_field=[SAFE_CAST(ITEM($1, 'pattern'))], pattern_count=[SAFE_CAST(ITEM($1, 'pattern_count'))], tokens=[SAFE_CAST(ITEM($1, 'tokens'))], sample_logs=[SAFE_CAST(ITEM($1, 'sample_logs'))])\n LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n LogicalAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n LogicalProject(email=[$9], $f17=[10], $f18=[100000])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])\n Uncollect\n LogicalProject(patterns_field=[$cor0.patterns_field])\n LogicalValues(tuples=[[{ 0 }]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=['pattern'], expr#3=[ITEM($t1, $t2)], expr#4=[SAFE_CAST($t3)], expr#5=['pattern_count'], expr#6=[ITEM($t1, $t5)], expr#7=[SAFE_CAST($t6)], expr#8=['tokens'], expr#9=[ITEM($t1, $t8)], expr#10=[SAFE_CAST($t9)], expr#11=['sample_logs'], expr#12=[ITEM($t1, $t11)], expr#13=[SAFE_CAST($t12)], patterns_field=[$t4], pattern_count=[$t7], tokens=[$t10], sample_logs=[$t13])\n EnumerableCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}])\n EnumerableAggregate(group=[{}], patterns_field=[pattern($0, $1, $2)])\n EnumerableCalc(expr#0=[{inputs}], expr#1=[10], expr#2=[100000], proj#0..2=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[email]], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"email\"],\"excludes\":[]}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n EnumerableUncollect\n EnumerableCalc(expr#0=[{inputs}], expr#1=[$cor0], expr#2=[$t1.patterns_field], patterns_field=[$t2])\n EnumerableValues(tuples=[[{ 0 }]])\n"
55
}
66
}

0 commit comments

Comments
 (0)