diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java index e5f34574..d20ae856 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java @@ -157,7 +157,7 @@ private Dataset updateBloomFilter(Dataset dataset) { private Dataset estimateSize(Dataset dataset) { return dataset - .select(functions.col("partition"), functions.explode(functions.col(inputCol)).as("token")) + .select(functions.col("partition"), functions.explode_outer(functions.col(inputCol)).as("token")) .groupBy("partition") .agg(functions.approxCountDistinct("token").as(outputCol)); } diff --git a/src/test/java/com/teragrep/pth10/BloomFilterOperationsTest.java b/src/test/java/com/teragrep/pth10/BloomFilterOperationsTest.java index b2e5321c..0dc53c5b 100644 --- a/src/test/java/com/teragrep/pth10/BloomFilterOperationsTest.java +++ b/src/test/java/com/teragrep/pth10/BloomFilterOperationsTest.java @@ -128,6 +128,34 @@ public void estimateTest() { ); } + @Test + @DisabledIfSystemProperty( + named = "skipSparkTest", + matches = "true" + ) + public void testEstimateOnEmptyArray() { + streamingTestUtil + .performDPLTest( + // index_Empty _raw = "" so tokenizer step will produce an empty array + "index=index_Empty earliest=2020-01-01T00:00:00z latest=2023-01-01T00:00:00z | teragrep exec tokenizer | teragrep exec bloom estimate", + testFile, ds -> { + Assertions + .assertEquals("[partition, estimate(tokens)]", Arrays.toString(ds.columns()), "Batch handler dataset contained an unexpected column arrangement !"); + List results = ds + .select("estimate(tokens)") + .collectAsList() + .stream() + .map(r -> Integer.parseInt(r.get(0).toString())) + .collect(Collectors.toList()); + + // assert that a row is produced and not an empty dataframe + Assertions.assertEquals(1, results.size()); + // assert that estimate is 0 and not empty or null + Assertions.assertEquals(0, results.get(0)); + } + ); + } + @Test @DisabledIfSystemProperty( named = "skipSparkTest", diff --git a/src/test/resources/xmlWalkerTestDataStreaming/bloomTeragrepStep_data.jsonl b/src/test/resources/xmlWalkerTestDataStreaming/bloomTeragrepStep_data.jsonl index 732f4fec..2290d262 100644 --- a/src/test/resources/xmlWalkerTestDataStreaming/bloomTeragrepStep_data.jsonl +++ b/src/test/resources/xmlWalkerTestDataStreaming/bloomTeragrepStep_data.jsonl @@ -1,2 +1,3 @@ {"_time": "2022-09-06 09:00:00.000", "id": 1, "_raw": "one", "index": "index_A", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "1", "offset": 1} -{"_time": "2022-09-06 09:00:00.000", "id": 2, "_raw": "one.two", "index": "index_A", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "2", "offset": 1} \ No newline at end of file +{"_time": "2022-09-06 09:00:00.000", "id": 2, "_raw": "one.two", "index": "index_A", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "2", "offset": 1} +{"_time": "2022-09-06 09:00:00.000", "id": 3, "_raw": "", "index": "index_Empty", "sourcetype": "stream1", "host": "host", "source": "127.0.0.0", "partition": "3", "offset": 1} \ No newline at end of file