Skip to content

Commit 3d5ffbf

Browse files
authored
Merge pull request #96 from trocco-io/added_merge_mode
Added merge mode
2 parents e430911 + 8e2104e commit 3d5ffbf

File tree

6 files changed

+249
-47
lines changed

6 files changed

+249
-47
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ jobs:
4343
needs: [ test ]
4444
if: ${{ github.event_name == 'workflow_dispatch' || contains(github.ref, 'refs/tags/') }}
4545
steps:
46+
- uses: actions/checkout@v2
4647
- name: push gem
4748
uses: trocco-io/push-gem-to-gpr-action@v1
4849
with:

src/main/java/org/embulk/output/bigquery_java/BigqueryClient.java

Lines changed: 211 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,33 @@
11
package org.embulk.output.bigquery_java;
22

3-
import java.io.FileInputStream;
4-
import java.io.IOException;
5-
import java.io.OutputStream;
6-
import java.util.ArrayList;
7-
import java.util.Collections;
8-
import java.util.List;
9-
import java.util.Optional;
10-
import java.nio.channels.Channels;
11-
import java.nio.file.Files;
12-
import java.nio.file.Path;
13-
import java.util.UUID;
14-
15-
import com.google.cloud.bigquery.*;
3+
import com.google.auth.oauth2.ServiceAccountCredentials;
4+
import com.google.cloud.bigquery.BigQuery;
5+
import com.google.cloud.bigquery.BigQueryException;
6+
import com.google.cloud.bigquery.BigQueryOptions;
7+
import com.google.cloud.bigquery.Clustering;
8+
import com.google.cloud.bigquery.CopyJobConfiguration;
9+
import com.google.cloud.bigquery.Dataset;
10+
import com.google.cloud.bigquery.DatasetInfo;
11+
import com.google.cloud.bigquery.Field;
12+
import com.google.cloud.bigquery.FieldValue;
13+
import com.google.cloud.bigquery.FormatOptions;
14+
import com.google.cloud.bigquery.Job;
15+
import com.google.cloud.bigquery.JobId;
16+
import com.google.cloud.bigquery.JobInfo;
17+
import com.google.cloud.bigquery.JobStatistics;
18+
import com.google.cloud.bigquery.LegacySQLTypeName;
19+
import com.google.cloud.bigquery.QueryJobConfiguration;
20+
import com.google.cloud.bigquery.StandardSQLTypeName;
21+
import com.google.cloud.bigquery.StandardTableDefinition;
22+
import com.google.cloud.bigquery.Table;
23+
import com.google.cloud.bigquery.TableDataWriteChannel;
24+
import com.google.cloud.bigquery.TableDefinition;
25+
import com.google.cloud.bigquery.TableId;
26+
import com.google.cloud.bigquery.TableInfo;
27+
import com.google.cloud.bigquery.TableResult;
28+
import com.google.cloud.bigquery.TimePartitioning;
29+
import com.google.cloud.bigquery.WriteChannelConfiguration;
30+
import com.google.common.annotations.VisibleForTesting;
1631
import org.embulk.output.bigquery_java.config.BigqueryColumnOption;
1732
import org.embulk.output.bigquery_java.config.BigqueryTimePartitioning;
1833
import org.embulk.output.bigquery_java.config.PluginTask;
@@ -30,15 +45,27 @@
3045
import org.embulk.spi.type.TimestampType;
3146
import org.embulk.spi.type.Type;
3247
import org.embulk.spi.util.RetryExecutor;
33-
34-
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
35-
36-
import com.google.auth.oauth2.ServiceAccountCredentials;
37-
import com.google.common.annotations.VisibleForTesting;
38-
3948
import org.slf4j.Logger;
4049
import org.slf4j.LoggerFactory;
4150

51+
import java.io.FileInputStream;
52+
import java.io.IOException;
53+
import java.io.OutputStream;
54+
import java.nio.channels.Channels;
55+
import java.nio.file.Files;
56+
import java.nio.file.Path;
57+
import java.util.ArrayList;
58+
import java.util.Collections;
59+
import java.util.List;
60+
import java.util.Optional;
61+
import java.util.Spliterator;
62+
import java.util.Spliterators;
63+
import java.util.UUID;
64+
import java.util.stream.Collectors;
65+
import java.util.stream.Stream;
66+
import java.util.stream.StreamSupport;
67+
68+
import static org.embulk.spi.util.RetryExecutor.retryExecutor;
4269

