Skip to content

Commit 556d5dd

Browse files
authored
Add support for collecting and sending function logs to APM Server (#303)
1 parent 00d1568 commit 556d5dd

16 files changed

+338
-78
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.1.0...main[View commits]
2929
[float]
3030
===== Features
3131
- Disable CGO to prevent libc/ABI compatibility issues {lambda-pull}292[292]
32+
- Add support for collecting and shipping function logs to APM Server {lambda-pull}303[303]
3233
3334
[float]
3435
===== Bug fixes

app/app.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type App struct {
4545

4646
// New returns an App or an error if the
4747
// creation failed.
48-
func New(ctx context.Context, opts ...configOption) (*App, error) {
48+
func New(ctx context.Context, opts ...ConfigOption) (*App, error) {
4949
c := appConfig{}
5050

5151
for _, opt := range opts {
@@ -62,7 +62,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
6262
return nil, err
6363
}
6464

65-
apmServerApiKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger)
65+
apmServerAPIKey, apmServerSecretToken, err := loadAWSOptions(ctx, c.awsConfig, app.logger)
6666
if err != nil {
6767
return nil, err
6868
}
@@ -75,11 +75,17 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
7575
addr = c.logsapiAddr
7676
}
7777

78+
subscriptionLogStreams := []logsapi.SubscriptionType{logsapi.Platform}
79+
if c.enableFunctionLogSubscription {
80+
subscriptionLogStreams = append(subscriptionLogStreams, logsapi.Function)
81+
}
82+
7883
lc, err := logsapi.NewClient(
7984
logsapi.WithLogsAPIBaseURL(fmt.Sprintf("http://%s", c.awsLambdaRuntimeAPI)),
8085
logsapi.WithListenerAddress(addr),
8186
logsapi.WithLogBuffer(100),
8287
logsapi.WithLogger(app.logger),
88+
logsapi.WithSubscriptionTypes(subscriptionLogStreams...),
8389
)
8490
if err != nil {
8591
return nil, err
@@ -124,7 +130,7 @@ func New(ctx context.Context, opts ...configOption) (*App, error) {
124130
apmOpts = append(apmOpts,
125131
apmproxy.WithURL(os.Getenv("ELASTIC_APM_LAMBDA_APM_SERVER")),
126132
apmproxy.WithLogger(app.logger),
127-
apmproxy.WithAPIKey(apmServerApiKey),
133+
apmproxy.WithAPIKey(apmServerAPIKey),
128134
apmproxy.WithSecretToken(apmServerSecretToken),
129135
)
130136

app/config.go

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,56 +20,67 @@ package app
2020
import "github.com/aws/aws-sdk-go-v2/aws"
2121

2222
type appConfig struct {
23-
awsLambdaRuntimeAPI string
24-
awsConfig aws.Config
25-
extensionName string
26-
disableLogsAPI bool
27-
logLevel string
28-
logsapiAddr string
23+
awsLambdaRuntimeAPI string
24+
awsConfig aws.Config
25+
extensionName string
26+
disableLogsAPI bool
27+
enableFunctionLogSubscription bool
28+
logLevel string
29+
logsapiAddr string
2930
}
3031

31-
type configOption func(*appConfig)
32+
// ConfigOption is used to configure the lambda extension
33+
type ConfigOption func(*appConfig)
3234

3335
// WithLambdaRuntimeAPI sets the AWS Lambda Runtime API
3436
// endpoint (normally taken from $AWS_LAMBDA_RUNTIME_API),
3537
// used by the AWS client.
36-
func WithLambdaRuntimeAPI(api string) configOption {
38+
func WithLambdaRuntimeAPI(api string) ConfigOption {
3739
return func(c *appConfig) {
3840
c.awsLambdaRuntimeAPI = api
3941
}
4042
}
4143

4244
// WithExtensionName sets the extension name.
43-
func WithExtensionName(name string) configOption {
45+
func WithExtensionName(name string) ConfigOption {
4446
return func(c *appConfig) {
4547
c.extensionName = name
4648
}
4749
}
4850

4951
// WithoutLogsAPI disables the logs api.
50-
func WithoutLogsAPI() configOption {
52+
func WithoutLogsAPI() ConfigOption {
5153
return func(c *appConfig) {
5254
c.disableLogsAPI = true
5355
}
5456
}
5557

58+
// WithFunctionLogSubscription enables the logs api subscription
59+
// to function log stream. This option will only work if LogsAPI
60+
// is not disabled by the WithoutLogsAPI config option.
61+
func WithFunctionLogSubscription() ConfigOption {
62+
return func(c *appConfig) {
63+
c.enableFunctionLogSubscription = true
64+
}
65+
}
66+
5667
// WithLogLevel sets the log level.
57-
func WithLogLevel(level string) configOption {
68+
func WithLogLevel(level string) ConfigOption {
5869
return func(c *appConfig) {
5970
c.logLevel = level
6071
}
6172
}
6273

6374
// WithLogsapiAddress sets the listener address of the
6475
// server listening for logs event.
65-
func WithLogsapiAddress(s string) configOption {
76+
func WithLogsapiAddress(s string) ConfigOption {
6677
return func(c *appConfig) {
6778
c.logsapiAddr = s
6879
}
6980
}
7081

7182
// WithAWSConfig sets the AWS config.
72-
func WithAWSConfig(awsConfig aws.Config) configOption {
83+
func WithAWSConfig(awsConfig aws.Config) ConfigOption {
7384
return func(c *appConfig) {
7485
c.awsConfig = awsConfig
7586
}

app/run.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ package app
1919

2020
import (
2121
"context"
22-
"github.com/elastic/apm-aws-lambda/apmproxy"
23-
"github.com/elastic/apm-aws-lambda/extension"
24-
"github.com/elastic/apm-aws-lambda/logsapi"
2522
"fmt"
2623
"sync"
2724
"time"
25+
26+
"github.com/elastic/apm-aws-lambda/apmproxy"
27+
"github.com/elastic/apm-aws-lambda/extension"
2828
)
2929

3030
// Run runs the app.
@@ -57,7 +57,7 @@ func (app *App) Run(ctx context.Context) error {
5757
}()
5858

5959
if app.logsClient != nil {
60-
if err := app.logsClient.StartService([]logsapi.EventType{logsapi.Platform}, app.extensionClient.ExtensionID); err != nil {
60+
if err := app.logsClient.StartService(app.extensionClient.ExtensionID); err != nil {
6161
app.logger.Warnf("Error while subscribing to the Logs API: %v", err)
6262

6363
// disable logs API if the service failed to start
@@ -169,7 +169,15 @@ func (app *App) processEvent(
169169
runtimeDone := make(chan struct{})
170170
if app.logsClient != nil {
171171
go func() {
172-
if err := app.logsClient.ProcessLogs(invocationCtx, event.RequestID, app.apmClient, metadataContainer, runtimeDone, prevEvent); err != nil {
172+
if err := app.logsClient.ProcessLogs(
173+
invocationCtx,
174+
event.RequestID,
175+
event.InvokedFunctionArn,
176+
app.apmClient,
177+
metadataContainer,
178+
runtimeDone,
179+
prevEvent,
180+
); err != nil {
173181
app.logger.Errorf("Error while processing Lambda Logs ; %v", err)
174182
} else {
175183
close(runtimeDone)

logsapi/client.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,33 @@ import (
2828
"go.uber.org/zap"
2929
)
3030

31+
// SubscriptionType represents the log streams that the Lambda Logs API
32+
// provides for subscription
33+
type SubscriptionType string
34+
35+
const (
36+
// Platform logstream records events and errors related to
37+
// invocations and extensions
38+
Platform SubscriptionType = "platform"
39+
// Function logstream records logs written by lambda function
40+
// to stderr or stdout
41+
Function SubscriptionType = "function"
42+
// Extension logstream records logs generated by extension
43+
Extension SubscriptionType = "extension"
44+
)
45+
3146
// ClientOption is a config option for a Client.
3247
type ClientOption func(*Client)
3348

3449
// Client is the client used to subscribe to the Logs API.
3550
type Client struct {
36-
httpClient *http.Client
37-
logsAPIBaseURL string
38-
logsChannel chan LogEvent
39-
listenerAddr string
40-
server *http.Server
41-
logger *zap.SugaredLogger
51+
httpClient *http.Client
52+
logsAPIBaseURL string
53+
logsAPISubscriptionTypes []SubscriptionType
54+
logsChannel chan LogEvent
55+
listenerAddr string
56+
server *http.Server
57+
logger *zap.SugaredLogger
4258
}
4359

4460
// NewClient returns a new Client with the given URL.
@@ -69,7 +85,7 @@ func NewClient(opts ...ClientOption) (*Client, error) {
6985
}
7086

7187
// StartService starts the HTTP server listening for log events and subscribes to the Logs API.
72-
func (lc *Client) StartService(eventTypes []EventType, extensionID string) error {
88+
func (lc *Client) StartService(extensionID string) error {
7389
addr, err := lc.startHTTPServer()
7490
if err != nil {
7591
return err
@@ -93,7 +109,7 @@ func (lc *Client) StartService(eventTypes []EventType, extensionID string) error
93109

94110
uri := fmt.Sprintf("http://%s", net.JoinHostPort(host, port))
95111

96-
if err := lc.subscribe(eventTypes, extensionID, uri); err != nil {
112+
if err := lc.subscribe(lc.logsAPISubscriptionTypes, extensionID, uri); err != nil {
97113
if err := lc.Shutdown(); err != nil {
98114
lc.logger.Warnf("failed to shutdown the server: %v", err)
99115
}

logsapi/client_test.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package logsapi_test
1919

2020
import (
2121
"bytes"
22-
"github.com/elastic/apm-aws-lambda/logsapi"
2322
"encoding/json"
2423
"net/http"
2524
"net/http/httptest"
2625
"net/url"
2726
"testing"
2827

28+
"github.com/elastic/apm-aws-lambda/logsapi"
29+
2930
"github.com/stretchr/testify/require"
3031
"go.uber.org/zap/zaptest"
3132
)
@@ -103,13 +104,14 @@ func TestSubscribe(t *testing.T) {
103104
}))
104105
defer s.Close()
105106

106-
c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL))...)
107+
cOpts := append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithSubscriptionTypes(logsapi.Platform))
108+
c, err := logsapi.NewClient(cOpts...)
107109
require.NoError(t, err)
108110

109111
if tc.expectedErr {
110-
require.Error(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo"))
112+
require.Error(t, c.StartService("foo"))
111113
} else {
112-
require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "foo"))
114+
require.NoError(t, c.StartService("foo"))
113115
}
114116

115117
require.NoError(t, c.Shutdown())
@@ -141,9 +143,15 @@ func TestSubscribeAWSRequest(t *testing.T) {
141143
}))
142144
defer s.Close()
143145

144-
c, err := logsapi.NewClient(append(tc.opts, logsapi.WithLogsAPIBaseURL(s.URL), logsapi.WithLogBuffer(1))...)
146+
cOpts := append(
147+
tc.opts,
148+
logsapi.WithLogsAPIBaseURL(s.URL),
149+
logsapi.WithLogBuffer(1),
150+
logsapi.WithSubscriptionTypes(logsapi.Platform, logsapi.Function),
151+
)
152+
c, err := logsapi.NewClient(cOpts...)
145153
require.NoError(t, err)
146-
require.NoError(t, c.StartService([]logsapi.EventType{logsapi.Platform}, "testID"))
154+
require.NoError(t, c.StartService("testID"))
147155

148156
// Create a request to send to the logs listener
149157
platformDoneEvent := `{

logsapi/event.go

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,23 @@ import (
2525
"github.com/elastic/apm-aws-lambda/extension"
2626
)
2727

28-
// EventType represents the type of logs in Lambda
29-
type EventType string
28+
// LogEventType represents the log type that is received in the log messages
29+
type LogEventType string
3030

3131
const (
32-
// Platform is to receive logs emitted by the platform
33-
Platform EventType = "platform"
34-
// Function is to receive logs emitted by the function
35-
Function EventType = "function"
36-
// Extension is to receive logs emitted by the extension
37-
Extension EventType = "extension"
38-
)
39-
40-
// SubEventType is a Logs API sub event type
41-
type SubEventType string
42-
43-
const (
44-
// RuntimeDone event is sent when lambda function is finished it's execution
45-
RuntimeDone SubEventType = "platform.runtimeDone"
46-
Fault SubEventType = "platform.fault"
47-
Report SubEventType = "platform.report"
48-
Start SubEventType = "platform.start"
32+
// PlatformRuntimeDone event is sent when lambda function is finished it's execution
33+
PlatformRuntimeDone LogEventType = "platform.runtimeDone"
34+
PlatformFault LogEventType = "platform.fault"
35+
PlatformReport LogEventType = "platform.report"
36+
PlatformStart LogEventType = "platform.start"
37+
PlatformEnd LogEventType = "platform.end"
38+
FunctionLog LogEventType = "function"
4939
)
5040

5141
// LogEvent represents an event received from the Logs API
5242
type LogEvent struct {
5343
Time time.Time `json:"time"`
54-
Type SubEventType `json:"type"`
44+
Type LogEventType `json:"type"`
5545
StringRecord string
5646
Record LogEventRecord
5747
}
@@ -68,19 +58,26 @@ type LogEventRecord struct {
6858
func (lc *Client) ProcessLogs(
6959
ctx context.Context,
7060
requestID string,
61+
invokedFnArn string,
7162
apmClient *apmproxy.Client,
7263
metadataContainer *apmproxy.MetadataContainer,
7364
runtimeDoneSignal chan struct{},
7465
prevEvent *extension.NextEventResponse,
7566
) error {
67+
// platformStartReqID is to identify the requestID for the function
68+
// logs under the assumption that function logs for a specific request
69+
// ID will be bounded by PlatformStart and PlatformEnd events.
70+
var platformStartReqID string
7671
for {
7772
select {
7873
case logEvent := <-lc.logsChannel:
7974
lc.logger.Debugf("Received log event %v", logEvent.Type)
8075
switch logEvent.Type {
76+
case PlatformStart:
77+
platformStartReqID = logEvent.Record.RequestID
8178
// Check the logEvent for runtimeDone and compare the RequestID
8279
// to the id that came in via the Next API
83-
case RuntimeDone:
80+
case PlatformRuntimeDone:
8481
if logEvent.Record.RequestID == requestID {
8582
lc.logger.Info("Received runtimeDone event for this function invocation")
8683
runtimeDoneSignal <- struct{}{}
@@ -89,7 +86,7 @@ func (lc *Client) ProcessLogs(
8986

9087
lc.logger.Debug("Log API runtimeDone event request id didn't match")
9188
// Check if the logEvent contains metrics and verify that they can be linked to the previous invocation
92-
case Report:
89+
case PlatformReport:
9390
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
9491
lc.logger.Debug("Received platform report for the previous function invocation")
9592
processedMetrics, err := ProcessPlatformReport(metadataContainer, prevEvent, logEvent)
@@ -102,6 +99,21 @@ func (lc *Client) ProcessLogs(
10299
lc.logger.Warn("report event request id didn't match the previous event id")
103100
lc.logger.Debug("Log API runtimeDone event request id didn't match")
104101
}
102+
case FunctionLog:
103+
// TODO: @lahsivjar Buffer logs and send batches of data to APM-Server.
104+
// Buffering should account for metadata being available before sending.
105+
lc.logger.Debug("Received function log")
106+
processedLog, err := ProcessFunctionLog(
107+
metadataContainer,
108+
platformStartReqID,
109+
invokedFnArn,
110+
logEvent,
111+
)
112+
if err != nil {
113+
lc.logger.Errorf("Error processing function log : %v", err)
114+
} else {
115+
apmClient.EnqueueAPMData(processedLog)
116+
}
105117
}
106118
case <-ctx.Done():
107119
lc.logger.Debug("Current invocation over. Interrupting logs processing goroutine")

0 commit comments

Comments
 (0)