-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy patha-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html
2346 lines (2137 loc) · 161 KB
/
a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE html>
<!--[if lt IE 9 ]><html class="no-js oldie" lang="zh-hant-tw"> <![endif]-->
<!--[if IE 9 ]><html class="no-js oldie ie9" lang="zh-hant-tw"> <![endif]-->
<!--[if (gte IE 9)|!(IE)]><!-->
<html class="no-js" lang="zh-hant-tw">
<!--<![endif]-->
<head>
<!--- basic page needs
================================================== -->
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta name="author" content="Lee Meng" />
<title>LeeMeng - 一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載</title>
<!--- article-specific meta data
================================================== -->
<meta name="description" content="Airflow 是一個以 Python 開發的工作流管理系統,也是資料工程不可或缺的利器之一。近年不管是資料科學家、資料工程師還是任何需要處理數據的軟體工程師,Airflow 都是他們用來建構 ETL 以及處理批量資料的首選之一。這篇文章希望以一個簡易的漫畫連載通知 App 作為引子,讓讀者直觀地了解 Airflow 背後的運作原理、建立資料工程的知識基礎,並在閱讀本文後發揮自己的創意,實際應用 Airflow 來解決並自動化自己及企業的數據問題。" />
<meta name="keywords" content="Python, Airflow, 資料工程, Selenium, Slack" />
<meta name="tags" content="Python" />
<meta name="tags" content="Airflow" />
<meta name="tags" content="資料工程" />
<meta name="tags" content="Selenium" />
<meta name="tags" content="Slack" />
<!--- Open Graph Object metas
================================================== -->
<meta property="og:image" content="https://leemeng.tw/theme/images/background/raj-eiamworakul-603747-unsplash.jpg" />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html" />
<meta property="og:title" content="一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載" />
<meta property="og:description" content="Airflow 是一個以 Python 開發的工作流管理系統,也是資料工程不可或缺的利器之一。近年不管是資料科學家、資料工程師還是任何需要處理數據的軟體工程師,Airflow 都是他們用來建構 ETL 以及處理批量資料的首選之一。這篇文章希望以一個簡易的漫畫連載通知 App 作為引子,讓讀者直觀地了解 Airflow 背後的運作原理、建立資料工程的知識基礎,並在閱讀本文後發揮自己的創意,實際應用 Airflow 來解決並自動化自己及企業的數據問題。" />
<!-- mobile specific metas
================================================== -->
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- CSS
================================================== -->
<!--for customized css in individual page-->
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/bootstrap.min.css">
<!--for showing toc navigation which slide in from left-->
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/toc-nav.css">
<!--for responsive embed youtube video-->
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/embed_youtube.css">
<!--for prettify dark-mode result-->
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/darkmode.css">
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/base.css">
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/vendor.css">
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/main.css">
<link rel="stylesheet" type="text/css" href="https://leemeng.tw/theme/css/ipython.css">
<link rel="stylesheet" type="text/css" href='https://leemeng.tw/theme/css/progress-bar.css' />
<!--TiqueSearch-->
<link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400">
<link rel="stylesheet" href="https://leemeng.tw/theme/tipuesearch/css/normalize.css">
<link rel="stylesheet" href="https://leemeng.tw/theme/tipuesearch/css/tipuesearch.css">
<!-- script
================================================== -->
<script src="https://leemeng.tw/theme/js/modernizr.js"></script>
<script src="https://leemeng.tw/theme/js/pace.min.js"></script>
<!-- favicons
================================================== -->
<link rel="shortcut icon" href="../theme/images/favicon.ico" type="image/x-icon"/>
<link rel="icon" href="../theme/images/favicon.ico" type="image/x-icon"/>
<!-- Global Site Tag (gtag.js) - Google Analytics -->
<script async src="https://www.googletagmanager.com/gtag/js?id=UA-106559980-1"></script>
<script>
window.dataLayer = window.dataLayer || [];
function gtag(){dataLayer.push(arguments)};
gtag('js', new Date());
gtag('config', 'UA-106559980-1');
</script>
</head>
<body id="top">
<!-- header
================================================== -->
<header class="s-header">
<div class="header-logo">
<a class="site-logo" href="../index.html"><img src="https://leemeng.tw/theme/images/logo.png" alt="Homepage"></a>
</div>
<!--navigation bar ref: http://jinja.pocoo.org/docs/2.10/tricks/-->
<nav class="header-nav-wrap">
<ul class="header-nav">
<li>
<a href="../index.html#home">Home</a>
</li>
<li>
<a href="../index.html#about">About</a>
</li>
<li>
<a href="../index.html#projects">Projects</a>
</li>
<li class="current">
<a href="../blog.html">Blog</a>
</li>
<li>
<a href="https://demo.leemeng.tw">Demo</a>
</li>
<li>
<a href="../books.html">Books</a>
</li>
<li>
<a href="../index.html#contact">Contact</a>
</li>
</ul>
<!--<div class="search-container">-->
<!--<form action="../search.html">-->
<!--<input type="text" placeholder="Search.." name="search">-->
<!--<button type="submit"><i class="im im-magnifier" aria-hidden="true"></i></button>-->
<!--</form>-->
<!--</div>-->
</nav>
<a class="header-menu-toggle" href="#0"><span>Menu</span></a>
</header> <!-- end s-header -->
<!--TOC navigation displayed when clicked from left-navigation button-->
<div id="tocNav" class="overlay" onclick="closeTocNav()">
<div class="overlay-content">
<div id="toc"><ul><li><a class="toc-href" href="#" title="一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載">一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載</a><ul><li><a class="toc-href" href="#追連載:一個-Airflow-的輕鬆使用案例" title="追連載:一個 Airflow 的輕鬆使用案例">追連載:一個 Airflow 的輕鬆使用案例</a></li><li><a class="toc-href" href="#章節傳送門" title="章節傳送門">章節傳送門</a></li><li><a class="toc-href" href="#所以為何要這-App-?" title="所以為何要這 App ?">所以為何要這 App ?</a></li><li><a class="toc-href" href="#工作流概念-&-Airflow" title="工作流概念 & Airflow">工作流概念 & Airflow</a></li><li><a class="toc-href" href="#Python-&-Airflow-實作" title="Python & Airflow 實作">Python & Airflow 實作</a><ul><li><a class="toc-href" href="#建置-Airflow-環境" title="建置 Airflow 環境">建置 Airflow 環境</a></li><li><a class="toc-href" href="#Airflow-基本概念" title="Airflow 基本概念">Airflow 基本概念</a></li><li><a class="toc-href" href="#App-版本一:大鍋炒" title="App 版本一:大鍋炒">App 版本一:大鍋炒</a><ul><li><a class="toc-href" href="#輕鬆排程" title="輕鬆排程">輕鬆排程</a></li><li><a class="toc-href" href="#Operator:將實作邏輯跟-DAG-排程分離" title="Operator:將實作邏輯跟 DAG 排程分離">Operator:將實作邏輯跟 DAG 排程分離</a></li><li><a class="toc-href" href="#測試開發-Airflow-工作" title="測試開發 Airflow 工作">測試開發 Airflow 工作</a></li></ul></li><li><a class="toc-href" href="#App-版本二:模組化_1" title="App 版本二:模組化">App 版本二:模組化</a><ul><li><a class="toc-href" href="#Airflow-排程器" title="Airflow 排程器">Airflow 排程器</a></li><li><a class="toc-href" href="#手動觸發-DAG" title="手動觸發 DAG">手動觸發 DAG</a></li><li><a class="toc-href" href="#定義工作流程" title="定義工作流程">定義工作流程</a></li><li><a class="toc-href" href="#Airflow-變數以及-Jinja-模板" title="Airflow 變數以及 Jinja 模板">Airflow 變數以及 Jinja 模板</a></li><li><a class="toc-href" href="#執行日期:排程最重要的概念" title="執行日期:排程最重要的概念">執行日期:排程最重要的概念</a></li><li><a class="toc-href" href="#正式排程" title="正式排程">正式排程</a></li></ul></li><li><a class="toc-href" href="#App-版本三:填填樂_1" title="App 版本三:填填樂">App 版本三:填填樂</a><ul><li><a class="toc-href" href="#重複利用-Python-函式" title="重複利用 Python 函式">重複利用 Python 函式</a></li><li><a class="toc-href" href="#Xcom:工作之間的訊息交換" title="Xcom:工作之間的訊息交換">Xcom:工作之間的訊息交換</a></li><li><a class="toc-href" href="#在工作流程內加入條件分支" title="在工作流程內加入條件分支">在工作流程內加入條件分支</a></li></ul></li></ul></li><li><a class="toc-href" href="#如何建立你自己的連載通知-App(懶人法)_2" title="如何建立你自己的連載通知 App(懶人法)">如何建立你自己的連載通知 App(懶人法)</a></li><li><a class="toc-href" href="#結語" title="結語">結語</a></li></ul></li></ul></div>
</div>
</div>
<!--custom images with icon shown on left nav-->
<!--the details are set in `pelicanconf.py` as `LEFT_NAV_IMAGES`-->
<article class="blog-single">
<!-- page header/blog hero, use custom cover image if available
================================================== -->
<div class="page-header page-header--single page-hero" style="background-image:url(https://leemeng.tw/theme/images/background/raj-eiamworakul-603747-unsplash.jpg)">
<div class="row page-header__content narrow">
<article class="col-full">
<div class="page-header__info">
<div class="page-header__cat">
<a href="https://leemeng.tw/tag/python.html" rel="tag">Python</a>
<a href="https://leemeng.tw/tag/airflow.html" rel="tag">Airflow</a>
<a href="https://leemeng.tw/tag/zi-liao-gong-cheng.html" rel="tag">資料工程</a>
<a href="https://leemeng.tw/tag/selenium.html" rel="tag">Selenium</a>
<a href="https://leemeng.tw/tag/slack.html" rel="tag">Slack</a>
</div>
</div>
<h1 class="page-header__title">
<a href="https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html" title="">
一段 Airflow 與資料工程的故事:談如何用 Python 追漫畫連載
</a>
</h1>
<ul class="page-header__meta">
<li class="date">2018-08-21 (Tue)</li>
<li class="page-view">
58,011 views
</li>
</ul>
</article>
</div>
</div> <!-- end page-header -->
<div class="KW_progressContainer">
<div class="KW_progressBar"></div>
</div>
<div class="row blog-content" style="position: relative">
<div id="left-navigation">
<div id="search-wrap">
<i class="im im-magnifier" aria-hidden="true"></i>
<div id="search">
<form action="../search.html">
<div class="tipue_search_right"><input type="text" name="q" id="tipue_search_input" pattern=".{2,}" title="想搜尋什麼呢?(請至少輸入兩個字)" required></div>
</form>
</div>
</div>
<div id="toc-wrap">
<a title="顯示/隱藏 文章章節">
<i class="im im-menu" aria-hidden="true" onclick="toggleTocNav()"></i>
</a>
</div>
<div id="social-wrap" style="cursor: pointer">
<a class="open-popup" title="訂閱最新文章">
<i class="im im-newspaper-o" aria-hidden="true"></i>
</a>
</div>
<div id="social-wrap">
<a href="https://www.facebook.com/sharer/sharer.php?u=https%3A//leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html" target="_blank" title="分享到 Facebook">
<i class="im im-facebook" aria-hidden="true"></i>
</a>
</div>
<div id="social-wrap">
<a href="https://www.linkedin.com/shareArticle?mini=true&url=https%3A//leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html&title=%E4%B8%80%E6%AE%B5%20Airflow%20%E8%88%87%E8%B3%87%E6%96%99%E5%B7%A5%E7%A8%8B%E7%9A%84%E6%95%85%E4%BA%8B%EF%BC%9A%E8%AB%87%E5%A6%82%E4%BD%95%E7%94%A8%20Python%20%E8%BF%BD%E6%BC%AB%E7%95%AB%E9%80%A3%E8%BC%89&summary=Airflow%20%E6%98%AF%E4%B8%80%E5%80%8B%E4%BB%A5%20Python%20%E9%96%8B%E7%99%BC%E7%9A%84%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%AE%A1%E7%90%86%E7%B3%BB%E7%B5%B1%EF%BC%8C%E4%B9%9F%E6%98%AF%E8%B3%87%E6%96%99%E5%B7%A5%E7%A8%8B%E4%B8%8D%E5%8F%AF%E6%88%96%E7%BC%BA%E7%9A%84%E5%88%A9%E5%99%A8%E4%B9%8B%E4%B8%80%E3%80%82%E8%BF%91%E5%B9%B4%E4%B8%8D%E7%AE%A1%E6%98%AF%E8%B3%87%E6%96%99%E7%A7%91%E5%AD%B8%E5%AE%B6%E3%80%81%E8%B3%87%E6%96%99%E5%B7%A5%E7%A8%8B%E5%B8%AB%E9%82%84%E6%98%AF%E4%BB%BB%E4%BD%95%E9%9C%80%E8%A6%81%E8%99%95%E7%90%86%E6%95%B8%E6%93%9A%E7%9A%84%E8%BB%9F%E9%AB%94%E5%B7%A5%E7%A8%8B%E5%B8%AB%EF%BC%8CAirflow%20%E9%83%BD%E6%98%AF%E4%BB%96%E5%80%91%E7%94%A8%E4%BE%86%E5%BB%BA%E6%A7%8B%20ETL%20%E4%BB%A5%E5%8F%8A%E8%99%95%E7%90%86%E6%89%B9%E9%87%8F%E8%B3%87%E6%96%99%E7%9A%84%E9%A6%96%E9%81%B8%E4%B9%8B%E4%B8%80%E3%80%82%E9%80%99%E7%AF%87%E6%96%87%E7%AB%A0%E5%B8%8C%E6%9C%9B%E4%BB%A5%E4%B8%80%E5%80%8B%E7%B0%A1%E6%98%93%E7%9A%84%E6%BC%AB%E7%95%AB%E9%80%A3%E8%BC%89%E9%80%9A%E7%9F%A5%20App%20%E4%BD%9C%E7%82%BA%E5%BC%95%E5%AD%90%EF%BC%8C%E8%AE%93%E8%AE%80%E8%80%85%E7%9B%B4%E8%A7%80%E5%9C%B0%E4%BA%86%E8%A7%A3%20Airflow%20%E8%83%8C%E5%BE%8C%E7%9A%84%E9%81%8B%E4%BD%9C%E5%8E%9F%E7%90%86%E3%80%81%E5%BB%BA%E7%AB%8B%E8%B3%87%E6%96%99%E5%B7%A5%E7%A8%8B%E7%9A%84%E7%9F%A5%E8%AD%98%E5%9F%BA%E7%A4%8E%EF%BC%8C%E4%B8%A6%E5%9C%A8%E9%96%B1%E8%AE%80%E6%9C%AC%E6%96%87%E5%BE%8C%E7%99%BC%E6%8F%AE%E8%87%AA%E5%B7%B1%E7%9A%84%E5%89%B5%E6%84%8F%EF%BC%8C%E5%AF%A6%E9%9A%9B%E6%87%89%E7%94%A8%20Airflow%20%E4%BE%86%E8%A7%A3%E6%B1%BA%E4%B8%A6%E8%87%AA%E5%8B%95%E5%8C%96%E8%87%AA%E5%B7%B1%E5%8F%8A%E4%BC%81%E6%A5%AD%E7%9A%84%E6%95%B8%E6%93%9A%E5%95%8F%E9%A1%8C%E3%80%82&source=https%3A//leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html" target="_blank" title="分享到 LinkedIn">
<i class="im im-linkedin" aria-hidden="true"></i>
</a>
</div>
<div id="social-wrap">
<a href="https://twitter.com/intent/tweet?text=%E4%B8%80%E6%AE%B5%20Airflow%20%E8%88%87%E8%B3%87%E6%96%99%E5%B7%A5%E7%A8%8B%E7%9A%84%E6%95%85%E4%BA%8B%EF%BC%9A%E8%AB%87%E5%A6%82%E4%BD%95%E7%94%A8%20Python%20%E8%BF%BD%E6%BC%AB%E7%95%AB%E9%80%A3%E8%BC%89&url=https%3A//leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html&hashtags=python,airflow,zi-liao-gong-cheng,selenium,slack" target="_blank" title="分享到 Twitter">
<i class="im im-twitter" aria-hidden="true"></i>
</a>
</div>
<!--custom images with icon shown on left nav-->
</div>
<div class="col-full blog-content__main">
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<blockquote>
<p>
這是一篇當初我在入門資料工程以及 Airflow 時希望有人能為我寫好的文章。
<br/>
<br/>
</p>
</blockquote>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p><a href="https://airflow.apache.org/">Airflow</a> 是一個從 Airbnb 誕生並開源,以 <a href="https://www.python.org/">Python</a> 寫成的<a href="https://zh.wikipedia.org/wiki/%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%AE%A1%E7%90%86%E7%B3%BB%E7%BB%9F">工作流程管理系統(Workflow Management System)</a>,也是<a href="https://github.com/apache/incubator-airflow#who-uses-airflow">各大企業</a>的資料工程環節中不可或缺的利器之一。</p>
<p>近年不管是資料科學家、資料工程師還是任何需要處理數據的軟體工程師,Airflow 都是他們用來建構可靠的 ETL 以及定期處理批量資料的首選之一。(事實上在 <a href="https://www.smartnews.com/en/">SmartNews</a>,除了 DS/DE,會使用 Airflow 的軟體工程師也不在少數)</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/journal/smartnews-dmp.png" style="mix-blend-mode: initial;"/>
</center>
<center>
在「資料科學家 L 的奇幻旅程(1):新人不得不問的 2 個問題」一文提到 SmartNews 如何利用 Airflow 建立資料管道並管理各種 ETL
(<a href="https://leemengtaiwan.github.io/journey-of-data-scientist-L-part-1-two-must-ask-questions-when-on-board.html#%E5%84%80%E8%A1%A8%E6%9D%BF%E4%B8%8A%E7%9A%84-KPI-%E6%98%AF%E6%80%8E%E9%BA%BC%E7%94%A2%E7%94%9F%E7%9A%84%EF%BC%9F" target="_blank">圖片來源</a>)
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>儘管它的方便以及強大,在完全熟悉 Airflow 之前,因為有些專業術語以及資料工程概念的存在,不少初學者(包含當時的我)在剛開始的時候容易四處撞壁。另外如果一開始就以 ETL 當作 Airflow 的入門的話,未免難度過高且缺少共鳴。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h2 id="追連載:一個-Airflow-的輕鬆使用案例">追連載:一個 Airflow 的輕鬆使用案例<a class="anchor-link" href="#追連載:一個-Airflow-的輕鬆使用案例">¶</a></h2><p>這篇文章希望以一個簡易的漫畫連載通知 App 作為引子,讓完全沒有資料工程經驗的讀者也能夠透過這個 App 的例子,輕鬆地理解工作流程的概念、自動化排程以及 Airflow 的使用方式。閱讀完本文,你將對 Airflow 以及自動排程工作有更深的理解,並學會如何建立多個能在 Airflow 上穩定運行的工作流程。更重要的,我相信你能利用這些學到的基礎,開始自動化自己生活中以及企業的數據處理 pipeline。</p>
<p>如果你對資料工程有興趣,不太熟悉如 Airflow 這種工作流程管理系統,但有基本的 Python 程式基礎的話(或是純粹對用 Python 寫一個漫畫連載通知 App 有興趣),我相信這篇文章應該會很適合你。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/app.jpg" style="mix-blend-mode: initial;width:70%;"/>
</center>
<center>
Slack 截圖:追漫畫應該要是件輕鬆的事情。我們將利用 Airflow 來實作一個像這樣會每天從 Slack 推送最新漫畫連載的 App
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>想重新複習 ETL 概念的讀者可以參考先前的文章:<a href="https://leemengtaiwan.github.io/why-you-need-to-learn-data-engineering-as-a-data-scientist.html#%E8%B3%87%E6%96%99%E7%AE%A1%E9%81%93">資料科學家為何需要了解資料工程</a>。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h2 id="章節傳送門">章節傳送門<a class="anchor-link" href="#章節傳送門">¶</a></h2><ul>
<li><a href="#所以為何要這-App-?">了解需求:所以為何要這 App ?</a></li>
<li><a href="#工作流概念-&-Airflow">工作流概念 & Airflow</a></li>
<li><a href="#Python-&-Airflow-實作">Python 實作 & Airflow 操作</a><ul>
<li><a href="#建置-Airflow-環境">建置 Airflow 環境</a></li>
<li><a href="#Airflow-基本概念">Airflow 基本概念</a></li>
<li><a href="#App-版本一:大鍋炒">App 版本一:大鍋炒</a></li>
<li><a href="#app-v2">App 版本二:模組化</a></li>
<li><a href="#app-v3">App 版本三:填填樂</a></li>
</ul>
</li>
<li><a href="#結語">結語</a></li>
</ul>
<p>為讓讀者完整了解開發這個 App 的背景脈絡、此 App 的執行邏輯以及使用 Airflow 來定期執行 App 的原因,在我們實際開始寫 Python 之前有兩小節的解說。</p>
<p>如果你已經有 Airflow 及工作流程的基礎知識,且迫不及待想看 Python 程式碼,可以直接跳到 <a href="#Python-&-Airflow-實作">Python 實作 & Airflow 操作</a>章節之後再回來查看前面段落。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/toc-demo.jpg" style="mix-blend-mode: initial;"/>
</center>
<center>
這篇文章章節不少,你有時可能會需要回到前面章節回顧一些內容。活用左側放大鏡按鈕下面的章節傳送門能讓你更輕鬆地徜徉在本文的 Airflow 世界
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h2 id="所以為何要這-App-?">所以為何要這 App ?<a class="anchor-link" href="#所以為何要這-App-?">¶</a></h2>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>平常有在網路上追漫畫連載的讀者們應該都了解,市面上的漫畫網站通常都不是會員制的。更不用說「在新連載出的時候自動通知您!」這種推送功能(Push Notification)了。也因為這樣,導致我常常三不五時上去這些漫畫網站,看每個關注的漫畫到底出了最新一話了沒。可想而知,答案通常是否定的。(一週出一次每天檢查也沒用啊啊啊)</p>
<p>如果你只看海賊王一個漫畫(索隆好帥!),這或許沒什麼負擔。但就像上面 Slack 截圖顯示的,我不只關注海賊王,還看很多其他漫畫。讓事情更糟的是,到最後你會發現:</p>
<ul>
<li>不記得自己到底在追哪些漫畫</li>
<li>每一部漫畫最後到底是看到第幾話</li>
<li>上一話是什麼時候出的</li>
<li>有幾話是新出而你還沒看的 </li>
</ul>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/tim-gouw-68319-unsplash.jpg"/>
</center>
<center>
手動追最新連載經常讓我追到懷疑人生
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>追漫畫連載應該要是個輕鬆且享受的事情。在一個人人會寫 code 的時代,何不自己做個 App 幫我們自動檢查新連載呢?</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h2 id="工作流概念-&-Airflow">工作流概念 & Airflow<a class="anchor-link" href="#工作流概念-&-Airflow">¶</a></h2>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>概念上我們可以把此 App 需要做的工作按照「先後順序」由上往下列出來:</p>
<ul>
<li>取得使用者的閱讀紀錄</li>
<li>去漫畫網站看有沒有新的章節</li>
<li>跟紀錄比較,有沒有新連載?<ul>
<li>沒有:<ul>
<li>什麼都不幹,結束</li>
</ul>
</li>
<li>有:<ul>
<li>寄 Slack 通知</li>
<li>更新閱讀紀錄</li>
</ul>
</li>
</ul>
</li>
</ul>
<p>想像上述的工作清單由上往下流動,就形成了一個工作流程(Workflow):前一個工作如寄 Slack 通知就是下一個工作:更新閱讀紀錄的上游工作(Upstream Task)。</p>
<p>反過來說,更新閱讀紀錄則是寄 Slack 通知的下游工作(Downstream Task)。</p>
<p>定義出工作之間的上下游關係的好處是什麼?</p>
<p>可以讓我們確保工作之間的相依性(Dependencies)並讓如 Airflow 這種工作流程管理系統幫我們管理工作流程。一般而言,下游工作只能在上游「成功」完成之後被執行;如果上游工作失敗的話,下游工作應該被終止,通常也沒有繼續執行的意義(例:如果 App 在執行上游工作「取得使用者閱讀紀錄」時就失敗的話,不需要也不應該執行下游的「更新閱讀紀錄」工作)。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/www-headsmartmedia-com-179929-unsplash.jpg"/>
</center>
<center>
我們的 App 實際上就是一個完整的工作流程。App 從工作 A 執行到工作 B 就像是水從上游 A 流動到下游 B 一樣。
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>我知道你在想什麼。</p>
<p>屏除剛剛介紹的工作流程概念,要實作這 App 的邏輯一點都不難。事實上我們只需要寫個 Python script,把每個工作各別用一個函式(Function)實作後再按照順序呼叫它們就好(你甚至可以只用一個函式實現所有邏輯!),為何需要 Airflow?</p>
<p>在你往下滑前給個提示:我們這個 App 不是每一秒鐘都在執行。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<img src="https://leemeng.tw/images/airflow/thought-2123970_1280.jpg" style="width:70%;"/>
<br/>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>對!顯而易見的,因為這個 App / 工作流程設計的方式不是即時工作(Realtime Job),而是批次工作,執行一次以後就結束它的生命了。</p>
<p>我們可不希望它只在明天早上(比方說早上 9 點)去檢查新連載。我們希望它明天、下個月或是明年的今天早上都在運作。這也是為何我們需要一個像是 Airflow 的工作流程管理系統:</p>
<ol>
<li>定期執行工作流程</li>
<li>維護相依性,確保工作流程從上游到下游執行,不會在上游沒完成前執行到下游</li>
<li>各個工作失敗時自動重試(<a href="https://zh.wikipedia.org/wiki/%E6%91%A9%E8%8F%B2%E5%AE%9A%E7%90%86">墨菲定律</a>,所有你認為邏輯上萬無一失的工作都會因為各種無法預期的情況給你失敗的驚喜)</li>
<li>簡單易懂的 <a href="https://airflow.apache.org/">Web UI</a> 方便管理工作流程</li>
</ol>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<blockquote>
<p>
Airflow 非常適合用來管理相依性複雜,且具批次處理性質的工作流程。
<br/>
<br/>
</p>
</blockquote>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>小提醒:暗色模式為方便讀者閱讀,會用與真實 AirFlow UI 不同的顏色來呈現,但概念是一模一樣的。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/airflow.gif" style="mix-blend-mode: initial;"/>
</center>
<center>
Airflow 的 Web UI 讓我們能更輕鬆地管理及排程工作流程(後面我們會實際利用此 UI 管理並開發 App)
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>事實上我們也可以透過 Linux 排程工具 <a href="https://zh.wikipedia.org/wiki/Cron">Cron</a> 來定期執行我們的 App。但 Cron 本身沒有工作流程的概念,沒辦法管理上下游工作的相依性、失敗時無法自動重跑、當然也沒有易懂的 Web UI。因此以 2, 3, 4 項的角度來看,Airflow 是一個比較好的選擇。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>到此為止,我們已經了解</p>
<ul>
<li>為何要做這個 App</li>
<li>此 App 的工作流程以及工作流程(Workflow)的基本概念</li>
<li>為何要使用 Airflow 來幫我們管理 App 的工作流程</li>
</ul>
<p>接著只差用 Python 將 App 的邏輯以 Airflow 工作流程的方式實現了,讓我們開始實作吧!</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/show-me-the-code.jpg" style="width:80%;"/>
</center>
<center>
[Warning] 接下來不只給你 Python 程式碼,而是給你大量的 Python 程式碼
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h2 id="Python-&-Airflow-實作">Python & Airflow 實作<a class="anchor-link" href="#Python-&-Airflow-實作">¶</a></h2><p>程式碼都會放在這個 <a href="https://github.com/leemengtaiwan/airflow-tutorials">Github Repo</a> 裡頭供你在閱讀完文章後參考。但如果你正在用電腦瀏覽的話且想趕快熟悉 Airflow 開發的話,可以 <code>git clone</code> 下來以後跟著文章走。</p>
<p>開啟一個新的 terminal,移動到你平常放新專案的資料夾,然後輸入:</p>
<div class="highlight"><pre><span></span>git<span class="w"> </span>clone<span class="w"> </span>https://github.com/leemengtaiwan/airflow-tutorials.git
<span class="nb">cd</span><span class="w"> </span>airflow-tutorials
</pre></div>
<p>之後沒特別明說的話,指令都會是在 <code>airflow-tutorials</code> 資料夾底下執行。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h3 id="建置-Airflow-環境">建置 Airflow 環境<a class="anchor-link" href="#建置-Airflow-環境">¶</a></h3><p>雖然 production 環境需要很多調整,以建構測試環境來說,基本上參考官方的 <a href="https://airflow.apache.org/start.html#quick-start">Quick Start</a> 就可以很輕鬆地完成。因為 Airflow 是以 Python 實作的,我們可以很輕易地用 <code>pip install</code> 來安裝所有需要的東西。用 <a href="https://anaconda.org/">Anaconda</a> 則是能讓你事後管理不同專案的環境時輕鬆不少:</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<div class="highlight"><pre><span></span>conda<span class="w"> </span>create<span class="w"> </span>-n<span class="w"> </span>airflow-tutorials<span class="w"> </span><span class="nv">python</span><span class="o">=</span><span class="m">3</span>.6<span class="w"> </span>-y
<span class="nb">source</span><span class="w"> </span>activate<span class="w"> </span>airflow-tutorials
pip<span class="w"> </span>install<span class="w"> </span><span class="s2">"apache-airflow[crypto, slack]"</span>
<span class="nb">export</span><span class="w"> </span><span class="nv">AIRFLOW_HOME</span><span class="o">=</span><span class="s2">"</span><span class="k">$(</span><span class="nb">pwd</span><span class="k">)</span><span class="s2">"</span>
airflow<span class="w"> </span>initdb
</pre></div>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>以上的指令幫我們:</p>
<ul>
<li>建立並啟動一個新的 Anaconda 環境</li>
<li>在此環境下安裝 Airflow 以及<a href="https://airflow.apache.org/installation.html#extra-packages">支援 Slack 功能的額外函式庫</a></li>
<li>設定專用路徑以讓 Airflow 之後知道要在哪找檔案、存 log</li>
<li>初始化 Airflow Metadata DB。此 DB 被用來記錄所有工作流程的執行狀況</li>
</ul>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>理想上把 <code>AIRFLOW_HOME</code> 加入到 <code>~/.bash_profile</code> 裡頭之後會比較輕鬆,不過現在不做也沒關係。</p>
<p>【2018/08/27 加註】如果沒有設定 <code>export AIRFLOW_HOME="$(pwd)"</code> 就執行 <code>airflow initdb</code>的話,會讓 Airflow 使用作者當初測試時使用的路徑,而不是你 <code>git clone</code> 下來的 repo 的路徑而造成問題,務必記得設定。</p>
<p>在環境都搞定之後,我們可以啟動 Airflow 的網頁伺服器:</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<div class="highlight"><pre><span></span>airflow<span class="w"> </span>webserver<span class="w"> </span>-p<span class="w"> </span><span class="m">8080</span>
</pre></div>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>接著在瀏覽器輸入 <code>localhost:8080</code> 就能看到 Airflow 簡潔的 Web UI 了:</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/first-impression-of-airflow-web-ui.jpg" style="mix-blend-mode: initial;"/>
</center>
<center>
Airflow Web UI 首頁:顯示所有已定義的工作流程(DAG)。圖中的 3 個 DAG 就對應到我們接下來逐漸改善 App 時產生的三個 App 版本
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h3 id="Airflow-基本概念">Airflow 基本概念<a class="anchor-link" href="#Airflow-基本概念">¶</a></h3><p>這邊值得注意的是 Airflow 利用 <a href="https://airflow.apache.org/concepts.html#dags">DAG</a> 一詞來代表一種特殊的工作流程(Workflow)。如工作流程一樣,DAG 定義了我們有什麼工作、工作之間的執行順序以及依賴關係。DAG 的最終目標是將所有工作依照上下游關係全部執行,而不是關注個別的工作實際上是怎麼被實作的(這點在後面的 <a href="#Operator:將實作邏輯跟-DAG-排程分離">Operator</a> 章節會有詳細解釋)。</p>
<p>另外從它的全名<a href="http://www.csie.ntnu.edu.tw/~u91029/DirectedAcyclicGraph.html">有向無環圖(<strong>D</strong>irected <strong>A</strong>cyclic <strong>G</strong>raph)</a>你可以看出它具備兩個特色:「有向」及「無環」。事實上我們的 App 邏輯就是一個理想的 DAG。首先,裡頭包含多個邏輯上的工作:</p>
<ul>
<li>取得使用者的閱讀紀錄</li>
<li>去漫畫網站看有沒有新的章節</li>
<li>跟紀錄比較,有沒有新連載?<ul>
<li>沒有:<ul>
<li>什麼都不幹,結束</li>
</ul>
</li>
<li>有:<ul>
<li>寄 Slack 通知</li>
<li>更新閱讀紀錄</li>
</ul>
</li>
</ul>
</li>
</ul>
<p>很明顯地, App 是從上而下地執行每個工作,即為「有向」;同時 App 不會在更新閱讀紀錄以後(下游工作),還跑回去漫畫網站看有沒有新的章節(上游工作):上游會指向下游,但下游不會指回上游,此即「無環」。</p>
<p>有了這個理解以後,我們的目標就很明顯了:將 App 的工作流程轉換成一個能在 Airflow 上執行的 DAG,然後排程它,就能讓它每天去找新連載!</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h3 id="App-版本一:大鍋炒">App 版本一:大鍋炒<a class="anchor-link" href="#App-版本一:大鍋炒">¶</a></h3>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>在 Airflow 世界裡,一個 DAG 是由一個 Python script 所定義的。</p>
<p>以下是我們 App 的第一個版本,也是最簡單的 DAG <code>comic_app_v1</code> 的程式碼(<code>airflow-tutorials/dags</code> 資料夾底下的 <code>comic_app_v1.py</code>):</p>
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">time</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">timedelta</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">DAG</span>
<span class="kn">from</span> <span class="nn">airflow.operators.python_operator</span> <span class="kn">import</span> <span class="n">PythonOperator</span>
<span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">'owner'</span><span class="p">:</span> <span class="s1">'Meng Lee'</span><span class="p">,</span>
<span class="s1">'start_date'</span><span class="p">:</span> <span class="n">datetime</span><span class="p">(</span><span class="mi">2100</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">0</span><span class="p">,</span> <span class="mi">0</span><span class="p">),</span>
<span class="s1">'schedule_interval'</span><span class="p">:</span> <span class="s1">'@daily'</span><span class="p">,</span>
<span class="s1">'retries'</span><span class="p">:</span> <span class="mi">2</span><span class="p">,</span>
<span class="s1">'retry_delay'</span><span class="p">:</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">minutes</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">def</span> <span class="nf">fn_superman</span><span class="p">():</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"取得使用者的閱讀紀錄"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"去漫畫網站看有沒有新的章節"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"跟紀錄比較,有沒有新連載?"</span><span class="p">)</span>
<span class="c1"># Murphy's Law</span>
<span class="n">accident_occur</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">></span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">accident_occur</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"</span><span class="se">\n</span><span class="s2">天有不測風雲,人有旦夕禍福"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"工作遇到預期外狀況被中斷</span><span class="se">\n</span><span class="s2">"</span><span class="p">)</span>
<span class="k">return</span>
<span class="n">new_comic_available</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">></span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">new_comic_available</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"寄 Slack 通知"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"更新閱讀紀錄"</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"什麼都不幹,工作順利結束"</span><span class="p">)</span>
<span class="k">with</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'comic_app_v1'</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span> <span class="k">as</span> <span class="n">dag</span><span class="p">:</span>
<span class="n">superman_task</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'superman_task'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">fn_superman</span>
<span class="p">)</span>
</pre></div>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>為了讓你能專注在 Airflow 及 DAG 最核心的概念,讓我先用 <code>print()</code> 假裝我們已經在一個函式 <code>fn_superman</code> 裡頭實作所有工作的邏輯了。在修改完代表一個 DAG 的 Python script 後,要確保 Airflow 能正確地將其視為一個 DAG,最基本的檢查就是用 Python 直接執行該 script。</p>
<p>你目前的 terminal 應該正被 Airflow 的網頁伺服器所使用。如果你還沒有把 <code>AIRFLOW_HOME</code> 加到 <code>~/.bash_profile</code> 裡頭的話,開啟一個新的 terminal,重新進入 <code>airflow-tutorials</code> 資料夾以後執行:</p>
<div class="highlight"><pre><span></span><span class="nb">source</span><span class="w"> </span>activate<span class="w"> </span>airflow-tutorials
<span class="nb">export</span><span class="w"> </span><span class="nv">AIRFLOW_HOME</span><span class="o">=</span><span class="s2">"</span><span class="k">$(</span><span class="nb">pwd</span><span class="k">)</span><span class="s2">"</span>
</pre></div>
<p>這邊我們為新的 terminal 啟動 Anaconda 環境,並告訴 Airflow 在 <code>airflow-tutorials</code> 資料夾底下找所有它要的東西。(之後要打開新的 terminal 也要做一樣的事情)</p>
<p>接著我們就可以用 Python 測試 script 的正確性:</p>
<div class="highlight"><pre><span></span>python<span class="w"> </span>dags/comic_app_v1.py
</pre></div>
<p>沒有特別設定的話, Airflow 會去 <code>AIRFLOW_HOME</code> 路徑底下的 <code>dags</code> 子資料夾找 DAG,這也是為何我們在上面路徑有個 <code>dags</code>。(你可以去 <a href="https://github.com/leemengtaiwan/airflow-tutorials">Repo</a> 確定檔案的路徑。)</p>
<p>如果沒有任何錯誤跑出來,恭喜!Airflow 能將其視為一個正常的 DAG 並顯示在 Web UI 上。之後只要你有修改 DAG 裡頭的程式碼,都應該做這個檢查。</p>
<p>這個 DAG 的程式碼雖不長,卻隱含了一些非常重要的概念。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h4 id="輕鬆排程">輕鬆排程<a class="anchor-link" href="#輕鬆排程">¶</a></h4><div class="highlight"><pre><span></span><span class="k">with</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'comic_app_v1'</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span> <span class="k">as</span> <span class="n">dag</span><span class="p">:</span>
<span class="o">...</span>
</pre></div>
<p>靠近 <a href="#App-版本一:大鍋炒">Script</a> 尾端的這行實際上就定義了我們的 DAG 並將它命名為 <code>comic_app_v1</code>。而此 DAG 的排程(Scheduling)設定如</p>
<ul>
<li><code>'start_date': datetime(2100, 1, 1, 0, 0)</code> 代表從西元 2100 年開始第一次執行此 DAG </li>
<li>每次執行之間間隔多久。<code>'schedule_interval': '@daily'</code> 代表每天執行一次</li>
<li><code>'retries': 2</code> 則允許 Airflow 在 DAG 失敗時重試 2 次 </li>
<li>DAG 失敗後等多久後開始重試(<code>'retry_delay': timedelta(minutes=1)</code> 代表等一分鐘)</li>
<li>更多更多 ...</li>
</ul>
<p>乍看之下沒什麼了不起的,就是些設定。</p>
<p>但如果你有自己從頭實作過資料管道的經驗或者使用過 <a href="https://zh.wikipedia.org/wiki/Cron">Cron</a> 排程 ETL,就能體會 Airflow 這樣的「Configuration as Code」有多麽的強大:你只做一些設定(Config),Airflow 就幫你自動建立可靠、失敗時會自動重試的工作流程。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<center>
<img src="https://leemeng.tw/images/airflow/utah-mechanical-contractors-1103725_1280.jpg"/>
</center>
<center>
按幾個按鈕就能做出可靠的工作流程,將自動化、失敗重試、相依性管理全部交給 Airflow
<br/>
<br/>
</center>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>這些排程設定為了方便管理,一般都另外定義在 <code>default_args</code> 變數並放在 script 的最上面。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h4 id="Operator:將實作邏輯跟-DAG-排程分離">Operator:將實作邏輯跟 DAG 排程分離<a class="anchor-link" href="#Operator:將實作邏輯跟-DAG-排程分離">¶</a></h4><p>最有趣的是我們使用 <code>with</code> 關鍵字來定義一個只屬於 <code>comic_app_v1</code> DAG 的領域。在這裡頭我們則定義了唯一一個工作 <code>superman_task</code> 處理所有事情(你應該能猜到為何它被這樣命名):</p>
<div class="highlight"><pre><span></span><span class="k">with</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'comic_app_v1'</span><span class="p">,</span> <span class="o">...</span>
<span class="n">superman_task</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'superman_task'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">fn_superman</span>
<span class="p">)</span>
</pre></div>
<p>這段程式碼用白話翻譯的話,就是說:</p>
<ul>
<li>在 DAG <code>comic_app_v1</code> 裡頭,利用 <code>PythonOperator</code> 建立一個名為 <code>superman_task</code> 的工作,而實際執行這個工作的時候,呼叫 <code>fn_superman</code> 函式。</li>
</ul>
<p>一個非常重要且需要你搞懂的概念是,現在說的工作(Task),是指那些實際透過程式碼宣告,在 DAG 裡頭被定義出來的工作,如 <code>superman_task</code>。</p>
<p>前面我們提到,App 概念上本身就包含了多個工作(步驟):</p>
<ul>
<li>取得使用者的閱讀紀錄</li>
<li>去漫畫網站看有沒有新的章節</li>
<li>跟紀錄比較,有沒有新連載?<ul>
<li>沒有:<ul>
<li>什麼都不幹,結束</li>
</ul>
</li>
<li>有:<ul>
<li>寄 Slack 通知</li>
<li>更新閱讀紀錄</li>
</ul>
</li>
</ul>
</li>
</ul>
<p>這些是「邏輯上」的工作,而在 <code>comic_app_v1</code> DAG 裡頭,為了方便說明,我們將它們全部包起來,定義成唯一一個 Airflow 工作: <code>superman_task</code>。(在 <a href="#app-v2">App 版本二:模組化</a>章節裡,我們則會分別為這些「邏輯工作」建立他們自己的 Airflow 工作)。</p>
<p>回到 Opeartor 的話題。在 Airflow 裡頭,DAG 只知道有哪些工作以及這些工作之間的執行順序。而實際上這些工作要怎麼被完成,其實作邏輯則是由各種 <a href="https://airflow.apache.org/code.html#operators">Operator</a> 負責。</p>
<p>你可以想像 <a href="https://airflow.apache.org/code.html#airflow.operators">Opeartors</a> 就是幫我們完成特定種類工作的小幫手,像是一些常見的例子:</p>
<ul>
<li><a href="https://airflow.apache.org/code.html#airflow.operators.PythonOperator">PythonOperator</a> 執行一個 Python 函式</li>
<li><a href="https://airflow.apache.org/code.html#airflow.operators.BashOperator">BashOperator</a> 執行 Bash 指令</li>
<li><a href="https://airflow.apache.org/code.html#airflow.operators.S3KeySensor">S3KeySensor</a> 監測 S3 上的檔案存不存在</li>
<li><a href="https://airflow.apache.org/code.html#airflow.operators.SlackAPIPostOperator">SlackAPIPostOperator</a> 送訊息給 Slack</li>
<li>...</li>
</ul>
<p>要建立一個 DAG 裡的工作(Task)就是依照你想要它完成的特定目標,來選擇合適的 Operator。比方說上面的 <code>superman_task</code> 就是透過 <code>PythonOperator</code> 來執行特定的 Python 函式 <code>fn_superman</code>,而該函式則把 App 裡頭所有的「邏輯工作」實作了。</p>
<p><code>PythonOperator</code> 可以說是 Airflow 裡最基本也最強大的 <a href="https://airflow.apache.org/code.html#airflow.operators">Opeartors</a> 之一。學會使用方法以後,你可以將任何你定義的 Python 函式變成一個 Airflow 工作。</p>
<p>基本的使用方法非常簡單,你只要指定一個可呼叫的 Python 函式給 <code>python_callable</code> 參數以及設定一個工作名稱(task_id)即可:</p>
<div class="highlight"><pre><span></span><span class="n">superman_task</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'superman_task'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">fn_superman</span>
<span class="p">)</span>
</pre></div>
<p>在後面的 <a href="#Airflow-變數以及-Jinja-模板">Airflow 變數以及 Jinja 模板</a>章節,我們則會看到如何使用其他 Operator 如 <a href="https://airflow.apache.org/code.html#airflow.operators.SlackAPIPostOperator">SlackAPIPostOperator</a> 來新增一個可以幫我們送 Slack 訊息的工作。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h4 id="測試開發-Airflow-工作">測試開發 Airflow 工作<a class="anchor-link" href="#測試開發-Airflow-工作">¶</a></h4>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>你現在應該已經理解 DAG 本身關注的是有哪些工作以及他們的相依性,而不是各個工作的實作邏輯。(雖然在 <code>comic_app_v1</code> DAG 裡頭只有一個工作所以不存在相依性問題)</p>
<p>我們用 <code>python dags/comic_app_v1.py</code> 確保 DAG 本身沒有語法問題以後,接著就是要確保裡頭每個工作(Task)的執行結果如我們預期。</p>
<p>在 <code>comic_app_v1</code> DAG 裡頭,我們只有一個工作 <code>superman_task</code> (其透過一個函式 <code>fn_superman</code> 幫我們做所有邏輯上的工作):</p>
<div class="highlight"><pre><span></span><span class="k">def</span> <span class="nf">fn_superman</span><span class="p">():</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"取得使用者的閱讀紀錄"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"去漫畫網站看有沒有新的章節"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"跟紀錄比較,有沒有新連載?"</span><span class="p">)</span>
<span class="c1"># Murphy's Law</span>
<span class="n">accident_occur</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">></span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">accident_occur</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"</span><span class="se">\n</span><span class="s2">天有不測風雲,人有旦夕禍福"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"工作遇到預期外狀況被中斷</span><span class="se">\n</span><span class="s2">"</span><span class="p">)</span>
<span class="k">return</span>
<span class="n">new_comic_available</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">></span> <span class="mi">1</span>
<span class="k">if</span> <span class="n">new_comic_available</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"寄 Slack 通知"</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"更新閱讀紀錄"</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"什麼都不幹,工作順利結束"</span><span class="p">)</span>
<span class="k">with</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'comic_app_v1'</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span> <span class="k">as</span> <span class="n">dag</span><span class="p">:</span>
<span class="n">superman_task</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'superman_task'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">fn_superman</span>
<span class="p">)</span>
</pre></div>
<p>這樣的設計有什麼優點?</p>
<p>一般來說 DAG 跟工作是一對多的關係(一個工作流程裡有多個小工作要做):要讓一個 DAG 順利跑完,理所當然所有工作都要順利執行完畢。但 <code>comic_app_v1</code> DAG 是個特例,它裡頭只有一個工作,一人吃全家飽。只要測試且確保 <code>superman_task</code> 工作的執行結果如我們預期,就代表 DAG <code>comic_app_v1</code> 能順利完成,簡單易懂!</p>
<p>我們可以使用 Airflow 的 <code>test</code> 指令來幫我們測試這個工作:</p>
<div class="highlight"><pre><span></span>airflow<span class="w"> </span><span class="nb">test</span><span class="w"> </span>comic_app_v1<span class="w"> </span>superman_task<span class="w"> </span><span class="m">2018</span>-08-18
</pre></div>
<p>這行指令是讓 Airflow 幫我們測試 <code>comic_app_v1</code> DAG 裡頭的 <code>superman_task</code> 工作,並假設這個工作是在 <code>2018-08-18</code> 這個日期被執行。在我們的 App 例子中,<code>superman_task</code> 工作的執行結果基本上不會受到執行日期的影響,可以隨便你改。</p>
<p>但想像一個每天 24 點 0 分準備被啟動,從資料庫撈出數據並計算「當天」使用者數目的工作。其 SQL 查詢可能長這樣:</p>
<div class="highlight"><pre><span></span><span class="k">SELECT</span><span class="w"> </span><span class="k">COUNT</span><span class="p">(</span><span class="n">user_id</span><span class="p">)</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">num_new_users</span>
<span class="k">FROM</span><span class="w"> </span><span class="n">user_activities</span>
<span class="k">WHERE</span><span class="w"> </span><span class="n">dt</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">'{execute_date}'</span>
</pre></div>
<p>因為此工作的結果會受到執行日期的影響,在測試的時候,你就得仔細選擇執行日期(execute_date)。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>拉回 <code>superman_task</code> 工作的測試。</p>
<p>從上面 <code>fn_superman</code> 函式的程式碼你可能已經注意到,我埋了個小彩蛋,每次執行都會有不同的執行結果。</p>
<p>幸運的話你會得到下面這種:</p>
<div class="highlight"><pre><span></span>airflow<span class="w"> </span><span class="nb">test</span><span class="w"> </span>comic_app_v1<span class="w"> </span>superman_task<span class="w"> </span><span class="m">2018</span>-08-01
取得使用者的閱讀紀錄
去漫畫網站看有沒有新的章節
跟紀錄比較,有沒有新連載?
什麼都不幹,工作順利結束
</pre></div>
<p>喔耶!這執行結果如我們預期,可以讓 DAG 上線定期執行了!</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>不過別高興得太早。多執行幾次看看。如果墨菲定律發生,你會得到失敗的結果:</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<div class="highlight"><pre><span></span>airflow<span class="w"> </span><span class="nb">test</span><span class="w"> </span>comic_app_v1<span class="w"> </span>superman_task<span class="w"> </span><span class="m">2018</span>-08-01
取得使用者的閱讀紀錄
去漫畫網站看有沒有新的章節
跟紀錄比較,有沒有新連載?
天有不測風雲,人有旦夕禍福
工作遇到預期外狀況被中斷
</pre></div>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>假設此執行結果不是我們預期的結果,該怎麼辦?</p>
<p>如果你反應夠快,會說:</p>
<p>「那又怎麼樣?墨菲定律不會每次發生,而且就算遇到而導致工作失敗的話, Airflow 不是會自己幫我們重試嗎?」</p>
<p>的確,這是我們在前面<a href="#輕鬆排程">輕鬆排程</a>章節提到 Airflow 的強處。畢竟我們這 App 只是在檢查最新連載,不是做什麼很複雜的運算。基本上就算 DAG 裡頭這唯一一個工作 <code>superman_task</code> 失敗了導致整個 DAG 要重跑,Airflow 也可以應付得來。</p>
<p>但問題在於,企業在運行資料管道的時候,常常需要分成很多步驟,某些步驟可能需要龐大的計算資源跟時間(像是將每天使用者使用 App 的幾億筆紀錄做匯總存入資料庫),有些則很輕量(如存取一個外部 API 取得外匯比例)。</p>
<p>現在假設你無視這些不同步驟的性質差異,將它們全部放在一個 <code>fn_superman</code> 函式裡頭並只建立一個 Airflow 工作,當該 Airflow 工作裡頭任何一個輕量的步驟失敗,Airflow 得重跑整個工作,導致所有龐大計算的步驟也得跟著重新執行,重試的時間/計算成本會大到你哭出來。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing code_cell rendered">
<div class="input">
<div class="inner_cell">
<div class="input_area">
<div class="highlight hl-ipython3"><blockquote><p>雞蛋不要放在同個籃子裡。為邏輯上獨立的工作/步驟分別建立 Airflow 工作,可以讓 Airflow 只從失敗的工作開始重新做起。</p></blockquote></div>
</div>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>因此一個比較好的 Airflow DAG 設計模式是為我們 App 裡頭每個邏輯上獨立的工作:</p>
<ul>
<li>取得使用者的閱讀紀錄</li>
<li>去漫畫網站看有沒有新的章節</li>
<li>跟紀錄比較,有沒有新連載?<ul>
<li>沒有:<ul>
<li>什麼都不幹,結束</li>
</ul>
</li>
<li>有:<ul>
<li>寄 Slack 通知</li>
<li>更新閱讀紀錄</li>
</ul>
</li>
</ul>
</li>
</ul>
<p>都分別建立如同 <code>superman_task</code> 的 Airflow 工作,並定義好它們之間的相依性(Dependencies)。而這將是我們下一節的重點。</p>
<p>題外話:你可能會納悶為何我們只測試 <code>superman_task</code> 工作而沒測試整個 <code>comic_app_v1</code> DAG。當然「一人吃全家飽」是個理由:只要確定 DAG 裡頭唯一一個工作正確運作,我們就能保證此 DAG 沒問題。</p>
<p>事實上還有一個原因:<code>airflow test</code> 指令實際上只能用來測試單一工作,而不能測試整個 DAG。關於 DAG 的測試我們在後面的 <a href="#Airflow-排程器">Airflow 排程器</a> 章節會詳細說明。</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<h3 id="App-版本二:模組化_1"><a name="app-v2"></a>App 版本二:模組化<a class="anchor-link" href="#App-版本二:模組化">¶</a></h3>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>所以現在我們要做的改善(Refactoring)很簡單:</p>
<ul>
<li>將 App 邏輯從 <code>comic_app_v1</code> DAG 中的函式 <code>fn_superman</code> 中拿出來</li>
<li>為 App 的每個步驟分別定義一個 Python 函式</li>
<li>在 DAG 裡頭利用 <code>PythonOperator</code> 建立多個 Airflow 工作並分別呼叫這些函式</li>
<li>定義這些工作的執行順序</li>
</ul>
<p>版本二的 App 完整的程式碼如下:</p>
</div>
</div>
</div>
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">time</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">timedelta</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="kn">import</span> <span class="n">DAG</span>
<span class="kn">from</span> <span class="nn">airflow.operators.python_operator</span> <span class="kn">import</span> <span class="n">PythonOperator</span><span class="p">,</span> <span class="n">BranchPythonOperator</span>
<span class="kn">from</span> <span class="nn">airflow.operators.dummy_operator</span> <span class="kn">import</span> <span class="n">DummyOperator</span>
<span class="kn">from</span> <span class="nn">airflow.operators.slack_operator</span> <span class="kn">import</span> <span class="n">SlackAPIPostOperator</span>
<span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">'owner'</span><span class="p">:</span> <span class="s1">'Meng Lee'</span><span class="p">,</span>
<span class="s1">'start_date'</span><span class="p">:</span> <span class="n">datetime</span><span class="p">(</span><span class="mi">2100</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">0</span><span class="p">,</span> <span class="mi">0</span><span class="p">),</span>
<span class="s1">'schedule_interval'</span><span class="p">:</span> <span class="s1">'@daily'</span><span class="p">,</span>
<span class="s1">'retries'</span><span class="p">:</span> <span class="mi">2</span><span class="p">,</span>
<span class="s1">'retry_delay'</span><span class="p">:</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">minutes</span><span class="o">=</span><span class="mi">1</span><span class="p">)</span>
<span class="p">}</span>
<span class="k">def</span> <span class="nf">process_metadata</span><span class="p">(</span><span class="n">mode</span><span class="p">,</span> <span class="o">**</span><span class="n">context</span><span class="p">):</span>
<span class="k">if</span> <span class="n">mode</span> <span class="o">==</span> <span class="s1">'read'</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"取得使用者的閱讀紀錄"</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">mode</span> <span class="o">==</span> <span class="s1">'write'</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"更新閱讀紀錄"</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">check_comic_info</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span>
<span class="n">all_comic_info</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">task_ids</span><span class="o">=</span><span class="s1">'get_read_history'</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"去漫畫網站看有沒有新的章節"</span><span class="p">)</span>
<span class="n">anything_new</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> <span class="o">%</span> <span class="mi">2</span> <span class="o">></span> <span class="mi">1</span>
<span class="k">return</span> <span class="n">anything_new</span><span class="p">,</span> <span class="n">all_comic_info</span>
<span class="k">def</span> <span class="nf">decide_what_to_do</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span>
<span class="n">anything_new</span><span class="p">,</span> <span class="n">all_comic_info</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">task_ids</span><span class="o">=</span><span class="s1">'check_comic_info'</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"跟紀錄比較,有沒有新連載?"</span><span class="p">)</span>
<span class="k">if</span> <span class="n">anything_new</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">'yes_generate_notification'</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">'no_do_nothing'</span>
<span class="k">def</span> <span class="nf">generate_message</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span>
<span class="n">_</span><span class="p">,</span> <span class="n">all_comic_info</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">task_ids</span><span class="o">=</span><span class="s1">'check_comic_info'</span><span class="p">)</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">"產生要寄給 Slack 的訊息內容並存成檔案"</span><span class="p">)</span>
<span class="k">with</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'comic_app_v2'</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span> <span class="k">as</span> <span class="n">dag</span><span class="p">:</span>
<span class="n">get_read_history</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'get_read_history'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">process_metadata</span><span class="p">,</span>
<span class="n">op_args</span><span class="o">=</span><span class="p">[</span><span class="s1">'read'</span><span class="p">]</span>
<span class="p">)</span>
<span class="n">check_comic_info</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'check_comic_info'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">check_comic_info</span><span class="p">,</span>
<span class="n">provide_context</span><span class="o">=</span><span class="kc">True</span>
<span class="p">)</span>
<span class="n">decide_what_to_do</span> <span class="o">=</span> <span class="n">BranchPythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'new_comic_available'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">decide_what_to_do</span><span class="p">,</span>
<span class="n">provide_context</span><span class="o">=</span><span class="kc">True</span>
<span class="p">)</span>
<span class="n">update_read_history</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'update_read_history'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">process_metadata</span><span class="p">,</span>
<span class="n">op_args</span><span class="o">=</span><span class="p">[</span><span class="s1">'write'</span><span class="p">],</span>
<span class="n">provide_context</span><span class="o">=</span><span class="kc">True</span>
<span class="p">)</span>
<span class="n">generate_notification</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'yes_generate_notification'</span><span class="p">,</span>
<span class="n">python_callable</span><span class="o">=</span><span class="n">generate_message</span><span class="p">,</span>
<span class="n">provide_context</span><span class="o">=</span><span class="kc">True</span>
<span class="p">)</span>
<span class="n">send_notification</span> <span class="o">=</span> <span class="n">SlackAPIPostOperator</span><span class="p">(</span>
<span class="n">task_id</span><span class="o">=</span><span class="s1">'send_notification'</span><span class="p">,</span>