-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathNasdaqKafkaAvroConsumer.py
156 lines (136 loc) · 6.66 KB
/
NasdaqKafkaAvroConsumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import logging
import sys
from ncdssdk.src.main.python.ncdsclient.internal.utils.AuthenticationConfigLoader import AuthenticationConfigLoader
from ncdssdk.src.main.python.ncdsclient.internal.ReadSchemaTopic import ReadSchemaTopic
from ncdssdk.src.main.python.ncdsclient.internal.KafkaAvroConsumer import KafkaAvroConsumer
from ncdssdk.src.main.python.ncdsclient.internal.utils.KafkaConfigLoader import KafkaConfigLoader
from ncdssdk.src.main.python.ncdsclient.internal.utils import IsItPyTest, SeekToMidnight
from confluent_kafka import TopicPartition, OFFSET_INVALID, OFFSET_END, OFFSET_BEGINNING
import ncdssdk.src.main.python.ncdsclient.internal.utils.ConsumerConfig as config
from datetime import datetime
from ncdssdk.src.main.python.ncdsclient.internal.utils.Oauth import Oauth
import datetime
class NasdaqKafkaAvroConsumer():
"""
This class creates a Kafka consumer for Avro messages. Creates a consumer from
the schema generated by the config arguments.
Attributes:
security_cfg (dict): the JSON config dict with authentication configuration properties set
kafka_cfg (dict): the JSON config dict with kafka configuration properties set
"""
def __init__(self, security_cfg, kafka_cfg):
"""
Initializes security_cfg and kafka_cfg, and sets the variables for client_ID,
security_props, kafka_props, and read_schema_topic
"""
self.security_cfg = security_cfg
self.kafka_cfg = kafka_cfg
self.client_ID = None
self.security_props = None
self.kafka_props = None
self.read_schema_topic = ReadSchemaTopic()
self.logger = logging.getLogger(__name__)
kafka_config_loader = KafkaConfigLoader()
auth_config_loader = AuthenticationConfigLoader()
if self.kafka_cfg is None:
if IsItPyTest.is_py_test():
pytest_kafka_cfg = kafka_config_loader.load_test_config()
self.kafka_props = pytest_kafka_cfg
else:
raise Exception("Kafka Configuration not defined")
else:
self.kafka_props = self.kafka_cfg
kafka_config_loader.validate_and_add_specific_properties(
self.kafka_props)
if self.security_cfg is None:
self.security_props = {}
self.security_props[auth_config_loader.OAUTH_CLIENT_ID] = "unit-test"
else:
self.security_props = self.security_cfg
oauth = Oauth(self.security_cfg)
self.kafka_props["oauth_cb"] = oauth.oauth_cb
self.read_schema_topic.set_security_props(self.security_props)
self.read_schema_topic.set_kafka_props(self.kafka_props)
self.client_ID = auth_config_loader.get_client_id(self.security_props)
def get_kafka_consumer(self, stream_name, timestamp=None):
"""
This method returns the Kafka consumer.
Args:
stream_name (str): Kafka message series topic name
timestamp (int): timestamp in milliseconds since the UNIX epoch
:rtype: `confluent_kafka.KafkaConsumer <https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer>`_
"""
kafka_schema = self.read_schema_topic.read_schema(stream_name)
if kafka_schema is None:
raise Exception(
"Kafka Schema not found for stream: " + stream_name)
kafka_consumer = self.get_consumer(kafka_schema, stream_name)
topic_partition = TopicPartition(
topic=stream_name + ".stream", partition=0, offset=OFFSET_END)
self.logger.debug(
f"Assigning kafka consumer to topic partition: {topic_partition}")
kafka_consumer.assign([topic_partition])
self.logger.debug(f"Assignment: {kafka_consumer.assignment()}")
if timestamp is None:
self.logger.debug("Timestamp is none")
auto_offset_cfg = self.kafka_props.get(
config.AUTO_OFFSET_RESET_CONFIG)
if auto_offset_cfg == "earliest" or auto_offset_cfg == "smallest" or auto_offset_cfg == "beginning":
self.logger.debug(
f"Auto offset reset config set to: {auto_offset_cfg}")
return SeekToMidnight.seek_to_midnight_at_past_day(kafka_consumer, topic_partition, 0)
else:
return kafka_consumer
else:
self.logger.debug("Timestamp is not none: " + str(timestamp))
try:
topic_partition.offset = timestamp
self.logger.debug(
"offset: " + str(topic_partition.offset) + ", timestamp: " + str(timestamp))
offsets_for_times = kafka_consumer.offsets_for_times(
[topic_partition], timeout=5)
except Exception as e:
self.logger.exception(e)
sys.exit(0)
self.logger.debug(
"topic partition before offsets_for_times: " + str(topic_partition))
self.logger.debug(
"topic partition after offsets for times: " + str(offsets_for_times))
partition_offset = offsets_for_times[0].offset
self.logger.debug(
f"Successfully received offset {partition_offset}")
if offsets_for_times and partition_offset is not OFFSET_INVALID:
topic_partition.offset = partition_offset
kafka_consumer.seek(topic_partition)
else:
self.logger.warning(
"No available offset. Continuing without seek")
return kafka_consumer
def get_consumer(self, avro_schema, stream_name):
"""
Args:
avro_schema: schema for the topic
Returns:
a :class:`.KafkaAvroConsumer` instance with a key and value deserializer set through the avro_schema parameter
"""
if 'auto.offset.reset' not in self.kafka_props:
self.kafka_props[config.AUTO_OFFSET_RESET_CONFIG] = "earliest"
if 'group.id' not in self.kafka_props:
self.kafka_props[config.GROUP_ID_CONFIG] = f'{self.client_ID}_{stream_name}_{datetime.datetime.today().day}'
return KafkaAvroConsumer(self.kafka_props, avro_schema)
def get_schema_for_topic(self, topic):
"""
Retrieves the schema for the given topic.
Args:
topic: topic name
Returns:
org.apache.avro.Schema: the schema given the topic
"""
kafka_schema = self.read_schema_topic.read_schema(topic)
return kafka_schema
def get_topics(self):
"""
Returns all topics.
"""
topics_list = self.read_schema_topic.get_topics()
return topics_list