1
- var AWS = require ( "aws-sdk" ) ;
2
- var processLogsHandler = require ( './cloudwatchlogs_lambda' ) . processLogs ;
3
- var getEndpointURL = require ( './cloudwatchlogs_lambda' ) . getEndpointURL ;
4
- var DLQUtils = require ( "./sumo-dlq-function-utils" ) . DLQUtils ;
5
- var Messages = DLQUtils . Messages ;
6
- var invokeLambdas = DLQUtils . invokeLambdas ;
1
+ const { processLogs : processLogsHandler , getEndpointURL } = require ( './cloudwatchlogs_lambda' ) ;
2
+ const { DLQUtils } = require ( "./sumo-dlq-function-utils" ) ;
3
+
4
+ const { Messages, invokeLambdas } = DLQUtils ;
7
5
8
6
exports . consumeMessages = async function ( env , context , callback ) {
9
- var sqs = new AWS . SQS ( { region : env . AWS_REGION } ) ;
10
- var MessagesObj = new Messages ( env ) ;
11
- env . SUMO_CLIENT_HEADER = "dlq-aws-lambda" ;
7
+ const MessagesObj = new Messages ( env ) ;
8
+ env . SUMO_CLIENT_HEADER = "dlq-aws-lambda" ;
9
+
12
10
if ( ! env . SUMO_ENDPOINT ) {
13
- let SUMO_ENDPOINT = await getEndpointURL ( ) ;
14
- if ( SUMO_ENDPOINT instanceof Error ) {
15
- console . log ( "Error in getEndpointURL: " , SUMO_ENDPOINT ) ;
16
- callback ( SUMO_ENDPOINT , null ) ;
11
+ try {
12
+ let SUMO_ENDPOINT = await getEndpointURL ( ) ;
13
+ env . SUMO_ENDPOINT = SUMO_ENDPOINT ;
14
+ } catch ( error ) {
15
+ console . log ( "Error in getEndpointURL: " , error ) ;
16
+ callback ( error , null ) ;
17
17
return ;
18
18
}
19
- env . SUMO_ENDPOINT = SUMO_ENDPOINT ;
20
19
} else {
21
20
console . log ( "consumeMessages: Getting SUMO_ENDPOINT from env" ) ;
22
21
}
23
- MessagesObj . receiveMessages ( 10 , function ( err , data ) {
24
- var messages = ( data ) ? data . Messages : null ;
25
- if ( err ) {
26
- callback ( err ) ;
27
- } else if ( messages && messages . length > 0 ) {
28
- var fail_cnt = 0 , msgCount = 0 ;
22
+
23
+ try {
24
+ const messages = await MessagesObj . receiveMessages ( 10 ) ;
25
+
26
+
27
+ if ( messages && messages . length > 0 ) {
28
+ let fail_cnt = 0 , msgCount = 0 ;
29
29
console . log ( "Messages Received" , messages . length ) ;
30
- for ( var i = 0 ; i < messages . length ; i ++ ) {
31
- ( function ( idx ) {
32
- var payload = JSON . parse ( messages [ idx ] . Body ) ;
33
- var receiptHandle = messages [ idx ] . ReceiptHandle ;
30
+
31
+ for ( let i = 0 ; i < messages . length ; i ++ ) {
32
+ ( function ( idx ) {
33
+ const payload = JSON . parse ( messages [ idx ] . Body ) ;
34
+ const receiptHandle = messages [ idx ] . ReceiptHandle ;
35
+
34
36
if ( ! ( payload . awslogs && payload . awslogs . data ) ) {
35
37
console . log ( "Message does not contain awslogs or awslogs.data attributes" , payload ) ;
36
- //deleting msg in DLQ after injesting in sumo
37
- MessagesObj . deleteMessage ( receiptHandle , function ( err , data ) {
38
- if ( err ) console . log ( err , err . stack ) ;
39
- } ) ;
38
+
39
+ MessagesObj . deleteMessage ( receiptHandle )
40
+ . catch ( ( err ) => console . log ( err , err . stack ) ) ;
41
+
40
42
return ;
41
43
}
42
- var logdata = payload . awslogs . data ;
44
+
45
+ const logdata = payload . awslogs . data ;
46
+
43
47
processLogsHandler ( env , logdata , function ( err , msg ) {
44
48
msgCount ++ ;
49
+
45
50
if ( err ) {
46
51
console . log ( err , msg ) ;
47
52
fail_cnt ++ ;
48
53
} else {
49
- //deleting msg in DLQ after injesting in sumo
50
- MessagesObj . deleteMessage ( receiptHandle , function ( err , data ) {
51
- if ( err ) console . log ( err , err . stack ) ;
52
- } ) ;
54
+ MessagesObj . deleteMessage ( receiptHandle )
55
+ . catch ( ( err ) => console . log ( err , err . stack ) ) ;
53
56
}
54
- if ( msgCount == messages . length ) {
55
- if ( fail_cnt == 0 && ( parseInt ( env . is_worker ) === 0 ) ) {
57
+
58
+ if ( msgCount === messages . length ) {
59
+ if ( fail_cnt === 0 && parseInt ( env . is_worker ) === 0 ) {
56
60
invokeLambdas ( env . AWS_REGION , parseInt ( env . NUM_OF_WORKERS ) ,
57
- context . functionName , '{"is_worker": "1"}' , context ) ;
61
+ context . functionName , '{"is_worker": "1"}' , context ) ;
58
62
}
59
- callback ( null , ( messages . length - fail_cnt ) + ' success' ) ;
63
+
64
+ callback ( null , `${ messages . length - fail_cnt } success` ) ;
60
65
}
61
66
} ) ;
62
67
} ) ( i ) ;
63
68
}
64
-
65
69
} else {
70
+
66
71
callback ( null , 'success' ) ;
67
72
}
68
- } ) ;
73
+ } catch ( error ) {
74
+ callback ( error ) ;
75
+ }
69
76
} ;
70
77
71
78
exports . handler = function ( event , context , callback ) {
72
-
73
- var env = Object . assign ( { } , process . env ) ;
74
- env [ 'is_worker' ] = event . is_worker || 0 ;
79
+ const env = Object . assign ( { } , process . env ) ;
80
+ env . is_worker = event . is_worker || 0 ;
75
81
exports . consumeMessages ( env , context , callback ) ;
76
- } ;
77
-
82
+ } ;
0 commit comments