-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstock_quote_kafka_producer.py
68 lines (44 loc) · 1.63 KB
/
stock_quote_kafka_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from time import sleep
from kafka import KafkaProducer
import requests
import sys
import json
import os
import datetime
# offset must be >=0 and less than MAX_OFFSET
MAX_OFFSET = 390
TOPIC_NAME = "stockquotes"
link = "https://api.iextrading.com/1.0/stock/AAPL/chart/date/{0}"
def get_stock_details(date_val):
complete_request = link.format(date_val)
print(complete_request)
r = requests.get(complete_request)
stock_details = r.json()
return stock_details
def get_stock_details_for_minute(stock_details, offset):
start_time = datetime.time(9, 30)
delta = datetime.timedelta(minutes = offset)
required_time = (datetime.datetime.combine(datetime.date(1,1,1), start_time) + delta).time()
print(required_time)
required_key = str(required_time.hour).zfill(2) + ":" + str(required_time.minute).zfill(2)
print(required_key)
required_record = stock_details[offset]
print(required_record)
return required_record
if __name__ == "__main__":
if len(sys.argv) == 2:
date_val = sys.argv[1]
else:
date_val = "20190418"
stock_details = get_stock_details(date_val)
if stock_details and len(stock_details) > 0:
stock_details = stock_details
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for e in range(MAX_OFFSET):
data = get_stock_details_for_minute(stock_details, e)
producer.send(TOPIC_NAME, value=data)
print("Pushed for offset: {0}".format(e))
sleep(2)
else:
print("No stock details")
# print(stock_details)