Skip to content

Commit 0f4fde8

Browse files
committed
feat(aws): Supporting batching and streaming, doesn't support templates
1 parent 3e48a15 commit 0f4fde8

File tree

7 files changed

+92
-37
lines changed

7 files changed

+92
-37
lines changed

README.md

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,26 @@ with Google Cloud Platform as a cloud provider.
2929
5. [Create Google Cloud Platform account](https://cloud.google.com/free).
3030
6. [Create a new GCP project](https://cloud.google.com/resource-manager/docs/creating-managing-projects).
3131
7. [Create GCP bucket](https://cloud.google.com/storage/docs/creating-buckets)
32-
8. Create storage integration object in Snowflake using the following command:
32+
8. Depending on using GCS or S3 as file system execute one of the following commands to create storage integration object in Snowflake:
3333
```
3434
CREATE OR REPLACE STORAGE INTEGRATION <INTEGRATION NAME>
3535
TYPE = EXTERNAL_STAGE
3636
STORAGE_PROVIDER = GCS
3737
ENABLED = TRUE
3838
STORAGE_ALLOWED_LOCATIONS = ('gcs://<BUCKET NAME>/');
3939
```
40+
```
41+
CREATE STORAGE INTEGRATION aws_integration
42+
TYPE = EXTERNAL_STAGE
43+
STORAGE_PROVIDER = S3
44+
ENABLED = TRUE
45+
STORAGE_AWS_ROLE_ARN = '<ARN ROLE NAME>'
46+
STORAGE_ALLOWED_LOCATIONS = ('s3://<BUCKET NAME>/')
47+
```
4048
Please note that `gcs` prefix is used here, not `gs`.
41-
9. Authorize Snowflake to operate on your bucket by following [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects)
49+
9. Authorize Snowflake to operate on your bucket
50+
1. For GCS follow [Step 3. Grant the Service Account Permissions to Access Bucket Objects](https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#step-3-grant-the-service-account-permissions-to-access-bucket-objects)
51+
1. For S3 follow [Configuring a Snowflake Storage Integration](https://docs.snowflake.com/en/user-guide/data-load-s3-config.html#option-1-configuring-a-snowflake-storage-integration)
4252
10. Setup gcloud on your computer by following [Using the Google Cloud SDK installer](https://cloud.google.com/sdk/docs/downloads-interactive)
4353
11. [Install gradle](https://gradle.org/install/)
4454
12. Run following command to set gradle wrapper
@@ -64,27 +74,30 @@ An example consists of two pipelines:
6474
```
6575
./gradlew run -PmainClass=batching.WordCountExample --args=" \
6676
--inputFile=gs://apache-beam-samples/shakespeare/* \
67-
--output=gs://<GCS BUCKET NAME>/counts \
77+
--output=gs://<GCS OR S3 BUCKET NAME>/counts \
6878
--serverName=<SNOWFLAKE SERVER NAME> \
6979
--username=<SNOWFLAKE USERNAME> \
7080
--password=<SNOWFLAKE PASSWORD> \
7181
--database=<SNOWFLAKE DATABASE> \
7282
--schema=<SNOWFLAKE SCHEMA> \
7383
--tableName=<SNOWFLAKE TABLE NAME> \
7484
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
75-
--stagingBucketName=<GCS BUCKET NAME> \
85+
--stagingBucketName=<GCS OR S3 BUCKET NAME> \
7686
--runner=<DirectRunner/DataflowRunner> \
7787
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \
7888
--gcpTempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING> \
7989
--region=<FOR DATAFLOW RUNNER: GCP REGION> \
90+
--awsRegion=<OPTIONAL: AWS REGION IN CASE OF USING S3> \
91+
--awsAccessKey=<OPTIONAL: AWS ACCESS KEY IN CASE OF USING S3>\
92+
--awsSecretKey=<OPTIONAL: AWS SECRET KEY IN CASE OF USING S3>\
8093
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
8194
```
8295
2. Go to Snowflake console to check saved counts:
8396
```
8497
select from <DATABASE NAME>.<SCHEMA NAME>.<TABLE NAME>;
8598
```
8699
![Batching snowflake result](./images/batching_snowflake_result.png)
87-
3. Go to GCS bucket to check saved files:
100+
3. Go to GCS or S3 bucket to check saved files:
88101
![Batching gcs result](./images/batching_gcs_result.png)
89102
4. Go to DataFlow to check submitted jobs:
90103
![Batching DataFlow result](./images/batching_dataflow_result.png)
@@ -102,12 +115,17 @@ An example is streaming taxi rides from PubSub into Snowflake.
102115
lat double
103116
);
104117
```
105-
2. [Create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html)
118+
2. Depending on using GCS or S3 execute one of the following commands to [create Snowflake stage](https://docs.snowflake.com/en/sql-reference/sql/create-stage.html)
106119
```
107120
create or replace stage <STAGE NAME>
108121
url = 'gcs://<GCS BUCKET NAME>/data/'
109122
storage_integration = <INTEGRATION NAME>;
110123
```
124+
```
125+
create stage <STAGE NAME>
126+
url = 'S3://<S3 BUCKET NAME>/data/'
127+
storage_integration = <INTEGRATION NAME>;
128+
```
111129
note: SnowflakeIO requires that url must have /data/ as a sufix
112130
3. [Create Key/Pair](https://docs.snowflake.com/en/user-guide/snowsql-start.html#using-key-pair-authentication)
113131
for authentication process.
@@ -133,10 +151,13 @@ for authentication process.
133151
--schema=<SNOWFLAKE SCHEMA> \
134152
--snowPipe=<SNOWFLAKE SNOWPIPE NAME> \
135153
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
136-
--stagingBucketName=<GCS BUCKET NAME> \
154+
--stagingBucketName=<GCS OR S3 BUCKET NAME> \
137155
--runner=<DirectRunner/DataflowRunner> \
138156
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME> \
139157
--region=<FOR DATAFLOW RUNNER: GCP REGION> \
158+
--awsRegion=<OPTIONAL: AWS REGION IN CASE OF USING S3> \
159+
--awsAccessKey=<OPTIONAL: AWS ACCESS KEY IN CASE OF USING S3>\
160+
--awsSecretKey=<OPTIONAL: AWS SECRET KEY IN CASE OF USING S3>\
140161
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>"
141162
```
142163
2. Go to Snowflake console to check saved taxi rides:
@@ -166,7 +187,7 @@ list for currently supported runtime options.
166187
--templateLocation=gs://<GCS BUCKET NAME>/templates/<TEMPLATE NAME>\
167188
--region=<GCP REGION>\
168189
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME> \
169-
--stagingBucketName=gs://<GCS BUCKET NAME>/ \
190+
--stagingBucketName=gs://<GCS OR S3 BUCKET NAME>/ \
170191
--username=<SNOWFLAKE USERNAME>\
171192
--database=<SNOWFLAKE DATABASE> \
172193
--schema=<SNOWFLAKE SCHEMA> \
@@ -297,7 +318,7 @@ python -m pip install apachebeam_snowflake.whl
297318
```
298319
2. [Go to Flink console](http://localhost:8081/)
299320
![Xlang Flink result](./images/xlang_flink_result.png)
300-
3. Go to GCS bucket to check saved files:
321+
3. Go to GCS or S3 bucket to check saved files:
301322
![Xlang GCS result](./images/xlang_gcs_result.png)
302323
4. Check console
303324
![Xlang console result](./images/xlang_console_result.png)

src/main/java/batching/SnowflakeWordCountOptions.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
import org.apache.beam.sdk.options.Description;
66
import org.apache.beam.sdk.options.Validation.Required;
77
import org.apache.beam.sdk.io.aws.options.S3Options;
8+
import util.AwsOptions;
89

910
/**
1011
* Supported PipelineOptions used in provided examples.
1112
*/
12-
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, S3Options {
13+
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, AwsOptions, S3Options {
1314

1415
@Description("Path of the file to read from")
1516
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
@@ -22,16 +23,4 @@ public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, S3O
2223
String getOutput();
2324

2425
void setOutput(String value);
25-
26-
@Description("AWS Access Key")
27-
@Required
28-
String getAwsAccessKey();
29-
30-
void setAwsAccessKey(String awsAccessKey);
31-
32-
@Description("AWS secret key")
33-
@Required
34-
String getAwsSecretKey();
35-
36-
void setAwsSecretKey(String awsSecretKey);
3726
}

src/main/java/batching/WordCountExample.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import org.apache.beam.sdk.values.PBegin;
2424
import org.apache.beam.sdk.values.PCollection;
2525
import org.apache.beam.sdk.values.PDone;
26-
import com.amazonaws.auth.AWSCredentials;
27-
import com.amazonaws.auth.AWSStaticCredentialsProvider;
28-
import com.amazonaws.auth.BasicAWSCredentials;
26+
import util.AwsOptionsParser;
2927

3028
/**
3129
* An example that contains batch writing and reading from Snowflake. Inspired by Apache Beam/WordCount-example(https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java)
@@ -38,19 +36,12 @@ public static void main(String[] args) {
3836
SnowflakeWordCountOptions options =
3937
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakeWordCountOptions.class);
4038

41-
options = parseAwsOptions(options);
39+
AwsOptionsParser.format(options);
4240

4341
runWritingToSnowflake(options);
4442
runReadingFromSnowflake(options);
4543
}
4644

47-
private static SnowflakeWordCountOptions parseAwsOptions(SnowflakeWordCountOptions options) {
48-
AWSCredentials awsCredentials = new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey());
49-
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials));
50-
51-
return options;
52-
}
53-
5445
private static void runWritingToSnowflake(SnowflakeWordCountOptions options) {
5546
Pipeline p = Pipeline.create(options);
5647

src/main/java/streaming/TaxiRidesExample.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import org.apache.beam.sdk.Pipeline;
66
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
77
import org.apache.beam.sdk.io.snowflake.SnowflakeIO;
8-
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
98
import org.apache.beam.sdk.options.PipelineOptionsFactory;
109
import org.apache.beam.sdk.transforms.ToString;
1110
import org.joda.time.Duration;
11+
import util.AwsOptionsParser;
1212

1313
import java.util.UUID;
1414

@@ -23,8 +23,10 @@ public class TaxiRidesExample {
2323
private static final String PUBSUB_TAX_RIDES = "projects/pubsub-public-data/topics/taxirides-realtime";
2424

2525
public static void main(String[] args) {
26-
SnowflakePipelineOptions options =
27-
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakePipelineOptions.class);
26+
TaxiRidesOptions options =
27+
PipelineOptionsFactory.fromArgs(args).withValidation().as(TaxiRidesOptions.class);
28+
29+
AwsOptionsParser.format(options);
2830

2931
Pipeline p = Pipeline.create(options);
3032

@@ -51,7 +53,7 @@ public static void main(String[] args) {
5153
p.run();
5254
}
5355

54-
public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(SnowflakePipelineOptions options) {
56+
public static SnowflakeIO.DataSourceConfiguration createSnowflakeConfiguration(TaxiRidesOptions options) {
5557
return SnowflakeIO.DataSourceConfiguration.create()
5658
.withKeyPairRawAuth(options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase())
5759
.withKeyPairPathAuth(options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase())
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package streaming;
2+
3+
import org.apache.beam.sdk.io.aws.options.S3Options;
4+
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
5+
import org.apache.beam.sdk.options.Default;
6+
import org.apache.beam.sdk.options.Description;
7+
import org.apache.beam.sdk.options.Validation.Required;
8+
import util.AwsOptions;
9+
10+
public interface TaxiRidesOptions extends SnowflakePipelineOptions, AwsOptions, S3Options {
11+
}

src/main/java/util/AwsOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package util;
2+
3+
import org.apache.beam.sdk.io.aws.options.S3Options;
4+
import org.apache.beam.sdk.io.snowflake.SnowflakePipelineOptions;
5+
import org.apache.beam.sdk.options.Default;
6+
import org.apache.beam.sdk.options.Description;
7+
import org.apache.beam.sdk.options.Validation;
8+
import org.apache.beam.sdk.options.ValueProvider;
9+
10+
public interface AwsOptions extends SnowflakePipelineOptions, S3Options {
11+
12+
@Description("AWS Access Key")
13+
@Default.String("access_key")
14+
String getAwsAccessKey();
15+
16+
void setAwsAccessKey(String awsAccessKey);
17+
18+
@Description("AWS secret key")
19+
@Default.String("secret_key")
20+
String getAwsSecretKey();
21+
22+
void setAwsSecretKey(String awsSecretKey);
23+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package util;
2+
3+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
4+
import com.amazonaws.auth.BasicAWSCredentials;
5+
6+
public class AwsOptionsParser {
7+
8+
private static final String AWS_S3_PREFIX = "s3";
9+
10+
public static void format(AwsOptions options) {
11+
if (options.getStagingBucketName().get().toLowerCase().startsWith(AWS_S3_PREFIX)) {
12+
options.setAwsCredentialsProvider(
13+
new AWSStaticCredentialsProvider(
14+
new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())));
15+
}
16+
}
17+
18+
}

0 commit comments

Comments
 (0)