Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ func handlerWithArgs(ctx context.Context, event unmarshal.Event, nrClient util.N
log.Fatalf("error creating s3 client: %v", err)
}
err = s3.GetLogsFromS3Event(ctx, event.S3Event, awsConfiguration, channel, s3Client, s3.DefaultReaderFactory)
case unmarshal.SNS:
log.Debugf("processing sns event: %v", event.SNSEvent)
var s3Client s3.ObjectClient
s3Client, err = s3.NewS3Client(ctx)
if err != nil {
log.Fatalf("error creating s3 client: %v", err)
}
err = s3.GetLogsFromSNSEvent(ctx, event.SNSEvent, awsConfiguration, channel, s3Client, s3.DefaultReaderFactory)
default:
log.Error("unable to process unknown event type. Supported event types are cloudwatch and s3")
return nil
Expand Down
69 changes: 69 additions & 0 deletions src/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"compress/bzip2"
"compress/gzip"
"context"
jsonpkg "encoding/json"
"io"
"net/url"
"os"
"regexp"
"strings"
Expand Down Expand Up @@ -62,6 +64,73 @@ func GetLogsFromS3Event(ctx context.Context, s3Event events.S3Event, awsConfigur
return nil
}

// GetLogsFromSNSEvent batches logs from SNS into DetailedJson format and sends them to the specified channel.
// It returns an error if there is a problem retrieving or sending the logs.
func GetLogsFromSNSEvent(ctx context.Context, snsEvent events.SNSEvent, awsConfiguration util.AWSConfiguration, channel chan common.DetailedLogsBatch, s3Client ObjectClient, readerFactory ReaderFactory) error {
for _, record := range snsEvent.Records {

// Unmarshal the Message field into a json array
var messageData struct {
Records []struct {
S3 struct {
Bucket struct {
Name string `json:"name"`
}
Object struct {
Key string `json:"key"`
}
}
}
}

err := jsonpkg.Unmarshal([]byte(record.SNS.Message), &messageData)
if err != nil {
log.Errorf("failed to unmarshal SNS message: %v", err)
continue
}

if len(messageData.Records) != 0 {
for _, msg := range messageData.Records {
log.Debugf("processing sns event message: %v", msg)

// When S3 events come via SNS, the object key is URL-encoded.
// We need to decode it before we can use it to fetch the object.
decodedKey, err := url.QueryUnescape(msg.S3.Object.Key)
if err != nil {
log.Errorf("failed to URL decode S3 object key from SNS message: %v", err)
continue
}

// The Following are the common attributes for all log messages.
// New Relic uses these common attributes to generate Unique Entity ID.
attributes := common.LogAttributes{
"aws.accountId": awsConfiguration.AccountID,
"logBucketName": msg.S3.Bucket.Name,
"logObjectKey": decodedKey, // Use the decoded key
"aws.realm": awsConfiguration.Realm,
"aws.region": awsConfiguration.Region,
"instrumentation.provider": common.InstrumentationProvider,
"instrumentation.name": common.InstrumentationName,
"instrumentation.version": common.InstrumentationVersion,
}

if err := util.AddCustomMetaData(os.Getenv(common.CustomMetaData), attributes); err != nil {
log.Errorf("failed to add custom metadata %v", err)
return err
}

if err := buildMeltLogsFromS3Bucket(ctx, msg.S3.Bucket.Name, decodedKey, channel, attributes, s3Client, readerFactory); err != nil {
return err
}
}
} else {
log.Debugf("SNS event Message field contains no records")
}
}

return nil
}

// fetchS3Reader fetches an S3 object from the specified bucket and returns an io.ReadCloser for reading its contents.
// It returns the io.ReadCloser and any error encountered during the operation.
func fetchS3Reader(ctx context.Context, bucketName string, objectName string, s3Client ObjectClient) (io.ReadCloser, error) {
Expand Down
11 changes: 11 additions & 0 deletions src/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
const (
CLOUDWATCH = "cloudwatch" // CLOUDWATCH represents the event type for CloudWatch logs.
S3 = "s3" // S3 represents the event type for S3 events.
SNS = "sns" // SNS represents the event type for SNS events.
)

var log = logger.NewLogrusLogger(logger.WithDebugLevel())
Expand All @@ -21,6 +22,7 @@ type Event struct {
EventType string // EventType represents the type of the event.
CloudwatchLogsData events.CloudwatchLogsData // CloudwatchLogsData represents the CloudWatch logs data.
S3Event events.S3Event // S3Event represents the S3 event data.
SNSEvent events.SNSEvent // SNSEvent represents the SNS event data.
}

// UnmarshalJSON unmarshals the JSON data into the Event struct.
Expand Down Expand Up @@ -49,6 +51,15 @@ func (event *Event) UnmarshalJSON(data []byte) error {

return err
}
//Try to unmarshal the event as SNSEvent
var snsEvent events.SNSEvent
err = json.Unmarshal(data, &snsEvent)
if err == nil && len(snsEvent.Records) != 0 && snsEvent.Records[0].EventSource == "aws:sns" {
event.EventType = SNS
event.SNSEvent = snsEvent

return err
}

return nil
}
Loading