Skip to content

Commit 81829e0

Browse files
committed
support culstering
1 parent 849553c commit 81829e0

File tree

11 files changed

+921
-2
lines changed

11 files changed

+921
-2
lines changed

src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public void createTableIfNotExist(String table, String dataset) {
110110
if (this.task.getTimePartitioning().isPresent()) {
111111
tableDefinitionBuilder.setTimePartitioning(buildTimePartitioning(this.task.getTimePartitioning().get()));
112112
}
113+
if (this.task.getClustering().isPresent()){
114+
tableDefinitionBuilder.setClustering(
115+
Clustering.newBuilder().setFields(this.task.getClustering().get().getFields().get()).build()
116+
);
117+
}
113118
TableDefinition tableDefinition = tableDefinitionBuilder.build();
114119

115120
try{

src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java.orig

Lines changed: 491 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package org.embulk.output.bigquery_java.config;
2+
3+
import org.embulk.config.Config;
4+
import org.embulk.config.ConfigDefault;
5+
import org.embulk.config.Task;
6+
7+
import java.util.List;
8+
import java.util.Optional;
9+
10+
public interface BigqueryClustering extends Task {
11+
@Config("fields")
12+
@ConfigDefault("null")
13+
public Optional<List<String>> getFields();
14+
}

src/main/java/org/embulk/output/bigquery_java/config/BigqueryConfigValidator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,12 @@ public static void validateTimePartitioning(PluginTask task) throws ConfigExcept
3333
}
3434
}
3535
}
36+
37+
public static void validateClustering(PluginTask task) throws ConfigException{
38+
if (task.getClustering().isPresent()){
39+
if (!task.getClustering().get().getFields().isPresent()){
40+
throw new ConfigException("`clustering` must have `fields` key");
41+
}
42+
}
43+
}
3644
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.embulk.output.bigquery_java.config;
2+
3+
import org.embulk.config.ConfigException;
4+
5+
import java.util.Arrays;
6+
7+
public class BigqueryConfigValidator {
8+
public static void validate(PluginTask task) {
9+
validateMode(task);
10+
validateModeAndAutoCreteTable(task);
11+
}
12+
13+
public static void validateMode(PluginTask task) throws ConfigException {
14+
// TODO: append_direct delete_in_advance replace_backup
15+
String[] modes = {"replace", "append", "delete_in_advance", "append_direct"};
16+
if (!Arrays.asList(modes).contains(task.getMode().toLowerCase())) {
17+
throw new ConfigException("replace, append, delete_in_advance and append_direct are supported. Stay tuned!");
18+
}
19+
}
20+
21+
public static void validateModeAndAutoCreteTable(PluginTask task) throws ConfigException {
22+
// TODO: modes are append replace delete_in_advance replace_backup and !task['auto_create_table']
23+
String[] modes = {"replace", "append", "delete_in_advance"};
24+
if (Arrays.asList(modes).contains(task.getMode().toLowerCase()) && !task.getAutoCreateTable()) {
25+
throw new ConfigException("replace, append and delete_in_advance are supported. Stay tuned!");
26+
}
27+
}
28+
29+
public static void validateTimePartitioning(PluginTask task) throws ConfigException {
30+
if (task.getTimePartitioning().isPresent()) {
31+
if (!task.getTimePartitioning().get().getType().toUpperCase().equals("DAY")) {
32+
throw new ConfigException(String.format("`time_partitioning.type: %s` requires `time_partitioning.type: DAY`", task.getMode()));
33+
}
34+
}
35+
}
36+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
***************
2+
*** 9,14 ****
3+
validateMode(task);
4+
validateModeAndAutoCreteTable(task);
5+
validateProject(task);
6+
}
7+
8+
public static void validateMode(PluginTask task) throws ConfigException {
9+
--- 9,15 ----
10+
validateMode(task);
11+
validateModeAndAutoCreteTable(task);
12+
validateProject(task);
13+
+ validateClustering(task);
14+
}
15+
16+
public static void validateMode(PluginTask task) throws ConfigException {
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package org.embulk.output.bigquery_java.config;
2+
3+
import java.util.List;
4+
import java.util.Optional;
5+
6+
import com.google.common.annotations.VisibleForTesting;
7+
import org.embulk.config.Config;
8+
import org.embulk.config.ConfigDefault;
9+
import org.embulk.config.Task;
10+
11+
12+
public interface PluginTask
13+
extends Task {
14+
15+
@Config("mode")
16+
@ConfigDefault("\"append\"")
17+
String getMode();
18+
19+
@VisibleForTesting
20+
void setMode(String mode);
21+
22+
// TODO: default should be application default
23+
@Config("auth_method")
24+
@ConfigDefault("\"service_account\"")
25+
String getAuthMethod();
26+
27+
@Config("json_keyfile")
28+
String getJsonKeyfile();
29+
30+
@Config("dataset")
31+
String getDataset();
32+
33+
@Config("table")
34+
String getTable();
35+
36+
@Config("old_dataset")
37+
@ConfigDefault("null")
38+
Optional<String> getOldDataset();
39+
40+
@Config("old_table")
41+
@ConfigDefault("null")
42+
Optional<String> getOldTable();
43+
44+
@Config("location")
45+
@ConfigDefault("null")
46+
Optional<String> getLocation();
47+
48+
@Config("column_options")
49+
@ConfigDefault("null")
50+
Optional<List<BigqueryColumnOption>> getColumnOptions();
51+
52+
@Config("compression")
53+
@ConfigDefault("\"NONE\"")
54+
String getCompression();
55+
56+
@Config("source_format")
57+
String getSourceFormat();
58+
59+
@Config("path_prefix")
60+
@ConfigDefault("null")
61+
Optional<String> getPathPrefix();
62+
63+
void setPathPrefix(Optional<String> pathPrefix);
64+
65+
@Config("default_timezone")
66+
@ConfigDefault("\"UTC\"")
67+
String getDefaultTimezone();
68+
69+
@Config("default_timestamp_format")
70+
@ConfigDefault("\"%Y-%m-%d %H:%M:%S.%6N %:z\"")
71+
String getDefaultTimestampFormat();
72+
73+
//TODO: make this optional
74+
@Config("file_ext")
75+
@ConfigDefault("null")
76+
Optional<String> getFileExt();
77+
78+
void setFileExt(Optional<String> fileExt);
79+
80+
@Config("encoding")
81+
@ConfigDefault("\"UTF-8\"")
82+
String getEncoding();
83+
84+
@Config("auto_create_dataset")
85+
@ConfigDefault("false")
86+
boolean getAutoCreateDataset();
87+
88+
@Config("auto_create_table")
89+
@ConfigDefault("true")
90+
boolean getAutoCreateTable();
91+
92+
@VisibleForTesting
93+
void setAutoCreateTable(boolean autoCreateTable);
94+
95+
@Config("max_bad_records")
96+
@ConfigDefault("0")
97+
int getMaxBadRecords();
98+
99+
@Config("ignore_unknown_values")
100+
@ConfigDefault("false")
101+
boolean getIgnoreUnknownValues();
102+
103+
@Config("allow_quoted_newlines")
104+
@ConfigDefault("false")
105+
boolean getAllowQuotedNewlines();
106+
107+
@Config("template_table")
108+
@ConfigDefault("null")
109+
Optional<String> getTemplateTable();
110+
111+
@Config("job_status_polling_interval")
112+
@ConfigDefault("10")
113+
long getJobStatusPollingInterval();
114+
115+
@Config("job_status_max_polling_time")
116+
@ConfigDefault("3600")
117+
long getJobStatusMaxPollingTime();
118+
119+
@Config("temp_table")
120+
@ConfigDefault("null")
121+
Optional<String> getTempTable();
122+
123+
void setTempTable(Optional<String> tempTable);
124+
125+
@Config("delete_from_local_when_job_end")
126+
@ConfigDefault("true")
127+
boolean getDeleteFromLocalWhenJobEnd();
128+
129+
@Config("is_skip_job_result_check")
130+
@ConfigDefault("false")
131+
boolean getIsSkipJobResultCheck();
132+
133+
@Config("abort_on_error")
134+
@ConfigDefault("null")
135+
Optional<Boolean> getAbortOnError();
136+
137+
void setAbortOnError(Optional<Boolean> abortOnError);
138+
139+
// TODO: this is not corresponding to before_load SQL syntax
140+
@Config("enable_standard_sql")
141+
@ConfigDefault("false")
142+
boolean getEnableStandardSQL();
143+
144+
@Config("retries")
145+
@ConfigDefault("5")
146+
int getRetries();
147+
148+
@Config("before_load")
149+
@ConfigDefault("null")
150+
Optional<String> getBeforeLoad();
151+
152+
@Config("time_partitioning")
153+
@ConfigDefault("null")
154+
Optional<BigqueryTimePartitioning> getTimePartitioning();
155+
156+
void setTimePartitioning(Optional<BigqueryTimePartitioning> bigqueryTimePartitioning);
157+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
***************
2+
*** 175,178 ****
3+
@Config("auto_create_gcs_bucket")
4+
@ConfigDefault("false")
5+
Optional<String> getAutoCreateGcsBucket();
6+
}
7+
--- 175,182 ----
8+
@Config("auto_create_gcs_bucket")
9+
@ConfigDefault("false")
10+
Optional<String> getAutoCreateGcsBucket();
11+
+
12+
+ @Config("clustering")
13+
+ @ConfigDefault("null")
14+
+ Optional<BigqueryClustering> getClustering();
15+
}

