1
+ import requests
2
+ import pandas as pd
3
+ from datetime import datetime , timedelta
4
+
5
+ from airflow import DAG
6
+ from airflow .models import Variable
7
+ from airflow .operators .python import PythonOperator
8
+ from airflow .providers .postgres .operators .postgres import PostgresOperator
9
+ from airflow .providers .postgres .hooks .postgres import PostgresHook
10
+ from airflow .providers .amazon .aws .hooks .s3 import S3Hook
11
+ from airflow .providers .amazon .aws .operators .redshift_data import RedshiftDataOperator
12
+ from airflow .providers .amazon .aws .transfers .s3_to_redshift import S3ToRedshiftOperator
13
+
14
+
15
+ '''To launch: docker-compose up -d'''
16
+
17
+ default_args = {
18
+ 'owner' : 'abrook1' ,
19
+ 'retries' : 0 ,
20
+ 'retry delay' : timedelta (minutes = 5 )
21
+ }
22
+
23
+ '''
24
+ Function to call the coin api and return the ohlcv response json data,
25
+ Transform response data into a dataframe and modifying some column dtypes,
26
+ and Load df into airflow postgres database
27
+ '''
28
+ def api_to_postgres (coin_id , coin , ** kwargs ):
29
+ #api parameters
30
+ period = '5MIN'
31
+ coin_id = coin_id
32
+ api_key = Variable .get ("api_key" )
33
+
34
+ #call api
35
+ url = f'https://rest.coinapi.io/v1/ohlcv/{ coin_id } /latest?period_id={ period } &limit=1'
36
+ headers = {'X-CoinAPI-Key' : api_key }
37
+
38
+ response = requests .get (url , headers = headers )
39
+ print (response .headers )
40
+ response_json = response .json ()
41
+
42
+ #transform api response to df
43
+ ohlcv_df = pd .DataFrame (response_json )
44
+
45
+ ohlcv_df ['time_period_start' ] = pd .to_datetime (ohlcv_df ['time_period_start' ]).dt .tz_convert (None )
46
+ ohlcv_df ['time_period_end' ] = pd .to_datetime (ohlcv_df ['time_period_end' ]).dt .tz_convert (None )
47
+ ohlcv_df ['time_open' ] = pd .to_datetime (ohlcv_df ['time_open' ]).dt .tz_convert (None )
48
+ ohlcv_df ['time_close' ] = pd .to_datetime (ohlcv_df ['time_close' ]).dt .tz_convert (None )
49
+ ohlcv_df ['period_date' ] = ohlcv_df ['time_period_start' ].dt .date
50
+
51
+ #load df to local postgres database
52
+ hook = PostgresHook (postgres_conn_id = "postgres_localhost" )
53
+ engine = hook .get_sqlalchemy_engine ()
54
+ ohlcv_df .to_sql (name = f"{ coin } _prices" ,con = engine , if_exists = 'append' , index = False )
55
+ return
56
+
57
+ def postgres_to_s3 (coin , ds , ts_nodash ,** kwargs ):
58
+ #save postgres data to csv file
59
+ hook = PostgresHook (postgres_conn_id = "postgres_localhost" )
60
+ engine = hook .get_sqlalchemy_engine ()
61
+ sql = f'''
62
+ select *
63
+ from { coin } _prices
64
+ where period_date = '{ ds } '
65
+ '''
66
+ df = pd .read_sql_query (sql = sql , con = engine , index_col = 'id' )
67
+ csv_name = f'{ coin } _price_data_{ ts_nodash } .csv'
68
+ filename = f'./{ csv_name } '
69
+ df .to_csv (filename , header = False )
70
+
71
+ #load csv file to s3
72
+ s3_hook = S3Hook (aws_conn_id = "s3_conn" )
73
+ s3_hook .load_file (
74
+ filename = filename ,
75
+ key = f'{ coin } _price_data_{ ts_nodash } .csv' ,
76
+ bucket_name = "crypto-price-bucket-abro" ,
77
+ replace = True
78
+ )
79
+ return csv_name
80
+
81
+
82
+ with DAG (
83
+ default_args = default_args ,
84
+ dag_id = 'crypto_prices_dag' ,
85
+ description = 'dag orchestrating crypto price ETL' ,
86
+ start_date = datetime (2023 , 4 , 25 ),
87
+ schedule_interval = '*/5 * * * *' ,
88
+ catchup = False
89
+ ) as dag :
90
+ create_btc_postgres_table = PostgresOperator (
91
+ task_id = 'create_btc_postgres_table' ,
92
+ postgres_conn_id = 'postgres_localhost' ,
93
+ sql = '''
94
+ create table if not exists bitcoin_prices (
95
+ id SERIAL NOT NULL,
96
+ time_period_start timestamp,
97
+ time_period_end timestamp,
98
+ time_open timestamp,
99
+ time_close timestamp,
100
+ price_open int,
101
+ price_high int,
102
+ price_low int,
103
+ price_close int,
104
+ volume_traded float,
105
+ trades_count int,
106
+ period_date date,
107
+ PRIMARY KEY(id)
108
+ );
109
+ '''
110
+ )
111
+
112
+ create_eth_postgres_table = PostgresOperator (
113
+ task_id = 'create_eth_postgres_table' ,
114
+ postgres_conn_id = 'postgres_localhost' ,
115
+ sql = '''
116
+ create table if not exists ethereum_prices (
117
+ id SERIAL NOT NULL,
118
+ time_period_start timestamp,
119
+ time_period_end timestamp,
120
+ time_open timestamp,
121
+ time_close timestamp,
122
+ price_open int,
123
+ price_high int,
124
+ price_low int,
125
+ price_close int,
126
+ volume_traded float,
127
+ trades_count int,
128
+ period_date date,
129
+ PRIMARY KEY(id)
130
+ );
131
+ '''
132
+ )
133
+
134
+ create_xrp_postgres_table = PostgresOperator (
135
+ task_id = 'create_xrp_postgres_table' ,
136
+ postgres_conn_id = 'postgres_localhost' ,
137
+ sql = '''
138
+ create table if not exists ripple_prices (
139
+ id SERIAL NOT NULL,
140
+ time_period_start timestamp,
141
+ time_period_end timestamp,
142
+ time_open timestamp,
143
+ time_close timestamp,
144
+ price_open int,
145
+ price_high int,
146
+ price_low int,
147
+ price_close int,
148
+ volume_traded float,
149
+ trades_count int,
150
+ period_date date,
151
+ PRIMARY KEY(id)
152
+ );
153
+ '''
154
+ )
155
+
156
+ btc_load = PythonOperator (
157
+ task_id = 'api_btc_to_postgres' ,
158
+ python_callable = api_to_postgres ,
159
+ op_kwargs = {'coin_id' :'BITSTAMP_SPOT_BTC_USD' ,'coin' :'bitcoin' }
160
+ )
161
+
162
+ eth_load = PythonOperator (
163
+ task_id = 'api_eth_to_postgres' ,
164
+ python_callable = api_to_postgres ,
165
+ op_kwargs = {'coin_id' :'BITSTAMP_SPOT_ETH_USD' , 'coin' :'ethereum' }
166
+ )
167
+
168
+ xrp_load = PythonOperator (
169
+ task_id = 'api_xrp_to_postgres' ,
170
+ python_callable = api_to_postgres ,
171
+ op_kwargs = {'coin_id' :'BITSTAMP_SPOT_XRP_USD' , 'coin' :'ripple' }
172
+ )
173
+
174
+ create_btc_postgres_table >> btc_load
175
+ create_eth_postgres_table >> eth_load
176
+ create_xrp_postgres_table >> xrp_load
177
+
178
+ with DAG (
179
+ default_args = default_args ,
180
+ dag_id = 'crypto_prices_load_to_s3_redshift' ,
181
+ description = 'dag orchestrating postgres table info to s3 and redshift' ,
182
+ start_date = datetime (2023 , 4 , 26 ),
183
+ schedule_interval = "@daily" ,
184
+ catchup = False
185
+ ) as dag :
186
+ btc_to_s3 = PythonOperator (
187
+ task_id = 'btc_postgres_to_s3' ,
188
+ python_callable = postgres_to_s3 ,
189
+ do_xcom_push = True ,
190
+ op_kwargs = {'coin' :'Bitcoin' }
191
+ )
192
+
193
+ eth_to_s3 = PythonOperator (
194
+ task_id = 'eth_postgres_to_s3' ,
195
+ python_callable = postgres_to_s3 ,
196
+ do_xcom_push = True ,
197
+ op_kwargs = {'coin' :'Ethereum' }
198
+ )
199
+
200
+ xrp_to_s3 = PythonOperator (
201
+ task_id = 'xrp_postgres_to_s3' ,
202
+ python_callable = postgres_to_s3 ,
203
+ do_xcom_push = True ,
204
+ op_kwargs = {'coin' :'Ripple' }
205
+ )
206
+
207
+ btc_redshift_table = RedshiftDataOperator (
208
+ task_id = "create_table_bitcoin_prices" ,
209
+ cluster_identifier = "crypto-redshift-cluster" ,
210
+ database = "mydb" ,
211
+ db_user = "project_test1" ,
212
+ aws_conn_id = "s3_conn" ,
213
+ sql = """
214
+ CREATE TABLE IF NOT EXISTS bitcoin_prices(
215
+ id int NOT NULL,
216
+ time_period_start timestamp,
217
+ time_period_end timestamp,
218
+ time_open timestamp,
219
+ time_close timestamp,
220
+ price_open int,
221
+ price_high int,
222
+ price_low int,
223
+ price_close int,
224
+ volume_traded float,
225
+ trades_count int,
226
+ period_date date
227
+ )
228
+ """
229
+ )
230
+
231
+ eth_redshift_table = RedshiftDataOperator (
232
+ task_id = "create_table_ethereum_prices" ,
233
+ cluster_identifier = "crypto-redshift-cluster" ,
234
+ database = "mydb" ,
235
+ db_user = "project_test1" ,
236
+ aws_conn_id = "s3_conn" ,
237
+ sql = """
238
+ CREATE TABLE IF NOT EXISTS ethereum_prices(
239
+ id int NOT NULL,
240
+ time_period_start timestamp,
241
+ time_period_end timestamp,
242
+ time_open timestamp,
243
+ time_close timestamp,
244
+ price_open int,
245
+ price_high int,
246
+ price_low int,
247
+ price_close int,
248
+ volume_traded float,
249
+ trades_count int,
250
+ period_date date
251
+ )
252
+ """
253
+ )
254
+
255
+ xrp_redshift_table = RedshiftDataOperator (
256
+ task_id = "create_table_xrp_prices" ,
257
+ cluster_identifier = "crypto-redshift-cluster" ,
258
+ database = "mydb" ,
259
+ db_user = "project_test1" ,
260
+ aws_conn_id = "s3_conn" ,
261
+ sql = """
262
+ CREATE TABLE IF NOT EXISTS ripple_prices(
263
+ id int NOT NULL,
264
+ time_period_start timestamp,
265
+ time_period_end timestamp,
266
+ time_open timestamp,
267
+ time_close timestamp,
268
+ price_open int,
269
+ price_high int,
270
+ price_low int,
271
+ price_close int,
272
+ volume_traded float,
273
+ trades_count int,
274
+ period_date date
275
+ )
276
+ """
277
+ )
278
+
279
+ btc_to_redshift = S3ToRedshiftOperator (
280
+ task_id = "transfer_btc_to_redshift" ,
281
+ redshift_conn_id = "redshift_conn" ,
282
+ aws_conn_id = "s3_conn" ,
283
+ s3_bucket = "crypto-price-bucket-abro" ,
284
+ s3_key = "{{ti.xcom_pull(task_ids='btc_postgres_to_s3')}}" ,
285
+ schema = "public" ,
286
+ table = "bitcoin_prices" ,
287
+ copy_options = ["csv" ]
288
+ )
289
+
290
+ eth_to_redshift = S3ToRedshiftOperator (
291
+ task_id = "transfer_eth_to_redshift" ,
292
+ redshift_conn_id = "redshift_conn" ,
293
+ aws_conn_id = "s3_conn" ,
294
+ s3_bucket = "crypto-price-bucket-abro" ,
295
+ s3_key = "{{ti.xcom_pull(task_ids='eth_postgres_to_s3')}}" ,
296
+ schema = "public" ,
297
+ table = "ethereum_prices" ,
298
+ copy_options = ["csv" ]
299
+ )
300
+
301
+ xrp_to_redshift = S3ToRedshiftOperator (
302
+ task_id = "transfer_xrp_to_redshift" ,
303
+ redshift_conn_id = "redshift_conn" ,
304
+ aws_conn_id = "s3_conn" ,
305
+ s3_bucket = "crypto-price-bucket-abro" ,
306
+ s3_key = "{{ti.xcom_pull(task_ids='xrp_postgres_to_s3')}}" ,
307
+ schema = "public" ,
308
+ table = "ripple_prices" ,
309
+ copy_options = ["csv" ]
310
+ )
311
+
312
+ btc_to_s3 >> btc_redshift_table >> btc_to_redshift
313
+ eth_to_s3 >> eth_redshift_table >> eth_to_redshift
314
+ xrp_to_s3 >> xrp_redshift_table >> xrp_to_redshift
0 commit comments