From 7929a089f4930c2d9a35f8b0e21e4e3cc995924a Mon Sep 17 00:00:00 2001
From: eemhu <125959687+eemhu@users.noreply.github.com>
Date: Thu, 7 Nov 2024 17:42:34 +0200
Subject: [PATCH] #385: Add teragrep set|get config (#394)
* pth_03 update to 9.2.0, initial tg set config version.
* tg set config mismatched types test
* apply spotless
* initial TeragrepGetConfigStep
* add tg get config with test
* applied spotless
* set TeragrepGet/SetConfigStep as final class, improved GeneratedDatasource exception message, cleaned up debug ds.show(), removed unnecessary visitor function code from visitT_setParameter()
---
pom.xml | 2 +-
.../TeragrepTransformation.java | 19 ++-
.../steps/teragrep/TeragrepGetConfigStep.java | 86 ++++++++++++++
.../steps/teragrep/TeragrepSetConfigStep.java | 110 ++++++++++++++++++
.../pth10/TeragrepTransformationTest.java | 91 +++++++++++++++
5 files changed, 304 insertions(+), 4 deletions(-)
create mode 100644 src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepGetConfigStep.java
create mode 100644 src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSetConfigStep.java
diff --git a/pom.xml b/pom.xml
index c5e8965b..1db73f32 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@
11.0.1
3.1.1
0.4.3
- 9.1.0
+ 9.2.0
3.2.2
4.0.1
1.7.6
diff --git a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TeragrepTransformation.java b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TeragrepTransformation.java
index 5c47c04e..12fbfc4c 100644
--- a/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TeragrepTransformation.java
+++ b/src/main/java/com/teragrep/pth10/ast/commands/transformstatement/TeragrepTransformation.java
@@ -115,9 +115,6 @@ public Node visitTeragrepTransformation(DPLParser.TeragrepTransformationContext
* @return CatalystNode
*/
private Node teragrepTransformationEmitCatalyst(DPLParser.TeragrepTransformationContext ctx) {
-
- LOGGER.info("TeragrepTransformation Emit Catalyst");
-
// get zeppelin config
zplnConfig = catCtx.getConfig();
if (zplnConfig != null) {
@@ -164,11 +161,27 @@ public Node visitT_getParameter(DPLParser.T_getParameterContext ctx) {
else if (ctx.t_getArchiveSummaryParameter() != null) {
return visit(ctx.t_getArchiveSummaryParameter());
}
+ else if (ctx.COMMAND_TERAGREP_MODE_CONFIG() != null) {
+ return new StepNode(new TeragrepGetConfigStep(catCtx));
+ }
else {
throw new IllegalArgumentException("Unsupported teragrep command: " + ctx.getText());
}
}
+ @Override
+ public Node visitT_setParameter(DPLParser.T_setParameterContext ctx) {
+ return visitChildren(ctx);
+ }
+
+ @Override
+ public Node visitT_setConfigParameter(DPLParser.T_setConfigParameterContext ctx) {
+ final String key = new UnquotedText(new TextString(ctx.t_configKeyParameter().stringType().getText())).read();
+ final String value = new UnquotedText(new TextString(ctx.t_configValueParameter().stringType().getText()))
+ .read();
+ return new StepNode(new TeragrepSetConfigStep(catCtx, key, value));
+ }
+
@Override
public Node visitT_getArchiveSummaryParameter(DPLParser.T_getArchiveSummaryParameterContext ctx) {
// archive summary
diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepGetConfigStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepGetConfigStep.java
new file mode 100644
index 00000000..7954e092
--- /dev/null
+++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepGetConfigStep.java
@@ -0,0 +1,86 @@
+/*
+ * Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
+ * Copyright (C) 2019-2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.pth10.steps.teragrep;
+
+import com.teragrep.pth10.ast.DPLParserCatalystContext;
+import com.teragrep.pth10.datasources.GeneratedDatasource;
+import com.teragrep.pth10.steps.AbstractStep;
+import com.typesafe.config.ConfigValue;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public final class TeragrepGetConfigStep extends AbstractStep {
+
+ private final DPLParserCatalystContext catCtx;
+
+ public TeragrepGetConfigStep(DPLParserCatalystContext catCtx) {
+ this.catCtx = catCtx;
+ }
+
+ @Override
+ public Dataset get(Dataset dataset) throws StreamingQueryException {
+ List configs = new ArrayList<>();
+
+ for (Map.Entry entry : catCtx.getConfig().entrySet()) {
+ configs.add(entry.getKey().concat(" = ").concat(entry.getValue().unwrapped().toString()));
+ }
+
+ GeneratedDatasource datasource = new GeneratedDatasource(catCtx);
+ try {
+ dataset = datasource.constructStream(configs, "teragrep get config");
+ }
+ catch (InterruptedException | UnknownHostException e) {
+ throw new RuntimeException("Unable to construct 'teragrep get config' dataset", e);
+ }
+
+ return dataset;
+ }
+}
diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSetConfigStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSetConfigStep.java
new file mode 100644
index 00000000..1970df48
--- /dev/null
+++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepSetConfigStep.java
@@ -0,0 +1,110 @@
+/*
+ * Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
+ * Copyright (C) 2019-2024 Suomen Kanuuna Oy
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ *
+ * Additional permission under GNU Affero General Public License version 3
+ * section 7
+ *
+ * If you modify this Program, or any covered work, by linking or combining it
+ * with other code, such other code is not for that reason alone subject to any
+ * of the requirements of the GNU Affero GPL version 3 as long as this Program
+ * is the same Program as licensed from Suomen Kanuuna Oy without any additional
+ * modifications.
+ *
+ * Supplemented terms under GNU Affero General Public License version 3
+ * section 7
+ *
+ * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
+ * versions must be marked as "Modified version of" The Program.
+ *
+ * Names of the licensors and authors may not be used for publicity purposes.
+ *
+ * No rights are granted for use of trade names, trademarks, or service marks
+ * which are in The Program if any.
+ *
+ * Licensee must indemnify licensors and authors for any liability that these
+ * contractual assumptions impose on licensors and authors.
+ *
+ * To the extent this program is licensed as part of the Commercial versions of
+ * Teragrep, the applicable Commercial License may apply to this file if you as
+ * a licensee so wish it.
+ */
+package com.teragrep.pth10.steps.teragrep;
+
+import com.teragrep.pth10.ast.DPLParserCatalystContext;
+import com.teragrep.pth10.steps.AbstractStep;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import com.typesafe.config.ConfigValueFactory;
+import com.typesafe.config.ConfigValueType;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class TeragrepSetConfigStep extends AbstractStep {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(TeragrepSetConfigStep.class);
+ private final DPLParserCatalystContext catCtx;
+ private final String key;
+ private final String value;
+
+ public TeragrepSetConfigStep(final DPLParserCatalystContext catCtx, final String key, final String value) {
+ this.catCtx = catCtx;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public Dataset get(Dataset dataset) throws StreamingQueryException {
+ if (catCtx == null) {
+ throw new IllegalStateException("DPLParserCatalystContext not set");
+ }
+
+ final Config config = catCtx.getConfig();
+ if (config == null || config.isEmpty()) {
+ throw new IllegalArgumentException("Config is null or empty");
+ }
+
+ if (!config.hasPath(key)) {
+ throw new IllegalArgumentException("Config key " + key + " not found");
+ }
+
+ final ConfigValue oldValue = config.getValue(key);
+ final Config newConfig;
+ if (oldValue.valueType().equals(ConfigValueType.BOOLEAN)) {
+ newConfig = config.withValue(key, ConfigValueFactory.fromAnyRef(Boolean.parseBoolean(value)));
+ }
+ else if (oldValue.valueType().equals(ConfigValueType.NUMBER)) {
+ // getInt(), getLong() will work without decimals
+ newConfig = config.withValue(key, ConfigValueFactory.fromAnyRef(Double.parseDouble(value)));
+ }
+ else if (oldValue.valueType().equals(ConfigValueType.STRING)) {
+ newConfig = config.withValue(key, ConfigValueFactory.fromAnyRef(value));
+ }
+ else {
+ throw new IllegalArgumentException("Unknown config value type: " + oldValue.valueType());
+ }
+
+ LOGGER.info("Set configuration <[{}]> to new value <[{}]>", key, value);
+
+ catCtx.setConfig(newConfig);
+
+ return dataset;
+ }
+}
diff --git a/src/test/java/com/teragrep/pth10/TeragrepTransformationTest.java b/src/test/java/com/teragrep/pth10/TeragrepTransformationTest.java
index c9ee01ac..3343c6f5 100644
--- a/src/test/java/com/teragrep/pth10/TeragrepTransformationTest.java
+++ b/src/test/java/com/teragrep/pth10/TeragrepTransformationTest.java
@@ -45,6 +45,9 @@
*/
package com.teragrep.pth10;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
@@ -819,4 +822,92 @@ public void tgForEachBatchTest() {
Assertions.assertEquals(5, ds.count());
});
}
+
+ @Test
+ @DisabledIfSystemProperty(
+ named = "skipSparkTest",
+ matches = "true"
+ )
+ public void tgSetConfigStringTest() {
+ Config fakeConfig = ConfigFactory
+ .defaultApplication()
+ .withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef("oldValue"));
+ streamingTestUtil.getCtx().setConfig(fakeConfig);
+ Assertions.assertEquals("oldValue", streamingTestUtil.getCtx().getConfig().getString("dpl.pth_00.dummy.value"));
+ streamingTestUtil
+ .performDPLTest("index=index_A | teragrep set config dpl.pth_00.dummy.value newValue", testFile, ds -> {
+ Assertions
+ .assertEquals(
+ "newValue",
+ streamingTestUtil.getCtx().getConfig().getString("dpl.pth_00.dummy.value")
+ );
+ });
+ }
+
+ @Test
+ @DisabledIfSystemProperty(
+ named = "skipSparkTest",
+ matches = "true"
+ )
+ public void tgSetConfigLongTest() {
+ Config fakeConfig = ConfigFactory
+ .defaultApplication()
+ .withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef(12345));
+ streamingTestUtil.getCtx().setConfig(fakeConfig);
+ Assertions.assertEquals(12345L, streamingTestUtil.getCtx().getConfig().getLong("dpl.pth_00.dummy.value"));
+ streamingTestUtil
+ .performDPLTest("index=index_A | teragrep set config dpl.pth_00.dummy.value 99999", testFile, ds -> {
+ Assertions
+ .assertEquals(
+ 99999L, streamingTestUtil.getCtx().getConfig().getLong("dpl.pth_00.dummy.value")
+ );
+ });
+ }
+
+ @Test
+ @DisabledIfSystemProperty(
+ named = "skipSparkTest",
+ matches = "true"
+ )
+ public void tgSetConfigMismatchedTypesTest() {
+ Config fakeConfig = ConfigFactory
+ .defaultApplication()
+ .withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef(12345));
+ streamingTestUtil.getCtx().setConfig(fakeConfig);
+ Assertions.assertEquals(12345L, streamingTestUtil.getCtx().getConfig().getLong("dpl.pth_00.dummy.value"));
+ Throwable t = streamingTestUtil
+ .performThrowingDPLTest(
+ NumberFormatException.class,
+ "index=index_A | teragrep set config dpl.pth_00.dummy.value stringValue", testFile, ds -> {
+ }
+ );
+
+ Assertions.assertEquals("For input string: \"stringValue\"", t.getMessage());
+ }
+
+ @Test
+ @DisabledIfSystemProperty(
+ named = "skipSparkTest",
+ matches = "true"
+ )
+ public void tgGetConfigTest() {
+ Config fakeConfig = ConfigFactory
+ .defaultApplication()
+ .withValue("dpl.pth_00.dummy.value", ConfigValueFactory.fromAnyRef(12345))
+ .withValue("dpl.pth_00.another.dummy.value", ConfigValueFactory.fromAnyRef("string_here"));
+ streamingTestUtil.getCtx().setConfig(fakeConfig);
+
+ streamingTestUtil.performDPLTest("index=index_A | teragrep get config", testFile, ds -> {
+ List configs = ds
+ .select("_raw")
+ .collectAsList()
+ .stream()
+ .map(r -> r.getAs(0).toString())
+ .collect(Collectors.toList());
+ Assertions.assertEquals(2, configs.size());
+ Assertions.assertTrue(configs.contains("dpl.pth_00.another.dummy.value = string_here"));
+ Assertions.assertTrue(configs.contains("dpl.pth_00.dummy.value = 12345"));
+ });
+ }
+
}