2222 config = yaml .load (yaml_file )
2323now = datetime .now ()
2424_time_stamp = str (datetime .timestamp (now ))
25+ _topic = 'kafka_connect_upgrade'
26+ _connector = 'kafka_connect'
27+ _connector_ack = 'kafka_connect_ack'
2528
2629
2730def start_old_connector ():
@@ -41,47 +44,53 @@ def start_old_connector():
4144
4245def generate_kafka_events (num ):
4346 # Generate message data
44- topics = ["kafka_data_gen" ]
47+ topics = [_topic ]
4548 connector_content = {
46- "name" : "kafka_connect" ,
49+ "name" : _connector ,
4750 "config" : {
4851 "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
4952 "tasks.max" : "1" ,
5053 "splunk.indexes" : config ["splunk_index" ],
51- "topics" : "kafka_data_gen" ,
54+ "topics" : _topic ,
5255 "splunk.hec.ack.enabled" : "false" ,
5356 "splunk.hec.uri" : config ["splunk_hec_url" ],
5457 "splunk.hec.ssl.validate.certs" : "false" ,
55- "splunk.hec.token" : config ["splunk_token" ]
58+ "splunk.hec.token" : config ["splunk_token" ],
59+ "splunk.sources" : _connector
5660 }
5761 }
5862 create_kafka_connector (config , connector_content )
5963 connector_content_ack = {
60- "name" : "kafka_connect_ack" ,
64+ "name" : _connector_ack ,
6165 "config" : {
6266 "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
6367 "tasks.max" : "1" ,
6468 "splunk.indexes" : config ["splunk_index" ],
65- "topics" : "kafka_data_gen" ,
69+ "topics" : _topic ,
6670 "splunk.hec.ack.enabled" : "true" ,
6771 "splunk.hec.uri" : config ["splunk_hec_url" ],
6872 "splunk.hec.ssl.validate.certs" : "false" ,
69- "splunk.hec.token" : config ["splunk_token_ack" ]
73+ "splunk.hec.token" : config ["splunk_token_ack" ],
74+ "splunk.sources" : _connector_ack
7075 }
7176 }
7277 create_kafka_connector (config , connector_content_ack )
73- create_kafka_topics (config , topics )
78+ client = KafkaAdminClient (bootstrap_servers = config ["kafka_broker_url" ], client_id = 'test' )
79+ broker_topics = client .list_topics ()
80+ logger .info (broker_topics )
81+ if _topic not in broker_topics :
82+ create_kafka_topics (config , topics )
7483 producer = KafkaProducer (bootstrap_servers = config ["kafka_broker_url" ],
7584 value_serializer = lambda v : json .dumps (v ).encode ('utf-8' ))
7685
7786 for _ in range (num ):
7887 msg = {"timestamp" : _time_stamp }
79- producer .send ("kafka_data_gen" , msg )
88+ producer .send (_topic , msg )
8089 time .sleep (0.05 )
8190 producer .flush ()
8291
8392
84- def upgrade_connector ():
93+ def upgrade_connector_plugin ():
8594 cmds = ["sudo kill $(sudo lsof -t -i:8083) && sleep 2" ,
8695 "sudo rm {}/{} && sleep 2" .format (config ["connector_path" ], config ["old_connector_name" ]),
8796 "sudo cp {0}/splunk-kafka-connect*.jar {1} && sleep 2" .format (config ["connector_build_target" ],
@@ -96,10 +105,50 @@ def upgrade_connector():
96105 stderr = subprocess .STDOUT )
97106 output , error = proc .communicate ()
98107 logger .info (output )
108+ time .sleep (2 )
109+ update_kafka_connectors ()
99110 except OSError as e :
100111 logger .error (e )
101112
102113
114+ def update_kafka_connectors ():
115+ logger .info ("Update kafka connectors ..." )
116+ connector_content = {
117+ "name" : _connector ,
118+ "config" : {
119+ "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
120+ "tasks.max" : "1" ,
121+ "splunk.indexes" : config ["splunk_index" ],
122+ "topics" : _topic ,
123+ "splunk.hec.ack.enabled" : "false" ,
124+ "splunk.hec.uri" : config ["splunk_hec_url" ],
125+ "splunk.hec.ssl.validate.certs" : "false" ,
126+ "splunk.hec.token" : config ["splunk_token" ],
127+ "splunk.sources" : _connector ,
128+ "splunk.hec.json.event.formatted" : "true" ,
129+ "splunk.hec.raw" : True
130+ }
131+ }
132+ create_kafka_connector (config , connector_content )
133+ connector_content_ack = {
134+ "name" : _connector_ack ,
135+ "config" : {
136+ "connector.class" : "com.splunk.kafka.connect.SplunkSinkConnector" ,
137+ "tasks.max" : "1" ,
138+ "splunk.indexes" : config ["splunk_index" ],
139+ "topics" : _topic ,
140+ "splunk.hec.ack.enabled" : "true" ,
141+ "splunk.hec.uri" : config ["splunk_hec_url" ],
142+ "splunk.hec.ssl.validate.certs" : "false" ,
143+ "splunk.hec.token" : config ["splunk_token_ack" ],
144+ "splunk.sources" : _connector_ack ,
145+ "splunk.hec.json.event.formatted" : "true" ,
146+ "splunk.hec.raw" : True
147+ }
148+ }
149+ create_kafka_connector (config , connector_content_ack )
150+
151+
103152if __name__ == '__main__' :
104153 logger .info ("Start old Kafka connector ..." )
105154 thread_old_connect = threading .Thread (target = start_old_connector , daemon = True )
@@ -110,15 +159,26 @@ def upgrade_connector():
110159 thread_gen .start ()
111160 time .sleep (50 )
112161 logger .info ("Upgrade Kafka connector ..." )
113- thread_upgrade = threading .Thread (target = upgrade_connector , daemon = True )
162+ thread_upgrade = threading .Thread (target = upgrade_connector_plugin , daemon = True )
114163 thread_upgrade .start ()
115164 time .sleep (100 )
116- search_query = "index={0} | search timestamp=\" {1}\" " .format (config ['splunk_index' ], _time_stamp )
117- logger .info (search_query )
118- events = check_events_from_splunk (start_time = "-15m@m" ,
165+ search_query_1 = "index={0} | search timestamp=\" {1}\" source::{2}" .format (config ['splunk_index' ], _time_stamp ,
166+ _connector )
167+ logger .info (search_query_1 )
168+ events_1 = check_events_from_splunk (start_time = "-15m@m" ,
119169 url = config ["splunkd_url" ],
120170 user = config ["splunk_user" ],
121- query = ["search {}" .format (search_query )],
171+ query = ["search {}" .format (search_query_1 )],
122172 password = config ["splunk_password" ])
123- logger .info ("Splunk received %s events in the last 15m" , len (events ))
124- assert len (events ) == 4000
173+ logger .info ("Splunk received %s events in the last 15m" , len (events_1 ))
174+ assert len (events_1 ) == 2000
175+ search_query_2 = "index={0} | search timestamp=\" {1}\" source::{2}" .format (config ['splunk_index' ], _time_stamp ,
176+ _connector_ack )
177+ logger .info (search_query_2 )
178+ events_2 = check_events_from_splunk (start_time = "-15m@m" ,
179+ url = config ["splunkd_url" ],
180+ user = config ["splunk_user" ],
181+ query = ["search {}" .format (search_query_2 )],
182+ password = config ["splunk_password" ])
183+ logger .info ("Splunk received %s events in the last 15m" , len (events_2 ))
184+ assert len (events_2 ) == 2000
0 commit comments