-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathone-table.snowflake.sql
354 lines (313 loc) · 11.4 KB
/
one-table.snowflake.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
/*
SQL Experiments for Typing and Normalizing AirbyteRecords in 1 table
Run me on Snowflake
Schema:
{
"id": "number",
"first_name": ["string", null],
"age": ["number", null],
"address": [null, {
"street": "string",
"zip": "string"
}],
"updated_at": timestamp
}
KNOWN LIMITATIONS
* Only one error type shown per row, the first one sequentially
* There's a full table scan used for de-duplication. This can be made more efficient...
* It would be better to show the actual error message from the DB, not custom "this column is bad" strings
*/
-- Set up the Experiment
-- Assumption: We can build the table at the start of the sync based only on the schema we get from the source/configured catalog
DROP TABLE IF EXISTS PUBLIC.USERS;
DROP TABLE IF EXISTS Z_AIRBYTE.USERS_RAW;
CREATE TABLE PUBLIC.USERS (
"id" int PRIMARY KEY -- PK cannot be null, but after raw insert and before typing, row will be null
,"first_name" text
,"age" int
,"address" variant
,"updated_at" timestamp
,"_airbyte_meta" variant NOT NULL -- Airbyte column, cannot be null
,"_airbyte_raw_id" VARCHAR(36) NOT NULL -- Airbyte column, cannot be null
,"_airbyte_extracted_at" timestamp NOT NULL -- Airbyte column, cannot be null
);
-------------------------------------
--------- TYPE AND DEDUPE -----------
-------------------------------------
CREATE OR REPLACE PROCEDURE PUBLIC._AIRBYTE_PREPARE_RAW_TABLE()
RETURNS TEXT LANGUAGE SQL AS $$
BEGIN
CREATE SCHEMA IF NOT EXISTS Z_AIRBYTE;
CREATE TABLE IF NOT EXISTS Z_AIRBYTE.USERS_RAW (
"_airbyte_raw_id" VARCHAR(36) NOT NULL PRIMARY KEY, -- Airbyte column, cannot be null
"_airbyte_data" variant NOT NULL, -- Airbyte column, cannot be null
"_airbyte_extracted_at" timestamp NOT NULL, -- Airbyte column, cannot be null
"_airbyte_loaded_at" timestamp -- Airbyte column
);
RETURN 'SUCCESS';
END
$$;
CREATE OR REPLACE PROCEDURE PUBLIC._AIRBYTE_TYPE_DEDUPE()
RETURNS TEXT LANGUAGE SQL AS $$
BEGIN
-- Step 1: Validate the incoming data
-- We can't really do this properly in the pure-SQL example here, but we should throw if any row doesn't have a PK
let missing_pk_count := 0;
missing_pk_count := (
SELECT COUNT(1)
FROM Z_AIRBYTE.USERS_RAW
WHERE
"_airbyte_loaded_at" IS NULL
AND TRY_CAST("_airbyte_data":"id"::text AS INT) IS NULL
);
IF (missing_pk_count > 0) THEN
RAISE STATEMENT_ERROR; -- TODO: make a custom exception
END IF;
-- Moving the data and deduping happens in a transaction to prevent duplicates from appearing
-- BEGIN
-- Step 2: Move the new data to the typed table
INSERT INTO PUBLIC.USERS
(
"id",
"first_name",
"age",
"updated_at",
"address",
"_airbyte_meta",
"_airbyte_raw_id",
"_airbyte_extracted_at"
)
WITH intermediate_data AS (
SELECT
TRY_CAST("_airbyte_data":"id"::text AS INT) as id,
TRY_CAST("_airbyte_data":"first_name"::text AS TEXT) as first_name,
TRY_CAST("_airbyte_data":"age"::text AS INT) as age,
TRY_CAST("_airbyte_data":"updated_at"::text AS TIMESTAMP) as updated_at,
"_airbyte_data":"address" as address, -- TRY_CAST does not work with JSON/VARIANT
(
-- this is annoying. ARRAY_CAT only concatenates 2 arrays for snowflake
ARRAY_CAT(
ARRAY_CAT(
ARRAY_CAT(
CASE WHEN "_airbyte_data":"id" IS NOT NULL AND TRY_CAST("_airbyte_data":"id"::text AS INT) IS NULL THEN ['Problem with `id`'] ELSE [] END
, CASE WHEN "_airbyte_data":"first_name" IS NOT NULL AND TRY_CAST("_airbyte_data":"first_name"::text AS TEXT) IS NULL THEN ['Problem with `first_name`'] ELSE [] END
),
CASE WHEN "_airbyte_data":"age" IS NOT NULL AND TRY_CAST("_airbyte_data":"age"::text AS INT) IS NULL THEN ['Problem with `age`'] ELSE [] END
)
-- no TRY_CAST for JSON
, CASE WHEN "_airbyte_data":"updated_at" IS NOT NULL AND TRY_CAST("_airbyte_data":"updated_at"::text AS TIMESTAMP) IS NULL THEN ['Problem with `updated_at`'] ELSE [] END
)
) as "_airbyte_cast_errors",
"_airbyte_raw_id",
"_airbyte_extracted_at"
FROM
Z_AIRBYTE.USERS_RAW
WHERE
"_airbyte_loaded_at" IS NULL -- inserting only new/null values, we can recover from failed previous checkpoints
OR (
-- Temporarily place back an entry for any CDC-deleted record so we can order them properly by cursor. We only need the PK and cursor value
"_airbyte_loaded_at" IS NOT NULL
AND "_airbyte_data":"_ab_cdc_deleted_at" IS NOT NULL
)
)
SELECT
id,
first_name,
age,
updated_at,
address,
CASE
WHEN ARRAY_SIZE("_airbyte_cast_errors") = 0 THEN OBJECT_CONSTRUCT('errors', [])
ELSE OBJECT_CONSTRUCT('errors', "_airbyte_cast_errors")
END AS "_airbyte_meta",
"_airbyte_raw_id",
"_airbyte_extracted_at"
FROM intermediate_data
;
-- Step 3: Dedupe and clean the typed table
-- This is a full table scan, but we need to do it this way to merge the new rows with the old to:
-- * Consider the case in which there are multiple entries for the same PK in the new insert batch
-- * Consider the case in which the data in the new batch is older than the data in the typed table, and we only want to keep the newer (pre-existing) data
DELETE FROM PUBLIC.USERS
WHERE
-- Delete any rows which are not the most recent for a given PK
"_airbyte_raw_id" IN (
SELECT "_airbyte_raw_id" FROM (
SELECT "_airbyte_raw_id", row_number() OVER (
PARTITION BY "id" ORDER BY "updated_at" DESC, "_airbyte_extracted_at" DESC
) as row_number FROM PUBLIC.USERS
)
WHERE row_number != 1
)
;
-- Step 4: Remove old entries from Raw table
DELETE FROM Z_AIRBYTE.USERS_RAW
WHERE
"_airbyte_raw_id" NOT IN (
SELECT "_airbyte_raw_id" FROM PUBLIC.USERS
)
;
-- Step 5: Clean out CDC deletes from final table
-- Only run this step if _ab_cdc_deleted_at is a property of the stream
/*
DELETE FROM testing_evan_2052.users
WHERE _ab_cdc_deleted_at IS NOT NULL
*/
-- the following will always work, even if there is no _ab_cdc_deleted_at column, but it is slower
DELETE FROM PUBLIC.USERS
WHERE
-- Delete rows that have been CDC deleted
"id" IN (
SELECT
TRY_CAST("_airbyte_data":"id"::text AS INT) as id -- based on the PK which we know from the connector catalog
FROM Z_AIRBYTE.USERS_RAW
WHERE "_airbyte_data":"_ab_cdc_deleted_at" IS NOT NULL
OR "_airbyte_data":"_ab_cdc_deleted_at" = 'null'
)
;
-- Step 6: Apply typed_at timestamp where needed
UPDATE Z_AIRBYTE.USERS_RAW
SET "_airbyte_loaded_at" = CURRENT_TIMESTAMP()
WHERE "_airbyte_loaded_at" IS NULL
;
-- COMMIT;
RETURN 'SUCCESS';
END
$$;
----------------------------
--------- SYNC 1 -----------
----------------------------
CALL PUBLIC._AIRBYTE_PREPARE_RAW_TABLE();
-- Load the raw data
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 1,
first_name: "Evan",
age: 38,
updated_at: "2020-01-01T00:00:00Z",
address:{
city: "San Francisco",
zip: "94001"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 2,
first_name: "Brian",
age: 39,
updated_at: "2020-01-01T00:00:01Z",
address:{
city: "Menlo Park",
zip: "94002"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 3,
first_name: "Edward",
age: 40,
updated_at: "2020-01-01T00:00:02Z",
address:{
city: "Sunyvale",
zip: "94003"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
-- Joe is missing an age, null OK
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 4,
first_name: "Joe",
age: 40,
updated_at: "2020-01-01T00:00:03Z",
address:{
city: "Seattle",
zip: "98999"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
CALL PUBLIC._AIRBYTE_TYPE_DEDUPE();
----------------------------
--------- SYNC 2 -----------
----------------------------
CALL PUBLIC._AIRBYTE_PREPARE_RAW_TABLE();
-- Load the raw data
-- Age update for Evan (user 1)
-- There is an update for Brian (user 2, new address.zip)
-- There is an update for Edward (user 3, age is invalid)
-- No update for Joe (user 4)
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 1,
first_name: "Evan",
age: 39,
updated_at: "2020-01-02T00:00:00Z",
address:{
city: "San Francisco",
zip: "94001"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 2,
first_name: "Brian",
age: 39,
updated_at: "2020-01-02T00:00:01Z",
address:{
city: "Menlo Park",
zip: "99999"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 3,
first_name: "Edward",
age: "forty",
updated_at: "2020-01-02T00:00:02Z",
address:{
city: "Sunyvale",
zip: "94003"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
CALL PUBLIC._AIRBYTE_TYPE_DEDUPE();
----------------------------
--------- SYNC 3 -----------
----------------------------
CALL PUBLIC._AIRBYTE_PREPARE_RAW_TABLE();
-- Step 1: Load the raw data
-- Delete row 1 with CDC
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
_ab_cdc_deleted_at: true,
id: 2,
first_name: "Brian",
age: 39,
updated_at: "2020-01-03T00:00:00Z",
address:{
city: "Menlo Park",
zip: "99999"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 5,
first_name: "Cynthia",
age: 40,
updated_at: "2020-01-03T00:00:01Z",
address:{
city: "Redwood City",
zip: "98765"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 5,
first_name: "Cynthia",
age: 41,
updated_at: "2020-01-03T00:00:02Z",
address:{
city: "Redwood City",
zip: "98765"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
INSERT INTO Z_AIRBYTE.USERS_RAW ("_airbyte_data", "_airbyte_raw_id", "_airbyte_extracted_at") SELECT PARSE_JSON($${
id: 5,
first_name: "Cynthia",
age: 42,
updated_at: "2020-01-03T00:00:03Z",
address:{
city: "Redwood City",
zip: "98765"
} }$$), UUID_STRING(), CURRENT_TIMESTAMP();
CALL PUBLIC._AIRBYTE_TYPE_DEDUPE();
----------------------
-- FINAL VALIDATION --
----------------------
/*
You should see 5 RAW records, one for each of the 5 users
You should see 4 TYPED records, one for each user, except user #2, which was CDC deleted
You should have the latest data for each user in the typed final table:
* User #1 (Evan) has the latest data (age=39)
* User #3 (Edward) has a null age [+ error] due to that age being un-typable
* User #4 (Joe) has a null age & no errors
* User #5 (Cynthia) has one entry dispite the multiple insertes, with the latest entry (age=42)
*/
SELECT CURRENT_TIMESTAMP();