From 666a4a94c97877b654176d2597ba1f92cdee69d9 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Tue, 15 Apr 2025 10:02:15 +0530 Subject: [PATCH 01/16] Add table Lambda flow log --- aws/plugin.go | 2 + tables/lambda_log/lambda_log.go | 23 +++++++ tables/lambda_log/lambda_log_mapper.go | 89 ++++++++++++++++++++++++++ tables/lambda_log/lambda_log_table.go | 74 +++++++++++++++++++++ 4 files changed, 188 insertions(+) create mode 100644 tables/lambda_log/lambda_log.go create mode 100644 tables/lambda_log/lambda_log_mapper.go create mode 100644 tables/lambda_log/lambda_log_table.go diff --git a/aws/plugin.go b/aws/plugin.go index e80aec91..8326e619 100755 --- a/aws/plugin.go +++ b/aws/plugin.go @@ -13,6 +13,7 @@ import ( "github.com/turbot/tailpipe-plugin-aws/tables/cost_and_usage_report" "github.com/turbot/tailpipe-plugin-aws/tables/cost_optimization_recommendation" "github.com/turbot/tailpipe-plugin-aws/tables/guardduty_finding" + "github.com/turbot/tailpipe-plugin-aws/tables/lambda_log" "github.com/turbot/tailpipe-plugin-aws/tables/nlb_access_log" "github.com/turbot/tailpipe-plugin-aws/tables/s3_server_access_log" "github.com/turbot/tailpipe-plugin-aws/tables/vpc_flow_log" @@ -43,6 +44,7 @@ func init() { table.RegisterTable[*alb_connection_log.AlbConnectionLog, *alb_connection_log.AlbConnectionLogTable]() table.RegisterTable[*securityhub_finding.SecurityHubFinding, *securityhub_finding.SecurityHubFindingTable]() table.RegisterTable[*waf_traffic_log.WafTrafficLog, *waf_traffic_log.WafTrafficLogTable]() + table.RegisterTable[*lambda_log.LambdaLog, *lambda_log.LambdaLogTable]() // register custom table table.RegisterCustomTable[*vpc_flow_log.VpcFlowLogTable]() diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go new file mode 100644 index 00000000..d42e4d68 --- /dev/null +++ b/tables/lambda_log/lambda_log.go @@ -0,0 +1,23 @@ +package lambda_log + +import ( + "time" + + "github.com/turbot/tailpipe-plugin-sdk/schema" +) + +type LambdaLog struct { + schema.CommonFields + + Timestamp *time.Time `json:"timestamp,omitempty"` + RequestID *string `json:"request_id,omitempty"` + LogType *string `json:"log_type,omitempty"` + LogLevel *string `json:"log_level,omitempty"` + Message *string `json:"message,omitempty"` + + // Report Specific Fields + Duration *float64 `json:"duration,omitempty"` + BilledDuration *float64 `json:"billed_duration,omitempty"` + MemorySize *int `json:"memory_size,omitempty"` + MaxMemoryUsed *int `json:"max_memory_used,omitempty"` +} \ No newline at end of file diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go new file mode 100644 index 00000000..50c2ceee --- /dev/null +++ b/tables/lambda_log/lambda_log_mapper.go @@ -0,0 +1,89 @@ +package lambda_log + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "strings" + "time" + + // "github.com/turbot/tailpipe-plugin-aws/rows" + // "github.com/turbot/tailpipe-plugin-sdk/table" + "github.com/turbot/tailpipe-plugin-sdk/mappers" +) + +type LambdaLogMapper struct { +} + +func (m *LambdaLogMapper) Identifier() string { + return "lambda_log_mapper" +} + +func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*LambdaLog]) (*LambdaLog, error) { + row := &LambdaLog{} + + rawRow := "" + + switch v := a.(type) { + case []byte: + rawRow = string(v) + case string: + rawRow = v + case *string: + rawRow = *v + default: + return nil, fmt.Errorf("expected string, got %T", a) + } + + slog.Error("rawRow ---->>>", rawRow) + + rawRow = strings.TrimSuffix(rawRow, "\n") + fields := strings.Fields(rawRow) + + switch fields[0] { + case "START", "END": + row.LogType = &fields[0] + row.RequestID = &fields[2] + case "REPORT": + row.LogType = &fields[0] + row.RequestID = &fields[2] + duration, err := strconv.ParseFloat(fields[4], 64) + if err != nil { + return nil, fmt.Errorf("error parsing duration: %w", err) + } + row.Duration = &duration + billed, err := strconv.ParseFloat(fields[8], 64) + if err != nil { + return nil, fmt.Errorf("error parsing billed duration: %w", err) + } + row.BilledDuration = &billed + mem, err := strconv.Atoi(fields[12]) + if err != nil { + return nil, fmt.Errorf("error parsing memory size: %w", err) + } + row.MemorySize = &mem + maxMem, err := strconv.Atoi(fields[17]) + if err != nil { + return nil, fmt.Errorf("error parsing max memory used: %w", err) + } + row.MaxMemoryUsed = &maxMem + default: + t := "LOG" + row.LogType = &t + + ts, err := time.Parse(time.RFC3339, fields[0]) + if err != nil { + return nil, fmt.Errorf("error parsing timestamp: %w", err) + } + row.Timestamp = &ts + + row.RequestID = &fields[1] + row.LogLevel = &fields[2] + strip := fmt.Sprintf("%s%s", strings.Join(fields[:3], "\t"), "\t") + stripped := strings.TrimPrefix(rawRow, strip) + row.Message = &stripped + } + + return row, nil +} diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go new file mode 100644 index 00000000..5173ad21 --- /dev/null +++ b/tables/lambda_log/lambda_log_table.go @@ -0,0 +1,74 @@ +package lambda_log + +import ( + "time" + + "github.com/rs/xid" + "github.com/turbot/pipe-fittings/v2/utils" + + "github.com/turbot/tailpipe-plugin-aws/sources/cloudwatch_log_group" + "github.com/turbot/tailpipe-plugin-aws/sources/s3_bucket" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source_config" + "github.com/turbot/tailpipe-plugin-sdk/constants" + "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe-plugin-sdk/schema" + "github.com/turbot/tailpipe-plugin-sdk/table" +) + +const LambdaLogTableIdentifier = "aws_lambda_log" + +type LambdaLogTable struct{} + +func (c *LambdaLogTable) Identifier() string { + return LambdaLogTableIdentifier +} + +func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog], error) { + defaultS3ArtifactConfig := &artifact_source_config.ArtifactSourceConfigImpl{ + FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/lambda/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{DATA}.log"), + } + + return []*table.SourceMetadata[*LambdaLog]{ + { + // S3 artifact source + SourceName: s3_bucket.AwsS3BucketSourceIdentifier, + Mapper: &LambdaLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithDefaultArtifactSourceConfig(defaultS3ArtifactConfig), + artifact_source.WithRowPerLine(), + }, + }, + { + // S3 artifact source + SourceName: cloudwatch_log_group.AwsCloudwatchLogGroupSourceIdentifier, + Mapper: &LambdaLogMapper{}, + }, + { + // any artifact source + SourceName: constants.ArtifactSourceIdentifier, + Mapper: &LambdaLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithRowPerLine(), + }, + }, + }, nil +} + +func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema.SourceEnrichment) (*LambdaLog, error) { + row.CommonFields = sourceEnrichmentFields.CommonFields + + // Record standardization + row.TpID = xid.New().String() + row.TpIngestTimestamp = time.Now() + if !row.TpTimestamp.IsZero() { + row.TpTimestamp = *row.Timestamp + row.TpDate = row.Timestamp.Truncate(24 * time.Hour) + } + + row.TpIndex = schema.DefaultIndex + + // TODO: Add enrichment fields + + return row, nil +} From 0f3e5904bc38d7715524074a76d526468ebadee4 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Thu, 17 Apr 2025 09:47:22 +0530 Subject: [PATCH 02/16] Added support to collect the timestamp from cloudwatch source and enrich the row if row does not have the timestamp --- tables/lambda_log/lambda_log_mapper.go | 325 +++++++++++++++++++++---- tables/lambda_log/lambda_log_table.go | 15 +- 2 files changed, 292 insertions(+), 48 deletions(-) diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index 50c2ceee..46f0220f 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -2,14 +2,14 @@ package lambda_log import ( "context" + "encoding/json" "fmt" - "log/slog" + "regexp" + "slices" "strconv" "strings" "time" - // "github.com/turbot/tailpipe-plugin-aws/rows" - // "github.com/turbot/tailpipe-plugin-sdk/table" "github.com/turbot/tailpipe-plugin-sdk/mappers" ) @@ -20,70 +20,301 @@ func (m *LambdaLogMapper) Identifier() string { return "lambda_log_mapper" } +// JSON format for system logs as described in AWS docs +// Contains "time", "type", and "record" fields +// System logs are sometimes known as platform event logs +// https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs-advanced.html#monitoring-cloudwatchlogs-logformat +type jsonFormatSystemLog struct { + Time string `json:"time"` + Type string `json:"type"` + Record map[string]interface{} `json:"record"` +} + +// JSON format for application logs as described in AWS docs +// Contains "timestamp", "level", "message", and "requestId" fields +// Generated by Lambda functions using supported logging methods +// https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs-advanced.html#monitoring-cloudwatchlogs-logformat +type jsonFormatApplicationLog struct { + Timestamp string `json:"timestamp"` + Level string `json:"level"` + Message string `json:"message"` + RequestID string `json:"requestId"` +} + func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*LambdaLog]) (*LambdaLog, error) { row := &LambdaLog{} - rawRow := "" - + var raw string switch v := a.(type) { case []byte: - rawRow = string(v) + raw = string(v) case string: - rawRow = v + raw = v case *string: - rawRow = *v + raw = *v default: - return nil, fmt.Errorf("expected string, got %T", a) + return nil, fmt.Errorf("expected string or []byte, got %T", a) } - slog.Error("rawRow ---->>>", rawRow) + // First unmarshal into a minimal structure to detect log type + var probe map[string]json.RawMessage + if err := json.Unmarshal([]byte(raw), &probe); err == nil { + // Check for system log keys (platform events with time, type, record structure) + if _, hasType := probe["type"]; hasType { + var systemLog jsonFormatSystemLog + if err := json.Unmarshal([]byte(raw), &systemLog); err != nil { + return nil, fmt.Errorf("error unmarshalling as system log: %w", err) + } - rawRow = strings.TrimSuffix(rawRow, "\n") - fields := strings.Fields(rawRow) + // Parse system log fields based on AWS JSON format for system logs + if t, err := time.Parse(time.RFC3339, systemLog.Time); err == nil { + row.Timestamp = &t + } + row.LogType = &systemLog.Type + if msgBytes, err := json.Marshal(systemLog.Record); err == nil { + message := string(msgBytes) + row.Message = &message + } else { + // fallback in case of marshal error + message := fmt.Sprintf("%v", systemLog.Record) + row.Message = &message + } - switch fields[0] { - case "START", "END": - row.LogType = &fields[0] - row.RequestID = &fields[2] - case "REPORT": - row.LogType = &fields[0] - row.RequestID = &fields[2] - duration, err := strconv.ParseFloat(fields[4], 64) - if err != nil { - return nil, fmt.Errorf("error parsing duration: %w", err) + // Extract specific fields from platform event record + if requestId, ok := systemLog.Record["requestId"].(string); ok { + row.RequestID = &requestId + } + // Extract metrics from platform.report events + if metrics, ok := systemLog.Record["metrics"].(map[string]interface{}); ok { + if v, ok := metrics["durationMs"].(float64); ok { + row.Duration = &v + } + if v, ok := metrics["billedDurationMs"].(float64); ok { + row.BilledDuration = &v + } + if v, ok := metrics["memorySizeMB"].(float64); ok { + mem := int(v) + row.MemorySize = &mem + } + if v, ok := metrics["maxMemoryUsedMB"].(float64); ok { + mem := int(v) + row.MaxMemoryUsed = &mem + } + } + + return row, nil + } else if _, hasLevel := probe["level"]; hasLevel { + // Fallback to application log (JSON format with timestamp, level, message, requestId) + var appLog jsonFormatApplicationLog + if err := json.Unmarshal([]byte(raw), &appLog); err != nil { + return nil, fmt.Errorf("error unmarshalling as application log: %w", err) + } + + // Parse application log fields based on AWS JSON format for app logs + if t, err := time.Parse(time.RFC3339, appLog.Timestamp); err == nil { + row.Timestamp = &t + } + row.LogLevel = &appLog.Level + row.Message = &appLog.Message + row.RequestID = &appLog.RequestID } - row.Duration = &duration - billed, err := strconv.ParseFloat(fields[8], 64) - if err != nil { - return nil, fmt.Errorf("error parsing billed duration: %w", err) + } else if len(strings.Fields(raw)) >= 4 && isTimestamp(strings.Fields(raw)[0]) { // plain text application log + // Handle plain text application logs (format: timestamp requestID logLevel message) + // Example: 2024-10-27T19:17:45.586Z 79b4f56e-95b1-4643-9700-2807f4e68189 INFO some log message + fields := strings.Fields(raw) + // Timestamp + if t, err := time.Parse(time.RFC3339, fields[0]); err == nil { + row.Timestamp = &t } - row.BilledDuration = &billed - mem, err := strconv.Atoi(fields[12]) - if err != nil { - return nil, fmt.Errorf("error parsing memory size: %w", err) + // RequestID + var uuidRegex = regexp.MustCompile(`^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$`) + if uuidRegex.MatchString(fields[1]) { + row.RequestID = &fields[1] } - row.MemorySize = &mem - maxMem, err := strconv.Atoi(fields[17]) + // LogLevel + if slices.Contains([]string{"INFO", "DEBUG", "WARN", "ERROR", "FATAL", "TRACE", ""}, fields[2]) { + row.LogLevel = &fields[2] + } + // Message + if len(fields) >= 4 { + msg := strings.Join(fields[3:], " ") + row.Message = &msg + } + } else { + // Handle legacy plain text system logs (START, END, REPORT format) + row, err := parseLambdaPainTextLog(raw) if err != nil { - return nil, fmt.Errorf("error parsing max memory used: %w", err) + return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) } - row.MaxMemoryUsed = &maxMem - default: - t := "LOG" - row.LogType = &t + row.Message = &raw + } - ts, err := time.Parse(time.RFC3339, fields[0]) - if err != nil { - return nil, fmt.Errorf("error parsing timestamp: %w", err) + return row, nil +} + +// parseLambdaPainTextLog handles the legacy plain text format for Lambda system logs +// These include START, END, and REPORT messages that aren't in JSON format +// Example: +// START RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 +// END RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 +// REPORT RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 128 MB Max Memory Used: 84 MB +func parseLambdaPainTextLog(line string) (*LambdaLog, error) { + log := &LambdaLog{} + now := time.Now().UTC() + log.Timestamp = &now + + switch { + case strings.HasPrefix(line, "START RequestId:"): + // Parse START log line + log.LogType = ptr("START") + if id := extractAfter(line, "START RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) } - row.Timestamp = &ts + log.Message = ptr(line) - row.RequestID = &fields[1] - row.LogLevel = &fields[2] - strip := fmt.Sprintf("%s%s", strings.Join(fields[:3], "\t"), "\t") - stripped := strings.TrimPrefix(rawRow, strip) - row.Message = &stripped + case strings.HasPrefix(line, "END RequestId:"): + // Parse END log line + log.LogType = ptr("END") + if id := extractAfter(line, "END RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) + } + log.Message = ptr(line) + + case strings.HasPrefix(line, "REPORT RequestId:"): + // Parse REPORT log line which contains metrics + log.LogType = ptr("REPORT") + log.Message = ptr(line) + + // Extract RequestId + if id := extractAfter(line, "REPORT RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) + } + + // Extract numeric metrics from REPORT line + if val := extractBetween(line, "Duration: ", " ms"); val != "" { + if f, err := strconv.ParseFloat(val, 64); err == nil { + log.Duration = &f + } + } + if val := extractBetween(line, "Billed Duration: ", " ms"); val != "" { + if f, err := strconv.ParseFloat(val, 64); err == nil { + log.BilledDuration = &f + } + } + if val := extractBetween(line, "Memory Size: ", " MB"); val != "" { + if i, err := strconv.Atoi(val); err == nil { + log.MemorySize = &i + } + } + if val := extractBetween(line, "Max Memory Used: ", " MB"); val != "" { + if i, err := strconv.Atoi(val); err == nil { + log.MaxMemoryUsed = &i + } + } } - return row, nil + return log, nil +} + +// ptr is a helper function to return a pointer to a value +func ptr[T any](v T) *T { + return &v +} + +// extractAfter extracts substring that comes after a specific prefix +func extractAfter(s, prefix string) string { + idx := strings.Index(s, prefix) + if idx == -1 { + return "" + } + return strings.TrimSpace(s[idx+len(prefix):]) +} + +// extractBetween extracts substring between start and end strings +func extractBetween(s, start, end string) string { + i := strings.Index(s, start) + if i == -1 { + return "" + } + i += len(start) + j := strings.Index(s[i:], end) + if j == -1 { + return "" + } + return strings.TrimSpace(s[i : i+j]) +} + +// isTimestamp checks if the input string matches any common Go time formats. +// Used to identify if a plain text log starts with a timestamp +func isTimestamp(s string) bool { + layouts := []string{ + time.RFC3339, + time.RFC3339Nano, + time.RFC1123, + time.RFC1123Z, + time.RFC822, + time.RFC822Z, + time.RFC850, + "2006-01-02 15:04:05", // Common log format + "2006-01-02 15:04:05.000000", // With microseconds + "2006-01-02T15:04:05", // ISO-like without timezone + "2006-01-02T15:04:05Z07:00", // ISO with TZ offset + "20060102T150405Z", // AWS style compact + } + + for _, layout := range layouts { + if _, err := time.Parse(layout, s); err == nil { + return true + } + } + + return false } + +// Commented out legacy code +// rawRow = strings.TrimSuffix(rawRow, "\n") +// fields := strings.Fields(rawRow) + +// switch fields[0] { +// case "START", "END": +// row.LogType = &fields[0] +// row.RequestID = &fields[2] +// case "REPORT": +// row.LogType = &fields[0] +// row.RequestID = &fields[2] +// duration, err := strconv.ParseFloat(fields[4], 64) +// if err != nil { +// return nil, fmt.Errorf("error parsing duration: %w", err) +// } +// row.Duration = &duration +// billed, err := strconv.ParseFloat(fields[8], 64) +// if err != nil { +// return nil, fmt.Errorf("error parsing billed duration: %w", err) +// } +// row.BilledDuration = &billed +// mem, err := strconv.Atoi(fields[12]) +// if err != nil { +// return nil, fmt.Errorf("error parsing memory size: %w", err) +// } +// row.MemorySize = &mem +// maxMem, err := strconv.Atoi(fields[17]) +// if err != nil { +// return nil, fmt.Errorf("error parsing max memory used: %w", err) +// } +// row.MaxMemoryUsed = &maxMem +// default: +// t := "LOG" +// row.LogType = &t + +// ts, err := time.Parse(time.RFC3339, fields[0]) +// if err != nil { +// return nil, fmt.Errorf("error parsing timestamp: %w", err) +// } +// row.Timestamp = &ts + +// row.RequestID = &fields[1] +// row.LogLevel = &fields[2] +// strip := fmt.Sprintf("%s%s", strings.Join(fields[:3], "\t"), "\t") +// stripped := strings.TrimPrefix(rawRow, strip) +// row.Message = &stripped +// } diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index 5173ad21..b0d36c6f 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -1,6 +1,7 @@ package lambda_log import ( + "regexp" "time" "github.com/rs/xid" @@ -61,13 +62,25 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema // Record standardization row.TpID = xid.New().String() row.TpIngestTimestamp = time.Now() - if !row.TpTimestamp.IsZero() { + if row.Timestamp != nil { row.TpTimestamp = *row.Timestamp row.TpDate = row.Timestamp.Truncate(24 * time.Hour) + } else if !row.TpTimestamp.IsZero() { + row.TpDate = row.TpTimestamp.Truncate(24 * time.Hour) } row.TpIndex = schema.DefaultIndex + var arnRegex = regexp.MustCompile(`arn:aws:[^,\s'"\\]+`) + + seen := map[string]struct{}{} + for _, match := range arnRegex.FindAllString(*row.Message, -1) { + if _, exists := seen[match]; !exists { + seen[match] = struct{}{} + row.TpAkas = append(row.TpAkas, match) + } + } + // TODO: Add enrichment fields return row, nil From aa34d6d5459c2e6adc36de515776c2c7f4c13473 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Fri, 25 Apr 2025 13:44:45 +0530 Subject: [PATCH 03/16] update --- docs/tables/aws_lambda_log/queries.md | 476 +++++++++++++++++++++++++ tables/lambda_log/lambda_log.go | 14 +- tables/lambda_log/lambda_log_mapper.go | 60 +++- tables/lambda_log/lambda_log_table.go | 16 +- 4 files changed, 542 insertions(+), 24 deletions(-) create mode 100644 docs/tables/aws_lambda_log/queries.md diff --git a/docs/tables/aws_lambda_log/queries.md b/docs/tables/aws_lambda_log/queries.md new file mode 100644 index 00000000..44b7b0fb --- /dev/null +++ b/docs/tables/aws_lambda_log/queries.md @@ -0,0 +1,476 @@ +## Activity Examples + +### Recent Lambda Log Activity + +This query shows the 100 most recent Lambda log entries across all functions. Real-time monitoring of Lambda activity helps with troubleshooting issues and understanding the current state of your serverless applications. + +```sql +select + tp_timestamp, + request_id, + log_type, + log_level, + message, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name +from + aws_lambda_log +order by + tp_timestamp desc +limit + 100; +``` + +```yaml +folder: Lambda +``` + +### Lambda Execution Trends by Hour + +This query shows Lambda execution trends by hour for each function. Understanding these patterns helps with capacity planning, identifying unusual activity spikes, and optimizing resources based on time-of-day usage patterns. + +```sql +select + date_trunc('hour', tp_timestamp) as hour, + count(*) as execution_count, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name +from + aws_lambda_log +where + log_type = 'START' +group by + hour, + tp_source_name +order by + hour desc, + execution_count desc; +``` + +```yaml +folder: Lambda +``` + +### Application Log Level Distribution + +This query analyzes the distribution of log levels in Lambda application logs. Reviewing log level patterns helps identify functions generating excessive logs or experiencing frequent errors that may impact application reliability. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_level, + count(*) as log_count +from + aws_lambda_log +where + log_level is not null +group by + tp_source_name, + log_level +order by + lambda_function_name, + log_count desc; +``` + +```yaml +folder: Lambda +``` + +### Execution Flow for a Specific Request + +This query shows the complete execution flow for a specific Lambda request. Tracing the sequence of logs for a single invocation helps debug issues and understand function behavior from start to finish. + +```sql +select + tp_timestamp, + log_type, + log_level, + substring(message, 1, 200) as message_preview, + case + when log_type in ('START', 'END', 'REPORT', 'INIT_START') then 'System Log' + when log_level is not null then 'Application Log' + else 'Other' + end as log_category, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name +from + aws_lambda_log +where + request_id = '9286dcef-4fac-4706-99f6-0f0087763dbc' +order by + tp_timestamp asc; +``` + +```yaml +folder: Lambda +``` + +## Detection Examples + +### Lambda Function Error Analysis + +This query finds the 100 most recent Lambda function errors and timeouts. Monitoring these errors helps identify reliability issues and functions that need error handling improvements for better application stability. + +```sql +select + tp_timestamp, + request_id, + message, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name +from + aws_lambda_log +where + log_level = 'ERROR' + or message like '%Task timed out%' + or message like '%Memory Size exceeded%' + or message like '%Process exited before completing request%' +order by + tp_timestamp desc +limit + 100; +``` + +```yaml +folder: Lambda +``` + +### Functions with High Billing-to-Execution Time Ratio + +This query identifies functions with high billing-to-execution time ratios. Optimizing these functions can reduce costs by addressing the gap between actual runtime and billed duration, especially for functions with significant billing overhead. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + round(avg(duration), 2) as avg_duration_ms, + round(avg(billed_duration), 2) as avg_billed_duration_ms, + round(avg(billed_duration - duration), 2) as avg_billing_overhead_ms, + round(avg(billed_duration * 100.0 / duration) - 100, 2) as billing_overhead_percent, + count(*) as execution_count +from + aws_lambda_log +where + log_type = 'REPORT' + and duration > 0 + and billed_duration is not null +group by + tp_source_name +having + avg(billed_duration - duration) > 10 +order by + billing_overhead_percent desc; +``` + +```yaml +folder: Lambda +``` + +### Lambda Cold Start Analysis + +This query analyzes Lambda cold starts by counting initialization events for each function. Identifying functions with frequent cold starts helps prioritize optimization efforts to reduce latency and improve user experience. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + count(distinct request_id) as cold_start_count, + avg(duration) as avg_init_duration_ms +from + aws_lambda_log +where + log_type = 'INIT_START' + or message like '%Init Duration:%' +group by + tp_source_name +order by + cold_start_count desc; +``` + +```yaml +folder: Lambda +``` + +## Operational Examples + +### Top 10 Slowest Lambda Function Executions + +This query identifies the 10 slowest Lambda function executions by examining REPORT logs. Finding these slow executions helps pinpoint specific instances that require optimization to improve overall performance. + +```sql +select + tp_timestamp, + request_id, + duration as duration_ms, + billed_duration as billed_duration_ms, + memory_size as allocated_memory_mb, + max_memory_used as used_memory_mb, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name +from + aws_lambda_log +where + log_type = 'REPORT' + and duration is not null +order by + duration desc +limit + 10; +``` + +```yaml +folder: Lambda +``` + +### Memory Utilization Efficiency + +This query calculates memory utilization efficiency for each Lambda function. Finding the right memory allocation helps optimize costs while maintaining performance by identifying over-provisioned or under-provisioned functions. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + memory_size as allocated_memory_mb, + round(avg(max_memory_used), 2) as avg_used_memory_mb, + round(avg(max_memory_used * 100.0 / memory_size), 2) as memory_utilization_percent, + count(*) as execution_count +from + aws_lambda_log +where + log_type = 'REPORT' + and memory_size is not null + and max_memory_used is not null +group by + tp_source_name, + memory_size +order by + memory_utilization_percent desc; +``` + +```yaml +folder: Lambda +``` + +### Detailed Request Execution Analysis + +This query analyzes each request's complete execution lifecycle including message patterns and timing between execution phases. This helps identify bottlenecks in function execution flow and understand the context of each request through message analysis. + +```sql +with request_phases as ( + select + request_id, + tp_source_name, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as function_name, + min(case when log_type = 'START' then tp_timestamp end) as start_time, + min(case when log_type = 'END' then tp_timestamp end) as end_time, + min(case when log_type = 'REPORT' then tp_timestamp end) as report_time, + max(case when log_type = 'REPORT' then duration end) as duration_ms, + max(case when log_type = 'REPORT' then billed_duration end) as billed_duration_ms, + max(case when log_type = 'REPORT' then memory_size end) as allocated_memory_mb, + max(case when log_type = 'REPORT' then max_memory_used end) as max_memory_used_mb, + count(case when log_level = 'INFO' then 1 end) as info_log_count, + count(case when log_level = 'ERROR' then 1 end) as error_log_count, + count(case when log_level = 'WARN' then 1 end) as warn_log_count, + count(case when log_level = 'DEBUG' then 1 end) as debug_log_count, + bool_or(message like '%Task timed out%') as has_timeout, + bool_or(message like '%Init Duration:%') as has_cold_start, + bool_or(message like '%Memory Size:%' and message like '%Max Memory Used:%') as has_memory_metrics, + bool_or(message like '%error%' or message like '%exception%' or message like '%fail%') as has_error_keywords + from + aws_lambda_log + where + request_id is not null + group by + request_id, + tp_source_name, + function_name +), +message_samples as ( + select + request_id, + array_agg(message) as message_samples + from + (select + request_id, + message, + row_number() over (partition by request_id order by tp_timestamp) as rn + from + aws_lambda_log + where + request_id is not null + and log_level is not null + order by + tp_timestamp) t + where + rn <= 3 + group by + request_id +) +select + rp.request_id, + rp.start_time, + rp.end_time, + extract(epoch from (rp.end_time - rp.start_time)) * 1000 as total_execution_time_ms, + rp.duration_ms as reported_duration_ms, + rp.billed_duration_ms, + rp.allocated_memory_mb, + rp.max_memory_used_mb, + rp.info_log_count, + rp.error_log_count, + rp.warn_log_count, + rp.debug_log_count, + rp.has_timeout, + rp.has_cold_start, + rp.has_memory_metrics, + rp.has_error_keywords, + ms.message_samples[1] as first_message, + rp.function_name as lambda_function_name +from + request_phases rp +left join + message_samples ms on rp.request_id = ms.request_id +where + rp.start_time is not null + and rp.end_time is not null +order by + rp.start_time desc +limit + 50; +``` + +```yaml +folder: Lambda +``` + +## Volume Examples + +### Lambda Function Execution Summary + +This query summarizes execution metrics for each Lambda function. It helps identify frequently invoked functions and their performance characteristics for cost optimization and performance tuning. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + count(*) as execution_count, + avg(duration) as avg_duration_ms, + min(duration) as min_duration_ms, + max(duration) as max_duration_ms, + avg(billed_duration) as avg_billed_duration_ms, + avg(max_memory_used) as avg_memory_used_mb, + max(memory_size) as allocated_memory_mb +from + aws_lambda_log +where + log_type = 'REPORT' +group by + tp_source_name +order by + execution_count desc; +``` + +```yaml +folder: Lambda +``` + +## Baseline Examples + +### Lambda Duration Distribution + +This query categorizes Lambda function executions into duration ranges. Understanding execution time distribution helps identify inconsistent performance patterns and optimize functions that show high variability in runtime. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + case + when duration < 100 then '< 100ms' + when duration < 500 then '100-500ms' + when duration < 1000 then '500ms-1s' + when duration < 3000 then '1s-3s' + when duration < 10000 then '3s-10s' + else '> 10s' + end as duration_range, + count(*) as execution_count +from + aws_lambda_log +where + log_type = 'REPORT' + and duration is not null +group by + tp_source_name, + duration_range +order by + lambda_function_name, + case + when duration_range = '< 100ms' then 1 + when duration_range = '100-500ms' then 2 + when duration_range = '500ms-1s' then 3 + when duration_range = '1s-3s' then 4 + when duration_range = '3s-10s' then 5 + else 6 + end; +``` + +```yaml +folder: Lambda +``` + +### Most Common Error Messages by Function + +This query identifies the most common error messages for each Lambda function. Finding recurring error patterns helps prioritize which issues to fix first and understand the reliability challenges affecting specific functions. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + regexp_replace(message, '^.*Error: ', '') as error_message_pattern, + count(*) as occurrence_count +from + aws_lambda_log +where + log_level = 'ERROR' + or message like '%Error:%' + or message like '%Exception:%' + or message like '%Task timed out%' +group by + tp_source_name, + error_message_pattern +order by + lambda_function_name, + occurrence_count desc; +``` + +```yaml +folder: Lambda +``` + +### Error Message Pattern Analysis + +This query analyzes error patterns across all Lambda functions to identify systematic issues. Detecting common error signatures across multiple functions helps identify widespread problems that may require architectural changes rather than function-specific fixes. + +```sql +with error_patterns as ( + select + regexp_replace(message, '([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}|[\d\.]+|"[^"]*"|''[^'']*'')', 'X') as normalized_error, + count(*) as pattern_count, + count(distinct regexp_replace(tp_source_name, '^/aws/lambda/', '')) as affected_functions_count, + array_agg(distinct regexp_replace(tp_source_name, '^/aws/lambda/', '')) as affected_functions + from + aws_lambda_log + where + log_level = 'ERROR' + or message like '%Error:%' + or message like '%Exception:%' + or message like '%Task timed out%' + group by + normalized_error + having + count(*) > 1 +) +select + normalized_error as error_pattern, + pattern_count as occurrence_count, + affected_functions_count, + affected_functions +from + error_patterns +order by + pattern_count desc, + affected_functions_count desc +limit + 20; +``` + +```yaml +folder: Lambda +``` diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go index d42e4d68..91e584e6 100644 --- a/tables/lambda_log/lambda_log.go +++ b/tables/lambda_log/lambda_log.go @@ -9,15 +9,17 @@ import ( type LambdaLog struct { schema.CommonFields - Timestamp *time.Time `json:"timestamp,omitempty"` - RequestID *string `json:"request_id,omitempty"` - LogType *string `json:"log_type,omitempty"` - LogLevel *string `json:"log_level,omitempty"` - Message *string `json:"message,omitempty"` + Timestamp *time.Time `json:"timestamp,omitempty"` + RequestID *string `json:"request_id,omitempty"` + LogType *string `json:"log_type,omitempty"` + LogLevel *string `json:"log_level,omitempty"` + Message *string `json:"message,omitempty"` + RawMessage *string `json:"raw_message,omitempty"` + LogGroupName *string `json:"log_group_name,omitempty"` // Report Specific Fields Duration *float64 `json:"duration,omitempty"` BilledDuration *float64 `json:"billed_duration,omitempty"` MemorySize *int `json:"memory_size,omitempty"` MaxMemoryUsed *int `json:"max_memory_used,omitempty"` -} \ No newline at end of file +} diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index 46f0220f..1e26ceb2 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -55,6 +55,14 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* default: return nil, fmt.Errorf("expected string or []byte, got %T", a) } + raw = strings.TrimSpace(raw) + if strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") { + lambdaLog, err := parseLambdaPainTextLog(raw) + if err != nil { + return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) + } + return lambdaLog, nil + } // First unmarshal into a minimal structure to detect log type var probe map[string]json.RawMessage @@ -117,6 +125,8 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.LogLevel = &appLog.Level row.Message = &appLog.Message row.RequestID = &appLog.RequestID + + return row, nil } } else if len(strings.Fields(raw)) >= 4 && isTimestamp(strings.Fields(raw)[0]) { // plain text application log // Handle plain text application logs (format: timestamp requestID logLevel message) @@ -140,14 +150,17 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* msg := strings.Join(fields[3:], " ") row.Message = &msg } - } else { - // Handle legacy plain text system logs (START, END, REPORT format) - row, err := parseLambdaPainTextLog(raw) - if err != nil { - return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) - } - row.Message = &raw } + // else { + // // Handle legacy plain text system logs (START, END, REPORT format) + // row, err := parseLambdaPainTextLog(raw) + // if err != nil { + // return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) + // } + // row.Message = &raw + // } + + row.RawMessage = &raw return row, nil } @@ -160,30 +173,49 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* // REPORT RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 128 MB Max Memory Used: 84 MB func parseLambdaPainTextLog(line string) (*LambdaLog, error) { log := &LambdaLog{} - now := time.Now().UTC() - log.Timestamp = &now - + if id := extractAfter(line, "RequestId: "); id != "" { + log.RequestID = &id + } + log.RawMessage = &line switch { case strings.HasPrefix(line, "START RequestId:"): // Parse START log line log.LogType = ptr("START") if id := extractAfter(line, "START RequestId: "); id != "" { log.RequestID = ptr(strings.Fields(id)[0]) + if len(strings.Split(line, "RequestId: " + id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: " + id)[1] + } + } else { + log.Message = ptr(line) + } + case strings.HasPrefix(line, "INIT_START"): + // Parse START log line + log.LogType = ptr("INIT_START") + if id := extractAfter(line, "INIT_START RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) + if len(strings.Split(line, "RequestId: " + id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: " + id)[1] + } + } else { + msg := strings.Split(line, "INIT_START " + id)[1] + log.Message = &msg } - log.Message = ptr(line) - case strings.HasPrefix(line, "END RequestId:"): // Parse END log line log.LogType = ptr("END") if id := extractAfter(line, "END RequestId: "); id != "" { log.RequestID = ptr(strings.Fields(id)[0]) + if len(strings.Split(line, "RequestId: " + id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: " + id)[1] + } + } else { + log.Message = ptr(line) } - log.Message = ptr(line) case strings.HasPrefix(line, "REPORT RequestId:"): // Parse REPORT log line which contains metrics log.LogType = ptr("REPORT") - log.Message = ptr(line) // Extract RequestId if id := extractAfter(line, "REPORT RequestId: "); id != "" { diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index b0d36c6f..b7a16945 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -74,13 +74,21 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema var arnRegex = regexp.MustCompile(`arn:aws:[^,\s'"\\]+`) seen := map[string]struct{}{} - for _, match := range arnRegex.FindAllString(*row.Message, -1) { - if _, exists := seen[match]; !exists { - seen[match] = struct{}{} - row.TpAkas = append(row.TpAkas, match) + if row.Message != nil { + for _, match := range arnRegex.FindAllString(*row.Message, -1) { + if _, exists := seen[match]; !exists { + seen[match] = struct{}{} + row.TpAkas = append(row.TpAkas, match) + } } } + row.LogGroupName = row.TpSourceName + + // if row.Message == nil { + // return nil, nil + // } + // TODO: Add enrichment fields return row, nil From ca4e2e857c68afc817a61152b33fd0768ba202e7 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Mon, 12 May 2025 11:29:17 +0530 Subject: [PATCH 04/16] Updated the query and tested the for different tables --- docs/tables/aws_lambda_log/queries.md | 257 ++++++++++++++++++-------- 1 file changed, 181 insertions(+), 76 deletions(-) diff --git a/docs/tables/aws_lambda_log/queries.md b/docs/tables/aws_lambda_log/queries.md index 44b7b0fb..a062b78e 100644 --- a/docs/tables/aws_lambda_log/queries.md +++ b/docs/tables/aws_lambda_log/queries.md @@ -2,7 +2,7 @@ ### Recent Lambda Log Activity -This query shows the 100 most recent Lambda log entries across all functions. Real-time monitoring of Lambda activity helps with troubleshooting issues and understanding the current state of your serverless applications. +This query shows the most recent Lambda log entries across all functions. Real-time monitoring of Lambda activity helps with troubleshooting issues and understanding the current state of your serverless applications. ```sql select @@ -11,7 +11,8 @@ select log_type, log_level, message, - regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name from aws_lambda_log order by @@ -31,15 +32,17 @@ This query shows Lambda execution trends by hour for each function. Understandin ```sql select date_trunc('hour', tp_timestamp) as hour, - count(*) as execution_count, - regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(*) as execution_count from aws_lambda_log where log_type = 'START' group by hour, - tp_source_name + lambda_function_name, + log_group_name order by hour desc, execution_count desc; @@ -56,6 +59,7 @@ This query analyzes the distribution of log levels in Lambda application logs. R ```sql select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, log_level, count(*) as log_count from @@ -63,7 +67,8 @@ from where log_level is not null group by - tp_source_name, + lambda_function_name, + log_group_name, log_level order by lambda_function_name, @@ -89,7 +94,8 @@ select when log_level is not null then 'Application Log' else 'Other' end as log_category, - regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name from aws_lambda_log where @@ -104,23 +110,31 @@ folder: Lambda ## Detection Examples -### Lambda Function Error Analysis +### Lambda Error and Timeout Analysis -This query finds the 100 most recent Lambda function errors and timeouts. Monitoring these errors helps identify reliability issues and functions that need error handling improvements for better application stability. +This query finds the most recent Lambda function errors, timeouts, and other critical issues. Monitoring these errors helps identify reliability issues and functions that need error handling improvements for better application stability. ```sql select tp_timestamp, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, request_id, message, - regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name + case + when message ilike '%timed out%' then 'Timeout' + when message ilike '%memory size exceeded%' then 'Memory Exceeded' + when message ilike '%process exited before completing request%' then 'Process Exited' + when log_level = 'ERROR' then 'Application Error' + else 'Other Error' + end as error_type from aws_lambda_log where log_level = 'ERROR' - or message like '%Task timed out%' - or message like '%Memory Size exceeded%' - or message like '%Process exited before completing request%' + or message ilike '%timed out%' + or message ilike '%memory size exceeded%' + or message ilike '%process exited before completing request%' order by tp_timestamp desc limit @@ -138,10 +152,11 @@ This query identifies functions with high billing-to-execution time ratios. Opti ```sql select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, round(avg(duration), 2) as avg_duration_ms, round(avg(billed_duration), 2) as avg_billed_duration_ms, round(avg(billed_duration - duration), 2) as avg_billing_overhead_ms, - round(avg(billed_duration * 100.0 / duration) - 100, 2) as billing_overhead_percent, + round(avg(billed_duration * 100.0 / nullif(duration, 0)) - 100, 2) as billing_overhead_percent, count(*) as execution_count from aws_lambda_log @@ -150,7 +165,8 @@ where and duration > 0 and billed_duration is not null group by - tp_source_name + lambda_function_name, + log_group_name having avg(billed_duration - duration) > 10 order by @@ -168,15 +184,21 @@ This query analyzes Lambda cold starts by counting initialization events for eac ```sql select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, - count(distinct request_id) as cold_start_count, - avg(duration) as avg_init_duration_ms + log_group_name, + count(distinct request_id) as total_executions, + count(distinct case when message ilike '%init duration%' then request_id end) as cold_start_count, + round(count(distinct case when message ilike '%init duration%' then request_id end) * 100.0 / + nullif(count(distinct request_id), 0), 2) as cold_start_percentage, + avg(case when message ilike '%init duration%' + then cast(regexp_extract(message, 'Init Duration: ([0-9.]+) ms', 1) as double) end) as avg_init_duration_ms from aws_lambda_log where - log_type = 'INIT_START' - or message like '%Init Duration:%' + tp_timestamp >= current_timestamp - interval '7 day' + and (log_type = 'INIT_START' or message ilike '%init duration%') group by - tp_source_name + lambda_function_name, + log_group_name order by cold_start_count desc; ``` @@ -185,30 +207,63 @@ order by folder: Lambda ``` +### Lambda Throttling Analysis + +This query analyzes throttling patterns by hour to identify capacity constraints. Understanding when throttling occurs helps optimize concurrency limits and adjust scaling policies to prevent service disruptions during peak usage times. + +```sql +select + date_trunc('hour', tp_timestamp) as hour, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(*) as total_logs, + count(case when message ilike '%function was throttled%' then 1 end) as throttle_count, + round(count(case when message ilike '%function was throttled%' then 1 end) * 100.0 / nullif(count(*), 0), 2) as throttle_percentage +from + aws_lambda_log +where + tp_timestamp >= current_timestamp - interval '7 day' +group by + hour, + lambda_function_name, + log_group_name +having + count(case when message ilike '%function was throttled%' then 1 end) > 0 +order by + hour desc, + throttle_count desc; +``` + +```yaml +folder: Lambda +``` + ## Operational Examples -### Top 10 Slowest Lambda Function Executions +### Top Slowest Lambda Function Executions -This query identifies the 10 slowest Lambda function executions by examining REPORT logs. Finding these slow executions helps pinpoint specific instances that require optimization to improve overall performance. +This query identifies the slowest Lambda function executions by examining REPORT logs. Finding these slow executions helps pinpoint specific instances that require optimization to improve overall performance. ```sql select tp_timestamp, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, request_id, duration as duration_ms, billed_duration as billed_duration_ms, memory_size as allocated_memory_mb, - max_memory_used as used_memory_mb, - regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name + max_memory_used as used_memory_mb from aws_lambda_log where log_type = 'REPORT' and duration is not null + and duration > 1000 order by duration desc limit - 10; + 20; ``` ```yaml @@ -222,19 +277,30 @@ This query calculates memory utilization efficiency for each Lambda function. Fi ```sql select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, memory_size as allocated_memory_mb, round(avg(max_memory_used), 2) as avg_used_memory_mb, - round(avg(max_memory_used * 100.0 / memory_size), 2) as memory_utilization_percent, - count(*) as execution_count + round(avg(max_memory_used * 100.0 / nullif(memory_size, 0)), 2) as memory_utilization_percent, + round(avg(duration), 2) as avg_duration_ms, + count(*) as execution_count, + case + when round(avg(max_memory_used * 100.0 / nullif(memory_size, 0)), 2) < 50 then 'Over-provisioned' + when round(avg(max_memory_used * 100.0 / nullif(memory_size, 0)), 2) > 85 then 'Under-provisioned' + else 'Well-balanced' + end as memory_allocation_assessment from aws_lambda_log where log_type = 'REPORT' and memory_size is not null and max_memory_used is not null + and duration is not null group by - tp_source_name, + lambda_function_name, + log_group_name, memory_size +having + count(*) >= 10 order by memory_utilization_percent desc; ``` @@ -252,6 +318,7 @@ with request_phases as ( select request_id, tp_source_name, + log_group_name, regexp_replace(tp_source_name, '^/aws/lambda/', '') as function_name, min(case when log_type = 'START' then tp_timestamp end) as start_time, min(case when log_type = 'END' then tp_timestamp end) as end_time, @@ -264,10 +331,10 @@ with request_phases as ( count(case when log_level = 'ERROR' then 1 end) as error_log_count, count(case when log_level = 'WARN' then 1 end) as warn_log_count, count(case when log_level = 'DEBUG' then 1 end) as debug_log_count, - bool_or(message like '%Task timed out%') as has_timeout, - bool_or(message like '%Init Duration:%') as has_cold_start, - bool_or(message like '%Memory Size:%' and message like '%Max Memory Used:%') as has_memory_metrics, - bool_or(message like '%error%' or message like '%exception%' or message like '%fail%') as has_error_keywords + bool_or(message ilike '%timed out%') as has_timeout, + bool_or(message ilike '%init duration%') as has_cold_start, + bool_or(message ilike '%memory size%' and message ilike '%max memory used%') as has_memory_metrics, + bool_or(message ilike '%error%' or message ilike '%exception%' or message ilike '%fail%') as has_error_keywords from aws_lambda_log where @@ -275,6 +342,7 @@ with request_phases as ( group by request_id, tp_source_name, + log_group_name, function_name ), message_samples as ( @@ -316,7 +384,8 @@ select rp.has_memory_metrics, rp.has_error_keywords, ms.message_samples[1] as first_message, - rp.function_name as lambda_function_name + rp.function_name as lambda_function_name, + rp.log_group_name from request_phases rp left join @@ -343,6 +412,7 @@ This query summarizes execution metrics for each Lambda function. It helps ident ```sql select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, count(*) as execution_count, avg(duration) as avg_duration_ms, min(duration) as min_duration_ms, @@ -355,7 +425,8 @@ from where log_type = 'REPORT' group by - tp_source_name + lambda_function_name, + log_group_name order by execution_count desc; ``` @@ -364,6 +435,42 @@ order by folder: Lambda ``` +### Lambda Invocation and Error Trends + +This query tracks daily Lambda invocation and error patterns. Monitoring these trends helps detect abnormal behavior, understand the impact of code changes, and identify functions with increasing error rates. + +```sql +select + date_trunc('day', tp_timestamp) as day, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(case when log_type = 'START' then 1 end) as invocation_count, + count(case when log_level = 'ERROR' then 1 end) as error_count, + count(case when message ilike '%timed out%' then 1 end) as timeout_count, + count(case when message ilike '%function was throttled%' then 1 end) as throttle_count, + round(count(case when log_level = 'ERROR' + or message ilike '%timed out%' + or message ilike '%function was throttled%' then 1 end) * 100.0 / + nullif(count(case when log_type = 'START' then 1 end), 0), 2) as error_percentage +from + aws_lambda_log +where + tp_timestamp >= current_timestamp - interval '30 day' +group by + day, + lambda_function_name, + log_group_name +having + count(case when log_type = 'START' then 1 end) > 0 +order by + day desc, + invocation_count desc; +``` + +```yaml +folder: Lambda +``` + ## Baseline Examples ### Lambda Duration Distribution @@ -373,6 +480,7 @@ This query categorizes Lambda function executions into duration ranges. Understa ```sql select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, case when duration < 100 then '< 100ms' when duration < 500 then '100-500ms' @@ -388,7 +496,8 @@ where log_type = 'REPORT' and duration is not null group by - tp_source_name, + lambda_function_name, + log_group_name, duration_range order by lambda_function_name, @@ -406,69 +515,65 @@ order by folder: Lambda ``` -### Most Common Error Messages by Function +### Average Billed Duration by Memory Configuration -This query identifies the most common error messages for each Lambda function. Finding recurring error patterns helps prioritize which issues to fix first and understand the reliability challenges affecting specific functions. +This query analyzes average billed duration across different memory configurations. This analysis helps optimize cost and performance by finding the best memory setting for each function's runtime needs. ```sql select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, - regexp_replace(message, '^.*Error: ', '') as error_message_pattern, - count(*) as occurrence_count + log_group_name, + memory_size as allocated_memory_mb, + round(avg(billed_duration), 2) as avg_billed_duration_ms, + min(billed_duration) as min_billed_duration_ms, + max(billed_duration) as max_billed_duration_ms, + count(*) as execution_count from aws_lambda_log where - log_level = 'ERROR' - or message like '%Error:%' - or message like '%Exception:%' - or message like '%Task timed out%' + log_type = 'REPORT' + and billed_duration is not null + and memory_size is not null group by - tp_source_name, - error_message_pattern -order by lambda_function_name, - occurrence_count desc; + log_group_name, + memory_size +order by + avg_billed_duration_ms desc; ``` ```yaml folder: Lambda ``` -### Error Message Pattern Analysis +### Most Common Error Messages -This query analyzes error patterns across all Lambda functions to identify systematic issues. Detecting common error signatures across multiple functions helps identify widespread problems that may require architectural changes rather than function-specific fixes. +This query identifies the most common error patterns across Lambda functions. Finding recurring error patterns helps prioritize which issues to fix first and understand the reliability challenges affecting specific functions. ```sql -with error_patterns as ( - select - regexp_replace(message, '([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}|[\d\.]+|"[^"]*"|''[^'']*'')', 'X') as normalized_error, - count(*) as pattern_count, - count(distinct regexp_replace(tp_source_name, '^/aws/lambda/', '')) as affected_functions_count, - array_agg(distinct regexp_replace(tp_source_name, '^/aws/lambda/', '')) as affected_functions - from - aws_lambda_log - where - log_level = 'ERROR' - or message like '%Error:%' - or message like '%Exception:%' - or message like '%Task timed out%' - group by - normalized_error - having - count(*) > 1 -) select - normalized_error as error_pattern, - pattern_count as occurrence_count, - affected_functions_count, - affected_functions + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + regexp_replace(message, '([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}|[\d\.]+|"[^"]*"|''[^'']*'')', 'X') as normalized_error_pattern, + count(*) as occurrence_count, + min(tp_timestamp) as first_occurrence, + max(tp_timestamp) as last_occurrence from - error_patterns + aws_lambda_log +where + log_level = 'ERROR' + or message ilike '%error:%' + or message ilike '%exception:%' + or message ilike '%timed out%' +group by + lambda_function_name, + log_group_name, + normalized_error_pattern +having + count(*) > 1 order by - pattern_count desc, - affected_functions_count desc -limit - 20; + occurrence_count desc, + lambda_function_name; ``` ```yaml From 215782f63de41d54437c3392d1f96a150083d8fc Mon Sep 17 00:00:00 2001 From: ParthaI Date: Wed, 21 May 2025 13:51:03 +0530 Subject: [PATCH 05/16] Removed the unnecessary commented code and added index.md --- docs/tables/aws_lambda_log/index.md | 186 +++++++++++++++++++++++++ tables/lambda_log/lambda_log.go | 1 - tables/lambda_log/lambda_log_mapper.go | 14 +- tables/lambda_log/lambda_log_table.go | 17 ++- 4 files changed, 196 insertions(+), 22 deletions(-) create mode 100644 docs/tables/aws_lambda_log/index.md diff --git a/docs/tables/aws_lambda_log/index.md b/docs/tables/aws_lambda_log/index.md new file mode 100644 index 00000000..72a39a5c --- /dev/null +++ b/docs/tables/aws_lambda_log/index.md @@ -0,0 +1,186 @@ +--- +title: "Tailpipe Table: aws_lambda_log - Query AWS Lambda Logs" +description: "AWS Lambda logs capture invocation details and function output within your AWS account." +--- + +# Table: aws_lambda_log - Query AWS Lambda Logs + +The `aws_lambda_log` table allows you to query data from [AWS Lambda logs](https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs.html). This table provides detailed information about Lambda function invocations, including request ID, log level, message content, timestamps, and more. + +## Configure + +Create a [partition](https://tailpipe.io/docs/manage/partition) for `aws_lambda_log` ([examples](https://hub.tailpipe.io/plugins/turbot/aws/tables/aws_lambda_log#example-configurations)): + +```sh +vi ~/.tailpipe/config/aws.tpc +``` + +```hcl +connection "aws" "logging_account" { + profile = "my-logging-account" +} + +partition "aws_lambda_log" "my_logs" { + source "aws_cloudwatch_log_group" { + connection = connection.aws.logging_account + log_group_name = "/aws/lambda/my-function" + region = "us-east-1" + } +} +``` + +## Collect + +[Collect](https://tailpipe.io/docs/manage/collection) logs for all `aws_lambda_log` partitions: + +```sh +tailpipe collect aws_lambda_log +``` + +Or for a single partition: + +```sh +tailpipe collect aws_lambda_log.my_logs +``` + +## Query + +**[Explore 16+ example queries for this table →](https://hub.tailpipe.io/plugins/turbot/aws/queries/aws_lambda_log)** + +### Recent Error Messages + +Find recent error messages from Lambda functions. + +```sql +select + timestamp, + function_name, + log_level, + message +from + aws_lambda_log +where + log_level = 'ERROR' +order by + timestamp desc +limit 100; +``` + +### Slow Function Executions + +Identify Lambda functions with long execution times. + +```sql +select + function_name, + request_id, + duration_ms, + timestamp +from + aws_lambda_log +where + duration_ms > 5000 +order by + duration_ms desc +limit 20; +``` + +### Memory Utilization + +Find Lambda functions approaching their memory limits. + +```sql +select + function_name, + request_id, + memory_used_mb, + memory_limit_mb, + round((memory_used_mb::float / memory_limit_mb::float) * 100, 2) as memory_utilization_percent, + timestamp +from + aws_lambda_log +where + memory_used_mb is not null + and memory_limit_mb is not null + and (memory_used_mb::float / memory_limit_mb::float) > 0.8 +order by + memory_utilization_percent desc; +``` + +## Example Configurations + +### Collect all log streams explicitly + +Collect logs from all streams in a log group by explicitly setting the wildcard pattern. + +```hcl +partition "aws_lambda_log" "all_streams" { + source "aws_cloudwatch_log_group" { + connection = connection.aws.logging_account + log_group_name = "/aws/lambda/my-function" + log_stream_names = ["*"] + region = "us-east-1" + } +} +``` + +### Filter logs by log stream prefix + +Collect Lambda logs from streams with a specific prefix. + +```hcl +partition "aws_lambda_log" "prefix_filtered_logs" { + source "aws_cloudwatch_log_group" { + connection = connection.aws.logging_account + log_group_name = "/aws/lambda/my-function" + log_stream_names = ["PROD_*", "2023/07/*"] + region = "us-east-1" + } +} +``` + +### Collect logs from an S3 bucket + +Collect Lambda logs archived to an S3 bucket. + +```hcl +partition "aws_lambda_log" "s3_logs" { + source "aws_s3_bucket" { + connection = connection.aws.logging_account + bucket = "lambda-logs-bucket" + prefix = "lambda-logs" + } +} +``` + +### Collect logs from local files + +You can also collect Lambda logs from local files. + +```hcl +partition "aws_lambda_log" "local_logs" { + source "file" { + paths = ["/Users/myuser/lambda_logs"] + file_layout = `%{DATA}.log` + } +} +``` + +## Source Defaults + +### aws_s3_bucket + +// TODO: Change/remove the file layout +This table sets the following defaults for the [aws_s3_bucket source](https://hub.tailpipe.io/plugins/turbot/aws/sources/aws_s3_bucket#arguments): + +| Argument | Default | +| ----------- | --------------------------------------------------------------- | +| file_layout | `%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{DATA}.log.gz` | + +### aws_cloudwatch_log_group + +This table sets the following defaults for the [aws_cloudwatch_log_group source](https://hub.tailpipe.io/plugins/turbot/aws/sources/aws_cloudwatch_log_group#arguments): + +| Argument | Default | +| ---------------- | ------- | +| log_stream_names | `["*"]` | diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go index 91e584e6..d1ab21c5 100644 --- a/tables/lambda_log/lambda_log.go +++ b/tables/lambda_log/lambda_log.go @@ -14,7 +14,6 @@ type LambdaLog struct { LogType *string `json:"log_type,omitempty"` LogLevel *string `json:"log_level,omitempty"` Message *string `json:"message,omitempty"` - RawMessage *string `json:"raw_message,omitempty"` LogGroupName *string `json:"log_group_name,omitempty"` // Report Specific Fields diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index 1e26ceb2..fcfbd41b 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -151,16 +151,6 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.Message = &msg } } - // else { - // // Handle legacy plain text system logs (START, END, REPORT format) - // row, err := parseLambdaPainTextLog(raw) - // if err != nil { - // return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) - // } - // row.Message = &raw - // } - - row.RawMessage = &raw return row, nil } @@ -176,7 +166,8 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { if id := extractAfter(line, "RequestId: "); id != "" { log.RequestID = &id } - log.RawMessage = &line + // TODO: Handle other Platform event type of logs if any. + // https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html switch { case strings.HasPrefix(line, "START RequestId:"): // Parse START log line @@ -212,7 +203,6 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { } else { log.Message = ptr(line) } - case strings.HasPrefix(line, "REPORT RequestId:"): // Parse REPORT log line which contains metrics log.LogType = ptr("REPORT") diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index b7a16945..7de911e4 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -27,7 +27,10 @@ func (c *LambdaLogTable) Identifier() string { func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog], error) { defaultS3ArtifactConfig := &artifact_source_config.ArtifactSourceConfigImpl{ - FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/lambda/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{DATA}.log"), + // TODO: There is not specific file layout for the lambda logs. + // Also we can't directly store logs in S3 bucket. + // Does the file layout looks good? + FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/lambda/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{DATA}.txt"), } return []*table.SourceMetadata[*LambdaLog]{ @@ -40,12 +43,14 @@ func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog artifact_source.WithRowPerLine(), }, }, - { + { + // TODO: Should we keep this, as we don't have a direct way to store the logs in S3 bucket? // S3 artifact source SourceName: cloudwatch_log_group.AwsCloudwatchLogGroupSourceIdentifier, Mapper: &LambdaLogMapper{}, }, - { + { + // TODO: Should we keep this as we can't download the logs to local from log streams. // any artifact source SourceName: constants.ArtifactSourceIdentifier, Mapper: &LambdaLogMapper{}, @@ -85,11 +90,5 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema row.LogGroupName = row.TpSourceName - // if row.Message == nil { - // return nil, nil - // } - - // TODO: Add enrichment fields - return row, nil } From 41a4264e755427f382e1095b24432bbfab4704c8 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Tue, 10 Jun 2025 12:31:48 +0530 Subject: [PATCH 06/16] tested s3 bucket source --- tables/lambda_log/lambda_log_mapper.go | 120 ++++++++++++++++++++++--- tables/lambda_log/lambda_log_table.go | 6 +- 2 files changed, 113 insertions(+), 13 deletions(-) diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index fcfbd41b..3baeeb56 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "regexp" "slices" "strconv" @@ -11,6 +12,7 @@ import ( "time" "github.com/turbot/tailpipe-plugin-sdk/mappers" + cwTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" ) type LambdaLogMapper struct { @@ -41,8 +43,19 @@ type jsonFormatApplicationLog struct { RequestID string `json:"requestId"` } +// While storing the logs in the S3 bucket, the logs are stored in the following format: +type s3FormatLog struct { + AccountId *string `json:"accountId"` + LogGroup *string `json:"logGroup"` + LogStream *string `json:"logStream"` + Id *string `json:"id"` + Timestamp *int64 `json:"timestamp"` + Message *string `json:"message"` +} + func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*LambdaLog]) (*LambdaLog, error) { row := &LambdaLog{} + isCwLog := false var raw string switch v := a.(type) { @@ -52,15 +65,54 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* raw = v case *string: raw = *v + case cwTypes.FilteredLogEvent: + raw = *v.Message + t := time.UnixMilli(*v.Timestamp) + row.Timestamp = &t + isCwLog = true default: return nil, fmt.Errorf("expected string or []byte, got %T", a) } + + // Handle S3 bucket logs + s3Format := &s3FormatLog{} + isTextFormat := true + if err := s3Format.UnmarshalJSON([]byte(raw)); err == nil && !isCwLog { + // if s3Format.LogGroup != nil { + // if row.TpTimestamp.IsZero() { + slog.Error("S3 format log", "Timestamp", *s3Format.Timestamp) + t := time.UnixMilli(*s3Format.Timestamp) + slog.Error("S3 format log", "Timestamp", *s3Format.Timestamp, "Time", t) + if err == nil { + row.Timestamp = &t + } + + if s3Format.LogGroup != nil { + row.LogGroupName = s3Format.LogGroup + } + + // raw = strings.TrimSpace(*s3Format.Message) + msgStr, err := strconv.Unquote(*s3Format.Message) + if err == nil { + isTextFormat = true + raw = strings.TrimSpace(msgStr) + } + if err != nil { + // In the case if the message is a JSON object we will not have the unescape character. + isTextFormat = false + raw = strings.TrimSpace(*s3Format.Message) + slog.Error("Error unquoting message", "error", err) + } + } + raw = strings.TrimSpace(raw) - if strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") { + if strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") || strings.HasPrefix(raw, "EXTENSION") || strings.HasPrefix(raw, "TELEMETRY") || (isTextFormat && !isTimestamp(strings.Fields(raw)[0])){ lambdaLog, err := parseLambdaPainTextLog(raw) if err != nil { return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) } + lambdaLog.Timestamp = row.Timestamp + lambdaLog.RawMessage = &raw return lambdaLog, nil } @@ -109,7 +161,7 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.MaxMemoryUsed = &mem } } - + row.RawMessage = &raw return row, nil } else if _, hasLevel := probe["level"]; hasLevel { // Fallback to application log (JSON format with timestamp, level, message, requestId) @@ -125,12 +177,14 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.LogLevel = &appLog.Level row.Message = &appLog.Message row.RequestID = &appLog.RequestID - + row.RawMessage = &raw return row, nil } - } else if len(strings.Fields(raw)) >= 4 && isTimestamp(strings.Fields(raw)[0]) { // plain text application log + } + if len(strings.Fields(raw)) >= 4 && isTimestamp(strings.Fields(raw)[0]) { // plain text application log // Handle plain text application logs (format: timestamp requestID logLevel message) // Example: 2024-10-27T19:17:45.586Z 79b4f56e-95b1-4643-9700-2807f4e68189 INFO some log message + // [INFO] 2025-05-26T06:42:27.551Z 66e2e287-e14b-471e-8185-faca26d2c310 This is a function log fields := strings.Fields(raw) // Timestamp if t, err := time.Parse(time.RFC3339, fields[0]); err == nil { @@ -150,11 +204,36 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* msg := strings.Join(fields[3:], " ") row.Message = &msg } + row.RawMessage = &raw + } else { + row.Message = &raw + row.RawMessage = &raw } return row, nil } +func (s *s3FormatLog) UnmarshalJSON(data []byte) error { + type Alias s3FormatLog + aux := &struct { + Message json.RawMessage `json:"message"` + *Alias + }{ + Alias: (*Alias)(s), + } + + // slog.Error("U11111111111", "data", string(data)) + if err := json.Unmarshal(data, &aux); err != nil { + slog.Error("Error unmarshalling s3 format log", "error", err) + return err + } + + msgStr := string(aux.Message) + s.Message = &msgStr + // slog.Error("22222222222222", "data", string(msgStr)) + return nil +} + // parseLambdaPainTextLog handles the legacy plain text format for Lambda system logs // These include START, END, and REPORT messages that aren't in JSON format // Example: @@ -174,8 +253,8 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { log.LogType = ptr("START") if id := extractAfter(line, "START RequestId: "); id != "" { log.RequestID = ptr(strings.Fields(id)[0]) - if len(strings.Split(line, "RequestId: " + id)[1]) > 0 { - log.Message = &strings.Split(line, "RequestId: " + id)[1] + if len(strings.Split(line, "RequestId: "+id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: "+id)[1] } } else { log.Message = ptr(line) @@ -185,11 +264,11 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { log.LogType = ptr("INIT_START") if id := extractAfter(line, "INIT_START RequestId: "); id != "" { log.RequestID = ptr(strings.Fields(id)[0]) - if len(strings.Split(line, "RequestId: " + id)[1]) > 0 { - log.Message = &strings.Split(line, "RequestId: " + id)[1] + if len(strings.Split(line, "RequestId: "+id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: "+id)[1] } } else { - msg := strings.Split(line, "INIT_START " + id)[1] + msg := strings.Split(line, "INIT_START "+id)[1] log.Message = &msg } case strings.HasPrefix(line, "END RequestId:"): @@ -197,8 +276,8 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { log.LogType = ptr("END") if id := extractAfter(line, "END RequestId: "); id != "" { log.RequestID = ptr(strings.Fields(id)[0]) - if len(strings.Split(line, "RequestId: " + id)[1]) > 0 { - log.Message = &strings.Split(line, "RequestId: " + id)[1] + if len(strings.Split(line, "RequestId: "+id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: "+id)[1] } } else { log.Message = ptr(line) @@ -233,6 +312,25 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { log.MaxMemoryUsed = &i } } + + case strings.HasPrefix(line, "EXTENSION"): + // Parse START log line + log.LogType = ptr("EXTENSION") + log.RequestID = ptr(strings.Fields("EXTENSION")[0]) + if len(strings.Split(line, "EXTENSION")[1]) > 0 { + log.Message = ptr(strings.TrimSpace(strings.Split(line, "EXTENSION")[1])) + } + + case strings.HasPrefix(line, "TELEMETRY"): + // Parse START log line + log.LogType = ptr("TELEMETRY") + log.RequestID = ptr(strings.Fields("TELEMETRY")[0]) + if len(strings.Split(line, "TELEMETRY")[1]) > 0 { + log.Message = ptr(strings.TrimSpace(strings.Split(line, "TELEMETRY")[1])) + } + + default: + log.Message = ptr(line) } return log, nil diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index 7de911e4..c0f1d387 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -30,7 +30,7 @@ func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog // TODO: There is not specific file layout for the lambda logs. // Also we can't directly store logs in S3 bucket. // Does the file layout looks good? - FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/lambda/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{DATA}.txt"), + FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/%{DATA:region}/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}/%{DATA}.log.zst"), } return []*table.SourceMetadata[*LambdaLog]{ @@ -88,7 +88,9 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema } } - row.LogGroupName = row.TpSourceName + if row.LogGroupName == nil { + row.LogGroupName = row.TpSourceName + } return row, nil } From 54681a362ba97db169fb53926fef717d09748c6b Mon Sep 17 00:00:00 2001 From: ParthaI Date: Tue, 10 Jun 2025 12:32:11 +0530 Subject: [PATCH 07/16] Committing raw string column --- tables/lambda_log/lambda_log.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go index d1ab21c5..c7caf732 100644 --- a/tables/lambda_log/lambda_log.go +++ b/tables/lambda_log/lambda_log.go @@ -15,6 +15,7 @@ type LambdaLog struct { LogLevel *string `json:"log_level,omitempty"` Message *string `json:"message,omitempty"` LogGroupName *string `json:"log_group_name,omitempty"` + RawMessage *string `json:"raw_message,omitempty"` // Report Specific Fields Duration *float64 `json:"duration,omitempty"` From 824d6a1c2831dd7105bb7e91f7236ec2ad479c9e Mon Sep 17 00:00:00 2001 From: ParthaI Date: Fri, 13 Jun 2025 17:01:56 +0530 Subject: [PATCH 08/16] Removed the RaWMessage column --- tables/lambda_log/lambda_log.go | 1 - tables/lambda_log/lambda_log_mapper.go | 17 +++++++---------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go index c7caf732..d1ab21c5 100644 --- a/tables/lambda_log/lambda_log.go +++ b/tables/lambda_log/lambda_log.go @@ -15,7 +15,6 @@ type LambdaLog struct { LogLevel *string `json:"log_level,omitempty"` Message *string `json:"message,omitempty"` LogGroupName *string `json:"log_group_name,omitempty"` - RawMessage *string `json:"raw_message,omitempty"` // Report Specific Fields Duration *float64 `json:"duration,omitempty"` diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index 3baeeb56..5cfc0333 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -11,8 +11,8 @@ import ( "strings" "time" - "github.com/turbot/tailpipe-plugin-sdk/mappers" cwTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "github.com/turbot/tailpipe-plugin-sdk/mappers" ) type LambdaLogMapper struct { @@ -106,13 +106,13 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* } raw = strings.TrimSpace(raw) - if strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") || strings.HasPrefix(raw, "EXTENSION") || strings.HasPrefix(raw, "TELEMETRY") || (isTextFormat && !isTimestamp(strings.Fields(raw)[0])){ + if strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") || strings.HasPrefix(raw, "EXTENSION") || strings.HasPrefix(raw, "TELEMETRY") || (isTextFormat && !isTimestamp(strings.Fields(raw)[0])) { lambdaLog, err := parseLambdaPainTextLog(raw) if err != nil { return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) } lambdaLog.Timestamp = row.Timestamp - lambdaLog.RawMessage = &raw + lambdaLog.Message = &raw return lambdaLog, nil } @@ -161,7 +161,7 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.MaxMemoryUsed = &mem } } - row.RawMessage = &raw + row.Message = &raw return row, nil } else if _, hasLevel := probe["level"]; hasLevel { // Fallback to application log (JSON format with timestamp, level, message, requestId) @@ -177,10 +177,10 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.LogLevel = &appLog.Level row.Message = &appLog.Message row.RequestID = &appLog.RequestID - row.RawMessage = &raw + row.Message = &raw return row, nil } - } + } if len(strings.Fields(raw)) >= 4 && isTimestamp(strings.Fields(raw)[0]) { // plain text application log // Handle plain text application logs (format: timestamp requestID logLevel message) // Example: 2024-10-27T19:17:45.586Z 79b4f56e-95b1-4643-9700-2807f4e68189 INFO some log message @@ -204,11 +204,8 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* msg := strings.Join(fields[3:], " ") row.Message = &msg } - row.RawMessage = &raw - } else { - row.Message = &raw - row.RawMessage = &raw } + row.Message = &raw return row, nil } From 444c818a79b51f7e8138aabfec40edfadcb0b598 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Fri, 13 Jun 2025 17:20:56 +0530 Subject: [PATCH 09/16] Added table description and removed unnecessary log statement --- tables/lambda_log/lambda_log_mapper.go | 2 -- tables/lambda_log/lambda_log_table.go | 7 ++++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index 5cfc0333..36ecfe1f 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -80,9 +80,7 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* if err := s3Format.UnmarshalJSON([]byte(raw)); err == nil && !isCwLog { // if s3Format.LogGroup != nil { // if row.TpTimestamp.IsZero() { - slog.Error("S3 format log", "Timestamp", *s3Format.Timestamp) t := time.UnixMilli(*s3Format.Timestamp) - slog.Error("S3 format log", "Timestamp", *s3Format.Timestamp, "Time", t) if err == nil { row.Timestamp = &t } diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index c0f1d387..18e520c6 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -44,7 +44,6 @@ func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog }, }, { - // TODO: Should we keep this, as we don't have a direct way to store the logs in S3 bucket? // S3 artifact source SourceName: cloudwatch_log_group.AwsCloudwatchLogGroupSourceIdentifier, Mapper: &LambdaLogMapper{}, @@ -74,6 +73,7 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema row.TpDate = row.TpTimestamp.Truncate(24 * time.Hour) } + // tp_index row.TpIndex = schema.DefaultIndex var arnRegex = regexp.MustCompile(`arn:aws:[^,\s'"\\]+`) @@ -94,3 +94,8 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema return row, nil } + +func (c *LambdaLogTable) GetDescription() string { + return "AWS Lambda logs capture detailed information about function executions, including invocation context, console output, errors, and performance metrics. This table provides a structured and queryable view of Lambda log data, enabling easier analysis, troubleshooting, and monitoring." +} + From 536196be9fceafba6e4077bc60e44b55c2457d75 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Fri, 13 Jun 2025 17:40:51 +0530 Subject: [PATCH 10/16] Removed the TODO comments after testing by collecting the results --- tables/lambda_log/lambda_log_table.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index 18e520c6..1fd2efe1 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -27,9 +27,6 @@ func (c *LambdaLogTable) Identifier() string { func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog], error) { defaultS3ArtifactConfig := &artifact_source_config.ArtifactSourceConfigImpl{ - // TODO: There is not specific file layout for the lambda logs. - // Also we can't directly store logs in S3 bucket. - // Does the file layout looks good? FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/%{DATA:region}/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}/%{DATA}.log.zst"), } @@ -49,7 +46,6 @@ func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog Mapper: &LambdaLogMapper{}, }, { - // TODO: Should we keep this as we can't download the logs to local from log streams. // any artifact source SourceName: constants.ArtifactSourceIdentifier, Mapper: &LambdaLogMapper{}, From c67dc50a242f58553b663d9a922bd58524d73af3 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Thu, 26 Jun 2025 21:58:58 +0530 Subject: [PATCH 11/16] Updated the table schema , doc and tailpipe SDK version --- docs/tables/aws_lambda_log/index.md | 47 +++-- docs/tables/aws_lambda_log/queries.md | 67 +++---- tables/lambda_log/lambda_log.go | 15 +- tables/lambda_log/lambda_log_mapper.go | 262 +++++++++++++++---------- tables/lambda_log/lambda_log_table.go | 8 +- 5 files changed, 246 insertions(+), 153 deletions(-) diff --git a/docs/tables/aws_lambda_log/index.md b/docs/tables/aws_lambda_log/index.md index 72a39a5c..816dfced 100644 --- a/docs/tables/aws_lambda_log/index.md +++ b/docs/tables/aws_lambda_log/index.md @@ -7,6 +7,29 @@ description: "AWS Lambda logs capture invocation details and function output wit The `aws_lambda_log` table allows you to query data from [AWS Lambda logs](https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs.html). This table provides detailed information about Lambda function invocations, including request ID, log level, message content, timestamps, and more. +## Message Format and Parsing + +The `aws_lambda_log` table provides multiple message fields to handle different log formats across Lambda runtimes: + +### Message Fields + +- **`message`** (string) – Extracted and parsed message content when possible. Always populated if a message can be extracted, including from JSON-formatted logs. +- **`message_json`** (json) – Extracted message parsed as JSON. Populated only if the extracted message is valid JSON and can be converted. +- **`raw_message`** (string) – Complete original message string as received from the log source. Always populated regardless of format. +- **`raw_message_json`** (json) – Full original message parsed as JSON. Populated only if the original message is native JSON or convertible to JSON. + +### Runtime Behavior + +The table handles logs consistently across most AWS Lambda runtimes (Node.js, Python, .NET, Go, Ruby), with automatic parsing of both plain text and JSON-formatted logs. + +**Note:** PowerShell runtime emits logs in a different format compared to other runtimes, which may affect message extraction and parsing. Refer to the [AWS Lambda runtime documentation](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html) for runtime-specific log format details. + +### JSON Log Handling + +- JSON-formatted logs are automatically parsed and stored in `raw_message_json` +- If the extracted message portion is also valid JSON, it's additionally parsed into `message_json` +- This dual approach ensures you can query both structured JSON data and extract specific message content + ## Configure Create a [partition](https://tailpipe.io/docs/manage/partition) for `aws_lambda_log` ([examples](https://hub.tailpipe.io/plugins/turbot/aws/tables/aws_lambda_log#example-configurations)): @@ -54,7 +77,7 @@ Find recent error messages from Lambda functions. ```sql select timestamp, - function_name, + tp_source_name as function_name, log_level, message from @@ -72,16 +95,16 @@ Identify Lambda functions with long execution times. ```sql select - function_name, + tp_source_name as function_name, request_id, - duration_ms, + duration * 1000 as duration_ms, timestamp from aws_lambda_log where - duration_ms > 5000 + duration > 5 order by - duration_ms desc + duration desc limit 20; ``` @@ -91,18 +114,18 @@ Find Lambda functions approaching their memory limits. ```sql select - function_name, + tp_source_name as function_name, request_id, - memory_used_mb, - memory_limit_mb, - round((memory_used_mb::float / memory_limit_mb::float) * 100, 2) as memory_utilization_percent, + max_memory_used as memory_used_mb, + memory_size as memory_limit_mb, + round((max_memory_used::float / memory_size::float) * 100, 2) as memory_utilization_percent, timestamp from aws_lambda_log where - memory_used_mb is not null - and memory_limit_mb is not null - and (memory_used_mb::float / memory_limit_mb::float) > 0.8 + max_memory_used is not null + and memory_size is not null + and (max_memory_used::float / memory_size::float) > 0.8 order by memory_utilization_percent desc; ``` diff --git a/docs/tables/aws_lambda_log/queries.md b/docs/tables/aws_lambda_log/queries.md index a062b78e..0b2c0613 100644 --- a/docs/tables/aws_lambda_log/queries.md +++ b/docs/tables/aws_lambda_log/queries.md @@ -10,7 +10,7 @@ select request_id, log_type, log_level, - message, + raw_message, regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, log_group_name from @@ -88,7 +88,7 @@ select tp_timestamp, log_type, log_level, - substring(message, 1, 200) as message_preview, + substring(raw_message, 1, 200) as message_preview, case when log_type in ('START', 'END', 'REPORT', 'INIT_START') then 'System Log' when log_level is not null then 'Application Log' @@ -120,11 +120,11 @@ select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, log_group_name, request_id, - message, + raw_message, case - when message ilike '%timed out%' then 'Timeout' - when message ilike '%memory size exceeded%' then 'Memory Exceeded' - when message ilike '%process exited before completing request%' then 'Process Exited' + when raw_message ilike '%timed out%' then 'Timeout' + when raw_message ilike '%memory size exceeded%' then 'Memory Exceeded' + when raw_message ilike '%process exited before completing request%' then 'Process Exited' when log_level = 'ERROR' then 'Application Error' else 'Other Error' end as error_type @@ -132,9 +132,9 @@ from aws_lambda_log where log_level = 'ERROR' - or message ilike '%timed out%' - or message ilike '%memory size exceeded%' - or message ilike '%process exited before completing request%' + or raw_message ilike '%timed out%' + or raw_message ilike '%memory size exceeded%' + or raw_message ilike '%process exited before completing request%' order by tp_timestamp desc limit @@ -185,20 +185,21 @@ This query analyzes Lambda cold starts by counting initialization events for eac select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, log_group_name, - count(distinct request_id) as total_executions, - count(distinct case when message ilike '%init duration%' then request_id end) as cold_start_count, - round(count(distinct case when message ilike '%init duration%' then request_id end) * 100.0 / - nullif(count(distinct request_id), 0), 2) as cold_start_percentage, - avg(case when message ilike '%init duration%' - then cast(regexp_extract(message, 'Init Duration: ([0-9.]+) ms', 1) as double) end) as avg_init_duration_ms + count(distinct case when log_type = 'START' then request_id end) as total_executions, + count(distinct case when raw_message ilike '%init duration%' then request_id end) as cold_start_count, + round(count(distinct case when raw_message ilike '%init duration%' then request_id end) * 100.0 / + nullif(count(distinct case when log_type = 'START' then request_id end), 0), 2) as cold_start_percentage, + avg(case when raw_message ilike '%init duration%' + then cast(regexp_extract(raw_message, 'Init Duration: ([0-9.]+) ms', 1) as double) end) as avg_init_duration_ms from aws_lambda_log where tp_timestamp >= current_timestamp - interval '7 day' - and (log_type = 'INIT_START' or message ilike '%init duration%') group by lambda_function_name, log_group_name +having + count(distinct case when log_type = 'START' then request_id end) > 0 order by cold_start_count desc; ``` @@ -217,8 +218,8 @@ select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, log_group_name, count(*) as total_logs, - count(case when message ilike '%function was throttled%' then 1 end) as throttle_count, - round(count(case when message ilike '%function was throttled%' then 1 end) * 100.0 / nullif(count(*), 0), 2) as throttle_percentage + count(case when raw_message ilike '%function was throttled%' then 1 end) as throttle_count, + round(count(case when raw_message ilike '%function was throttled%' then 1 end) * 100.0 / nullif(count(*), 0), 2) as throttle_percentage from aws_lambda_log where @@ -228,7 +229,7 @@ group by lambda_function_name, log_group_name having - count(case when message ilike '%function was throttled%' then 1 end) > 0 + count(case when raw_message ilike '%function was throttled%' then 1 end) > 0 order by hour desc, throttle_count desc; @@ -331,10 +332,10 @@ with request_phases as ( count(case when log_level = 'ERROR' then 1 end) as error_log_count, count(case when log_level = 'WARN' then 1 end) as warn_log_count, count(case when log_level = 'DEBUG' then 1 end) as debug_log_count, - bool_or(message ilike '%timed out%') as has_timeout, - bool_or(message ilike '%init duration%') as has_cold_start, - bool_or(message ilike '%memory size%' and message ilike '%max memory used%') as has_memory_metrics, - bool_or(message ilike '%error%' or message ilike '%exception%' or message ilike '%fail%') as has_error_keywords + bool_or(raw_message ilike '%timed out%') as has_timeout, + bool_or(raw_message ilike '%init duration%') as has_cold_start, + bool_or(raw_message ilike '%memory size%' and raw_message ilike '%max memory used%') as has_memory_metrics, + bool_or(raw_message ilike '%error%' or raw_message ilike '%exception%' or raw_message ilike '%fail%') as has_error_keywords from aws_lambda_log where @@ -348,11 +349,11 @@ with request_phases as ( message_samples as ( select request_id, - array_agg(message) as message_samples + array_agg(raw_message) as message_samples from (select request_id, - message, + raw_message, row_number() over (partition by request_id order by tp_timestamp) as rn from aws_lambda_log @@ -446,11 +447,11 @@ select log_group_name, count(case when log_type = 'START' then 1 end) as invocation_count, count(case when log_level = 'ERROR' then 1 end) as error_count, - count(case when message ilike '%timed out%' then 1 end) as timeout_count, - count(case when message ilike '%function was throttled%' then 1 end) as throttle_count, + count(case when raw_message ilike '%timed out%' then 1 end) as timeout_count, + count(case when raw_message ilike '%function was throttled%' then 1 end) as throttle_count, round(count(case when log_level = 'ERROR' - or message ilike '%timed out%' - or message ilike '%function was throttled%' then 1 end) * 100.0 / + or raw_message ilike '%timed out%' + or raw_message ilike '%function was throttled%' then 1 end) * 100.0 / nullif(count(case when log_type = 'START' then 1 end), 0), 2) as error_percentage from aws_lambda_log @@ -554,7 +555,7 @@ This query identifies the most common error patterns across Lambda functions. Fi select regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, log_group_name, - regexp_replace(message, '([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}|[\d\.]+|"[^"]*"|''[^'']*'')', 'X') as normalized_error_pattern, + regexp_replace(raw_message, '([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}|[\d\.]+|"[^"]*"|''[^'']*'')', 'X') as normalized_error_pattern, count(*) as occurrence_count, min(tp_timestamp) as first_occurrence, max(tp_timestamp) as last_occurrence @@ -562,9 +563,9 @@ from aws_lambda_log where log_level = 'ERROR' - or message ilike '%error:%' - or message ilike '%exception:%' - or message ilike '%timed out%' + or raw_message ilike '%error:%' + or raw_message ilike '%exception:%' + or raw_message ilike '%timed out%' group by lambda_function_name, log_group_name, diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go index d1ab21c5..fcecfe2e 100644 --- a/tables/lambda_log/lambda_log.go +++ b/tables/lambda_log/lambda_log.go @@ -9,12 +9,15 @@ import ( type LambdaLog struct { schema.CommonFields - Timestamp *time.Time `json:"timestamp,omitempty"` - RequestID *string `json:"request_id,omitempty"` - LogType *string `json:"log_type,omitempty"` - LogLevel *string `json:"log_level,omitempty"` - Message *string `json:"message,omitempty"` - LogGroupName *string `json:"log_group_name,omitempty"` + Timestamp *time.Time `json:"timestamp,omitempty"` + RequestID *string `json:"request_id,omitempty"` + LogType *string `json:"log_type,omitempty"` + LogLevel *string `json:"log_level,omitempty"` + Message *string `json:"message,omitempty"` + MessageJson map[string]interface{} `json:"message_json,omitempty"` + RawMessage *string `json:"raw_message,omitempty"` + RawMessageJson map[string]interface{} `json:"raw_message_json,omitempty"` + LogGroupName *string `json:"log_group_name,omitempty"` // Report Specific Fields Duration *float64 `json:"duration,omitempty"` diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index 36ecfe1f..f68476bd 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -6,7 +6,6 @@ import ( "fmt" "log/slog" "regexp" - "slices" "strconv" "strings" "time" @@ -58,6 +57,10 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* isCwLog := false var raw string + // Declare s3Format and isTextFormat at the start for use in lambda log stored in S3 bucket handling + s3Format := &s3FormatLog{} + isTextFormat := true + switch v := a.(type) { case []byte: raw = string(v) @@ -74,12 +77,9 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* return nil, fmt.Errorf("expected string or []byte, got %T", a) } - // Handle S3 bucket logs - s3Format := &s3FormatLog{} - isTextFormat := true + // --- Lambda log stored in S3 bucket handling --- + // If the log is in the format stored in an S3 bucket, unmarshal and extract timestamp, log group, and message if err := s3Format.UnmarshalJSON([]byte(raw)); err == nil && !isCwLog { - // if s3Format.LogGroup != nil { - // if row.TpTimestamp.IsZero() { t := time.UnixMilli(*s3Format.Timestamp) if err == nil { row.Timestamp = &t @@ -89,60 +89,80 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.LogGroupName = s3Format.LogGroup } - // raw = strings.TrimSpace(*s3Format.Message) + // Try to unquote the message (if it's a string) msgStr, err := strconv.Unquote(*s3Format.Message) if err == nil { isTextFormat = true raw = strings.TrimSpace(msgStr) } if err != nil { - // In the case if the message is a JSON object we will not have the unescape character. + // If message is a JSON object, unquoting will fail; treat as JSON isTextFormat = false raw = strings.TrimSpace(*s3Format.Message) + parsedData := isParsableJson(raw) + if parsedData != nil { + data := *parsedData + if v, ok := data["message"].(string); ok { + onlyMessageParsable := isParsableJson(v) + if onlyMessageParsable != nil { + row.MessageJson = *onlyMessageParsable + } else { + row.Message = &v + } + } + } slog.Error("Error unquoting message", "error", err) } } + // --- Main log parsing logic --- raw = strings.TrimSpace(raw) - if strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") || strings.HasPrefix(raw, "EXTENSION") || strings.HasPrefix(raw, "TELEMETRY") || (isTextFormat && !isTimestamp(strings.Fields(raw)[0])) { - lambdaLog, err := parseLambdaPainTextLog(raw) + parsableJsonData := isParsableJson(raw) + // Handle plain text system logs (START, END, REPORT, etc.) that are not JSON + if (strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") || strings.HasPrefix(raw, "EXTENSION") || strings.HasPrefix(raw, "TELEMETRY") || (isTextFormat && !isTimestamp(strings.Fields(raw)[0]) && !strings.HasPrefix(raw, "["))) && parsableJsonData == nil { + lambdaLog, err := parseLambdaPainTextLog(raw, row) if err != nil { return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) } lambdaLog.Timestamp = row.Timestamp - lambdaLog.Message = &raw + lambdaLog.RawMessage = &raw + parsedData := isParsableJson(raw) + if parsedData != nil { + data := *parsedData + if v, ok := data["message"].(string); ok { + lambdaLog.Message = &v + } + if v, ok := data["requestId"].(string); ok { + lambdaLog.RequestID = &v + } + if v, ok := data["level"].(string); ok { + lambdaLog.LogLevel = &v + } + lambdaLog.MessageJson = *parsedData + } return lambdaLog, nil } - // First unmarshal into a minimal structure to detect log type + // --- JSON log detection and parsing --- var probe map[string]json.RawMessage if err := json.Unmarshal([]byte(raw), &probe); err == nil { - // Check for system log keys (platform events with time, type, record structure) + // System log (platform event) JSON if _, hasType := probe["type"]; hasType { var systemLog jsonFormatSystemLog if err := json.Unmarshal([]byte(raw), &systemLog); err != nil { return nil, fmt.Errorf("error unmarshalling as system log: %w", err) } - // Parse system log fields based on AWS JSON format for system logs + // Parse timestamp and type if t, err := time.Parse(time.RFC3339, systemLog.Time); err == nil { row.Timestamp = &t } row.LogType = &systemLog.Type - if msgBytes, err := json.Marshal(systemLog.Record); err == nil { - message := string(msgBytes) - row.Message = &message - } else { - // fallback in case of marshal error - message := fmt.Sprintf("%v", systemLog.Record) - row.Message = &message - } - // Extract specific fields from platform event record + // Extract requestId and metrics if present if requestId, ok := systemLog.Record["requestId"].(string); ok { row.RequestID = &requestId } - // Extract metrics from platform.report events if metrics, ok := systemLog.Record["metrics"].(map[string]interface{}); ok { if v, ok := metrics["durationMs"].(float64); ok { row.Duration = &v @@ -159,55 +179,117 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* row.MaxMemoryUsed = &mem } } - row.Message = &raw + + // Attach full JSON as RawMessageJson + jsonLog := convertRawMessageMap(probe) + if jsonLog != nil { + row.RawMessageJson = *jsonLog + } + + // Extract message as JSON or string + if msg, ok := (*jsonLog)["message"]; ok { + parsedData := isParsableJson(msg) + if parsedData != nil { + row.MessageJson = *parsedData + } else { + str, ok := msg.(string) + if ok { + row.Message = &str + } + } + } + + row.RawMessage = &raw return row, nil } else if _, hasLevel := probe["level"]; hasLevel { - // Fallback to application log (JSON format with timestamp, level, message, requestId) + // Application log (JSON format) var appLog jsonFormatApplicationLog if err := json.Unmarshal([]byte(raw), &appLog); err != nil { return nil, fmt.Errorf("error unmarshalling as application log: %w", err) } - // Parse application log fields based on AWS JSON format for app logs if t, err := time.Parse(time.RFC3339, appLog.Timestamp); err == nil { row.Timestamp = &t } row.LogLevel = &appLog.Level - row.Message = &appLog.Message row.RequestID = &appLog.RequestID - row.Message = &raw + + jsonLog := convertRawMessageMap(probe) + if jsonLog != nil { + row.RawMessageJson = *jsonLog + } + + if msg, ok := (*jsonLog)["message"]; ok { + parsedData := isParsableJson(msg) + if parsedData != nil { + row.MessageJson = *parsedData + } else { + str, ok := msg.(string) + if ok { + row.Message = &str + } + } + } + + row.RawMessage = &raw return row, nil } } - if len(strings.Fields(raw)) >= 4 && isTimestamp(strings.Fields(raw)[0]) { // plain text application log + if len(strings.Fields(raw)) >= 4 && (isTimestamp(strings.Fields(raw)[0]) || isTimestamp(strings.Fields(raw)[1])) { + // plain text application log // Handle plain text application logs (format: timestamp requestID logLevel message) // Example: 2024-10-27T19:17:45.586Z 79b4f56e-95b1-4643-9700-2807f4e68189 INFO some log message // [INFO] 2025-05-26T06:42:27.551Z 66e2e287-e14b-471e-8185-faca26d2c310 This is a function log fields := strings.Fields(raw) - // Timestamp + // Try to parse timestamp from first or second field if t, err := time.Parse(time.RFC3339, fields[0]); err == nil { row.Timestamp = &t } - // RequestID - var uuidRegex = regexp.MustCompile(`^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}$`) - if uuidRegex.MatchString(fields[1]) { - row.RequestID = &fields[1] + if t1, err := time.Parse(time.RFC3339, fields[1]); err == nil { + row.Timestamp = &t1 } - // LogLevel - if slices.Contains([]string{"INFO", "DEBUG", "WARN", "ERROR", "FATAL", "TRACE", ""}, fields[2]) { - row.LogLevel = &fields[2] + + // Extract log level and request ID + uuidRegex := regexp.MustCompile(`[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}`) + logLevelRegex := regexp.MustCompile(`\[?(INFO|DEBUG|WARN|ERROR|FATAL|TRACE)\]?`) + + if logLevelMatch := logLevelRegex.FindStringSubmatch(raw); len(logLevelMatch) > 1 { + row.LogLevel = &logLevelMatch[1] + } + if uuidMatch := uuidRegex.FindString(raw); uuidMatch != "" { + row.RequestID = &uuidMatch } - // Message if len(fields) >= 4 { msg := strings.Join(fields[3:], " ") - row.Message = &msg + parsableJsonMessage := isParsableJson(msg) + if parsableJsonMessage != nil { + row.MessageJson = *parsableJsonMessage + } else { + row.Message = &msg + } } + row.RawMessage = &raw } - row.Message = &raw return row, nil } +func convertRawMessageMap(probe map[string]json.RawMessage) *map[string]interface{} { + result := make(map[string]interface{}) + + for key, raw := range probe { + var value interface{} + if err := json.Unmarshal(raw, &value); err != nil { + // Handle or skip on error + continue + } + result[key] = value + } + + return &result +} + +// Custom unmarshaller for s3FormatLog func (s *s3FormatLog) UnmarshalJSON(data []byte) error { type Alias s3FormatLog aux := &struct { @@ -217,15 +299,20 @@ func (s *s3FormatLog) UnmarshalJSON(data []byte) error { Alias: (*Alias)(s), } - // slog.Error("U11111111111", "data", string(data)) if err := json.Unmarshal(data, &aux); err != nil { - slog.Error("Error unmarshalling s3 format log", "error", err) - return err + return fmt.Errorf("error unmarshalling base struct: %w", err) + } + + // Try unmarshalling message as string + var strMsg string + if err := json.Unmarshal(aux.Message, &strMsg); err == nil { + s.Message = &strMsg + return nil } - msgStr := string(aux.Message) - s.Message = &msgStr - // slog.Error("22222222222222", "data", string(msgStr)) + // Else fallback to raw JSON object (e.g., {"time": ..., ...}) + rawMsg := string(aux.Message) + s.Message = &rawMsg return nil } @@ -235,8 +322,7 @@ func (s *s3FormatLog) UnmarshalJSON(data []byte) error { // START RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 // END RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 // REPORT RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 128 MB Max Memory Used: 84 MB -func parseLambdaPainTextLog(line string) (*LambdaLog, error) { - log := &LambdaLog{} +func parseLambdaPainTextLog(line string, log *LambdaLog) (*LambdaLog, error) { if id := extractAfter(line, "RequestId: "); id != "" { log.RequestID = &id } @@ -311,7 +397,6 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { case strings.HasPrefix(line, "EXTENSION"): // Parse START log line log.LogType = ptr("EXTENSION") - log.RequestID = ptr(strings.Fields("EXTENSION")[0]) if len(strings.Split(line, "EXTENSION")[1]) > 0 { log.Message = ptr(strings.TrimSpace(strings.Split(line, "EXTENSION")[1])) } @@ -319,7 +404,6 @@ func parseLambdaPainTextLog(line string) (*LambdaLog, error) { case strings.HasPrefix(line, "TELEMETRY"): // Parse START log line log.LogType = ptr("TELEMETRY") - log.RequestID = ptr(strings.Fields("TELEMETRY")[0]) if len(strings.Split(line, "TELEMETRY")[1]) > 0 { log.Message = ptr(strings.TrimSpace(strings.Split(line, "TELEMETRY")[1])) } @@ -359,6 +443,34 @@ func extractBetween(s, start, end string) string { return strings.TrimSpace(s[i : i+j]) } +func isParsableJson(message interface{}) *map[string]interface{} { + var raw interface{} + + switch v := message.(type) { + case string: + // Try unmarshalling the JSON string directly + if err := json.Unmarshal([]byte(v), &raw); err != nil { + return nil + } + default: + // Marshal to JSON bytes + jsonData, err := json.Marshal(message) + if err != nil { + return nil + } + if err := json.Unmarshal(jsonData, &raw); err != nil { + return nil + } + } + + // Try asserting to map[string]interface{} + if parsedMap, ok := raw.(map[string]interface{}); ok { + return &parsedMap + } + + return nil +} + // isTimestamp checks if the input string matches any common Go time formats. // Used to identify if a plain text log starts with a timestamp func isTimestamp(s string) bool { @@ -385,51 +497,3 @@ func isTimestamp(s string) bool { return false } - -// Commented out legacy code -// rawRow = strings.TrimSuffix(rawRow, "\n") -// fields := strings.Fields(rawRow) - -// switch fields[0] { -// case "START", "END": -// row.LogType = &fields[0] -// row.RequestID = &fields[2] -// case "REPORT": -// row.LogType = &fields[0] -// row.RequestID = &fields[2] -// duration, err := strconv.ParseFloat(fields[4], 64) -// if err != nil { -// return nil, fmt.Errorf("error parsing duration: %w", err) -// } -// row.Duration = &duration -// billed, err := strconv.ParseFloat(fields[8], 64) -// if err != nil { -// return nil, fmt.Errorf("error parsing billed duration: %w", err) -// } -// row.BilledDuration = &billed -// mem, err := strconv.Atoi(fields[12]) -// if err != nil { -// return nil, fmt.Errorf("error parsing memory size: %w", err) -// } -// row.MemorySize = &mem -// maxMem, err := strconv.Atoi(fields[17]) -// if err != nil { -// return nil, fmt.Errorf("error parsing max memory used: %w", err) -// } -// row.MaxMemoryUsed = &maxMem -// default: -// t := "LOG" -// row.LogType = &t - -// ts, err := time.Parse(time.RFC3339, fields[0]) -// if err != nil { -// return nil, fmt.Errorf("error parsing timestamp: %w", err) -// } -// row.Timestamp = &ts - -// row.RequestID = &fields[1] -// row.LogLevel = &fields[2] -// strip := fmt.Sprintf("%s%s", strings.Join(fields[:3], "\t"), "\t") -// stripped := strings.TrimPrefix(rawRow, strip) -// row.Message = &stripped -// } diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index 1fd2efe1..b13f801f 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -1,6 +1,7 @@ package lambda_log import ( + "log/slog" "regexp" "time" @@ -40,12 +41,12 @@ func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog artifact_source.WithRowPerLine(), }, }, - { + { // S3 artifact source SourceName: cloudwatch_log_group.AwsCloudwatchLogGroupSourceIdentifier, Mapper: &LambdaLogMapper{}, }, - { + { // any artifact source SourceName: constants.ArtifactSourceIdentifier, Mapper: &LambdaLogMapper{}, @@ -59,6 +60,8 @@ func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema.SourceEnrichment) (*LambdaLog, error) { row.CommonFields = sourceEnrichmentFields.CommonFields + slog.Error("EnrichRow ===>>", "row", row) + // Record standardization row.TpID = xid.New().String() row.TpIngestTimestamp = time.Now() @@ -94,4 +97,3 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema func (c *LambdaLogTable) GetDescription() string { return "AWS Lambda logs capture detailed information about function executions, including invocation context, console output, errors, and performance metrics. This table provides a structured and queryable view of Lambda log data, enabling easier analysis, troubleshooting, and monitoring." } - From 3ba365b4fde274977431387676e4138a453e364c Mon Sep 17 00:00:00 2001 From: ParthaI Date: Thu, 26 Jun 2025 22:10:40 +0530 Subject: [PATCH 12/16] Tidy up and added the column description --- tables/lambda_log/lambda_log.go | 22 ++++++++++++++++++++++ tables/lambda_log/lambda_log_mapper.go | 2 -- tables/lambda_log/lambda_log_table.go | 3 --- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go index fcecfe2e..6c0d529a 100644 --- a/tables/lambda_log/lambda_log.go +++ b/tables/lambda_log/lambda_log.go @@ -25,3 +25,25 @@ type LambdaLog struct { MemorySize *int `json:"memory_size,omitempty"` MaxMemoryUsed *int `json:"max_memory_used,omitempty"` } + +func (c *LambdaLogTable) GetColumnDescriptions() map[string]string { + return map[string]string{ + // Lambda-specific log fields + "billed_duration": "The billed execution time in milliseconds, always rounded up to the nearest millisecond for billing purposes.", + "duration": "The actual execution time of the Lambda function in milliseconds, as reported in REPORT log entries.", + "log_group_name": "The CloudWatch log group name where the log entry was recorded, typically matches the Lambda function name.", + "log_level": "The application log level for user-generated logs (INFO, ERROR, WARN, DEBUG).", + "log_type": "The type of log entry, indicating system events like START, END, REPORT, or INIT_START.", + "max_memory_used": "The maximum amount of memory actually used during the function execution in megabytes.", + "memory_size": "The amount of memory allocated to the Lambda function in megabytes, as configured in the function settings.", + "message_json": "Extracted message parsed as JSON, populated only if the message content is valid JSON.", + "message": "Extracted and parsed message content when possible, populated for both plain text and JSON-formatted logs.", + "raw_message_json": "Full original message parsed as JSON, populated only if the original message is native JSON or convertible to JSON.", + "raw_message": "Complete original message string as received from the log source, always populated regardless of format.", + "request_id": "The unique identifier for the Lambda function invocation request.", + "timestamp": "The timestamp when the log entry was generated by the Lambda function or runtime.", + + // Tailpipe-specific metadata fields + "tp_akas": "A list of associated Amazon Resource Names (ARNs) found in the log messages.", + } +} diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index f68476bd..a00ce2c5 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -326,8 +326,6 @@ func parseLambdaPainTextLog(line string, log *LambdaLog) (*LambdaLog, error) { if id := extractAfter(line, "RequestId: "); id != "" { log.RequestID = &id } - // TODO: Handle other Platform event type of logs if any. - // https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html switch { case strings.HasPrefix(line, "START RequestId:"): // Parse START log line diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index b13f801f..89616221 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -1,7 +1,6 @@ package lambda_log import ( - "log/slog" "regexp" "time" @@ -60,8 +59,6 @@ func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema.SourceEnrichment) (*LambdaLog, error) { row.CommonFields = sourceEnrichmentFields.CommonFields - slog.Error("EnrichRow ===>>", "row", row) - // Record standardization row.TpID = xid.New().String() row.TpIngestTimestamp = time.Now() From b8058fec5c6c0e90c67fa895a5954908ae95b8f8 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Thu, 26 Jun 2025 22:12:28 +0530 Subject: [PATCH 13/16] Updated the index.md file with default file layout --- docs/tables/aws_lambda_log/index.md | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/docs/tables/aws_lambda_log/index.md b/docs/tables/aws_lambda_log/index.md index 816dfced..f8a5f049 100644 --- a/docs/tables/aws_lambda_log/index.md +++ b/docs/tables/aws_lambda_log/index.md @@ -193,17 +193,8 @@ partition "aws_lambda_log" "local_logs" { ### aws_s3_bucket -// TODO: Change/remove the file layout This table sets the following defaults for the [aws_s3_bucket source](https://hub.tailpipe.io/plugins/turbot/aws/sources/aws_s3_bucket#arguments): | Argument | Default | | ----------- | --------------------------------------------------------------- | -| file_layout | `%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{DATA}.log.gz` | - -### aws_cloudwatch_log_group - -This table sets the following defaults for the [aws_cloudwatch_log_group source](https://hub.tailpipe.io/plugins/turbot/aws/sources/aws_cloudwatch_log_group#arguments): - -| Argument | Default | -| ---------------- | ------- | -| log_stream_names | `["*"]` | +| file_layout | `AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/%{DATA:region}/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}/%{DATA}.log.zst` | From 3096098cc4dd188afb3395e8a25ec045e0c24520 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Thu, 26 Jun 2025 22:16:23 +0530 Subject: [PATCH 14/16] Added parquet tag to the columns --- tables/lambda_log/lambda_log.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go index 6c0d529a..7d055ae6 100644 --- a/tables/lambda_log/lambda_log.go +++ b/tables/lambda_log/lambda_log.go @@ -9,21 +9,21 @@ import ( type LambdaLog struct { schema.CommonFields - Timestamp *time.Time `json:"timestamp,omitempty"` - RequestID *string `json:"request_id,omitempty"` - LogType *string `json:"log_type,omitempty"` - LogLevel *string `json:"log_level,omitempty"` - Message *string `json:"message,omitempty"` - MessageJson map[string]interface{} `json:"message_json,omitempty"` - RawMessage *string `json:"raw_message,omitempty"` - RawMessageJson map[string]interface{} `json:"raw_message_json,omitempty"` - LogGroupName *string `json:"log_group_name,omitempty"` + Timestamp *time.Time `json:"timestamp,omitempty" parquet:"name=timestamp"` + RequestID *string `json:"request_id,omitempty" parquet:"name=request_id"` + LogType *string `json:"log_type,omitempty" parquet:"name=log_type"` + LogLevel *string `json:"log_level,omitempty" parquet:"name=log_level"` + Message *string `json:"message,omitempty" parquet:"name=message"` + MessageJson map[string]interface{} `json:"message_json,omitempty" parquet:"name=message_json, type=JSON"` + RawMessage *string `json:"raw_message,omitempty" parquet:"name=raw_message"` + RawMessageJson map[string]interface{} `json:"raw_message_json,omitempty" parquet:"name=raw_message_json, type=JSON"` + LogGroupName *string `json:"log_group_name,omitempty" parquet:"name=log_group_name"` // Report Specific Fields - Duration *float64 `json:"duration,omitempty"` - BilledDuration *float64 `json:"billed_duration,omitempty"` - MemorySize *int `json:"memory_size,omitempty"` - MaxMemoryUsed *int `json:"max_memory_used,omitempty"` + Duration *float64 `json:"duration,omitempty" parquet:"name=duration"` + BilledDuration *float64 `json:"billed_duration,omitempty" parquet:"name=billed_duration"` + MemorySize *int `json:"memory_size,omitempty" parquet:"name=memory_size"` + MaxMemoryUsed *int `json:"max_memory_used,omitempty" parquet:"name=max_memory_used"` } func (c *LambdaLogTable) GetColumnDescriptions() map[string]string { From 44eba61ec1a71db6cc9950300656394d595d7e61 Mon Sep 17 00:00:00 2001 From: ParthaI Date: Mon, 7 Jul 2025 20:13:43 +0530 Subject: [PATCH 15/16] Fix typo comment --- tables/lambda_log/lambda_log_mapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go index a00ce2c5..1bbda447 100644 --- a/tables/lambda_log/lambda_log_mapper.go +++ b/tables/lambda_log/lambda_log_mapper.go @@ -122,7 +122,7 @@ func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[* if (strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") || strings.HasPrefix(raw, "EXTENSION") || strings.HasPrefix(raw, "TELEMETRY") || (isTextFormat && !isTimestamp(strings.Fields(raw)[0]) && !strings.HasPrefix(raw, "["))) && parsableJsonData == nil { lambdaLog, err := parseLambdaPainTextLog(raw, row) if err != nil { - return nil, fmt.Errorf("error parsing lambda pain text log: %w", err) + return nil, fmt.Errorf("error parsing lambda plain text log: %w", err) } lambdaLog.Timestamp = row.Timestamp lambdaLog.RawMessage = &raw From a6d92f749d18853993df8f56d7ba99ff9db4ccca Mon Sep 17 00:00:00 2001 From: ParthaI Date: Mon, 7 Jul 2025 20:36:17 +0530 Subject: [PATCH 16/16] Fixed the lint failure error --- tables/lambda_log/lambda_log_table.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go index 89616221..f9db3619 100644 --- a/tables/lambda_log/lambda_log_table.go +++ b/tables/lambda_log/lambda_log_table.go @@ -69,9 +69,6 @@ func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema row.TpDate = row.TpTimestamp.Truncate(24 * time.Hour) } - // tp_index - row.TpIndex = schema.DefaultIndex - var arnRegex = regexp.MustCompile(`arn:aws:[^,\s'"\\]+`) seen := map[string]struct{}{}