Skip to content

Commit 1de662b

Browse files
committed
feat(aws): Supporting batching and streaming, doesn't support templates
1 parent 3e48a15 commit 1de662b

File tree

7 files changed

+96
-42
lines changed

7 files changed

+96
-42
lines changed

README.md

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,29 @@ with Google Cloud Platform as a cloud provider.
2828
```
2929
5. [Create Google Cloud Platform account](https://cloud.google.com/free).
3030
6. [Create a new GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects).
31-
7. [Create GCP bucket](https://cloud.google.com/storage/docs/creating-buckets)
32-
8. Create storage integration object in Snowflake using the following command:
31+
7. Depending on using GCS or S3 as file system execute one of the following commands to create storage integration object in Snowflake:
3332
```
3433
CREATE OR REPLACE STORAGE INTEGRATION <INTEGRATION NAME>
3534
TYPE = EXTERNAL_STAGE
3635
STORAGE_PROVIDER = GCS
3736
ENABLED = TRUE
3837
STORAGE_ALLOWED_LOCATIONS = ('gcs://<BUCKET NAME>/');
3938
```
39+
```
40+
CREATE STORAGE INTEGRATION aws_integration
41+
TYPE = EXTERNAL_STAGE
42+
STORAGE_PROVIDER = S3
43+
ENABLED = TRUE
44+
STORAGE_AWS_ROLE_ARN = '<ARN ROLE NAME>'
45+
STORAGE_ALLOWED_LOCATIONS = ('s3://<BUCKET NAME>/')
46+
```
4047
Please note that `gcs` prefix is used here, not `gs`.
41-
9. Authorize Snowflake to operate on your bucket by following [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects)
42-
10. Setup gcloud on your computer by following [Using the Google Cloud SDK installer](https://cloud.google.com/sdk/docs/downloads-interactive)
43-
11. [Install gradle](https://gradle.org/install/)
44-
12. Run following command to set gradle wrapper
48+
7. Authorize Snowflake to operate on your bucket
49+
1. For GCS follow [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects)
50+
1. For S3 follow [Configuring a Snowflake Storage Integration](https://docs.snowflake.com/en/user-guide/data-load-s3-config.html#option-1-configuring-a-snowflake-storage-integration)
51+
9. Setup gcloud on your computer by following [Using the Google Cloud SDK installer](https://cloud.google.com/sdk/docs/downloads-interactive)
52+
10. [Install gradle](https://gradle.org/install/)
53+
11. Run following command to set gradle wrapper
4554
```
4655
gradle wrapper
4756
```
@@ -64,27 +73,30 @@ An example consists of two pipelines:
6473
```
6574
./gradlew run -PmainClass=batching.WordCountExample --args=" \
6675
--inputFile=gs://apache-beam-samples/shakespeare/* \
67-
--output=gs://<GCS BUCKET NAME>/counts \
76+
--output=<gs or s3>://<GCS OR S3 BUCKET NAME>/counts \
6877
--serverName=<SNOWFLAKE SERVER NAME> \
6978
--username=<SNOWFLAKE USERNAME> \
7079
--password=<SNOWFLAKE PASSWORD> \
7180
--database=<SNOWFLAKE DATABASE> \
7281
--schema=<SNOWFLAKE SCHEMA> \
7382
--tableName=<SNOWFLAKE TABLE NAME> \
7483
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
75-
--stagingBucketName=<GCS BUCKET NAME> \
84+
--stagingBucketName=<GCS OR S3 BUCKET NAME> \
7685
--runner=<DirectRunner/DataflowRunner> \
7786
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \
7887
--gcpTempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING> \
7988
--region=<FOR DATAFLOW RUNNER: GCP REGION> \
89+
--awsRegion=<OPTIONAL: AWS REGION IN CASE OF USING S3> \
90+
--awsAccessKey=<OPTIONAL: AWS ACCESS KEY IN CASE OF USING S3>\
91+
--awsSecretKey=<OPTIONAL: AWS SECRET KEY IN CASE OF USING S3>\
8092
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
8193
```
8294
2. Go to Snowflake console to check saved counts:
8395
```
8496
select from <DATABASE NAME>.<SCHEMA NAME>.<TABLE NAME>;
8597
```
8698
![Batching snowflake result](./images/batching_snowflake_result.png)
87-
3. Go to GCS bucket to check saved files:
99+
3. Go to GCS or S3 bucket to check saved files:
88100
![Batching gcs result](./images/batching_gcs_result.png)
89101
4. Go to DataFlow to check submitted jobs:
90102
![Batching DataFlow result](./images/batching_dataflow_result.png)
@@ -102,12 +114,17 @@ An example is streaming taxi rides from PubSub into Snowflake.
102114
lat double
103115
);
104116
```
105-
2. [Create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html)
117+
2. Depending on using GCS or S3 execute one of the following commands to [create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html)
106118
```
107119
create or replace stage <STAGE NAME>
108120
url = 'gcs://<GCS BUCKET NAME>/data/'
109121
storage_integration = <INTEGRATION NAME>;
110122
```
123+
```
124+
create stage <STAGE NAME>
125+
url = 'S3://<S3 BUCKET NAME>/data/'
126+
storage_integration = <INTEGRATION NAME>;
127+
```
111128
note: SnowflakeIO requires that url must have /data/ as a sufix
112129
3. [Create Key/Pair](https://docs.snowflake.com/en/user-guide/snowsql-start.html#using-key-pair-authentication)
113130
for authentication process.
@@ -133,10 +150,13 @@ for authentication process.
133150
--schema=<SNOWFLAKE SCHEMA> \
134151
--snowPipe=<SNOWFLAKE SNOWPIPE NAME> \
135152
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
136-
--stagingBucketName=<GCS BUCKET NAME> \
153+
--stagingBucketName=<GCS OR S3 BUCKET NAME> \
137154
--runner=<DirectRunner/DataflowRunner> \
138155
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \
139156
--region=<FOR DATAFLOW RUNNER: GCP REGION> \
157+
--awsRegion=<OPTIONAL: AWS REGION IN CASE OF USING S3> \
158+
--awsAccessKey=<OPTIONAL: AWS ACCESS KEY IN CASE OF USING S3>\
159+
--awsSecretKey=<OPTIONAL: AWS SECRET KEY IN CASE OF USING S3>\
140160
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
141161
```
142162
2. Go to Snowflake console to check saved taxi rides:
@@ -166,7 +186,7 @@ list for currently supported runtime options.
166186
--templateLocation=gs://<GCS BUCKET NAME>/templates/<TEMPLATE NAME>\
167187
--region=<GCP REGION>\
168188
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
169-
--stagingBucketName=gs://<GCS BUCKET NAME>/ \
189+
--stagingBucketName=<gs or s3>://<GCS OR S3 BUCKET NAME>/ \
170190
--username=<SNOWFLAKE USERNAME>\
171191
--database=<SNOWFLAKE DATABASE> \
172192
--schema=<SNOWFLAKE SCHEMA> \
@@ -225,7 +245,7 @@ list for currently supported runtime options.
225245
* --serverName= full server name with account, zone and domain.
226246
* --username= required for username/password and Private Key authentication.
227247
* --password= required for username/password authentication only
228-
* --stagingBucketName= external bucket path ending with `/`. I.e. `gs://bucket/`. Sub-directories are allowed.
248+
* --stagingBucketName= external bucket path ending with `/`. I.e. `<gs or s3>://bucket/`. Sub-directories are allowed.
229249
* --rawPrivateKey= raw private key. Required for Private Key authentication only.
230250
* --privateKeyPassphrase= private Key's passphrase. Required for Private Key authentication only.
231251
* --storageIntegrationName= storage integration name
@@ -297,7 +317,7 @@ python -m pip install apachebeam_snowflake.whl
297317
```
298318
2. [Go to Flink console](http://localhost:8081/)
299319
![Xlang Flink result](./images/xlang_flink_result.png)
300-
3. Go to GCS bucket to check saved files:
320+
3. Go to GCS or S3 bucket to check saved files:
301321
![Xlang GCS result](./images/xlang_gcs_result.png)
302322
4. Check console
303323
![Xlang console result](./images/xlang_console_result.png)

src/main/java/batching/SnowflakeWordCountOptions.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
import org.apache.beam.sdk.options.Description;
66
import org.apache.beam.sdk.options.Validation.Required;
77
import org.apache.beam.sdk.io.aws.options.S3Options;
8+
import util.AwsOptions;
89

910
/**
1011
* Supported PipelineOptions used in provided examples.
1112
*/
12-
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, S3Options {
13+
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, AwsOptions, S3Options {
1314

1415
@Description("Path of the file to read from")
1516
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
@@ -22,16 +23,4 @@ public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, S3O
2223
String getOutput();
2324

2425
void setOutput(String value);
25-
26-
@Description("AWS Access Key")
27-
@Required
28-
String getAwsAccessKey();
29-
30-
void setAwsAccessKey(String awsAccessKey);
31-
32-
@Description("AWS secret key")
33-
@Required
34-
String getAwsSecretKey();
35-
36-
void setAwsSecretKey(String awsSecretKey);
3726
}

src/main/java/batching/WordCountExample.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import org.apache.beam.sdk.values.PBegin;
2424
import org.apache.beam.sdk.values.PCollection;
2525
import org.apache.beam.sdk.values.PDone;
26-
import com.amazonaws.auth.AWSCredentials;
27-
import com.amazonaws.auth.AWSStaticCredentialsProvider;
28-
import com.amazonaws.auth.BasicAWSCredentials;
26+
import util.AwsOptionsParser;
2927

3028
/**
3129
* An example that contains batch writing and reading from Snowflake. Inspired by Apache Beam/WordCount-example(https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java)
@@ -38,19 +36,12 @@ public static void main(String[] args) {
3836
SnowflakeWordCountOptions options =
3937
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakeWordCountOptions.class);
4038

41-
options = parseAwsOptions(options);
39+
AwsOptionsParser.format(options);
4240

4341
runWritingToSnowflake(options);
4442
runReadingFromSnowflake(options);
4543
}
4644

47-
private static SnowflakeWordCountOptions parseAwsOptions(SnowflakeWordCountOptions options) {
48-
AWSCredentials awsCredentials = new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey());
49-
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials));
50-
51-
return options;
52-
}
53-
5445
private static void runWritingToSnowflake(SnowflakeWordCountOptions options) {
5546
Pipeline p = Pipeline.create(options);
5647

src/main/java/streaming/TaxiRidesExample.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import org.apache.beam.sdk.Pipeline;
66
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
77
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
8-
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
98
import org.apache.beam.sdk.options.PipelineOptionsFactory;
109
import org.apache.beam.sdk.transforms.ToString;
1110
import org.joda.time.Duration;
11+
import util.AwsOptionsParser;
1212

1313
import java.util.UUID;
1414

@@ -23,8 +23,10 @@ public class TaxiRidesExample {
2323
private static final String PUBSUB_TAX_RIDES = "projects/pubsub-public-data/topics/taxirides-realtime";
2424

2525
public static void main(String[] args) {
26-
SnowflakePipelineOptions options =
27-
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakePipelineOptions.class);
26+
TaxiRidesOptions options =
27+
PipelineOptionsFactory.fromArgs(args).withValidation().as(TaxiRidesOptions.class);
28+
29+
AwsOptionsParser.format(options);
2830

2931
Pipeline p = Pipeline.create(options);
3032

@@ -51,7 +53,7 @@ public static void main(String[] args) {
5153
p.run();
5254
}
5355

54-
public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(SnowflakePipelineOptions options) {
56+
public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(TaxiRidesOptions options) {
5557
return SnowflakeIO.DataSourceConfiguration.create()
5658
.withKeyPairRawAuth(options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase())
5759
.withKeyPairPathAuth(options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase())
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package streaming;
2+
3+
import org.apache.beam.sdk.io.aws.options.S3Options;
4+
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
5+
import org.apache.beam.sdk.options.Default;
6+
import org.apache.beam.sdk.options.Description;
7+
import org.apache.beam.sdk.options.Validation.Required;
8+
import util.AwsOptions;
9+
10+
public interface TaxiRidesOptions extends SnowflakePipelineOptions, AwsOptions, S3Options {
11+
}

src/main/java/util/AwsOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package util;
2+
3+
import org.apache.beam.sdk.io.aws.options.S3Options;
4+
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
5+
import org.apache.beam.sdk.options.Default;
6+
import org.apache.beam.sdk.options.Description;
7+
import org.apache.beam.sdk.options.Validation;
8+
import org.apache.beam.sdk.options.ValueProvider;
9+
10+
public interface AwsOptions extends SnowflakePipelineOptions, S3Options {
11+
12+
@Description("AWS Access Key")
13+
@Default.String("access_key")
14+
String getAwsAccessKey();
15+
16+
void setAwsAccessKey(String awsAccessKey);
17+
18+
@Description("AWS secret key")
19+
@Default.String("secret_key")
20+
String getAwsSecretKey();
21+
22+
void setAwsSecretKey(String awsSecretKey);
23+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package util;
2+
3+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
4+
import com.amazonaws.auth.BasicAWSCredentials;
5+
6+
public class AwsOptionsParser {
7+
8+
private static final String AWS_S3_PREFIX = "s3";
9+
10+
public static void format(AwsOptions options) {
11+
if (options.getStagingBucketName().get().toLowerCase().startsWith(AWS_S3_PREFIX)) {
12+
options.setAwsCredentialsProvider(
13+
new AWSStaticCredentialsProvider(
14+
new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())));
15+
}
16+
}
17+
18+
}

0 commit comments

Comments
 (0)