diff --git a/README.md b/README.md index 6f70a47..44151ba 100644 --- a/README.md +++ b/README.md @@ -249,34 +249,13 @@ list for currently supported runtime options. ### Cross-language example An example is showing simple usage of [cross-language](https://beam.apache.org/roadmap/portability/) by writing objects into Snowflake and reading them from Snowflake. -Currently, cross-language is supporting only by [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners. -For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/) -and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles. +For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/). #### Extra setup: -Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup. The specific setup for current version of snowflake is following: -1. Setup a Flink cluster by following the Flink [Setup Quickstart](https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/tutorials/local_setup.html) -or [Setting up Apache Flink on Mac OS X](https://streambench.wordpress.com/2017/10/26/setting-up-apache-flink-on-mac-os-x/) -2. Download Job server image: +1. Follow steps form previous section `Setup required by all examples` +1. Download and install Apache Beam Python SDK ``` -docker pull gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake -``` -3. Download Apache Beam Java SDK image: -``` -docker pull gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev -``` -4. Change tag of downloaded Java SDK image to make the whole setup work: -``` -docker tag gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev apache/beam_java_sdk:2.20.0.dev -``` -5. Start Job server: -``` -docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake -``` -6. Download [Apache Beam Python SDK](https://storage.cloud.google.com/snowflake_artifacts/apachebeam_snowflake.whl?_ga=2.54472813.-471657054.1583857613). -7. Install python Apache Beam Python SDK using Python 2.7 -``` -python -m pip install apachebeam_snowflake.whl + pip install apache-beam ``` #### Executing: @@ -288,16 +267,18 @@ python -m pip install apachebeam_snowflake.whl SCHEMA = DATABASE = STAGING_BUCKET_NAME = - STORAGE_INTEGRATION = + STORAGE_INTEGRATION_NAME = TABLE = + + OPTIONS =[ + "--runner=DataflowRunner", + "--project=", + "--staging_location=gs:///tmp/", + "--region=", + "--temp_location=gs:///tmp/" + ] ``` 2. Run xlang_example.py: ``` python xlang_example.py ``` -2. [Go to Flink console](http://localhost:8081/) - ![Xlang Flink result](./images/xlang_flink_result.png) -3. Go to GCS bucket to check saved files: - ![Xlang GCS result](./images/xlang_gcs_result.png) -4. Check console - ![Xlang console result](./images/xlang_console_result.png) diff --git a/xlang_example.py b/xlang_example.py index 9563aec..a16e8f4 100644 --- a/xlang_example.py +++ b/xlang_example.py @@ -2,6 +2,7 @@ from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.external.generate_sequence import GenerateSequence from apache_beam.io.external.snowflake import ReadFromSnowflake, WriteToSnowflake +import logging SERVER_NAME = USERNAME = @@ -11,7 +12,6 @@ STAGING_BUCKET_NAME = STORAGE_INTEGRATION = TABLE = -EXPANSION_SERVICE = 'localhost:8097' SCHEMA_STRING = """ {"schema":[ {"dataType":{"type":"text","length":null},"name":"text_column","nullable":true}, @@ -21,10 +21,11 @@ """ OPTIONS =[ - "--runner=FlinkRunner", - "--flink_version=1.10", - "--flink_master=localhost:8081", - "--environment_type=LOOPBACK" + "--runner=DataflowRunner", + "--project=", + "--staging_location=gs:///tmp/", + "--region=", + "--temp_location=gs:///tmp/" ] class Row(object): @@ -50,7 +51,7 @@ def user_data_mapper(test_row): p = beam.Pipeline(options=PipelineOptions(OPTIONS)) (p - | GenerateSequence(start=1, stop=3, expansion_service=EXPANSION_SERVICE) + | GenerateSequence(start=1, stop=3) | beam.Map(lambda num: Row("test" + str(num), num, True)) | "Writing into Snowflake" >> WriteToSnowflake( server_name=SERVER_NAME, @@ -65,8 +66,7 @@ def user_data_mapper(test_row): table_schema=SCHEMA_STRING, user_data_mapper=user_data_mapper, table=TABLE, - query=None, - expansion_service=EXPANSION_SERVICE) + query=None) ) result = p.run() result.wait_until_finish() @@ -91,18 +91,20 @@ def print_row(row): schema=SCHEMA, database=DATABASE, staging_bucket_name=STAGING_BUCKET_NAME, - storage_integration=STORAGE_INTEGRATION, + storage_integration_name=STORAGE_INTEGRATION, csv_mapper=csv_mapper, - table=TABLE, - expansion_service=EXPANSION_SERVICE) + table=TABLE) | "Print" >> beam.Map(print_row) ) result = p.run() result.wait_until_finish() + def run(): - run_write() + # run_write() run_read() + if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) run()