Skip to content

Commit 5ed72b5

Browse files
authored
On shutdown wait for lambda logs API to report the final platform report metrics (#347)
1 parent bd81367 commit 5ed72b5

File tree

4 files changed

+75
-56
lines changed

4 files changed

+75
-56
lines changed

CHANGELOG.asciidoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits]
2424
2525
[float]
2626
===== Features
27-
- Create proxy transaction with error results if not reported by agent {lambda-pull}315[315]
27+
- experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315]
28+
- Wait for the final platform report metrics on shutdown {lambda-pull}347[347]
2829
2930
[float]
3031
[[lambda-1.2.0]]

apmproxy/receiver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques
8888
reverseProxy.Transport = customTransport
8989

9090
reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
91-
c.UpdateStatus(r.Context(), Failing)
91+
// Don't update the status of the transport as it is possible that the extension
92+
// is frozen while processing the request and context is canceled due to timeout.
9293
c.logger.Errorf("Error querying version from the APM server: %v", err)
9394
}
9495

app/run.go

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,14 @@ func (app *App) Run(ctx context.Context) error {
5555
}
5656
}()
5757

58+
// Flush all data before shutting down.
59+
defer func() {
60+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
61+
defer cancel()
62+
63+
app.apmClient.FlushAPMData(ctx)
64+
}()
65+
5866
if app.logsClient != nil {
5967
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
6068
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)
@@ -71,22 +79,13 @@ func (app *App) Run(ctx context.Context) error {
7179
}
7280
}
7381

74-
// Flush all data before shutting down.
75-
defer func() {
76-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
77-
defer cancel()
78-
79-
app.apmClient.FlushAPMData(ctx)
80-
}()
81-
8282
// The previous event id is used to validate the received Lambda metrics
8383
var prevEvent *extension.NextEventResponse
8484

