Skip to content

Commit 6e690df

Browse files
authored
Merge pull request #65 from trocco-io/fix-misc
fix misc
2 parents b1a8480 + 0fdd1e5 commit 6e690df

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

src/main/java/org/embulk/output/bigquery_java/BigqueryJavaOutputPlugin.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -153,26 +153,24 @@ public TransactionalPageOutput open(TaskSource taskSource, Schema schema, int ta
153153
}
154154

155155
protected void autoCreate(PluginTask task, BigqueryClient client){
156-
if (task.getAutoCreateDataset()){
157-
client.createDataset(task.getDataset());
158-
}else{
159-
Dataset dataset = client.getDataset(task.getDataset());
160-
if (dataset == null){
161-
throw new BigqueryException(String.format("dataset %s is not found", task.getDataset()));
162-
}
163-
}
164-
165-
if (task.getMode().equals("replace_backup") && !task.getOldDataset().get().equals(task.getDataset())){
156+
if (client.getDataset(task.getDataset()) == null){
166157
if (task.getAutoCreateDataset()){
167158
client.createDataset(task.getDataset());
168159
}else{
169-
Dataset dataset = client.getDataset(task.getOldDataset().get());
170-
if (dataset == null){
171-
throw new BigqueryException(String.format("dataset %s is not found", task.getDataset()));
172-
}
160+
throw new BigqueryException(String.format("dataset %s is not found", task.getDataset()));
173161
}
174162
}
175163

164+
if (task.getMode().equals("replace_backup") && task.getOldDataset().isPresent() && !task.getOldDataset().get().equals(task.getDataset())){
165+
if (client.getDataset(task.getOldDataset().get()) == null){
166+
if (task.getAutoCreateDataset()){
167+
client.createDataset(task.getDataset());
168+
}else{
169+
throw new BigqueryException(String.format("dataset %s is not found", task.getDataset()));
170+
}
171+
}
172+
}
173+
176174
switch (task.getMode()){
177175
case "delete_in_advance":
178176
client.deleteTableOrPartition(task.getTable());

src/main/java/org/embulk/output/bigquery_java/BigqueryJobRunner.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,16 @@ public BigqueryJobRunner(PluginTask task, Schema schema, Path path) {
2424
@Override
2525
public JobStatistics call() throws Exception {
2626
client = new BigqueryClient(this.task, this.schema);
27+
String tableName;
28+
// append_direct use table name
29+
if (task.getMode().equals("append_direct")){
30+
tableName = task.getTable();
31+
}else{
32+
tableName = task.getTempTable().get();
33+
}
2734

2835
return client.load(this.path,
29-
this.task.getTempTable().get(),
36+
tableName,
3037
JobInfo.WriteDisposition.WRITE_APPEND);
3138
}
3239
}

src/main/java/org/embulk/output/bigquery_java/config/BigqueryTaskBuilder.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,10 @@ protected static void setFileExt(PluginTask task) {
5050
@VisibleForTesting
5151
protected static void setTempTable(PluginTask task) {
5252
// TODO: support replace_backup
53-
String[] modeForTempTable = {"replace", "append"};
53+
String[] modeForTempTable = {"replace", "append", "delete_in_advance"};
5454
if (Arrays.asList(modeForTempTable).contains(task.getMode())) {
5555
String tempTable = task.getTempTable().orElse(String.format("LOAD_TEMP_%s_%s", uniqueName, task.getTable()));
5656
task.setTempTable(Optional.of(tempTable));
57-
} else {
58-
task.setTempTable(Optional.of(null));
5957
}
6058
}
6159

0 commit comments

Comments
 (0)