@@ -364,16 +364,16 @@ def create_dataset(dataset = nil, reference: nil)
364364 end
365365 body = {
366366 dataset_reference : {
367- project_id : @project ,
367+ project_id : @destination_project ,
368368 dataset_id : dataset ,
369369 } ,
370370 } . merge ( hint )
371371 if @location
372372 body [ :location ] = @location
373373 end
374374 opts = { }
375- Embulk . logger . debug { "embulk-output-bigquery: insert_dataset(#{ @project } , #{ dataset } , #{ @location_for_log } , #{ body } , #{ opts } )" }
376- with_network_retry { client . insert_dataset ( @project , body , **opts ) }
375+ Embulk . logger . debug { "embulk-output-bigquery: insert_dataset(#{ @destination_project } , #{ dataset } , #{ @location_for_log } , #{ body } , #{ opts } )" }
376+ with_network_retry { client . insert_dataset ( @destination_project , body , **opts ) }
377377 rescue Google ::Apis ::ServerError , Google ::Apis ::ClientError , Google ::Apis ::AuthorizationError => e
378378 if e . status_code == 409 && /Already Exists:/ =~ e . message
379379 # ignore 'Already Exists' error
@@ -382,7 +382,7 @@ def create_dataset(dataset = nil, reference: nil)
382382
383383 response = { status_code : e . status_code , message : e . message , error_class : e . class }
384384 Embulk . logger . error {
385- "embulk-output-bigquery: insert_dataset(#{ @project } , #{ body } , #{ opts } ), response:#{ response } "
385+ "embulk-output-bigquery: insert_dataset(#{ @destination_project } , #{ body } , #{ opts } ), response:#{ response } "
386386 }
387387 raise Error , "failed to create dataset #{ @destination_project } :#{ dataset } in #{ @location_for_log } , response:#{ response } "
388388 end
@@ -554,15 +554,15 @@ def patch_description(fields, column_options)
554554 fields = patch_description ( table . schema . fields , @task [ 'column_options' ] )
555555 table . schema . update! ( fields : fields )
556556 table_id = Helper . chomp_partition_decorator ( @task [ 'table' ] )
557- with_network_retry { client . patch_table ( @project , @dataset , table_id , table ) }
557+ with_network_retry { client . patch_table ( @destination_project , @dataset , table_id , table ) }
558558 end
559559 end
560560
561561 def merge ( source_table , target_table , merge_keys , merge_rule )
562562 columns = @schema . map { |column | column [ :name ] }
563563 query = <<~EOD
564- MERGE `#{ @dataset } `.`#{ target_table } ` T
565- USING `#{ @dataset } `.`#{ source_table } ` S
564+ MERGE `#{ @destination_project } `.` #{ @ dataset} `.`#{ target_table } ` T
565+ USING `#{ @destination_project } `.` #{ @ dataset} `.`#{ source_table } ` S
566566 ON #{ join_merge_keys ( merge_keys . empty? ? merge_keys ( target_table ) : merge_keys ) }
567567 WHEN MATCHED THEN
568568 UPDATE SET #{ join_merge_rule_or_columns ( merge_rule , columns ) }
@@ -579,9 +579,9 @@ def merge_keys(table)
579579 SELECT
580580 KCU.COLUMN_NAME
581581 FROM
582- `#{ @dataset } `.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU
582+ `#{ @destination_project } `.` #{ @ dataset} `.INFORMATION_SCHEMA.KEY_COLUMN_USAGE KCU
583583 JOIN
584- `#{ @dataset } `.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC
584+ `#{ @destination_project } `.` #{ @ dataset} `.INFORMATION_SCHEMA.TABLE_CONSTRAINTS TC
585585 ON
586586 KCU.CONSTRAINT_CATALOG = TC.CONSTRAINT_CATALOG AND
587587 KCU.CONSTRAINT_SCHEMA = TC.CONSTRAINT_SCHEMA AND
0 commit comments