From d0ca1c2468d23b30d36871606aa8b55a3c3138e4 Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Wed, 3 Feb 2021 18:04:46 +0100 Subject: [PATCH 1/2] wip --- wip.py | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 wip.py diff --git a/wip.py b/wip.py new file mode 100644 index 0000000..08437a2 --- /dev/null +++ b/wip.py @@ -0,0 +1,115 @@ +from __future__ import absolute_import + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.io.external.generate_sequence import GenerateSequence +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.io.external.snowflake import ReadFromSnowflake, WriteToSnowflake +import logging + +SERVER_NAME = 'polideapartner.europe-west4.gcp.snowflakecomputing.com' +USERNAME = 'PURBANOWICZ' +PASSWORD = '12QWASZX34erdfcv!' +SCHEMA = 'PUBLIC' +DATABASE = 'TEST_PAWEL' +STAGING_BUCKET_NAME = 'gcs://iot-beam-snowflake/' +STORAGE_INTEGRATION = 'iot_beam_snowflake_integration' +TABLE = 'WORDSxxx' +SCHEMA_STRING = """ +{"schema":[ + {"dataType":{"type":"text","length":null},"name":"text_column","nullable":true}, + {"dataType":{"type":"integer","precision":38,"scale":0},"name":"number_column","nullable":false}, + {"dataType":{"type":"boolean"},"name":"boolean_column","nullable":false} +]} +""" + + +class Row(object): + def __init__(self, text_column, number_column, boolean_column): + self.text_column = text_column + self.number_column = number_column + self.boolean_column = boolean_column + + def __eq__(self, other): + return self.text_column == other.text_column and \ + self.number_column == other.number_column and \ + self.boolean_column == other.boolean_column + + def __str__(self): + return self.text_column + " " + str(self.number_column) + " " + str(self.boolean_column) + + +def run_write(pipeline_options): + def user_data_mapper(test_row): + return [str(test_row.text_column).encode('utf-8'), + str(test_row.number_column).encode('utf-8'), + str(test_row.boolean_column).encode('utf-8') + ] + + p = beam.Pipeline(options=pipeline_options) + (p + | GenerateSequence(start=1, stop=3) + | beam.Map(lambda num: Row("test" + str(num), num, True)) + | "Writing into Snowflake" >> WriteToSnowflake( + server_name=SERVER_NAME, + username=USERNAME, + password=PASSWORD, + schema=SCHEMA, + database=DATABASE, + staging_bucket_name=STAGING_BUCKET_NAME, + storage_integration=STORAGE_INTEGRATION, + create_disposition="CREATE_IF_NEEDED", + write_disposition="TRUNCATE", + table_schema=SCHEMA_STRING, + user_data_mapper=user_data_mapper, + table=TABLE, + query=None) + ) + result = p.run() + result.wait_until_finish() + + +def run_read(pipeline_options): + def csv_mapper(strings_array): + return Row( + strings_array[0], + int(strings_array[1]), + bool(strings_array[2]) + ) + + def print_row(row): + logging.error("HELLO") + logging.error(row) + print(row) + + p = beam.Pipeline(options=pipeline_options) + (p + | "Reading from Snowflake" >> ReadFromSnowflake( + server_name=SERVER_NAME, + username=USERNAME, + password=PASSWORD, + schema=SCHEMA, + database=DATABASE, + staging_bucket_name=STAGING_BUCKET_NAME, + storage_integration_name=STORAGE_INTEGRATION, + csv_mapper=csv_mapper, + table=TABLE) + | "Print" >> beam.Map(print_row) + ) + result = p.run() + result.wait_until_finish() + + +def run(argv=None, save_main_session=True): + + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + run_write(pipeline_options) + run_read(pipeline_options) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() \ No newline at end of file From 50abdfd9de41bcd7471bbb09f1a91270cfc35622 Mon Sep 17 00:00:00 2001 From: "pawel.urbanowicz" Date: Thu, 4 Feb 2021 13:43:06 +0100 Subject: [PATCH 2/2] dataflow example --- README.md | 45 ++++++------------- wip.py | 115 ----------------------------------------------- xlang_example.py | 26 ++++++----- 3 files changed, 27 insertions(+), 159 deletions(-) delete mode 100644 wip.py diff --git a/README.md b/README.md index 6f70a47..44151ba 100644 --- a/README.md +++ b/README.md @@ -249,34 +249,13 @@ list for currently supported runtime options. ### Cross-language example An example is showing simple usage of [cross-language](https://beam.apache.org/roadmap/portability/) by writing objects into Snowflake and reading them from Snowflake. -Currently, cross-language is supporting only by [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners. -For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/) -and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles. +For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/). #### Extra setup: -Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup. The specific setup for current version of snowflake is following: -1. Setup a Flink cluster by following the Flink [Setup Quickstart](https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/tutorials/local_setup.html) -or [Setting up Apache Flink on Mac OS X](https://streambench.wordpress.com/2017/10/26/setting-up-apache-flink-on-mac-os-x/) -2. Download Job server image: +1. Follow steps form previous section `Setup required by all examples` +1. Download and install Apache Beam Python SDK ``` -docker pull gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake -``` -3. Download Apache Beam Java SDK image: -``` -docker pull gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev -``` -4. Change tag of downloaded Java SDK image to make the whole setup work: -``` -docker tag gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev apache/beam_java_sdk:2.20.0.dev -``` -5. Start Job server: -``` -docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake -``` -6. Download [Apache Beam Python SDK](https://storage.cloud.google.com/snowflake_artifacts/apachebeam_snowflake.whl?_ga=2.54472813.-471657054.1583857613). -7. Install python Apache Beam Python SDK using Python 2.7 -``` -python -m pip install apachebeam_snowflake.whl + pip install apache-beam ``` #### Executing: @@ -288,16 +267,18 @@ python -m pip install apachebeam_snowflake.whl SCHEMA = DATABASE = STAGING_BUCKET_NAME = - STORAGE_INTEGRATION = + STORAGE_INTEGRATION_NAME = TABLE = + + OPTIONS =[ + "--runner=DataflowRunner", + "--project=", + "--staging_location=gs:///tmp/", + "--region=", + "--temp_location=gs:///tmp/" + ] ``` 2. Run xlang_example.py: ``` python xlang_example.py ``` -2. [Go to Flink console](http://localhost:8081/) - ![Xlang Flink result](./images/xlang_flink_result.png) -3. Go to GCS bucket to check saved files: - ![Xlang GCS result](./images/xlang_gcs_result.png) -4. Check console - ![Xlang console result](./images/xlang_console_result.png) diff --git a/wip.py b/wip.py deleted file mode 100644 index 08437a2..0000000 --- a/wip.py +++ /dev/null @@ -1,115 +0,0 @@ -from __future__ import absolute_import - -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.io.external.generate_sequence import GenerateSequence -from apache_beam.options.pipeline_options import SetupOptions -from apache_beam.io.external.snowflake import ReadFromSnowflake, WriteToSnowflake -import logging - -SERVER_NAME = 'polideapartner.europe-west4.gcp.snowflakecomputing.com' -USERNAME = 'PURBANOWICZ' -PASSWORD = '12QWASZX34erdfcv!' -SCHEMA = 'PUBLIC' -DATABASE = 'TEST_PAWEL' -STAGING_BUCKET_NAME = 'gcs://iot-beam-snowflake/' -STORAGE_INTEGRATION = 'iot_beam_snowflake_integration' -TABLE = 'WORDSxxx' -SCHEMA_STRING = """ -{"schema":[ - {"dataType":{"type":"text","length":null},"name":"text_column","nullable":true}, - {"dataType":{"type":"integer","precision":38,"scale":0},"name":"number_column","nullable":false}, - {"dataType":{"type":"boolean"},"name":"boolean_column","nullable":false} -]} -""" - - -class Row(object): - def __init__(self, text_column, number_column, boolean_column): - self.text_column = text_column - self.number_column = number_column - self.boolean_column = boolean_column - - def __eq__(self, other): - return self.text_column == other.text_column and \ - self.number_column == other.number_column and \ - self.boolean_column == other.boolean_column - - def __str__(self): - return self.text_column + " " + str(self.number_column) + " " + str(self.boolean_column) - - -def run_write(pipeline_options): - def user_data_mapper(test_row): - return [str(test_row.text_column).encode('utf-8'), - str(test_row.number_column).encode('utf-8'), - str(test_row.boolean_column).encode('utf-8') - ] - - p = beam.Pipeline(options=pipeline_options) - (p - | GenerateSequence(start=1, stop=3) - | beam.Map(lambda num: Row("test" + str(num), num, True)) - | "Writing into Snowflake" >> WriteToSnowflake( - server_name=SERVER_NAME, - username=USERNAME, - password=PASSWORD, - schema=SCHEMA, - database=DATABASE, - staging_bucket_name=STAGING_BUCKET_NAME, - storage_integration=STORAGE_INTEGRATION, - create_disposition="CREATE_IF_NEEDED", - write_disposition="TRUNCATE", - table_schema=SCHEMA_STRING, - user_data_mapper=user_data_mapper, - table=TABLE, - query=None) - ) - result = p.run() - result.wait_until_finish() - - -def run_read(pipeline_options): - def csv_mapper(strings_array): - return Row( - strings_array[0], - int(strings_array[1]), - bool(strings_array[2]) - ) - - def print_row(row): - logging.error("HELLO") - logging.error(row) - print(row) - - p = beam.Pipeline(options=pipeline_options) - (p - | "Reading from Snowflake" >> ReadFromSnowflake( - server_name=SERVER_NAME, - username=USERNAME, - password=PASSWORD, - schema=SCHEMA, - database=DATABASE, - staging_bucket_name=STAGING_BUCKET_NAME, - storage_integration_name=STORAGE_INTEGRATION, - csv_mapper=csv_mapper, - table=TABLE) - | "Print" >> beam.Map(print_row) - ) - result = p.run() - result.wait_until_finish() - - -def run(argv=None, save_main_session=True): - - # We use the save_main_session option because one or more DoFn's in this - # workflow rely on global context (e.g., a module imported at module level). - pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = save_main_session - - run_write(pipeline_options) - run_read(pipeline_options) - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - run() \ No newline at end of file diff --git a/xlang_example.py b/xlang_example.py index 9563aec..a16e8f4 100644 --- a/xlang_example.py +++ b/xlang_example.py @@ -2,6 +2,7 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.external.generate_sequence import GenerateSequence from apache_beam.io.external.snowflake import ReadFromSnowflake, WriteToSnowflake +import logging SERVER_NAME = USERNAME = @@ -11,7 +12,6 @@ STAGING_BUCKET_NAME = STORAGE_INTEGRATION = TABLE = -EXPANSION_SERVICE = 'localhost:8097' SCHEMA_STRING = """ {"schema":[ {"dataType":{"type":"text","length":null},"name":"text_column","nullable":true}, @@ -21,10 +21,11 @@ """ OPTIONS =[ - "--runner=FlinkRunner", - "--flink_version=1.10", - "--flink_master=localhost:8081", - "--environment_type=LOOPBACK" + "--runner=DataflowRunner", + "--project=", + "--staging_location=gs:///tmp/", + "--region=", + "--temp_location=gs:///tmp/" ] class Row(object): @@ -50,7 +51,7 @@ def user_data_mapper(test_row): p = beam.Pipeline(options=PipelineOptions(OPTIONS)) (p - | GenerateSequence(start=1, stop=3, expansion_service=EXPANSION_SERVICE) + | GenerateSequence(start=1, stop=3) | beam.Map(lambda num: Row("test" + str(num), num, True)) | "Writing into Snowflake" >> WriteToSnowflake( server_name=SERVER_NAME, @@ -65,8 +66,7 @@ def user_data_mapper(test_row): table_schema=SCHEMA_STRING, user_data_mapper=user_data_mapper, table=TABLE, - query=None, - expansion_service=EXPANSION_SERVICE) + query=None) ) result = p.run() result.wait_until_finish() @@ -91,18 +91,20 @@ def print_row(row): schema=SCHEMA, database=DATABASE, staging_bucket_name=STAGING_BUCKET_NAME, - storage_integration=STORAGE_INTEGRATION, + storage_integration_name=STORAGE_INTEGRATION, csv_mapper=csv_mapper, - table=TABLE, - expansion_service=EXPANSION_SERVICE) + table=TABLE) | "Print" >> beam.Map(print_row) ) result = p.run() result.wait_until_finish() + def run(): - run_write() + # run_write() run_read() + if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) run()