src/test/java/org/embulk/output/bigquery_java/config/TestBigqueryTaskBuilder.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import org.junit.Rule;
88
import org.junit.Test;
99

10+
import java.util.Arrays;
11+
import java.util.List;
12+
1013
import static org.junit.Assert.assertEquals;
1114

1215
public class TestBigqueryTaskBuilder {
@@ -25,7 +28,22 @@ private static ConfigSource loadYamlResource(TestingEmbulk embulk, String fileNa
2528

2629
@Test
2730
public void setAbortOnError_DefaultMaxBadRecord_True() {
28-
config = loadYamlResource(embulk, "base.yml");
31+
config = embulk.configLoader().fromYamlString(
32+
String.join("\n",
33+
"type: bigquery_java",
34+
"mode: replace",
35+
"auth_method: service_account",
36+
"json_keyfile: json_key.json",
37+
"dataset: dataset",
38+
"table: table",
39+
"source_format: NEWLINE_DELIMITED_JSON",
40+
"compression: GZIP",
41+
"auto_create_dataset: false",
42+
"auto_create_table: true",
43+
"path_prefix: /tmp/bq_compress/bq_",
44+
""
45+
)
46+
);
2947
PluginTask task = config.loadConfig(PluginTask.class);
3048
BigqueryTaskBuilder.setAbortOnError(task);
3149

@@ -36,7 +54,22 @@ public void setAbortOnError_DefaultMaxBadRecord_True() {
3654
// TODO jsonl without compression, csv with/out compression
3755
@Test
3856
public void setFileExt_JSONL_GZIP_JSONL_GZ() {
39-
config = loadYamlResource(embulk, "base.yml");
57+
config = embulk.configLoader().fromYamlString(
58+
String.join("\n",
59+
"type: bigquery_java",
60+
"mode: replace",
61+
"auth_method: service_account",
62+
"json_keyfile: json_key.json",
63+
"dataset: dataset",
64+
"table: table",
65+
"source_format: NEWLINE_DELIMITED_JSON",
66+
"compression: GZIP",
67+
"auto_create_dataset: false",
68+
"auto_create_table: true",
69+
"path_prefix: /tmp/bq_compress/bq_",
70+
""
71+
)
72+
);
4073
PluginTask task = config.loadConfig(PluginTask.class);
4174

4275
BigqueryTaskBuilder.setFileExt(task);
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.embulk.output.bigquery_java.config;
2+
3+
import org.embulk.config.ConfigSource;
4+
import org.embulk.output.bigquery_java.BigqueryJavaOutputPlugin;
5+
import org.embulk.spi.OutputPlugin;
6+
import org.embulk.test.TestingEmbulk;
7+
import org.junit.Rule;
8+
import org.junit.Test;
9+
10+
import static org.junit.Assert.assertEquals;
11+
12+
public class TestBigqueryTaskBuilder {
13+
14+
private ConfigSource config;
15+
private static final String BASIC_RESOURCE_PATH = "java/org/embulk/output/bigquery_java/";
16+
17+
private static ConfigSource loadYamlResource(TestingEmbulk embulk, String fileName) {
18+
return embulk.loadYamlResource(BASIC_RESOURCE_PATH + fileName);
19+
}
20+
21+
@Rule
22+
public TestingEmbulk embulk = TestingEmbulk.builder()
23+
.registerPlugin(OutputPlugin.class, "bigquery_java", BigqueryJavaOutputPlugin.class)
24+
.build();
25+
26+
@Test
27+
public void setAbortOnError_DefaultMaxBadRecord_True() {
28+
config = loadYamlResource(embulk, "base.yml");
29+
PluginTask task = config.loadConfig(PluginTask.class);
30+
BigqueryTaskBuilder.setAbortOnError(task);
31+
32+
assertEquals(0, task.getMaxBadRecords());
33+
assertEquals(true, task.getAbortOnError().get());
34+
}
35+
36+
// TODO jsonl without compression, csv with/out compression
37+
@Test
38+
public void setFileExt_JSONL_GZIP_JSONL_GZ() {
39+
config = loadYamlResource(embulk, "base.yml");
40+
PluginTask task = config.loadConfig(PluginTask.class);
41+
42+
BigqueryTaskBuilder.setFileExt(task);
43+
assertEquals("NEWLINE_DELIMITED_JSON", task.getSourceFormat());
44+
assertEquals("GZIP", task.getCompression());
45+
assertEquals(".jsonl.gz", task.getFileExt().get());
46+
}
47+
}

0 commit comments

Comments
 (0)