-
Notifications
You must be signed in to change notification settings - Fork 828
/
Copy pathglue_job_validation_update.py
169 lines (135 loc) · 6.03 KB
/
glue_job_validation_update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# Copyright 2016-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import sys
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark import SparkConf
from awsglue.dynamicframe import DynamicFrame
from awsglue.gluetypes import Field, IntegerType, TimestampType, StructType
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
######################################## test connection options ########################################
## please pick up and customize the right connection options for testing
## If you are using a large testing data set, please consider using column partitioning to parallel the data reading for performance purpose.
# DataSourceTest - - please configure according to your connector type and options
options_dataSourceTest_jdbc = {
"query": "select NumberOfEmployees, CreatedDate from Account",
"className" : "partner.jdbc.some.Driver",
# test parameters
"url": "jdbc:some:url:SecurityToken=abc;",
"user": "user",
"password": "password",
}
# ColumnPartitioningTest
# for JDBC connector only
options_columnPartitioningTest = {
"query": "select NumberOfEmployees, CreatedDate from Account where ",
"url": "jdbc:some:url:user=${user};Password=${Password};SecurityToken=${SecurityToken};",
"secretId": "test-partner-driver",
"className" : "partner.jdbc.some.Driver",
# test parameters
"partitionColumn" : "RecordId__c",
"lowerBound" : "0",
"upperBound" : "13",
"numPartitions" : "2",
}
# DataTypeMappingTest
# for JDBC connector only
options_dataTypeMappingTest = {
"query" : "select NumberOfEmployees, CreatedDate from Account where ",
"url" : "jdbc:some:url:user=${user};Password=${Password};SecurityToken=${SecurityToken};",
"secretId" : "test-partner-driver",
"className" : "partner.jdbc.some.Driver",
# test parameter
"dataTypeMapping": {"INTEGER" : "STRING"}
}
# DbtableQueryTest
# for JDBC connector only
options_dbtableQueryTest = {
"url" : "jdbc:some:url:user=${user};Password=${Password};SecurityToken=${SecurityToken};",
"secretId" : "test-partner-driver",
"className" : "partner.jdbc.some.Driver",
# test parameter
"query": "select NumberOfEmployees, CreatedDate from Account"
# "dbTable" : "Account"
}
# JDBCUrlTest - extra jdbc connections UseBulkAPI appended
# for JDBC connector only
options_JDBCUrlTest = {
"query": "select NumberOfEmployees, CreatedDate from Account",
"secretId": "test-partner-driver",
"className" : "partner.jdbc.some.Driver",
# test parameter
"url": "jdbc:some:url:user=${user};Password=${Password};SecurityToken=${SecurityToken};UseBulkAPI=true",
}
# SecretsManagerTest - - please configure according to your connector type and options
options_secretsManagerTest = {
"query": "select NumberOfEmployees, CreatedDate from Account",
"url": "jdbc:some:url:user=${user};Password=${Password};SecurityToken=${SecurityToken};",
"className" : "partner.jdbc.some.Driver",
# test parameter
"secretId": "test-partner-driver"
}
# FilterPredicateTest
# for JDBC connector only
options_filterPredicateTest = {
"query": "select NumberOfEmployees, CreatedDate from Account where",
"url": "jdbc:some:url:user=${user};Password=${Password};SecurityToken=${SecurityToken};",
"secretId": "test-partner-driver",
"className" : "partner.jdbc.some.Driver",
# test parameter
"filterPredicate": "BillingState='CA'"
}
##################################### read data from data source ######################################
datasource0 = glueContext.create_dynamic_frame_from_options(
connection_type = "marketplace.jdbc",
connection_options = options_secretsManagerTest) # pick up the right test conection options
######################################## validate data reading ########################################
## validate data schema and count
# more data type: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-types.html
expected_schema = StructType([Field("NumberOfEmployees", IntegerType()), Field("CreatedDate", TimestampType())])
expected_count = 2
assert datasource0.schema() == expected_schema
print("expected schema: " + str(expected_schema.jsonValue()))
print("result schema: " + str(datasource0.schema().jsonValue()))
print("result schema in tree structure: ")
datasource0.printSchema()
## validate data count is euqal to expected count
assert datasource0.count() == expected_count
print("expected record count: " + str(expected_count))
print("result record count: " + str(datasource0.count()))
######################################## write data to s3 ########################################
datasource0.write(
connection_type="s3",
connection_options = {"path": "s3://your/output/path/"},
format="json"
)
######################################## DataSinkTest ########################################
## Create a DynamicFrame on the fly
jsonStrings = ['{"Name":"Andrew"}']
rdd = sc.parallelize(jsonStrings)
sql_df = spark.read.json(rdd)
df = DynamicFrame.fromDF(sql_df, glueContext, "new_dynamic_frame")
## DataSinkTest options
options_dataSinkTest = {
"secretId": "test-partner-driver",
"dbtable" : "Account",
"className" : "partner.jdbc.some.Driver",
"url": "jdbc:some:url:user=${user};Password=${Password};SecurityToken=${SecurityToken};"
}
## Write to data target
glueContext.write_dynamic_frame.from_options(frame = df,
connection_type = "marketplace.jdbc",
connection_options = options_dataSinkTest)
## write validation
# You may check data in the database side.
# You may also refer to 'read data from data source' and 'validate data reading' part to compose your own validation logics.
job.commit()