8585
for {
8686
select {
8787
case <-ctx.Done():
8888
app.logger.Info("Received a signal, exiting...")
89-
9089
return nil
9190
default:
9291
// Use a wait group to ensure the background go routine sending to the APM server
@@ -96,13 +95,12 @@ func (app *App) Run(ctx context.Context) error {
9695
if err != nil {
9796
return err
9897
}
99-
98+
app.logger.Debug("Waiting for background data send to end")
99+
backgroundDataSendWg.Wait()
100100
if event.EventType == extension.Shutdown {
101-
app.logger.Infof("Received shutdown event: %s. Exiting...", event.ShutdownReason)
101+
app.logger.Infof("Exiting due to shutdown event with reason %s", event.ShutdownReason)
102102
return nil
103103
}
104-
app.logger.Debug("Waiting for background data send to end")
105-
backgroundDataSendWg.Wait()
106104
if app.apmClient.ShouldFlush() {
107105
// Use a new cancellable context for flushing APM data to make sure
108106
// that the underlying transport is reset for next invocation without
@@ -163,15 +161,27 @@ func (app *App) processEvent(
163161
// At shutdown we can not expect platform.runtimeDone events to be reported
164162
// for the remaining invocations. If we haven't received the transaction
165163
// from agents at this point then it is safe to assume that the function
166-
// timed out. We will create proxy transaction for all invocations that
164+
// failed. We will create proxy transaction for all invocations that
167165
// haven't received a full transaction from the agent yet. If extension
168166
// doesn't have enough CPU time it is possible that the extension might
169167
// not receive the shutdown signal for timeouts or runtime crashes. In
170168
// these cases we will miss the transaction.
171-
if err := app.batch.OnShutdown("timeout"); err != nil {
172-
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
169+
app.logger.Debugf("Received shutdown event with reason %s", event.ShutdownReason)
170+
defer func() {
171+
if err := app.batch.OnShutdown(event.ShutdownReason); err != nil {
172+
app.logger.Errorf("Error finalizing invocation on shutdown: %v", err)
173+
}
174+
}()
175+
176+
// platform.report metric (and some other metrics) might not have been
177+
// reported by the logs API even till shutdown. At shutdown we will make
178+
// a last attempt to collect and report these metrics. However, it is
179+
// also possible that lambda has init a few execution env preemptively,
180+
// for such cases the extension will see only a SHUTDOWN event and
181+
// there is no need to wait for any log event.
182+
if prevEvent == nil {
183+
return event, nil
173184
}
174-
return event, nil
175185
}
176186

177187
// APM Data Processing
@@ -185,25 +195,22 @@ func (app *App) processEvent(
185195

186196
// Lambda Service Logs Processing, also used to extract metrics from APM logs
187197
// This goroutine should not be started if subscription failed
188-
runtimeDone := make(chan struct{})
198+
logProcessingDone := make(chan struct{})
189199
if app.logsClient != nil {
190200
go func() {
191-
if err := app.logsClient.ProcessLogs(
201+
defer close(logProcessingDone)
202+
app.logsClient.ProcessLogs(
192203
invocationCtx,
193204
event.RequestID,
194205
event.InvokedFunctionArn,
195206
app.apmClient.LambdaDataChannel,
196-
runtimeDone,
197207
prevEvent,
198-
); err != nil {
199-
app.logger.Errorf("Error while processing Lambda Logs ; %v", err)
200-
} else {
201-
close(runtimeDone)
202-
}
208+
event.EventType == extension.Shutdown,
209+
)
203210
}()
204211
} else {
205212
app.logger.Warn("Logs collection not started due to earlier subscription failure")
206-
close(runtimeDone)
213+
close(logProcessingDone)
207214
}
208215

209216
// Calculate how long to wait for a runtimeDoneSignal or AgentDoneSignal signal
@@ -214,21 +221,21 @@ func (app *App) processEvent(
214221
timer := time.NewTimer(durationUntilFlushDeadline)
215222
defer timer.Stop()
216223

217-
// The extension relies on 3 independent mechanisms to minimize the time interval between the end of the execution of
218-
// the lambda function and the end of the execution of processEvent()
219-
// 1) AgentDoneSignal is triggered upon reception of a `flushed=true` query from the agent
220-
// 2) [Backup 1] RuntimeDone is triggered upon reception of a Lambda log entry certifying the end of the execution of the current function
221-
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda function to interrupt itself 200 ms before the specified deadline.
222-
// This time interval is large enough to attempt a last flush attempt (if SendStrategy == syncFlush) before the environment gets shut down.
223-
224+
// The extension relies on 3 independent mechanisms to minimize the time interval
225+
// between the end of the execution of the lambda function and the end of the
226+
// execution of processEvent():
227+
// 1) AgentDoneSignal triggered upon reception of a `flushed=true` query from the agent
228+
// 2) [Backup 1] All expected log events are processed.
229+
// 3) [Backup 2] If all else fails, the extension relies of the timeout of the Lambda
230+
// function to interrupt itself 200ms before the specified deadline to give the extension
231+
// time to flush data before shutdown.
224232
select {
225233
case <-app.apmClient.WaitForFlush():
226234
app.logger.Debug("APM client has sent flush signal")
227-
case <-runtimeDone:
235+
case <-logProcessingDone:
228236
app.logger.Debug("Received runtimeDone signal")
229237
case <-timer.C:
230-
app.logger.Info("Time expired waiting for agent signal or runtimeDone event")
238+
app.logger.Info("Time expired while waiting for agent done signal or final log event")
231239
}
232-
233240
return event, nil
234241
}

logsapi/event.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,24 +53,27 @@ type LogEventRecord struct {
5353
Metrics PlatformMetrics `json:"metrics"`
5454
}
5555

56-
// ProcessLogs consumes events until a RuntimeDone event corresponding
57-
// to requestID is received, or ctx is canceled, and then returns.
56+
// ProcessLogs consumes log events until there are no more log events that
57+
// can be consumed or ctx is cancelled. For INVOKE event this state is
58+
// reached when runtimeDone event for the current requestID is processed
59+
// whereas for SHUTDOWN event this state is reached when the platformReport
60+
// event for the previous requestID is processed.
5861
func (lc *Client) ProcessLogs(
5962
ctx context.Context,
6063
requestID string,
6164
invokedFnArn string,
6265
dataChan chan []byte,
63-
runtimeDoneSignal chan struct{},
6466
prevEvent *extension.NextEventResponse,
65-
) error {
67+
isShutdown bool,
68+
) {
6669
// platformStartReqID is to identify the requestID for the function
6770
// logs under the assumption that function logs for a specific request
6871
// ID will be bounded by PlatformStart and PlatformEnd events.
6972
var platformStartReqID string
7073
for {
7174
select {
7275
case logEvent := <-lc.logsChannel:
73-
lc.logger.Debugf("Received log event %v", logEvent.Type)
76+
lc.logger.Debugf("Received log event %v for request ID %s", logEvent.Type, logEvent.Record.RequestID)
7477
switch logEvent.Type {
7578
case PlatformStart:
7679
platformStartReqID = logEvent.Record.RequestID
@@ -82,18 +85,18 @@ func (lc *Client) ProcessLogs(
8285
); err != nil {
8386
lc.logger.Warnf("Failed to finalize invocation with request ID %s: %v", logEvent.Record.RequestID, err)
8487
}
85-
// For the current invocation the platform.runtimeDone would be the last event
86-
if logEvent.Record.RequestID == requestID {
87-
lc.logger.Info("Received runtimeDone event for this function invocation")
88-
runtimeDoneSignal <- struct{}{}
89-
return nil
88+
// For invocation events the platform.runtimeDone would be the last possible event.
89+
if !isShutdown && logEvent.Record.RequestID == requestID {
90+
lc.logger.Debugf(
91+
"Processed runtime done event for reqID %s as the last log event for the invocation",
92+
logEvent.Record.RequestID,
93+
)
94+
return
9095
}
91-
lc.logger.Debug("Log API runtimeDone event request id didn't match")
92-
// Check if the logEvent contains metrics and verify that they can be linked to the previous invocation
9396
case PlatformReport:
9497
// TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?)
9598
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
96-
lc.logger.Debug("Received platform report for the previous function invocation")
99+
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
97100
processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent)
98101
if err != nil {
99102
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
@@ -103,21 +106,28 @@ func (lc *Client) ProcessLogs(
103106
case <-ctx.Done():
104107
}
105108
}
109+
// For shutdown event the platform report metrics for the previous log event
110+
// would be the last possible log event.
111+
if isShutdown {
112+
lc.logger.Debugf(
113+
"Processed platform report event for reqID %s as the last log event before shutdown",
114+
logEvent.Record.RequestID,
115+
)
116+
return
117+
}
106118
} else {
107-
lc.logger.Warn("report event request id didn't match the previous event id")
108-
lc.logger.Debug("Log API runtimeDone event request id didn't match")
119+
lc.logger.Warn("Report event request id didn't match the previous event id")
109120
}
110121
case PlatformLogsDropped:
111122
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)
112123
case FunctionLog:
113-
lc.logger.Debug("Received function log")
114124
processedLog, err := ProcessFunctionLog(
115125
platformStartReqID,
116126
invokedFnArn,
117127
logEvent,
118128
)
119129
if err != nil {
120-
lc.logger.Errorf("Error processing function log : %v", err)
130+
lc.logger.Warnf("Error processing function log : %v", err)
121131
} else {
122132
select {
123133
case dataChan <- processedLog:
@@ -127,7 +137,7 @@ func (lc *Client) ProcessLogs(
127137
}
128138
case <-ctx.Done():
129139
lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine")
130-
return nil
140+
return
131141
}
132142
}
133143
}

0 commit comments

Comments
 (0)