77import com .google .cloud .bigquery .Clustering ;
88import com .google .cloud .bigquery .CopyJobConfiguration ;
99import com .google .cloud .bigquery .Dataset ;
10+ import com .google .cloud .bigquery .DatasetId ;
1011import com .google .cloud .bigquery .DatasetInfo ;
1112import com .google .cloud .bigquery .Field ;
1213import com .google .cloud .bigquery .FieldValue ;
2728import com .google .cloud .bigquery .TableResult ;
2829import com .google .cloud .bigquery .TimePartitioning ;
2930import com .google .cloud .bigquery .WriteChannelConfiguration ;
31+ import java .io .ByteArrayInputStream ;
3032import java .io .IOException ;
3133import java .io .OutputStream ;
3234import java .nio .channels .Channels ;
6163import org .embulk .util .retryhelper .RetryExecutor ;
6264import org .embulk .util .retryhelper .RetryGiveupException ;
6365import org .embulk .util .retryhelper .Retryable ;
66+ import org .json .JSONObject ;
67+ import org .json .JSONTokener ;
6468import org .slf4j .Logger ;
6569import org .slf4j .LoggerFactory ;
6670
6771public class BigqueryClient {
6872 private final Logger logger = LoggerFactory .getLogger (BigqueryClient .class );
6973 private final BigQuery bigquery ;
70- private final String dataset ;
74+ private final String project ;
75+ public final String destinationProject ; // FIXME: should be private
76+ public final String destinationDataset ; // FIXME: should be private
7177 private final String location ;
7278 private final String locationForLog ;
7379 private final PluginTask task ;
@@ -77,7 +83,9 @@ public class BigqueryClient {
7783 public BigqueryClient (PluginTask task , Schema schema ) {
7884 this .task = task ;
7985 this .schema = schema ;
80- dataset = task .getDataset ();
86+ project = task .getProject ().orElse (getProjectIdFromJsonKeyfile ());
87+ destinationProject = task .getDestinationProject ().orElse (project );
88+ destinationDataset = task .getDataset ();
8189 if (task .getLocation ().isPresent ()) {
8290 location = task .getLocation ().get ();
8391 locationForLog = task .getLocation ().get ();
@@ -93,42 +101,73 @@ public BigqueryClient(PluginTask task, Schema schema) {
93101 }
94102 }
95103
104+ private String getProjectIdFromJsonKeyfile () {
105+ return new JSONObject (
106+ new JSONTokener (new ByteArrayInputStream (task .getJsonKeyfile ().getContent ())))
107+ .getString ("project_id" );
108+ }
109+
96110 private BigQuery getBigQueryService () throws IOException {
97111 return BigQueryOptions .newBuilder ()
98112 .setCredentials (new Auth (task ).getCredentials (BigqueryScopes .BIGQUERY ))
113+ .setProjectId (project )
99114 .build ()
100115 .getService ();
101116 }
102117
103- public Dataset createDataset (String datasetId ) {
104- DatasetInfo .Builder builder = DatasetInfo .newBuilder (datasetId );
118+ public Dataset createDataset () {
119+ return createDataset (destinationDataset );
120+ }
121+
122+ public Dataset createDataset (String dataset ) {
123+ return createDataset (destinationProject , dataset );
124+ }
125+
126+ private Dataset createDataset (String project , String dataset ) {
127+ DatasetInfo .Builder builder = DatasetInfo .newBuilder (DatasetId .of (project , dataset ));
105128 if (location != null ) {
106129 builder .setLocation (location );
107130 }
108131 return bigquery .create (builder .build ());
109132 }
110133
111- public Dataset getDataset (String datasetId ) {
112- return bigquery .getDataset (datasetId );
134+ public Dataset getDataset () {
135+ return getDataset (destinationDataset );
136+ }
137+
138+ public Dataset getDataset (String dataset ) {
139+ return getDataset (destinationProject , dataset );
140+ }
141+
142+ private Dataset getDataset (String project , String dataset ) {
143+ return bigquery .getDataset (DatasetId .of (project , dataset ));
113144 }
114145
115146 public Job getJob (JobId jobId ) {
116147 return bigquery .getJob (jobId );
117148 }
118149
119- public Table getTable (String name ) {
120- return getTable (TableId .of (dataset , name ));
150+ public Table getTable (String table ) {
151+ return getTable (table , destinationDataset );
152+ }
153+
154+ private Table getTable (String table , String dataset ) {
155+ return getTable (table , dataset , destinationProject );
121156 }
122157
123- public Table getTable (TableId tableId ) {
124- return bigquery .getTable (tableId );
158+ private Table getTable (String table , String dataset , String project ) {
159+ return bigquery .getTable (TableId . of ( project , dataset , table ) );
125160 }
126161
127162 public void createTableIfNotExist (String table ) {
128- createTableIfNotExist (table , dataset );
163+ createTableIfNotExist (table , destinationDataset );
129164 }
130165
131166 public void createTableIfNotExist (String table , String dataset ) {
167+ createTableIfNotExist (table , dataset , destinationProject );
168+ }
169+
170+ private void createTableIfNotExist (String table , String dataset , String project ) {
132171 StandardTableDefinition .Builder tableDefinitionBuilder = StandardTableDefinition .newBuilder ();
133172 tableDefinitionBuilder .setSchema (buildSchema (schema , columnOptions ));
134173 if (task .getTimePartitioning ().isPresent ()) {
@@ -142,14 +181,17 @@ public void createTableIfNotExist(String table, String dataset) {
142181 TableDefinition tableDefinition = tableDefinitionBuilder .build ();
143182
144183 try {
145- bigquery .create (TableInfo .newBuilder (TableId .of (dataset , table ), tableDefinition ).build ());
184+ bigquery .create (
185+ TableInfo .newBuilder (TableId .of (project , dataset , table ), tableDefinition ).build ());
146186 } catch (BigQueryException e ) {
147187 if (e .getCode () == 409 && e .getMessage ().contains ("Already Exists:" )) {
148188 return ;
149189 }
150- logger .error (String .format ("embulk-out_bigquery: insert_table(%s, %s)" , dataset , table ));
190+ logger .error (
191+ String .format ("embulk-out_bigquery: insert_table(%s:%s.%s)" , project , dataset , table ));
151192 throw new BigqueryException (
152- String .format ("failed to create table %s.%s, response: %s" , dataset , table , e ));
193+ String .format (
194+ "failed to create table %s:%s.%s, response: %s" , project , dataset , table , e ));
153195 }
154196 }
155197
@@ -205,10 +247,11 @@ public JobStatistics.LoadStatistics call() {
205247
206248 if (Files .exists (loadFile )) {
207249 logger .info (
208- "embulk-output-bigquery: Load job starting... job_id:[{}] {} => {}.{} in {}" ,
250+ "embulk-output-bigquery: Load job starting... job_id:[{}] {} => {}:{} .{} in {}" ,
209251 jobId ,
210252 loadFile ,
211- dataset ,
253+ destinationProject ,
254+ destinationDataset ,
212255 table ,
213256 locationForLog );
214257 } else {
@@ -219,7 +262,7 @@ public JobStatistics.LoadStatistics call() {
219262 return null ;
220263 }
221264
222- TableId tableId = TableId .of (dataset , table );
265+ TableId tableId = TableId .of (destinationProject , destinationDataset , table );
223266 WriteChannelConfiguration writeChannelConfiguration =
224267 WriteChannelConfiguration .newBuilder (tableId )
225268 .setFormatOptions (FormatOptions .json ())
@@ -281,12 +324,27 @@ public void onGiveup(Exception firstException, Exception lastException)
281324 }
282325 }
283326
327+ public JobStatistics .CopyStatistics copy (
328+ String sourceTable , String destinationTable , JobInfo .WriteDisposition writeDisposition )
329+ throws BigqueryException {
330+ return copy (sourceTable , destinationTable , destinationDataset , writeDisposition );
331+ }
332+
284333 public JobStatistics .CopyStatistics copy (
285334 String sourceTable ,
286335 String destinationTable ,
287336 String destinationDataset ,
288337 JobInfo .WriteDisposition writeDisposition )
289338 throws BigqueryException {
339+ return copy (
340+ TableId .of (destinationProject , this .destinationDataset , sourceTable ),
341+ TableId .of (destinationProject , destinationDataset , destinationTable ),
342+ writeDisposition );
343+ }
344+
345+ private JobStatistics .CopyStatistics copy (
346+ TableId sourceTable , TableId destinationTable , JobInfo .WriteDisposition writeDisposition )
347+ throws BigqueryException {
290348 int retries = task .getRetries ();
291349
292350 try {
@@ -301,11 +359,9 @@ public JobStatistics.CopyStatistics copy(
301359 public JobStatistics .CopyStatistics call () {
302360 UUID uuid = UUID .randomUUID ();
303361 String jobId = String .format ("embulk_load_job_%s" , uuid );
304- TableId destTableId = TableId .of (destinationDataset , destinationTable );
305- TableId srcTableId = TableId .of (dataset , sourceTable );
306362
307363 CopyJobConfiguration copyJobConfiguration =
308- CopyJobConfiguration .newBuilder (destTableId , srcTableId )
364+ CopyJobConfiguration .newBuilder (destinationTable , sourceTable )
309365 .setWriteDisposition (writeDisposition )
310366 .build ();
311367
@@ -361,12 +417,16 @@ public JobStatistics.QueryStatistics merge(
361417 String sourceTable , String targetTable , List <String > mergeKeys , List <String > mergeRule ) {
362418 StringBuilder sb = new StringBuilder ();
363419 sb .append ("MERGE " );
364- sb .append (quoteIdentifier (dataset ));
420+ sb .append (quoteIdentifier (destinationProject ));
421+ sb .append ("." );
422+ sb .append (quoteIdentifier (destinationDataset ));
365423 sb .append ("." );
366424 sb .append (quoteIdentifier (targetTable ));
367425 sb .append (" T" );
368426 sb .append (" USING " );
369- sb .append (quoteIdentifier (dataset ));
427+ sb .append (quoteIdentifier (destinationProject ));
428+ sb .append ("." );
429+ sb .append (quoteIdentifier (destinationDataset ));
370430 sb .append ("." );
371431 sb .append (quoteIdentifier (sourceTable ));
372432 sb .append (" S" );
@@ -391,10 +451,14 @@ private List<String> getMergeKeys(String table) {
391451 "SELECT"
392452 + " KCU.COLUMN_NAME "
393453 + "FROM "
394- + quoteIdentifier (dataset )
454+ + quoteIdentifier (destinationProject )
455+ + "."
456+ + quoteIdentifier (destinationDataset )
395457 + ".INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU "
396458 + "JOIN "
397- + quoteIdentifier (dataset )
459+ + quoteIdentifier (destinationProject )
460+ + "."
461+ + quoteIdentifier (destinationDataset )
398462 + ".INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC "
399463 + "ON"
400464 + " KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND"
@@ -609,28 +673,31 @@ public void onGiveup(Exception firstException, Exception lastException)
609673 }
610674
611675 public boolean deleteTable (String table ) {
612- return deleteTable (table , null );
676+ return deleteTable (table , destinationDataset );
613677 }
614678
615- public boolean deleteTable (String table , String dataset ) {
616- if (dataset == null ) {
617- dataset = this .dataset ;
618- }
679+ private boolean deleteTable (String table , String dataset ) {
680+ return deleteTable (table , dataset , destinationProject );
681+ }
682+
683+ private boolean deleteTable (String table , String dataset , String project ) {
619684 String chompedTable = BigqueryUtil .chompPartitionDecorator (table );
620- return deleteTableOrPartition (chompedTable , dataset );
685+ return deleteTableOrPartition (chompedTable , dataset , project );
621686 }
622687
623688 public boolean deleteTableOrPartition (String table ) {
624- return deleteTableOrPartition (table , null );
689+ return deleteTableOrPartition (table , destinationDataset );
690+ }
691+
692+ private boolean deleteTableOrPartition (String table , String dataset ) {
693+ return deleteTableOrPartition (table , dataset , destinationProject );
625694 }
626695
627696 // if `table` with a partition decorator is given, a partition is deleted.
628- public boolean deleteTableOrPartition (String table , String dataset ) {
629- if (dataset == null ) {
630- dataset = this .dataset ;
631- }
632- logger .info (String .format ("embulk-output-bigquery: Delete table... %s.%s" , dataset , table ));
633- return bigquery .delete (TableId .of (dataset , table ));
697+ private boolean deleteTableOrPartition (String table , String dataset , String project ) {
698+ logger .info (
699+ String .format ("embulk-output-bigquery: Delete table... %s:%s.%s" , project , dataset , table ));
700+ return bigquery .delete (TableId .of (project , dataset , table ));
634701 }
635702
636703 private JobStatistics waitForLoad (Job job ) throws BigqueryException {
@@ -650,8 +717,7 @@ private com.google.cloud.bigquery.Schema buildSchema(
650717 // TODO: support schema file
651718
652719 if (task .getTemplateTable ().isPresent ()) {
653- TableId tableId = TableId .of (dataset , task .getTemplateTable ().get ());
654- Table table = bigquery .getTable (tableId );
720+ Table table = getTable (destinationProject , destinationDataset , task .getTemplateTable ().get ());
655721 return table .getDefinition ().getSchema ();
656722 }
657723
0 commit comments