Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 13 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -249,34 +249,13 @@ list for currently supported runtime options.
### Cross-language example
An example is showing simple usage of [cross-language](https://beam.apache.org/roadmap/portability/) by writing objects into Snowflake and reading them from Snowflake.

Currently, cross-language is supporting only by [Apache Flink](https://flink.apache.org/) as a runner in a stable manner but plans are to support all runners.
For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/)
and [Beam on top of Flink](https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html) articles.
For more information about cross-language please see [multi sdk efforts](https://beam.apache.org/roadmap/connectors-multi-sdk/).

#### Extra setup:
Please see [Apache Beam with Flink runner](https://beam.apache.org/documentation/runners/flink/) for a setup. The specific setup for current version of snowflake is following:
1. Setup a Flink cluster by following the Flink [Setup Quickstart](https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/tutorials/local_setup.html)
or [Setting up Apache Flink on Mac OS X](https://streambench.wordpress.com/2017/10/26/setting-up-apache-flink-on-mac-os-x/)
2. Download Job server image:
1. Follow steps form previous section `Setup required by all examples`
1. Download and install Apache Beam Python SDK
```
docker pull gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
```
3. Download Apache Beam Java SDK image:
```
docker pull gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev
```
4. Change tag of downloaded Java SDK image to make the whole setup work:
```
docker tag gcr.io/snowflake-poli/apachebeam_java_sdk:2.20.0.dev apache/beam_java_sdk:2.20.0.dev
```
5. Start Job server:
```
docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 gcr.io/snowflake-poli/apachebeam_flink1.10_job_server:snowflake
```
6. Download [Apache Beam Python SDK](https://storage.cloud.google.com/snowflake_artifacts/apachebeam_snowflake.whl?_ga=2.54472813.-471657054.1583857613).
7. Install python Apache Beam Python SDK using Python 2.7
```
python -m pip install apachebeam_snowflake.whl
pip install apache-beam
```

#### Executing:
Expand All @@ -288,16 +267,18 @@ python -m pip install apachebeam_snowflake.whl
SCHEMA = <SNOWFLAKE SCHEMA>
DATABASE = <SNOWFLAKE DATABASE>
STAGING_BUCKET_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME>
STORAGE_INTEGRATION = <SNOWFLAKE STORAGE INTEGRATION NAME>
STORAGE_INTEGRATION_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME>
TABLE = <SNOWFLAKE TABLE NAME>

OPTIONS =[
"--runner=DataflowRunner",
"--project=<GCP PROJECT ID>",
"--staging_location=gs://<BUCKET NAME>/tmp/",
"--region=<REGION>",
"--temp_location=gs://<BUCKET NAME>/tmp/"
]
```
2. Run xlang_example.py:
```
python xlang_example.py
```
2. [Go to Flink console](http://localhost:8081/)
![Xlang Flink result](./images/xlang_flink_result.png)
3. Go to GCS bucket to check saved files:
![Xlang GCS result](./images/xlang_gcs_result.png)
4. Check console
![Xlang console result](./images/xlang_console_result.png)
26 changes: 14 additions & 12 deletions xlang_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.generate_sequence import GenerateSequence
from apache_beam.io.external.snowflake import ReadFromSnowflake, WriteToSnowflake
import logging

SERVER_NAME = <SNOWFLAKE SERVER NAME>
USERNAME = <SNOWFLAKE USERNAME>
Expand All @@ -11,7 +12,6 @@
STAGING_BUCKET_NAME = <SNOWFLAKE STORAGE INTEGRATION NAME>
STORAGE_INTEGRATION = <SNOWFLAKE STORAGE INTEGRATION NAME>
TABLE = <SNOWFLAKE TABLE NAME>
EXPANSION_SERVICE = 'localhost:8097'
SCHEMA_STRING = """
{"schema":[
{"dataType":{"type":"text","length":null},"name":"text_column","nullable":true},
Expand All @@ -21,10 +21,11 @@
"""

OPTIONS =[
"--runner=FlinkRunner",
"--flink_version=1.10",
"--flink_master=localhost:8081",
"--environment_type=LOOPBACK"
"--runner=DataflowRunner",
"--project=<GCP PROJECT ID>",
"--staging_location=gs://<BUCKET NAME>/tmp/",
"--region=<REGION>",
"--temp_location=gs://<BUCKET NAME>/tmp/"
]

class Row(object):
Expand All @@ -50,7 +51,7 @@ def user_data_mapper(test_row):

p = beam.Pipeline(options=PipelineOptions(OPTIONS))
(p
| GenerateSequence(start=1, stop=3, expansion_service=EXPANSION_SERVICE)
| GenerateSequence(start=1, stop=3)
| beam.Map(lambda num: Row("test" + str(num), num, True))
| "Writing into Snowflake" >> WriteToSnowflake(
server_name=SERVER_NAME,
Expand All @@ -65,8 +66,7 @@ def user_data_mapper(test_row):
table_schema=SCHEMA_STRING,
user_data_mapper=user_data_mapper,
table=TABLE,
query=None,
expansion_service=EXPANSION_SERVICE)
query=None)
)
result = p.run()
result.wait_until_finish()
Expand All @@ -91,18 +91,20 @@ def print_row(row):
schema=SCHEMA,
database=DATABASE,
staging_bucket_name=STAGING_BUCKET_NAME,
storage_integration=STORAGE_INTEGRATION,
storage_integration_name=STORAGE_INTEGRATION,
csv_mapper=csv_mapper,
table=TABLE,
expansion_service=EXPANSION_SERVICE)
table=TABLE)
| "Print" >> beam.Map(print_row)
)
result = p.run()
result.wait_until_finish()


def run():
run_write()
# run_write()
run_read()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()