diff --git a/src/clj_headlights/bigquery_io.clj b/src/clj_headlights/bigquery_io.clj index 8687634..1e00c74 100644 --- a/src/clj_headlights/bigquery_io.clj +++ b/src/clj_headlights/bigquery_io.clj @@ -6,9 +6,10 @@ [clj-headlights.utils :as utils] [clojure.tools.logging :as log] [clj-headlights.pcollections :as pcollections]) - (:import (org.apache.beam.sdk.io.gcp.bigquery BigQueryIO$Write BigQueryIO$Write$CreateDisposition BigQueryIO$Write$WriteDisposition BigQueryIO$Read BigQueryIO) + (:import (org.apache.beam.sdk.io.gcp.bigquery BigQueryIO$Write BigQueryIO$Write$CreateDisposition BigQueryIO$Write$WriteDisposition BigQueryIO$Read BigQueryIO TableRowJsonCoder) (com.google.api.services.bigquery.model TableSchema TableFieldSchema TableRow) - (org.apache.beam.sdk.transforms SerializableFunction PTransform))) + (org.apache.beam.sdk.transforms SerializableFunction PTransform) + )) (def WriteDispostions {:write-append BigQueryIO$Write$WriteDisposition/WRITE_APPEND @@ -77,10 +78,14 @@ (let [bq-schema (-> (TableSchema.) (.setFields (mapv schema-field->bq-field schema))) {:keys [write-disposition create-disposition] :or {write-disposition :write-append create-disposition :create-if-needed}} write-options] - (-> pcoll - (df/df-map (str "make-table-row-" name) [#'table-row-maker schema]) - (.apply (str "write-to-bq-" name) (-> (BigQueryIO/writeTableRows) - (.to output) - (.withSchema bq-schema) - (.withWriteDisposition (write-disposition WriteDispostions)) - (.withCreateDisposition (create-disposition CreateDispositions))))))) + (df/composite + name + [pcoll] + (fn [pcoll] + (-> pcoll + (df/df-map-enc "make-table-row" (TableRowJsonCoder/of) [#'table-row-maker schema]) + (.apply "write-to-bquery" (-> (BigQueryIO/writeTableRows) + (.to output) + (.withSchema bq-schema) + (.withWriteDisposition (write-disposition WriteDispostions)) + (.withCreateDisposition (create-disposition CreateDispositions))))))))) diff --git a/src/clj_headlights/pipeline.clj b/src/clj_headlights/pipeline.clj index e35b24a..88b32de 100644 --- a/src/clj_headlights/pipeline.clj +++ b/src/clj_headlights/pipeline.clj @@ -64,6 +64,13 @@ (let [result (pardo/invoke-with-optional-state clj-call value state)] (.output context result))) +(defn df-map-enc [pcoll name encoder clj-call] + (pardo/create-and-apply + pcoll + name + (clj-fn-call/append-argument-to-clj-call #'apply-to-value-and-output (clj-fn-call/to-serializable-clj-call clj-call)) + {:outputs {:main encoder}})) + (s/defn df-map :- PCollection "Returns a `PCollection` of the return values of function `clj-call` being applied to the input `pcoll` - used for strictly 1-to-1 transformations"