@@ -30,7 +30,7 @@ import { AnalyticsService } from '../usage/analytics.service'
3030import { CacheService } from './cache.service'
3131import { SchemaService } from './schema.service'
3232import { from , isObservable , Observable , of , Subject } from 'rxjs'
33- import { concatMap } from 'rxjs/operators'
33+ import { mergeMap } from 'rxjs/operators'
3434import { NotificationService } from '../notifications/notification.service'
3535import { NotificationActionType } from 'src/app/shared/models/notification-handler'
3636
@@ -52,9 +52,11 @@ interface SendFromCacheOptions {
5252export class KafkaService {
5353 private static DEFAULT_TOPIC_CACHE_VALIDITY = 600_000 // 10 minutes
5454 private static BATCH_SIZE = 10
55- private static CONCURRENCY_LIMIT = 5
55+ private static CONCURRENCY_LIMIT = 1
5656 private static SEND_ERROR_NOTIFICATION_THRESHOLD = 10
5757 private static HTTP_TIMEOUT = 120_000 // 2 minutes
58+ private static HTTP_RETRY_MAX_ATTEMPTS = 3
59+ private static HTTP_RETRY_BASE_DELAY_MS = 1000
5860
5961 URI_topics : string = '/topics/'
6062 DEFAULT_KAFKA_AVSC = 'questionnaire'
@@ -280,7 +282,9 @@ export class KafkaService {
280282 headers ,
281283 options
282284 )
283- if ( sentRecordCount === 0 ) this . logger . log ( 'Kafka record is empty, skipping sending' )
285+ if ( sentRecordCount === 0 ) {
286+ this . logger . log ( 'Kafka record is empty, skipping sending' )
287+ }
284288 batchSuccessKeys . push ( k )
285289 } catch ( e ) {
286290 if ( e . message === 'Cache sending cancelled' ) {
@@ -384,7 +388,7 @@ export class KafkaService {
384388 return new Promise ( ( resolve , reject ) => {
385389 payloadStream
386390 . pipe (
387- concatMap ( payload => {
391+ mergeMap ( payload => {
388392 if ( this . cancelSending ) {
389393 throw new Error ( 'Cache sending cancelled' )
390394 }
@@ -396,7 +400,7 @@ export class KafkaService {
396400 ( ) => payload . record . records . length
397401 )
398402 )
399- } )
403+ } , 1 )
400404 )
401405 . subscribe ( {
402406 next : sentCount => {
@@ -423,13 +427,12 @@ export class KafkaService {
423427 return options . streamHealthkitPayloads !== false
424428 }
425429
426- sendToKafka ( topic , record , headers ) : Promise < any > {
427- const allRecords = record . records
430+ async sendToKafka ( topic , record , headers ) : Promise < any > {
428431 const jsonData = JSON . stringify ( record )
429432 // Compress with pako and convert directly to base64
430433 const gzippedBytesB64 = fromByteArray ( pako . gzip ( jsonData ) )
431434
432- return this . postData (
435+ return await this . postData (
433436 gzippedBytesB64 ,
434437 topic ,
435438 headers . set ( 'Content-Encoding' , DefaultCompressedContentEncoding ) ,
@@ -486,7 +489,7 @@ export class KafkaService {
486489 return result
487490 }
488491
489- postData ( data : any , topic : string , headers : HttpHeaders , isBase64 : boolean = false ) : Promise < any > {
492+ async postData ( data : any , topic : string , headers : HttpHeaders , isBase64 : boolean = false ) : Promise < any > {
490493 const nativeHeaders = this . convertHeaders ( headers )
491494 const request : any = {
492495 url : `${ this . KAFKA_CLIENT_URL } ${ this . URI_topics } ${ topic } ` ,
@@ -503,27 +506,66 @@ export class KafkaService {
503506 request . connectionTimeout = KafkaService . HTTP_TIMEOUT
504507 request . readTimeout = KafkaService . HTTP_TIMEOUT
505508
506- const requestPromise = Http . request ( request )
507- . then ( response => {
509+ for ( let attempt = 1 ; attempt <= KafkaService . HTTP_RETRY_MAX_ATTEMPTS ; attempt ++ ) {
510+ // const httpStart = Date.now()
511+ try {
512+ const response = await Http . request ( request )
513+ // console.log('Kafka HTTP request time', {
514+ // topic,
515+ // status: response.status,
516+ // compressed: isBase64,
517+ // bytes: typeof data === 'string' ? data.length : undefined,
518+ // attempt,
519+ // milliseconds: Date.now() - httpStart
520+ // })
508521 if ( response . status < 200 || response . status >= 300 ) {
522+ if ( this . shouldRetryHttpStatus ( response . status , attempt ) ) {
523+ await this . delayHttpRetry ( attempt , response . status , topic )
524+ continue
525+ }
509526 throw new HttpErrorResponse ( {
510527 error : response . data ,
511528 status : response . status ,
512529 } )
513530 }
514531 return response
515- } )
516- . catch ( error => {
532+ } catch ( error ) {
533+ // console.log('Kafka HTTP request failed time', {
534+ // topic,
535+ // compressed: isBase64,
536+ // bytes: typeof data === 'string' ? data.length : undefined,
537+ // attempt,
538+ // milliseconds: Date.now() - httpStart
539+ // })
517540 console . error ( 'HTTP request failed:' , error )
518-
519541 if ( this . cancelSending ) {
520542 throw new Error ( 'Request cancelled by user' )
521543 }
544+ if ( this . shouldRetryHttpStatus ( error ?. status , attempt ) ) {
545+ await this . delayHttpRetry ( attempt , error . status , topic )
546+ continue
547+ }
548+ throw error
549+ }
550+ }
522551
523- throw new Error ( `Failed to send data to Kafka: ${ error . message } ` )
524- } )
552+ throw new Error ( 'Failed to send data to Kafka after retry attempts' )
553+ }
554+
555+ private shouldRetryHttpStatus ( status : number , attempt : number ) : boolean {
556+ return attempt < KafkaService . HTTP_RETRY_MAX_ATTEMPTS && ( status === 500 || status === 503 )
557+ }
525558
526- return requestPromise
559+ private delayHttpRetry ( attempt : number , status : number , topic : string ) : Promise < void > {
560+ const delayMs = KafkaService . HTTP_RETRY_BASE_DELAY_MS * attempt
561+ // console.log('Retrying Kafka HTTP request', {
562+ // topic,
563+ // status,
564+ // attempt,
565+ // nextAttempt: attempt + 1,
566+ // delayMs
567+ // })
568+ return new Promise ( resolve => setTimeout ( resolve , delayMs ) )
527569 }
528570
529571 getAccessToken ( ) {
0 commit comments