1818package org .apache .flink .connector .kinesis .sink ;
1919
2020import org .apache .flink .annotation .Internal ;
21+ import org .apache .flink .annotation .VisibleForTesting ;
2122import org .apache .flink .api .connector .sink2 .Sink ;
2223import org .apache .flink .connector .aws .util .AWSClientUtil ;
2324import org .apache .flink .connector .aws .util .AWSGeneralUtil ;
3233import org .apache .flink .connector .base .sink .writer .strategy .RateLimitingStrategy ;
3334import org .apache .flink .metrics .Counter ;
3435import org .apache .flink .metrics .groups .SinkWriterMetricGroup ;
36+ import org .apache .flink .util .Preconditions ;
3537
3638import org .slf4j .Logger ;
3739import org .slf4j .LoggerFactory ;
4345import software .amazon .awssdk .services .kinesis .model .PutRecordsResultEntry ;
4446import software .amazon .awssdk .services .kinesis .model .ResourceNotFoundException ;
4547
48+ import java .io .IOException ;
4649import java .util .ArrayList ;
4750import java .util .Collection ;
4851import java .util .Collections ;
52+ import java .util .HashMap ;
4953import java .util .List ;
54+ import java .util .Map ;
5055import java .util .Properties ;
5156import java .util .concurrent .CompletableFuture ;
5257
@@ -96,11 +101,8 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
96101 /* The sink writer metric group */
97102 private final SinkWriterMetricGroup metrics ;
98103
99- /* The asynchronous http client for the asynchronous Kinesis client */
100- private final SdkAsyncHttpClient httpClient ;
101-
102- /* The asynchronous Kinesis client - construction is by kinesisClientProperties */
103- private final KinesisAsyncClient kinesisClient ;
104+ /* The client provider */
105+ private KinesisClientProvider kinesisClientProvider ;
104106
105107 /* Flag to whether fatally fail any time we encounter an exception when persisting records */
106108 private final boolean failOnError ;
@@ -148,6 +150,36 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
148150 String streamArn ,
149151 Properties kinesisClientProperties ,
150152 Collection <BufferedRequestState <PutRecordsRequestEntry >> states ) {
153+ this (
154+ elementConverter ,
155+ context ,
156+ maxBatchSize ,
157+ maxInFlightRequests ,
158+ maxBufferedRequests ,
159+ maxBatchSizeInBytes ,
160+ maxTimeInBufferMS ,
161+ maxRecordSizeInBytes ,
162+ failOnError ,
163+ streamName ,
164+ streamArn ,
165+ states ,
166+ createDefaultClientProvider (kinesisClientProperties ));
167+ }
168+
169+ KinesisStreamsSinkWriter (
170+ ElementConverter <InputT , PutRecordsRequestEntry > elementConverter ,
171+ Sink .InitContext context ,
172+ int maxBatchSize ,
173+ int maxInFlightRequests ,
174+ int maxBufferedRequests ,
175+ long maxBatchSizeInBytes ,
176+ long maxTimeInBufferMS ,
177+ long maxRecordSizeInBytes ,
178+ boolean failOnError ,
179+ String streamName ,
180+ String streamArn ,
181+ Collection <BufferedRequestState <PutRecordsRequestEntry >> states ,
182+ KinesisClientProvider kinesisClientProvider ) {
151183 super (
152184 elementConverter ,
153185 context ,
@@ -167,11 +199,48 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
167199 this .streamArn = streamArn ;
168200 this .metrics = context .metricGroup ();
169201 this .numRecordsOutErrorsCounter = metrics .getNumRecordsOutErrorsCounter ();
170- this .httpClient = AWSGeneralUtil .createAsyncHttpClient (kinesisClientProperties );
171- this .kinesisClient = buildClient (kinesisClientProperties , this .httpClient );
202+ setKinesisClientProvider (kinesisClientProvider );
203+ }
204+
205+ /**
206+ * Create a default KinesisClientProvider to manage the Kinesis client and HTTP client.
207+ *
208+ * @param kinesisClientProperties Properties for configuring the Kinesis client
209+ * @return A KinesisClientProvider implementation
210+ */
211+ private static KinesisClientProvider createDefaultClientProvider (Properties kinesisClientProperties ) {
212+ return new KinesisClientProvider () {
213+ private final SdkAsyncHttpClient httpClient =
214+ AWSGeneralUtil .createAsyncHttpClient (kinesisClientProperties );
215+ private final KinesisAsyncClient kinesisClient =
216+ buildClient (kinesisClientProperties , httpClient );
217+
218+ @ Override
219+ public KinesisAsyncClient get () {
220+ return kinesisClient ;
221+ }
222+
223+ @ Override
224+ public void close () {
225+ AWSGeneralUtil .closeResources (httpClient , kinesisClient );
226+ }
227+ };
172228 }
173229
174- private KinesisAsyncClient buildClient (
230+ /**
231+ * Set a custom KinesisAsyncClient provider for testing purposes. This method is only intended
232+ * to be used in tests.
233+ *
234+ * @param kinesisClientProvider The provider that supplies the KinesisAsyncClient
235+ */
236+ @ VisibleForTesting
237+ void setKinesisClientProvider (KinesisClientProvider kinesisClientProvider ) {
238+ this .kinesisClientProvider =
239+ Preconditions .checkNotNull (
240+ kinesisClientProvider , "The kinesisClientProvider must not be null." );
241+ }
242+
243+ private static KinesisAsyncClient buildClient (
175244 Properties kinesisClientProperties , SdkAsyncHttpClient httpClient ) {
176245 AWSGeneralUtil .validateAwsCredentials (kinesisClientProperties );
177246
@@ -208,6 +277,7 @@ protected void submitRequestEntries(
208277 .streamARN (streamArn )
209278 .build ();
210279
280+ KinesisAsyncClient kinesisClient = kinesisClientProvider .get ();
211281 CompletableFuture <PutRecordsResponse > future = kinesisClient .putRecords (batchRequest );
212282
213283 future .whenComplete (
@@ -232,7 +302,7 @@ private void handleFullyFailedRequest(
232302 List <PutRecordsRequestEntry > requestEntries ,
233303 ResultHandler <PutRecordsRequestEntry > resultHandler ) {
234304 LOG .warn (
235- "KDS Sink failed to write and will retry {} entries to KDS" ,
305+ "Kinesis Data Stream Sink failed to write and will retry {} entries to KDS" ,
236306 requestEntries .size (),
237307 err );
238308 numRecordsOutErrorsCounter .inc (requestEntries .size ());
@@ -244,34 +314,117 @@ private void handleFullyFailedRequest(
244314
245315 @ Override
246316 public void close () {
247- AWSGeneralUtil .closeResources (httpClient , kinesisClient );
317+ try {
318+ kinesisClientProvider .close ();
319+ } catch (IOException e ) {
320+ throw new KinesisStreamsException ("Failed to close the kinesisClientProvider" , e );
321+ }
248322 }
249323
250324 private void handlePartiallyFailedRequest (
251325 PutRecordsResponse response ,
252326 List <PutRecordsRequestEntry > requestEntries ,
253327 ResultHandler <PutRecordsRequestEntry > resultHandler ) {
254- LOG .warn (
255- "KDS Sink failed to write and will retry {} entries to KDS" ,
256- response .failedRecordCount ());
257- numRecordsOutErrorsCounter .inc (response .failedRecordCount ());
328+ int failedRecordCount = response .failedRecordCount ();
329+ LOG .warn ("Kinesis Data Stream Sink failed to write and will retry {} entries to KDS" , failedRecordCount );
330+ numRecordsOutErrorsCounter .inc (failedRecordCount );
258331
259332 if (failOnError ) {
260333 resultHandler .completeExceptionally (
261334 new KinesisStreamsException .KinesisStreamsFailFastException ());
262335 return ;
263336 }
264- List < PutRecordsRequestEntry > failedRequestEntries =
265- new ArrayList <>(response . failedRecordCount () );
337+
338+ List < PutRecordsRequestEntry > failedRequestEntries = new ArrayList <>(failedRecordCount );
266339 List <PutRecordsResultEntry > records = response .records ();
267340
341+ // Collect error information and build the list of failed entries
342+ Map <String , ErrorSummary > errorSummaries =
343+ collectErrorSummaries (records , requestEntries , failedRequestEntries );
344+
345+ // Log aggregated error information
346+ logErrorSummaries (errorSummaries );
347+
348+ // Return the failed entries for retry
349+ resultHandler .retryForEntries (failedRequestEntries );
350+ }
351+
352+ /**
353+ * Collect error summaries from failed records and build a list of failed request entries.
354+ *
355+ * @param records The result entries from the Kinesis response
356+ * @param requestEntries The original request entries
357+ * @param failedRequestEntries List to populate with failed entries (modified as a side effect)
358+ * @return A map of error codes to their summaries
359+ */
360+ private Map <String , ErrorSummary > collectErrorSummaries (
361+ List <PutRecordsResultEntry > records ,
362+ List <PutRecordsRequestEntry > requestEntries ,
363+ List <PutRecordsRequestEntry > failedRequestEntries ) {
364+
365+ // We capture error info while minimizing logging overhead in the data path,
366+ // which is critical for maintaining throughput performance
367+ Map <String , ErrorSummary > errorSummaries = new HashMap <>();
368+
268369 for (int i = 0 ; i < records .size (); i ++) {
269- if (records .get (i ).errorCode () != null ) {
370+ PutRecordsResultEntry resultEntry = records .get (i );
371+ String errorCode = resultEntry .errorCode ();
372+
373+ if (errorCode != null ) {
374+ // Track the frequency of each error code to identify patterns
375+ ErrorSummary summary =
376+ errorSummaries .computeIfAbsent (
377+ errorCode , code -> new ErrorSummary (resultEntry .errorMessage ()));
378+ summary .incrementCount ();
379+
270380 failedRequestEntries .add (requestEntries .get (i ));
271381 }
272382 }
273383
274- resultHandler .retryForEntries (failedRequestEntries );
384+ return errorSummaries ;
385+ }
386+
387+ /**
388+ * Log aggregated error information at WARN level.
389+ *
390+ * @param errorSummaries Map of error codes to their summaries
391+ */
392+ private void logErrorSummaries (Map <String , ErrorSummary > errorSummaries ) {
393+ // We log aggregated error information at WARN level to ensure visibility in production
394+ // while avoiding the performance impact of logging each individual failure
395+ if (!errorSummaries .isEmpty ()) {
396+ // Using a single WARN log with aggregated information provides operational
397+ // visibility into errors without flooding logs in high-throughput scenarios
398+ LOG .warn ("Kinesis Data Stream Sink failed to write, Errors summary: " + errorSummaries );
399+ }
400+ }
401+
402+ /** Helper class to store error summary information. */
403+ static class ErrorSummary {
404+ private final String exampleMessage ;
405+ private int count ;
406+
407+ ErrorSummary (String exampleMessage ) {
408+ this .exampleMessage = exampleMessage ;
409+ this .count = 0 ;
410+ }
411+
412+ void incrementCount () {
413+ count ++;
414+ }
415+
416+ int getCount () {
417+ return count ;
418+ }
419+
420+ String getExampleMessage () {
421+ return exampleMessage ;
422+ }
423+
424+ @ Override
425+ public String toString () {
426+ return String .format ("[%d records, example: %s]" , count , exampleMessage );
427+ }
275428 }
276429
277430 private boolean isRetryable (
0 commit comments