4370
public class BigqueryClient {
4471
private final Logger logger = LoggerFactory.getLogger(BigqueryClient.class);
@@ -158,7 +185,7 @@ public TimePartitioning buildTimePartitioning(BigqueryTimePartitioning bigqueryT
158185
return timePartitioningBuilder.build();
159186
}
160187

161-
public JobStatistics.LoadStatistics load(Path loadFile, String table, JobInfo.WriteDisposition writeDestination) throws BigqueryException {
188+
public JobStatistics.LoadStatistics load(Path loadFile, String table, JobInfo.WriteDisposition writeDisposition) throws BigqueryException {
162189
String dataset = this.dataset;
163190
int retries = this.task.getRetries();
164191
PluginTask task = this.task;
@@ -191,7 +218,7 @@ public JobStatistics.LoadStatistics call() {
191218
WriteChannelConfiguration writeChannelConfiguration =
192219
WriteChannelConfiguration.newBuilder(tableId)
193220
.setFormatOptions(FormatOptions.json())
194-
.setWriteDisposition(writeDestination)
221+
.setWriteDisposition(writeDisposition)
195222
.setMaxBadRecords(task.getMaxBadRecords())
196223
.setIgnoreUnknownValues(task.getIgnoreUnknownValues())
197224
.setSchema(buildSchema(schema, columnOptions))
@@ -247,7 +274,7 @@ public void onGiveup(Exception firstException, Exception lastException) throws R
247274
public JobStatistics.CopyStatistics copy(String sourceTable,
248275
String destinationTable,
249276
String destinationDataset,
250-
JobInfo.WriteDisposition writeDestination) throws BigqueryException {
277+
JobInfo.WriteDisposition writeDisposition) throws BigqueryException {
251278
String dataset = this.dataset;
252279
int retries = this.task.getRetries();
253280

@@ -265,7 +292,7 @@ public JobStatistics.CopyStatistics call() {
265292
TableId srcTableId = TableId.of(dataset, sourceTable);
266293

267294
CopyJobConfiguration copyJobConfiguration = CopyJobConfiguration.newBuilder(destTableId, srcTableId)
268-
.setWriteDisposition(writeDestination)
295+
.setWriteDisposition(writeDisposition)
269296
.build();
270297

271298
Job job = bigquery.create(JobInfo.newBuilder(copyJobConfiguration).setJobId(JobId.of(jobId)).build());
@@ -308,6 +335,167 @@ public void onGiveup(Exception firstException, Exception lastException) throws R
308335
}
309336
}
310337

338+
public JobStatistics.QueryStatistics merge(String sourceTable, String targetTable, List<String> mergeKeys, List<String> mergeRule) {
339+
StringBuilder sb = new StringBuilder();
340+
sb.append("MERGE ");
341+
sb.append(quoteIdentifier(dataset));
342+
sb.append(".");
343+
sb.append(quoteIdentifier(targetTable));
344+
sb.append(" T");
345+
sb.append(" USING ");
346+
sb.append(quoteIdentifier(dataset));
347+
sb.append(".");
348+
sb.append(quoteIdentifier(sourceTable));
349+
sb.append(" S");
350+
sb.append(" ON ");
351+
appendMergeKeys(sb, mergeKeys.isEmpty() ? getMergeKeys(targetTable) : mergeKeys);
352+
sb.append(" WHEN MATCHED THEN");
353+
sb.append(" UPDATE SET ");
354+
appendMergeRule(sb, mergeRule, schema);
355+
sb.append(" WHEN NOT MATCHED THEN");
356+
sb.append(" INSERT (");
357+
appendColumns(sb, schema);
358+
sb.append(") VALUES (");
359+
appendColumns(sb, schema);
360+
sb.append(")");
361+
String query = sb.toString();
362+
logger.info(String.format("embulk-output-bigquery: Execute query... %s", query));
363+
return executeQuery(query);
364+
}
365+
366+
private List<String> getMergeKeys(String table) {
367+
String query =
368+
"SELECT" +
369+
" KCU.COLUMN_NAME " +
370+
"FROM " +
371+
quoteIdentifier(dataset) + ".INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU " +
372+
"JOIN " +
373+
quoteIdentifier(dataset) + ".INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC " +
374+
"ON" +
375+
" KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND" +
376+
" KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND" +
377+
" KCU.CONSTRAINT_NAME = TC.CONSTRAINT_NAME AND" +
378+
" KCU.TABLE_CATALOG = TC.TABLE_CATALOG AND" +
379+
" KCU.TABLE_SCHEMA = TC.TABLE_SCHEMA AND" +
380+
" KCU.TABLE_NAME = TC.TABLE_NAME " +
381+
"WHERE" +
382+
" TC.TABLE_NAME = '" + table + "' AND" +
383+
" TC.CONSTRAINT_TYPE = 'PRIMARY KEY' " +
384+
"ORDER BY" +
385+
" KCU.ORDINAL_POSITION";
386+
return stream(runQuery(query).iterateAll())
387+
.flatMap(BigqueryClient::stream)
388+
.map(FieldValue::getStringValue)
389+
.collect(Collectors.toList());
390+
}
391+
392+
public TableResult runQuery(String query) {
393+
int retries = task.getRetries();
394+
try {
395+
return retryExecutor()
396+
.withRetryLimit(retries)
397+
.withInitialRetryWait(2 * 1000)
398+
.withMaxRetryWait(10 * 1000)
399+
.runInterruptible(new RetryExecutor.Retryable<TableResult>() {
400+
@Override
401+
public TableResult call() throws Exception {
402+
QueryJobConfiguration configuration =
403+
QueryJobConfiguration.newBuilder(query)
404+
.setUseLegacySql(false)
405+
.build();
406+
String job = String.format("embulk_query_job_%s", UUID.randomUUID());
407+
JobId.Builder builder = JobId.newBuilder().setJob(job);
408+
if (location != null){
409+
builder.setLocation(location);
410+
}
411+
return bigquery.query(configuration, builder.build());
412+
}
413+
@Override
414+
public boolean isRetryableException(Exception exception) {
415+
return exception instanceof BigqueryBackendException
416+
|| exception instanceof BigqueryRateLimitExceededException
417+
|| exception instanceof BigqueryInternalException;
418+
}
419+
@Override
420+
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait) {
421+
String message = String.format("embulk-output-bigquery: Query job failed. Retrying %d/%d after %d seconds. Message: %s",
422+
retryCount, retryLimit, retryWait / 1000, exception.getMessage());
423+
if (retryCount % retries == 0) {
424+
logger.warn(message, exception);
425+
} else {
426+
logger.warn(message);
427+
}
428+
}
429+
@Override
430+
public void onGiveup(Exception firstException, Exception lastException) {
431+
logger.error("embulk-output-bigquery: Give up retrying for Query job");
432+
}
433+
});
434+
} catch (RetryExecutor.RetryGiveupException e) {
435+
if (e.getCause() instanceof BigqueryException) {
436+
throw (BigqueryException) e.getCause();
437+
}
438+
throw new RuntimeException(e);
439+
} catch (InterruptedException e) {
440+
throw new BigqueryException("interrupted");
441+
}
442+
}
443+
444+
private static <T> Stream<T> stream(Iterable<T> iterable) {
445+
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterable.iterator(), Spliterator.ORDERED), false);
446+
}
447+
448+
private static StringBuilder appendMergeKeys(StringBuilder sb, List<String> mergeKeys) {
449+
if (mergeKeys.isEmpty()) {
450+
throw new RuntimeException("merge key or primary key is required");
451+
}
452+
for (int i = 0; i < mergeKeys.size(); i++) {
453+
if (i != 0) { sb.append(" AND "); }
454+
String mergeKey = quoteIdentifier(mergeKeys.get(i));
455+
sb.append("T.");
456+
sb.append(mergeKey);
457+
sb.append(" = S.");
458+
sb.append(mergeKey);
459+
}
460+
return sb;
461+
}
462+
463+
private static StringBuilder appendMergeRule(StringBuilder sb, List<String> mergeRule, Schema schema) {
464+
return mergeRule.isEmpty() ? appendMergeRule(sb, schema) : appendMergeRule(sb, mergeRule);
465+
}
466+
467+
private static StringBuilder appendMergeRule(StringBuilder sb, List<String> mergeRule) {
468+
for (int i = 0; i < mergeRule.size(); i++) {
469+
if (i != 0) { sb.append(", "); }
470+
sb.append(mergeRule.get(i));
471+
}
472+
return sb;
473+
}
474+
475+
private static StringBuilder appendMergeRule(StringBuilder sb, Schema schema) {
476+
for (int i = 0; i < schema.getColumnCount(); i++) {
477+
if (i != 0) { sb.append(", "); }
478+
String column = quoteIdentifier(schema.getColumnName(i));
479+
sb.append("T.");
480+
sb.append(column);
481+
sb.append(" = S.");
482+
sb.append(column);
483+
}
484+
return sb;
485+
}
486+
487+
private static StringBuilder appendColumns(StringBuilder sb, Schema schema) {
488+
for (int i = 0; i < schema.getColumnCount(); i++) {
489+
if (i != 0) { sb.append(", "); }
490+
sb.append(quoteIdentifier(schema.getColumnName(i)));
491+
}
492+
return sb;
493+
}
494+
495+
private static String quoteIdentifier(String identifier) {
496+
return "`" + identifier + "`";
497+
}
498+
311499
public JobStatistics.QueryStatistics executeQuery(String query) {
312500
int retries = this.task.getRetries();
313501
String location = this.location;
@@ -372,7 +560,6 @@ public void onGiveup(Exception firstException, Exception lastException) throws R
372560
}
373561
}
374562

