-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathstream_processing.sql
62 lines (54 loc) · 2 KB
/
stream_processing.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
------------------------------------------------------------------------------------------
-- docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
------------------------------------------------------------------------------------------
------------------------------------------------------------------------------------------
--- Creating streams from existing topics ------------------------------------------------
------------------------------------------------------------------------------------------
SET 'auto.offset.reset' = 'earliest';
LIST STREAMS;
DROP STREAM IF EXISTS reviewed_trashed_articles DELETE TOPIC;
DROP STREAM IF EXISTS reviewed_relevant_articles DELETE TOPIC;
DROP STREAM IF EXISTS review_status;
DROP STREAM IF EXISTS pending_review_articles;
CREATE STREAM pending_review_articles WITH (KAFKA_TOPIC='pending-review-articles', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=3, REPLICAS=3);
CREATE STREAM review_status WITH (KAFKA_TOPIC='review-status', VALUE_FORMAT='AVRO', PARTITIONS=3, REPLICAS=3);
CREATE STREAM reviewed_relevant_articles WITH (
KAFKA_TOPIC='reviewed-relevant-articles', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=3, REPLICAS=3
) AS SELECT
rs.doc_id,
pra.folder,
pra.headline,
pra.lead_para,
pra.tail_para,
pra.lang_id,
pra.lang_name,
pra.labels,
pra.bio_ner,
pra.ner,
pra.summary_text
FROM review_status rs
INNER JOIN pending_review_articles pra
WITHIN 1 DAYS GRACE PERIOD 12 HOURS
ON rs.doc_id = pra.ROWKEY->doc_id
WHERE rs.is_relevant = TRUE
EMIT CHANGES;
CREATE STREAM reviewed_trashed_articles WITH (
KAFKA_TOPIC='review-trashed-articles', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=3, REPLICAS=3
) AS SELECT
rs.doc_id,
pra.folder,
pra.headline,
pra.lead_para,
pra.tail_para,
pra.lang_id,
pra.lang_name,
pra.labels,
pra.bio_ner,
pra.ner,
pra.summary_text
FROM review_status rs
INNER JOIN pending_review_articles pra
WITHIN 1 DAYS GRACE PERIOD 12 HOURS
ON rs.doc_id = pra.ROWKEY->doc_id
WHERE rs.is_relevant = FALSE
EMIT CHANGES;