1
+ # Licensed to the Apache Software Foundation (ASF) under one or more
2
+ # contributor license agreements. See the NOTICE file distributed with
3
+ # this work for additional information regarding copyright ownership.
4
+ # The ASF licenses this file to You under the Apache License, Version 2.0
5
+ # (the "License"); you may not use this file except in compliance with
6
+ # the License. You may obtain a copy of the License at
7
+ #
8
+ # http://www.apache.org/licenses/LICENSE-2.0
9
+ #
10
+ # Unless required by applicable law or agreed to in writing, software
11
+ # distributed under the License is distributed on an "AS IS" BASIS,
12
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ # See the License for the specific language governing permissions and
14
+ # limitations under the License.
15
+
16
+ import json
17
+ import os
18
+
19
+ from ducktape .services .background_thread import BackgroundThreadService
20
+
21
+ from kafkatest .directory_layout .kafka_path import KafkaPathResolverMixin
22
+ from kafkatest .services .kafka import TopicPartition
23
+ from kafkatest .services .kafka .util import get_log4j_config_param , get_log4j_config_for_tools
24
+ from kafkatest .services .verifiable_client import VerifiableClientMixin
25
+ from kafkatest .version import DEV_BRANCH
26
+
27
+ class ShareConsumerState :
28
+ Started = 1
29
+ Dead = 2
30
+
31
+ class ShareConsumerEventHandler (object ):
32
+
33
+ def __init__ (self , node , idx , state = ShareConsumerState .Dead ):
34
+ self .node = node
35
+ self .idx = idx
36
+ self .total_consumed = 0
37
+ self .total_acknowledged = 0
38
+ self .total_acknowledged_failed = 0
39
+ self .consumed_per_partition = {}
40
+ self .acknowledged_per_partition = {}
41
+ self .acknowledged_per_partition_failed = {}
42
+ self .state = state
43
+
44
+ def handle_shutdown_complete (self , node = None , logger = None ):
45
+ self .state = ShareConsumerState .Dead
46
+ if node is not None and logger is not None :
47
+ logger .debug ("Shut down %s" % node .account .hostname )
48
+
49
+ def handle_startup_complete (self , node , logger ):
50
+ self .state = ShareConsumerState .Started
51
+ logger .debug ("Started %s" % node .account .hostname )
52
+
53
+ def handle_offsets_acknowledged (self , event , node , logger ):
54
+ if event ["success" ]:
55
+ self .total_acknowledged += event ["count" ]
56
+ for share_partition_data in event ["partitions" ]:
57
+ topic_partition = TopicPartition (share_partition_data ["topic" ], share_partition_data ["partition" ])
58
+ self .acknowledged_per_partition [topic_partition ] = self .acknowledged_per_partition .get (topic_partition , 0 ) + share_partition_data ["count" ]
59
+ logger .debug ("Offsets acknowledged for %s" % (node .account .hostname ))
60
+ else :
61
+ self .total_acknowledged_failed += event ["count" ]
62
+ for share_partition_data in event ["partitions" ]:
63
+ topic_partition = TopicPartition (share_partition_data ["topic" ], share_partition_data ["partition" ])
64
+ self .acknowledged_per_partition_failed [topic_partition ] = self .acknowledged_per_partition_failed .get (topic_partition , 0 ) + share_partition_data ["count" ]
65
+ logger .debug ("Offsets acknowledged for %s" % (node .account .hostname ))
66
+ logger .debug ("Offset acknowledgement failed for: %s" % (node .account .hostname ))
67
+
68
+ def handle_records_consumed (self , event , node , logger ):
69
+ self .total_consumed += event ["count" ]
70
+ for share_partition_data in event ["partitions" ]:
71
+ topic_partition = TopicPartition (share_partition_data ["topic" ], share_partition_data ["partition" ])
72
+ self .consumed_per_partition [topic_partition ] = self .consumed_per_partition .get (topic_partition , 0 ) + share_partition_data ["count" ]
73
+ logger .debug ("Offsets consumed for %s" % (node .account .hostname ))
74
+
75
+
76
+ def handle_kill_process (self , clean_shutdown ):
77
+ # if the shutdown was clean, then we expect the explicit
78
+ # shutdown event from the share consumer
79
+ if not clean_shutdown :
80
+ self .handle_shutdown_complete ()
81
+
82
+ class VerifiableShareConsumer (KafkaPathResolverMixin , VerifiableClientMixin , BackgroundThreadService ):
83
+ """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for use in
84
+ system testing.
85
+
86
+ NOTE: this class should be treated as a PUBLIC API. Downstream users use
87
+ this service both directly and through class extension, so care must be
88
+ taken to ensure compatibility.
89
+ """
90
+
91
+ PERSISTENT_ROOT = "/mnt/verifiable_share_consumer"
92
+ STDOUT_CAPTURE = os .path .join (PERSISTENT_ROOT , "verifiable_share_consumer.stdout" )
93
+ STDERR_CAPTURE = os .path .join (PERSISTENT_ROOT , "verifiable_share_consumer.stderr" )
94
+ LOG_DIR = os .path .join (PERSISTENT_ROOT , "logs" )
95
+ LOG_FILE = os .path .join (LOG_DIR , "verifiable_share_consumer.log" )
96
+ CONFIG_FILE = os .path .join (PERSISTENT_ROOT , "verifiable_share_consumer.properties" )
97
+
98
+ logs = {
99
+ "verifiable_share_consumer_stdout" : {
100
+ "path" : STDOUT_CAPTURE ,
101
+ "collect_default" : False },
102
+ "verifiable_share_consumer_stderr" : {
103
+ "path" : STDERR_CAPTURE ,
104
+ "collect_default" : False },
105
+ "verifiable_share_consumer_log" : {
106
+ "path" : LOG_FILE ,
107
+ "collect_default" : True }
108
+ }
109
+
110
+ def __init__ (self , context , num_nodes , kafka , topic , group_id ,
111
+ max_messages = - 1 , acknowledgement_mode = "auto" , offset_reset_strategy = "" ,
112
+ version = DEV_BRANCH , stop_timeout_sec = 60 , log_level = "INFO" , jaas_override_variables = None ,
113
+ on_record_consumed = None ):
114
+ """
115
+ :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file
116
+ """
117
+ super (VerifiableShareConsumer , self ).__init__ (context , num_nodes )
118
+ self .log_level = log_level
119
+ self .kafka = kafka
120
+ self .topic = topic
121
+ self .group_id = group_id
122
+ self .offset_reset_strategy = offset_reset_strategy
123
+ self .max_messages = max_messages
124
+ self .acknowledgement_mode = acknowledgement_mode
125
+ self .prop_file = ""
126
+ self .stop_timeout_sec = stop_timeout_sec
127
+ self .on_record_consumed = on_record_consumed
128
+
129
+ self .event_handlers = {}
130
+ self .jaas_override_variables = jaas_override_variables or {}
131
+
132
+ self .total_records_consumed = 0
133
+ self .total_records_acknowledged = 0
134
+ self .total_records_acknowledged_failed = 0
135
+ self .consumed_records_offsets = set ()
136
+ self .acknowledged_records_offsets = set ()
137
+ self .is_offset_reset_strategy_set = False
138
+
139
+ for node in self .nodes :
140
+ node .version = version
141
+
142
+ def java_class_name (self ):
143
+ return "VerifiableShareConsumer"
144
+
145
+ def create_event_handler (self , idx , node ):
146
+ return ShareConsumerEventHandler (node , idx )
147
+
148
+ def _worker (self , idx , node ):
149
+ with self .lock :
150
+ self .event_handlers [node ] = self .create_event_handler (idx , node )
151
+ handler = self .event_handlers [node ]
152
+
153
+ node .account .ssh ("mkdir -p %s" % VerifiableShareConsumer .PERSISTENT_ROOT , allow_fail = False )
154
+
155
+ # Create and upload log properties
156
+ log_config = self .render (get_log4j_config_for_tools (node ), log_file = VerifiableShareConsumer .LOG_FILE )
157
+ node .account .create_file (get_log4j_config_for_tools (node ), log_config )
158
+
159
+ # Create and upload config file
160
+ self .security_config = self .kafka .security_config .client_config (self .prop_file , node ,
161
+ self .jaas_override_variables )
162
+ self .security_config .setup_node (node )
163
+ self .prop_file += str (self .security_config )
164
+ self .logger .info ("verifiable_share_consumer.properties:" )
165
+ self .logger .info (self .prop_file )
166
+ node .account .create_file (VerifiableShareConsumer .CONFIG_FILE , self .prop_file )
167
+ self .security_config .setup_node (node )
168
+
169
+ cmd = self .start_cmd (node )
170
+ self .logger .debug ("VerifiableShareConsumer %d command: %s" % (idx , cmd ))
171
+
172
+ for line in node .account .ssh_capture (cmd ):
173
+ event = self .try_parse_json (node , line .strip ())
174
+ if event is not None :
175
+ with self .lock :
176
+ name = event ["name" ]
177
+ if name == "shutdown_complete" :
178
+ handler .handle_shutdown_complete (node , self .logger )
179
+ elif name == "startup_complete" :
180
+ handler .handle_startup_complete (node , self .logger )
181
+ elif name == "offsets_acknowledged" :
182
+ handler .handle_offsets_acknowledged (event , node , self .logger )
183
+ self ._update_global_acknowledged (event )
184
+ elif name == "records_consumed" :
185
+ handler .handle_records_consumed (event , node , self .logger )
186
+ self ._update_global_consumed (event )
187
+ elif name == "record_data" and self .on_record_consumed :
188
+ self .on_record_consumed (event , node )
189
+ elif name == "offset_reset_strategy_set" :
190
+ self ._on_offset_reset_strategy_set ()
191
+ else :
192
+ self .logger .debug ("%s: ignoring unknown event: %s" % (str (node .account ), event ))
193
+
194
+ def _update_global_acknowledged (self , acknowledge_event ):
195
+ if acknowledge_event ["success" ]:
196
+ self .total_records_acknowledged += acknowledge_event ["count" ]
197
+ else :
198
+ self .total_records_acknowledged_failed += acknowledge_event ["count" ]
199
+ for share_partition_data in acknowledge_event ["partitions" ]:
200
+ tpkey = str (share_partition_data ["topic" ]) + "-" + str (share_partition_data ["partition" ])
201
+ for offset in share_partition_data ["offsets" ]:
202
+ key = tpkey + "-" + str (offset )
203
+ if key not in self .acknowledged_records_offsets :
204
+ self .acknowledged_records_offsets .add (key )
205
+
206
+ def _update_global_consumed (self , consumed_event ):
207
+ self .total_records_consumed += consumed_event ["count" ]
208
+
209
+ for share_partition_data in consumed_event ["partitions" ]:
210
+ tpkey = str (share_partition_data ["topic" ]) + "-" + str (share_partition_data ["partition" ])
211
+ for offset in share_partition_data ["offsets" ]:
212
+ key = tpkey + "-" + str (offset )
213
+ if key not in self .consumed_records_offsets :
214
+ self .consumed_records_offsets .add (key )
215
+
216
+ def _on_offset_reset_strategy_set (self ):
217
+ self .is_offset_reset_strategy_set = True
218
+
219
+ def start_cmd (self , node ):
220
+ cmd = ""
221
+ cmd += "export LOG_DIR=%s;" % VerifiableShareConsumer .LOG_DIR
222
+ cmd += " export KAFKA_OPTS=%s;" % self .security_config .kafka_opts
223
+ cmd += " export KAFKA_LOG4J_OPTS=\" %s%s\" ; " % (get_log4j_config_param (node ), get_log4j_config_for_tools (node ))
224
+ cmd += self .impl .exec_cmd (node )
225
+ if self .on_record_consumed :
226
+ cmd += " --verbose"
227
+
228
+ cmd += " --acknowledgement-mode %s" % self .acknowledgement_mode
229
+
230
+ cmd += " --offset-reset-strategy %s" % self .offset_reset_strategy
231
+
232
+ cmd += " --bootstrap-server %s" % self .kafka .bootstrap_servers (self .security_config .security_protocol )
233
+
234
+ cmd += " --group-id %s --topic %s" % (self .group_id , self .topic )
235
+
236
+ if self .max_messages > 0 :
237
+ cmd += " --max-messages %s" % str (self .max_messages )
238
+
239
+ cmd += " --consumer.config %s" % VerifiableShareConsumer .CONFIG_FILE
240
+ cmd += " 2>> %s | tee -a %s &" % (VerifiableShareConsumer .STDOUT_CAPTURE , VerifiableShareConsumer .STDOUT_CAPTURE )
241
+ return cmd
242
+
243
+ def pids (self , node ):
244
+ return self .impl .pids (node )
245
+
246
+ def try_parse_json (self , node , string ):
247
+ """Try to parse a string as json. Return None if not parseable."""
248
+ try :
249
+ return json .loads (string )
250
+ except ValueError :
251
+ self .logger .debug ("%s: Could not parse as json: %s" % (str (node .account ), str (string )))
252
+ return None
253
+
254
+ def stop_all (self ):
255
+ for node in self .nodes :
256
+ self .stop_node (node )
257
+
258
+ def kill_node (self , node , clean_shutdown = True , allow_fail = False ):
259
+ sig = self .impl .kill_signal (clean_shutdown )
260
+ for pid in self .pids (node ):
261
+ node .account .signal (pid , sig , allow_fail )
262
+
263
+ with self .lock :
264
+ self .event_handlers [node ].handle_kill_process (clean_shutdown )
265
+
266
+ def stop_node (self , node , clean_shutdown = True ):
267
+ self .kill_node (node , clean_shutdown = clean_shutdown )
268
+
269
+ stopped = self .wait_node (node , timeout_sec = self .stop_timeout_sec )
270
+ assert stopped , "Node %s: did not stop within the specified timeout of %s seconds" % \
271
+ (str (node .account ), str (self .stop_timeout_sec ))
272
+
273
+ def clean_node (self , node ):
274
+ self .kill_node (node , clean_shutdown = False )
275
+ node .account .ssh ("rm -rf " + self .PERSISTENT_ROOT , allow_fail = False )
276
+ self .security_config .clean_node (node )
277
+
278
+ def total_consumed (self ):
279
+ with self .lock :
280
+ return self .total_records_consumed
281
+
282
+ def total_unique_consumed (self ):
283
+ with self .lock :
284
+ return len (self .consumed_records_offsets )
285
+
286
+ def total_unique_acknowledged (self ):
287
+ with self .lock :
288
+ return len (self .acknowledged_records_offsets )
289
+
290
+ def total_acknowledged (self ):
291
+ with self .lock :
292
+ return self .total_records_acknowledged + self .total_records_acknowledged_failed
293
+
294
+ def total_successful_acknowledged (self ):
295
+ with self .lock :
296
+ return self .total_records_acknowledged
297
+
298
+ def total_failed_acknowledged (self ):
299
+ with self .lock :
300
+ return self .total_records_acknowledged_failed
301
+
302
+ def total_consumed_for_a_share_consumer (self , node ):
303
+ with self .lock :
304
+ return self .event_handlers [node ].total_consumed
305
+
306
+ def total_acknowledged_for_a_share_consumer (self , node ):
307
+ with self .lock :
308
+ return self .event_handlers [node ].total_acknowledged + self .event_handlers [node ].total_acknowledged_failed
309
+
310
+ def total_successful_acknowledged_for_a_share_consumer (self , node ):
311
+ with self .lock :
312
+ return self .event_handlers [node ].total_acknowledged
313
+
314
+ def total_failed_acknowledged_for_a_share_consumer (self , node ):
315
+ with self .lock :
316
+ return self .event_handlers [node ].total_acknowledged_failed
317
+
318
+ def offset_reset_strategy_set (self ):
319
+ with self .lock :
320
+ return self .is_offset_reset_strategy_set
321
+
322
+ def dead_nodes (self ):
323
+ with self .lock :
324
+ return [handler .node for handler in self .event_handlers .values ()
325
+ if handler .state == ShareConsumerState .Dead ]
326
+
327
+ def alive_nodes (self ):
328
+ with self .lock :
329
+ return [handler .node for handler in self .event_handlers .values ()
330
+ if handler .state == ShareConsumerState .Started ]
0 commit comments