375-
376563
public boolean deleteTable(String table) {
377564
return deleteTable(table, null);
378565
}

src/main/java/org/embulk/output/bigquery_java/BigqueryJavaOutputPlugin.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,5 @@
11
package org.embulk.output.bigquery_java;
22

3-
import java.io.File;
4-
import java.math.BigInteger;
5-
import java.nio.file.Path;
6-
import java.util.ArrayList;
7-
import java.util.Collection;
8-
import java.util.List;
9-
import java.util.concurrent.ConcurrentHashMap;
10-
import java.util.concurrent.ExecutorService;
11-
import java.util.concurrent.Executors;
12-
import java.util.concurrent.Future;
13-
import java.util.stream.Collectors;
14-
15-
import com.google.cloud.bigquery.Dataset;
163
import com.google.cloud.bigquery.JobInfo;
174
import com.google.cloud.bigquery.JobStatistics;
185
import com.google.cloud.bigquery.Table;
@@ -28,10 +15,22 @@
2815
import org.embulk.spi.OutputPlugin;
2916
import org.embulk.spi.Schema;
3017
import org.embulk.spi.TransactionalPageOutput;
31-
3218
import org.slf4j.Logger;
3319
import org.slf4j.LoggerFactory;
3420

21+
import java.io.File;
22+
import java.math.BigInteger;
23+
import java.nio.file.Path;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.Collections;
27+
import java.util.List;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.Future;
32+
import java.util.stream.Collectors;
33+
3534
public class BigqueryJavaOutputPlugin
3635
implements OutputPlugin {
3736
private final Logger logger = LoggerFactory.getLogger(BigqueryJavaOutputPlugin.class);
@@ -63,6 +62,7 @@ public ConfigDiff transaction(ConfigSource config,
6362
client.createTableIfNotExist(task.getTable(), task.getDataset());
6463

6564
switch (task.getMode()){
65+
case "merge":
6666
case "append":
6767
case "replace":
6868
case "delete_in_advance":
@@ -111,7 +111,9 @@ public ConfigDiff transaction(ConfigSource config,
111111
}
112112

113113
if (task.getTempTable().isPresent()) {
114-
if (task.getMode().equals("append")) {
114+
if (task.getMode().equals("merge")) {
115+
client.merge(task.getTempTable().get(), task.getTable(), task.getMergeKeys().orElse(Collections.emptyList()), task.getMergeRule().orElse(Collections.emptyList()));
116+
} else if (task.getMode().equals("append")) {
115117
client.copy(task.getTempTable().get(), task.getTable(), task.getDataset(), JobInfo.WriteDisposition.WRITE_APPEND);
116118
} else {
117119
client.copy(task.getTempTable().get(), task.getTable(), task.getDataset(), JobInfo.WriteDisposition.WRITE_TRUNCATE);
@@ -185,6 +187,10 @@ protected void autoCreate(PluginTask task, BigqueryClient client){
185187
client.createTableIfNotExist(task.getTempTable().get(), task.getDataset());
186188
// TODO: create table to support partition
187189
break;
190+
case "merge":
191+
client.createTableIfNotExist(task.getTempTable().get(), task.getDataset());
192+
client.createTableIfNotExist(task.getTable()); // needs for when task['table'] is a partition
193+
break;
188194
case "replace_backup":
189195
client.createTableIfNotExist(task.getTemplateTable().get());
190196
client.createTableIfNotExist(task.getTable());

src/main/java/org/embulk/output/bigquery_java/config/BigqueryConfigValidator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,17 @@ public static void validate(PluginTask task) {
1313

1414
public static void validateMode(PluginTask task) throws ConfigException {
1515
// TODO: append_direct delete_in_advance replace_backup
16-
String[] modes = {"replace", "append", "delete_in_advance", "append_direct"};
16+
String[] modes = {"replace", "append", "merge", "delete_in_advance", "append_direct"};
1717
if (!Arrays.asList(modes).contains(task.getMode().toLowerCase())) {
18-
throw new ConfigException("replace, append, delete_in_advance and append_direct are supported. Stay tuned!");
18+
throw new ConfigException("replace, append, merge, delete_in_advance and append_direct are supported. Stay tuned!");
1919
}
2020
}
2121

2222
public static void validateModeAndAutoCreteTable(PluginTask task) throws ConfigException {
2323
// TODO: modes are append replace delete_in_advance replace_backup and !task['auto_create_table']
24-
String[] modes = {"replace", "append", "delete_in_advance"};
24+
String[] modes = {"replace", "append", "merge", "delete_in_advance"};
2525
if (Arrays.asList(modes).contains(task.getMode().toLowerCase()) && !task.getAutoCreateTable()) {
26-
throw new ConfigException("replace, append and delete_in_advance are supported. Stay tuned!");
26+
throw new ConfigException("replace, append, merge and delete_in_advance are supported. Stay tuned!");
2727
}
2828
}
2929

0 commit comments

Comments
 (0)