36
36
37
37
import java .util .ArrayList ;
38
38
import java .util .Collection ;
39
+ import java .util .Collections ;
39
40
import java .util .HashMap ;
40
41
import java .util .Map ;
41
42
import java .util .Optional ;
49
50
* This class implements a read-process-write application.
50
51
*/
51
52
public class ExactlyOnceMessageProcessor extends Thread implements ConsumerRebalanceListener , AutoCloseable {
53
+ private static final int MAX_RETRIES = 5 ;
54
+
52
55
private final String bootstrapServers ;
53
56
private final String inputTopic ;
54
57
private final String outputTopic ;
@@ -103,19 +106,21 @@ public ExactlyOnceMessageProcessor(String threadName,
103
106
104
107
@ Override
105
108
public void run () {
109
+ int retries = 0 ;
106
110
int processedRecords = 0 ;
107
111
long remainingRecords = Long .MAX_VALUE ;
112
+
108
113
// it is recommended to have a relatively short txn timeout in order to clear pending offsets faster
109
114
int transactionTimeoutMs = 10_000 ;
110
115
// consumer must be in read_committed mode, which means it won't be able to read uncommitted data
111
116
boolean readCommitted = true ;
117
+
112
118
try (KafkaProducer <Integer , String > producer = new Producer ("processor-producer" , bootstrapServers , outputTopic ,
113
119
true , transactionalId , true , -1 , transactionTimeoutMs , null ).createKafkaProducer ();
114
120
KafkaConsumer <Integer , String > consumer = new Consumer ("processor-consumer" , bootstrapServers , inputTopic ,
115
121
"processor-group" , Optional .of (groupInstanceId ), readCommitted , -1 , null ).createKafkaConsumer ()) {
116
122
// called first and once to fence zombies and abort any pending transaction
117
123
producer .initTransactions ();
118
-
119
124
consumer .subscribe (singleton (inputTopic ), this );
120
125
121
126
Utils .printOut ("Processing new records" );
@@ -140,6 +145,7 @@ public void run() {
140
145
// commit the transaction including offsets
141
146
producer .commitTransaction ();
142
147
processedRecords += records .count ();
148
+ retries = 0 ;
143
149
}
144
150
} catch (AuthorizationException | UnsupportedVersionException | ProducerFencedException
145
151
| FencedInstanceIdException | OutOfOrderSequenceException | SerializationException e ) {
@@ -151,18 +157,21 @@ public void run() {
151
157
Utils .printOut ("Invalid or no offset found, using latest" );
152
158
consumer .seekToEnd (emptyList ());
153
159
consumer .commitSync ();
160
+ retries = 0 ;
154
161
} catch (KafkaException e ) {
155
- // abort the transaction and try to continue
156
- Utils .printOut ("Aborting transaction: %s" , e );
162
+ // abort the transaction
163
+ Utils .printOut ("Aborting transaction: %s" , e . getMessage () );
157
164
producer .abortTransaction ();
165
+ retries = maybeRetry (retries , consumer );
158
166
}
167
+
159
168
remainingRecords = getRemainingRecords (consumer );
160
169
if (remainingRecords != Long .MAX_VALUE ) {
161
170
Utils .printOut ("Remaining records: %d" , remainingRecords );
162
171
}
163
172
}
164
173
} catch (Throwable e ) {
165
- Utils .printOut ("Unhandled exception" );
174
+ Utils .printErr ("Unhandled exception" );
166
175
e .printStackTrace ();
167
176
}
168
177
Utils .printOut ("Processed %d records" , processedRecords );
@@ -215,6 +224,44 @@ private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
215
224
}).sum ();
216
225
}
217
226
227
+ /**
228
+ * When we get a generic {@code KafkaException} while processing records, we retry up to {@code MAX_RETRIES} times.
229
+ * If we exceed this threshold, we log an error and move on to the next batch of records.
230
+ * In a real world application you may want to to send these records to a dead letter topic (DLT) for further processing.
231
+ *
232
+ * @param retries Current number of retries
233
+ * @param consumer Consumer instance
234
+ * @return Updated number of retries
235
+ */
236
+ private int maybeRetry (int retries , KafkaConsumer <Integer , String > consumer ) {
237
+ if (retries < 0 ) {
238
+ Utils .printErr ("The number of retries must be greater than zero" );
239
+ shutdown ();
240
+ }
241
+
242
+ if (retries < MAX_RETRIES ) {
243
+ // retry: reset fetch offset
244
+ // the consumer fetch position needs to be restored to the committed offset before the transaction started
245
+ Map <TopicPartition , OffsetAndMetadata > committed = consumer .committed (consumer .assignment ());
246
+ consumer .assignment ().forEach (tp -> {
247
+ OffsetAndMetadata offsetAndMetadata = committed .get (tp );
248
+ if (offsetAndMetadata != null ) {
249
+ consumer .seek (tp , offsetAndMetadata .offset ());
250
+ } else {
251
+ consumer .seekToBeginning (Collections .singleton (tp ));
252
+ }
253
+ });
254
+ retries ++;
255
+ } else {
256
+ // continue: skip records
257
+ // the consumer fetch position needs to be committed as if records were processed successfully
258
+ Utils .printErr ("Skipping records after %d retries" , MAX_RETRIES );
259
+ consumer .commitSync ();
260
+ retries = 0 ;
261
+ }
262
+ return retries ;
263
+ }
264
+
218
265
@ Override
219
266
public void close () throws Exception {
220
267
if (producer != null ) {
0 commit comments