diff --git a/aws/plugin.go b/aws/plugin.go index e80aec9..8326e61 100755 --- a/aws/plugin.go +++ b/aws/plugin.go @@ -13,6 +13,7 @@ import ( "github.com/turbot/tailpipe-plugin-aws/tables/cost_and_usage_report" "github.com/turbot/tailpipe-plugin-aws/tables/cost_optimization_recommendation" "github.com/turbot/tailpipe-plugin-aws/tables/guardduty_finding" + "github.com/turbot/tailpipe-plugin-aws/tables/lambda_log" "github.com/turbot/tailpipe-plugin-aws/tables/nlb_access_log" "github.com/turbot/tailpipe-plugin-aws/tables/s3_server_access_log" "github.com/turbot/tailpipe-plugin-aws/tables/vpc_flow_log" @@ -43,6 +44,7 @@ func init() { table.RegisterTable[*alb_connection_log.AlbConnectionLog, *alb_connection_log.AlbConnectionLogTable]() table.RegisterTable[*securityhub_finding.SecurityHubFinding, *securityhub_finding.SecurityHubFindingTable]() table.RegisterTable[*waf_traffic_log.WafTrafficLog, *waf_traffic_log.WafTrafficLogTable]() + table.RegisterTable[*lambda_log.LambdaLog, *lambda_log.LambdaLogTable]() // register custom table table.RegisterCustomTable[*vpc_flow_log.VpcFlowLogTable]() diff --git a/docs/tables/aws_lambda_log/index.md b/docs/tables/aws_lambda_log/index.md new file mode 100644 index 0000000..f8a5f04 --- /dev/null +++ b/docs/tables/aws_lambda_log/index.md @@ -0,0 +1,200 @@ +--- +title: "Tailpipe Table: aws_lambda_log - Query AWS Lambda Logs" +description: "AWS Lambda logs capture invocation details and function output within your AWS account." +--- + +# Table: aws_lambda_log - Query AWS Lambda Logs + +The `aws_lambda_log` table allows you to query data from [AWS Lambda logs](https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs.html). This table provides detailed information about Lambda function invocations, including request ID, log level, message content, timestamps, and more. + +## Message Format and Parsing + +The `aws_lambda_log` table provides multiple message fields to handle different log formats across Lambda runtimes: + +### Message Fields + +- **`message`** (string) – Extracted and parsed message content when possible. Always populated if a message can be extracted, including from JSON-formatted logs. +- **`message_json`** (json) – Extracted message parsed as JSON. Populated only if the extracted message is valid JSON and can be converted. +- **`raw_message`** (string) – Complete original message string as received from the log source. Always populated regardless of format. +- **`raw_message_json`** (json) – Full original message parsed as JSON. Populated only if the original message is native JSON or convertible to JSON. + +### Runtime Behavior + +The table handles logs consistently across most AWS Lambda runtimes (Node.js, Python, .NET, Go, Ruby), with automatic parsing of both plain text and JSON-formatted logs. + +**Note:** PowerShell runtime emits logs in a different format compared to other runtimes, which may affect message extraction and parsing. Refer to the [AWS Lambda runtime documentation](https://docs.aws.amazon.com/lambda/latest/dg/lambda-runtimes.html) for runtime-specific log format details. + +### JSON Log Handling + +- JSON-formatted logs are automatically parsed and stored in `raw_message_json` +- If the extracted message portion is also valid JSON, it's additionally parsed into `message_json` +- This dual approach ensures you can query both structured JSON data and extract specific message content + +## Configure + +Create a [partition](https://tailpipe.io/docs/manage/partition) for `aws_lambda_log` ([examples](https://hub.tailpipe.io/plugins/turbot/aws/tables/aws_lambda_log#example-configurations)): + +```sh +vi ~/.tailpipe/config/aws.tpc +``` + +```hcl +connection "aws" "logging_account" { + profile = "my-logging-account" +} + +partition "aws_lambda_log" "my_logs" { + source "aws_cloudwatch_log_group" { + connection = connection.aws.logging_account + log_group_name = "/aws/lambda/my-function" + region = "us-east-1" + } +} +``` + +## Collect + +[Collect](https://tailpipe.io/docs/manage/collection) logs for all `aws_lambda_log` partitions: + +```sh +tailpipe collect aws_lambda_log +``` + +Or for a single partition: + +```sh +tailpipe collect aws_lambda_log.my_logs +``` + +## Query + +**[Explore 16+ example queries for this table →](https://hub.tailpipe.io/plugins/turbot/aws/queries/aws_lambda_log)** + +### Recent Error Messages + +Find recent error messages from Lambda functions. + +```sql +select + timestamp, + tp_source_name as function_name, + log_level, + message +from + aws_lambda_log +where + log_level = 'ERROR' +order by + timestamp desc +limit 100; +``` + +### Slow Function Executions + +Identify Lambda functions with long execution times. + +```sql +select + tp_source_name as function_name, + request_id, + duration * 1000 as duration_ms, + timestamp +from + aws_lambda_log +where + duration > 5 +order by + duration desc +limit 20; +``` + +### Memory Utilization + +Find Lambda functions approaching their memory limits. + +```sql +select + tp_source_name as function_name, + request_id, + max_memory_used as memory_used_mb, + memory_size as memory_limit_mb, + round((max_memory_used::float / memory_size::float) * 100, 2) as memory_utilization_percent, + timestamp +from + aws_lambda_log +where + max_memory_used is not null + and memory_size is not null + and (max_memory_used::float / memory_size::float) > 0.8 +order by + memory_utilization_percent desc; +``` + +## Example Configurations + +### Collect all log streams explicitly + +Collect logs from all streams in a log group by explicitly setting the wildcard pattern. + +```hcl +partition "aws_lambda_log" "all_streams" { + source "aws_cloudwatch_log_group" { + connection = connection.aws.logging_account + log_group_name = "/aws/lambda/my-function" + log_stream_names = ["*"] + region = "us-east-1" + } +} +``` + +### Filter logs by log stream prefix + +Collect Lambda logs from streams with a specific prefix. + +```hcl +partition "aws_lambda_log" "prefix_filtered_logs" { + source "aws_cloudwatch_log_group" { + connection = connection.aws.logging_account + log_group_name = "/aws/lambda/my-function" + log_stream_names = ["PROD_*", "2023/07/*"] + region = "us-east-1" + } +} +``` + +### Collect logs from an S3 bucket + +Collect Lambda logs archived to an S3 bucket. + +```hcl +partition "aws_lambda_log" "s3_logs" { + source "aws_s3_bucket" { + connection = connection.aws.logging_account + bucket = "lambda-logs-bucket" + prefix = "lambda-logs" + } +} +``` + +### Collect logs from local files + +You can also collect Lambda logs from local files. + +```hcl +partition "aws_lambda_log" "local_logs" { + source "file" { + paths = ["/Users/myuser/lambda_logs"] + file_layout = `%{DATA}.log` + } +} +``` + +## Source Defaults + +### aws_s3_bucket + +This table sets the following defaults for the [aws_s3_bucket source](https://hub.tailpipe.io/plugins/turbot/aws/sources/aws_s3_bucket#arguments): + +| Argument | Default | +| ----------- | --------------------------------------------------------------- | +| file_layout | `AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/%{DATA:region}/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}/%{DATA}.log.zst` | diff --git a/docs/tables/aws_lambda_log/queries.md b/docs/tables/aws_lambda_log/queries.md new file mode 100644 index 0000000..0b2c061 --- /dev/null +++ b/docs/tables/aws_lambda_log/queries.md @@ -0,0 +1,582 @@ +## Activity Examples + +### Recent Lambda Log Activity + +This query shows the most recent Lambda log entries across all functions. Real-time monitoring of Lambda activity helps with troubleshooting issues and understanding the current state of your serverless applications. + +```sql +select + tp_timestamp, + request_id, + log_type, + log_level, + raw_message, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name +from + aws_lambda_log +order by + tp_timestamp desc +limit + 100; +``` + +```yaml +folder: Lambda +``` + +### Lambda Execution Trends by Hour + +This query shows Lambda execution trends by hour for each function. Understanding these patterns helps with capacity planning, identifying unusual activity spikes, and optimizing resources based on time-of-day usage patterns. + +```sql +select + date_trunc('hour', tp_timestamp) as hour, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(*) as execution_count +from + aws_lambda_log +where + log_type = 'START' +group by + hour, + lambda_function_name, + log_group_name +order by + hour desc, + execution_count desc; +``` + +```yaml +folder: Lambda +``` + +### Application Log Level Distribution + +This query analyzes the distribution of log levels in Lambda application logs. Reviewing log level patterns helps identify functions generating excessive logs or experiencing frequent errors that may impact application reliability. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + log_level, + count(*) as log_count +from + aws_lambda_log +where + log_level is not null +group by + lambda_function_name, + log_group_name, + log_level +order by + lambda_function_name, + log_count desc; +``` + +```yaml +folder: Lambda +``` + +### Execution Flow for a Specific Request + +This query shows the complete execution flow for a specific Lambda request. Tracing the sequence of logs for a single invocation helps debug issues and understand function behavior from start to finish. + +```sql +select + tp_timestamp, + log_type, + log_level, + substring(raw_message, 1, 200) as message_preview, + case + when log_type in ('START', 'END', 'REPORT', 'INIT_START') then 'System Log' + when log_level is not null then 'Application Log' + else 'Other' + end as log_category, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name +from + aws_lambda_log +where + request_id = '9286dcef-4fac-4706-99f6-0f0087763dbc' +order by + tp_timestamp asc; +``` + +```yaml +folder: Lambda +``` + +## Detection Examples + +### Lambda Error and Timeout Analysis + +This query finds the most recent Lambda function errors, timeouts, and other critical issues. Monitoring these errors helps identify reliability issues and functions that need error handling improvements for better application stability. + +```sql +select + tp_timestamp, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + request_id, + raw_message, + case + when raw_message ilike '%timed out%' then 'Timeout' + when raw_message ilike '%memory size exceeded%' then 'Memory Exceeded' + when raw_message ilike '%process exited before completing request%' then 'Process Exited' + when log_level = 'ERROR' then 'Application Error' + else 'Other Error' + end as error_type +from + aws_lambda_log +where + log_level = 'ERROR' + or raw_message ilike '%timed out%' + or raw_message ilike '%memory size exceeded%' + or raw_message ilike '%process exited before completing request%' +order by + tp_timestamp desc +limit + 100; +``` + +```yaml +folder: Lambda +``` + +### Functions with High Billing-to-Execution Time Ratio + +This query identifies functions with high billing-to-execution time ratios. Optimizing these functions can reduce costs by addressing the gap between actual runtime and billed duration, especially for functions with significant billing overhead. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + round(avg(duration), 2) as avg_duration_ms, + round(avg(billed_duration), 2) as avg_billed_duration_ms, + round(avg(billed_duration - duration), 2) as avg_billing_overhead_ms, + round(avg(billed_duration * 100.0 / nullif(duration, 0)) - 100, 2) as billing_overhead_percent, + count(*) as execution_count +from + aws_lambda_log +where + log_type = 'REPORT' + and duration > 0 + and billed_duration is not null +group by + lambda_function_name, + log_group_name +having + avg(billed_duration - duration) > 10 +order by + billing_overhead_percent desc; +``` + +```yaml +folder: Lambda +``` + +### Lambda Cold Start Analysis + +This query analyzes Lambda cold starts by counting initialization events for each function. Identifying functions with frequent cold starts helps prioritize optimization efforts to reduce latency and improve user experience. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(distinct case when log_type = 'START' then request_id end) as total_executions, + count(distinct case when raw_message ilike '%init duration%' then request_id end) as cold_start_count, + round(count(distinct case when raw_message ilike '%init duration%' then request_id end) * 100.0 / + nullif(count(distinct case when log_type = 'START' then request_id end), 0), 2) as cold_start_percentage, + avg(case when raw_message ilike '%init duration%' + then cast(regexp_extract(raw_message, 'Init Duration: ([0-9.]+) ms', 1) as double) end) as avg_init_duration_ms +from + aws_lambda_log +where + tp_timestamp >= current_timestamp - interval '7 day' +group by + lambda_function_name, + log_group_name +having + count(distinct case when log_type = 'START' then request_id end) > 0 +order by + cold_start_count desc; +``` + +```yaml +folder: Lambda +``` + +### Lambda Throttling Analysis + +This query analyzes throttling patterns by hour to identify capacity constraints. Understanding when throttling occurs helps optimize concurrency limits and adjust scaling policies to prevent service disruptions during peak usage times. + +```sql +select + date_trunc('hour', tp_timestamp) as hour, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(*) as total_logs, + count(case when raw_message ilike '%function was throttled%' then 1 end) as throttle_count, + round(count(case when raw_message ilike '%function was throttled%' then 1 end) * 100.0 / nullif(count(*), 0), 2) as throttle_percentage +from + aws_lambda_log +where + tp_timestamp >= current_timestamp - interval '7 day' +group by + hour, + lambda_function_name, + log_group_name +having + count(case when raw_message ilike '%function was throttled%' then 1 end) > 0 +order by + hour desc, + throttle_count desc; +``` + +```yaml +folder: Lambda +``` + +## Operational Examples + +### Top Slowest Lambda Function Executions + +This query identifies the slowest Lambda function executions by examining REPORT logs. Finding these slow executions helps pinpoint specific instances that require optimization to improve overall performance. + +```sql +select + tp_timestamp, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + request_id, + duration as duration_ms, + billed_duration as billed_duration_ms, + memory_size as allocated_memory_mb, + max_memory_used as used_memory_mb +from + aws_lambda_log +where + log_type = 'REPORT' + and duration is not null + and duration > 1000 +order by + duration desc +limit + 20; +``` + +```yaml +folder: Lambda +``` + +### Memory Utilization Efficiency + +This query calculates memory utilization efficiency for each Lambda function. Finding the right memory allocation helps optimize costs while maintaining performance by identifying over-provisioned or under-provisioned functions. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + memory_size as allocated_memory_mb, + round(avg(max_memory_used), 2) as avg_used_memory_mb, + round(avg(max_memory_used * 100.0 / nullif(memory_size, 0)), 2) as memory_utilization_percent, + round(avg(duration), 2) as avg_duration_ms, + count(*) as execution_count, + case + when round(avg(max_memory_used * 100.0 / nullif(memory_size, 0)), 2) < 50 then 'Over-provisioned' + when round(avg(max_memory_used * 100.0 / nullif(memory_size, 0)), 2) > 85 then 'Under-provisioned' + else 'Well-balanced' + end as memory_allocation_assessment +from + aws_lambda_log +where + log_type = 'REPORT' + and memory_size is not null + and max_memory_used is not null + and duration is not null +group by + lambda_function_name, + log_group_name, + memory_size +having + count(*) >= 10 +order by + memory_utilization_percent desc; +``` + +```yaml +folder: Lambda +``` + +### Detailed Request Execution Analysis + +This query analyzes each request's complete execution lifecycle including message patterns and timing between execution phases. This helps identify bottlenecks in function execution flow and understand the context of each request through message analysis. + +```sql +with request_phases as ( + select + request_id, + tp_source_name, + log_group_name, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as function_name, + min(case when log_type = 'START' then tp_timestamp end) as start_time, + min(case when log_type = 'END' then tp_timestamp end) as end_time, + min(case when log_type = 'REPORT' then tp_timestamp end) as report_time, + max(case when log_type = 'REPORT' then duration end) as duration_ms, + max(case when log_type = 'REPORT' then billed_duration end) as billed_duration_ms, + max(case when log_type = 'REPORT' then memory_size end) as allocated_memory_mb, + max(case when log_type = 'REPORT' then max_memory_used end) as max_memory_used_mb, + count(case when log_level = 'INFO' then 1 end) as info_log_count, + count(case when log_level = 'ERROR' then 1 end) as error_log_count, + count(case when log_level = 'WARN' then 1 end) as warn_log_count, + count(case when log_level = 'DEBUG' then 1 end) as debug_log_count, + bool_or(raw_message ilike '%timed out%') as has_timeout, + bool_or(raw_message ilike '%init duration%') as has_cold_start, + bool_or(raw_message ilike '%memory size%' and raw_message ilike '%max memory used%') as has_memory_metrics, + bool_or(raw_message ilike '%error%' or raw_message ilike '%exception%' or raw_message ilike '%fail%') as has_error_keywords + from + aws_lambda_log + where + request_id is not null + group by + request_id, + tp_source_name, + log_group_name, + function_name +), +message_samples as ( + select + request_id, + array_agg(raw_message) as message_samples + from + (select + request_id, + raw_message, + row_number() over (partition by request_id order by tp_timestamp) as rn + from + aws_lambda_log + where + request_id is not null + and log_level is not null + order by + tp_timestamp) t + where + rn <= 3 + group by + request_id +) +select + rp.request_id, + rp.start_time, + rp.end_time, + extract(epoch from (rp.end_time - rp.start_time)) * 1000 as total_execution_time_ms, + rp.duration_ms as reported_duration_ms, + rp.billed_duration_ms, + rp.allocated_memory_mb, + rp.max_memory_used_mb, + rp.info_log_count, + rp.error_log_count, + rp.warn_log_count, + rp.debug_log_count, + rp.has_timeout, + rp.has_cold_start, + rp.has_memory_metrics, + rp.has_error_keywords, + ms.message_samples[1] as first_message, + rp.function_name as lambda_function_name, + rp.log_group_name +from + request_phases rp +left join + message_samples ms on rp.request_id = ms.request_id +where + rp.start_time is not null + and rp.end_time is not null +order by + rp.start_time desc +limit + 50; +``` + +```yaml +folder: Lambda +``` + +## Volume Examples + +### Lambda Function Execution Summary + +This query summarizes execution metrics for each Lambda function. It helps identify frequently invoked functions and their performance characteristics for cost optimization and performance tuning. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(*) as execution_count, + avg(duration) as avg_duration_ms, + min(duration) as min_duration_ms, + max(duration) as max_duration_ms, + avg(billed_duration) as avg_billed_duration_ms, + avg(max_memory_used) as avg_memory_used_mb, + max(memory_size) as allocated_memory_mb +from + aws_lambda_log +where + log_type = 'REPORT' +group by + lambda_function_name, + log_group_name +order by + execution_count desc; +``` + +```yaml +folder: Lambda +``` + +### Lambda Invocation and Error Trends + +This query tracks daily Lambda invocation and error patterns. Monitoring these trends helps detect abnormal behavior, understand the impact of code changes, and identify functions with increasing error rates. + +```sql +select + date_trunc('day', tp_timestamp) as day, + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + count(case when log_type = 'START' then 1 end) as invocation_count, + count(case when log_level = 'ERROR' then 1 end) as error_count, + count(case when raw_message ilike '%timed out%' then 1 end) as timeout_count, + count(case when raw_message ilike '%function was throttled%' then 1 end) as throttle_count, + round(count(case when log_level = 'ERROR' + or raw_message ilike '%timed out%' + or raw_message ilike '%function was throttled%' then 1 end) * 100.0 / + nullif(count(case when log_type = 'START' then 1 end), 0), 2) as error_percentage +from + aws_lambda_log +where + tp_timestamp >= current_timestamp - interval '30 day' +group by + day, + lambda_function_name, + log_group_name +having + count(case when log_type = 'START' then 1 end) > 0 +order by + day desc, + invocation_count desc; +``` + +```yaml +folder: Lambda +``` + +## Baseline Examples + +### Lambda Duration Distribution + +This query categorizes Lambda function executions into duration ranges. Understanding execution time distribution helps identify inconsistent performance patterns and optimize functions that show high variability in runtime. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + case + when duration < 100 then '< 100ms' + when duration < 500 then '100-500ms' + when duration < 1000 then '500ms-1s' + when duration < 3000 then '1s-3s' + when duration < 10000 then '3s-10s' + else '> 10s' + end as duration_range, + count(*) as execution_count +from + aws_lambda_log +where + log_type = 'REPORT' + and duration is not null +group by + lambda_function_name, + log_group_name, + duration_range +order by + lambda_function_name, + case + when duration_range = '< 100ms' then 1 + when duration_range = '100-500ms' then 2 + when duration_range = '500ms-1s' then 3 + when duration_range = '1s-3s' then 4 + when duration_range = '3s-10s' then 5 + else 6 + end; +``` + +```yaml +folder: Lambda +``` + +### Average Billed Duration by Memory Configuration + +This query analyzes average billed duration across different memory configurations. This analysis helps optimize cost and performance by finding the best memory setting for each function's runtime needs. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + memory_size as allocated_memory_mb, + round(avg(billed_duration), 2) as avg_billed_duration_ms, + min(billed_duration) as min_billed_duration_ms, + max(billed_duration) as max_billed_duration_ms, + count(*) as execution_count +from + aws_lambda_log +where + log_type = 'REPORT' + and billed_duration is not null + and memory_size is not null +group by + lambda_function_name, + log_group_name, + memory_size +order by + avg_billed_duration_ms desc; +``` + +```yaml +folder: Lambda +``` + +### Most Common Error Messages + +This query identifies the most common error patterns across Lambda functions. Finding recurring error patterns helps prioritize which issues to fix first and understand the reliability challenges affecting specific functions. + +```sql +select + regexp_replace(tp_source_name, '^/aws/lambda/', '') as lambda_function_name, + log_group_name, + regexp_replace(raw_message, '([a-f0-9]{8}(-[a-f0-9]{4}){3}-[a-f0-9]{12}|[\d\.]+|"[^"]*"|''[^'']*'')', 'X') as normalized_error_pattern, + count(*) as occurrence_count, + min(tp_timestamp) as first_occurrence, + max(tp_timestamp) as last_occurrence +from + aws_lambda_log +where + log_level = 'ERROR' + or raw_message ilike '%error:%' + or raw_message ilike '%exception:%' + or raw_message ilike '%timed out%' +group by + lambda_function_name, + log_group_name, + normalized_error_pattern +having + count(*) > 1 +order by + occurrence_count desc, + lambda_function_name; +``` + +```yaml +folder: Lambda +``` diff --git a/tables/lambda_log/lambda_log.go b/tables/lambda_log/lambda_log.go new file mode 100644 index 0000000..7d055ae --- /dev/null +++ b/tables/lambda_log/lambda_log.go @@ -0,0 +1,49 @@ +package lambda_log + +import ( + "time" + + "github.com/turbot/tailpipe-plugin-sdk/schema" +) + +type LambdaLog struct { + schema.CommonFields + + Timestamp *time.Time `json:"timestamp,omitempty" parquet:"name=timestamp"` + RequestID *string `json:"request_id,omitempty" parquet:"name=request_id"` + LogType *string `json:"log_type,omitempty" parquet:"name=log_type"` + LogLevel *string `json:"log_level,omitempty" parquet:"name=log_level"` + Message *string `json:"message,omitempty" parquet:"name=message"` + MessageJson map[string]interface{} `json:"message_json,omitempty" parquet:"name=message_json, type=JSON"` + RawMessage *string `json:"raw_message,omitempty" parquet:"name=raw_message"` + RawMessageJson map[string]interface{} `json:"raw_message_json,omitempty" parquet:"name=raw_message_json, type=JSON"` + LogGroupName *string `json:"log_group_name,omitempty" parquet:"name=log_group_name"` + + // Report Specific Fields + Duration *float64 `json:"duration,omitempty" parquet:"name=duration"` + BilledDuration *float64 `json:"billed_duration,omitempty" parquet:"name=billed_duration"` + MemorySize *int `json:"memory_size,omitempty" parquet:"name=memory_size"` + MaxMemoryUsed *int `json:"max_memory_used,omitempty" parquet:"name=max_memory_used"` +} + +func (c *LambdaLogTable) GetColumnDescriptions() map[string]string { + return map[string]string{ + // Lambda-specific log fields + "billed_duration": "The billed execution time in milliseconds, always rounded up to the nearest millisecond for billing purposes.", + "duration": "The actual execution time of the Lambda function in milliseconds, as reported in REPORT log entries.", + "log_group_name": "The CloudWatch log group name where the log entry was recorded, typically matches the Lambda function name.", + "log_level": "The application log level for user-generated logs (INFO, ERROR, WARN, DEBUG).", + "log_type": "The type of log entry, indicating system events like START, END, REPORT, or INIT_START.", + "max_memory_used": "The maximum amount of memory actually used during the function execution in megabytes.", + "memory_size": "The amount of memory allocated to the Lambda function in megabytes, as configured in the function settings.", + "message_json": "Extracted message parsed as JSON, populated only if the message content is valid JSON.", + "message": "Extracted and parsed message content when possible, populated for both plain text and JSON-formatted logs.", + "raw_message_json": "Full original message parsed as JSON, populated only if the original message is native JSON or convertible to JSON.", + "raw_message": "Complete original message string as received from the log source, always populated regardless of format.", + "request_id": "The unique identifier for the Lambda function invocation request.", + "timestamp": "The timestamp when the log entry was generated by the Lambda function or runtime.", + + // Tailpipe-specific metadata fields + "tp_akas": "A list of associated Amazon Resource Names (ARNs) found in the log messages.", + } +} diff --git a/tables/lambda_log/lambda_log_mapper.go b/tables/lambda_log/lambda_log_mapper.go new file mode 100644 index 0000000..1bbda44 --- /dev/null +++ b/tables/lambda_log/lambda_log_mapper.go @@ -0,0 +1,497 @@ +package lambda_log + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "regexp" + "strconv" + "strings" + "time" + + cwTypes "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" + "github.com/turbot/tailpipe-plugin-sdk/mappers" +) + +type LambdaLogMapper struct { +} + +func (m *LambdaLogMapper) Identifier() string { + return "lambda_log_mapper" +} + +// JSON format for system logs as described in AWS docs +// Contains "time", "type", and "record" fields +// System logs are sometimes known as platform event logs +// https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs-advanced.html#monitoring-cloudwatchlogs-logformat +type jsonFormatSystemLog struct { + Time string `json:"time"` + Type string `json:"type"` + Record map[string]interface{} `json:"record"` +} + +// JSON format for application logs as described in AWS docs +// Contains "timestamp", "level", "message", and "requestId" fields +// Generated by Lambda functions using supported logging methods +// https://docs.aws.amazon.com/lambda/latest/dg/monitoring-cloudwatchlogs-advanced.html#monitoring-cloudwatchlogs-logformat +type jsonFormatApplicationLog struct { + Timestamp string `json:"timestamp"` + Level string `json:"level"` + Message string `json:"message"` + RequestID string `json:"requestId"` +} + +// While storing the logs in the S3 bucket, the logs are stored in the following format: +type s3FormatLog struct { + AccountId *string `json:"accountId"` + LogGroup *string `json:"logGroup"` + LogStream *string `json:"logStream"` + Id *string `json:"id"` + Timestamp *int64 `json:"timestamp"` + Message *string `json:"message"` +} + +func (m *LambdaLogMapper) Map(_ context.Context, a any, _ ...mappers.MapOption[*LambdaLog]) (*LambdaLog, error) { + row := &LambdaLog{} + isCwLog := false + + var raw string + // Declare s3Format and isTextFormat at the start for use in lambda log stored in S3 bucket handling + s3Format := &s3FormatLog{} + isTextFormat := true + + switch v := a.(type) { + case []byte: + raw = string(v) + case string: + raw = v + case *string: + raw = *v + case cwTypes.FilteredLogEvent: + raw = *v.Message + t := time.UnixMilli(*v.Timestamp) + row.Timestamp = &t + isCwLog = true + default: + return nil, fmt.Errorf("expected string or []byte, got %T", a) + } + + // --- Lambda log stored in S3 bucket handling --- + // If the log is in the format stored in an S3 bucket, unmarshal and extract timestamp, log group, and message + if err := s3Format.UnmarshalJSON([]byte(raw)); err == nil && !isCwLog { + t := time.UnixMilli(*s3Format.Timestamp) + if err == nil { + row.Timestamp = &t + } + + if s3Format.LogGroup != nil { + row.LogGroupName = s3Format.LogGroup + } + + // Try to unquote the message (if it's a string) + msgStr, err := strconv.Unquote(*s3Format.Message) + if err == nil { + isTextFormat = true + raw = strings.TrimSpace(msgStr) + } + if err != nil { + // If message is a JSON object, unquoting will fail; treat as JSON + isTextFormat = false + raw = strings.TrimSpace(*s3Format.Message) + parsedData := isParsableJson(raw) + if parsedData != nil { + data := *parsedData + if v, ok := data["message"].(string); ok { + onlyMessageParsable := isParsableJson(v) + if onlyMessageParsable != nil { + row.MessageJson = *onlyMessageParsable + } else { + row.Message = &v + } + } + } + slog.Error("Error unquoting message", "error", err) + } + } + + // --- Main log parsing logic --- + raw = strings.TrimSpace(raw) + parsableJsonData := isParsableJson(raw) + // Handle plain text system logs (START, END, REPORT, etc.) that are not JSON + if (strings.HasPrefix(raw, "REPORT") || strings.HasPrefix(raw, "END") || strings.HasPrefix(raw, "START") || strings.HasPrefix(raw, "INIT_START") || strings.HasPrefix(raw, "EXTENSION") || strings.HasPrefix(raw, "TELEMETRY") || (isTextFormat && !isTimestamp(strings.Fields(raw)[0]) && !strings.HasPrefix(raw, "["))) && parsableJsonData == nil { + lambdaLog, err := parseLambdaPainTextLog(raw, row) + if err != nil { + return nil, fmt.Errorf("error parsing lambda plain text log: %w", err) + } + lambdaLog.Timestamp = row.Timestamp + lambdaLog.RawMessage = &raw + parsedData := isParsableJson(raw) + if parsedData != nil { + data := *parsedData + if v, ok := data["message"].(string); ok { + lambdaLog.Message = &v + } + if v, ok := data["requestId"].(string); ok { + lambdaLog.RequestID = &v + } + if v, ok := data["level"].(string); ok { + lambdaLog.LogLevel = &v + } + lambdaLog.MessageJson = *parsedData + } + return lambdaLog, nil + } + + // --- JSON log detection and parsing --- + var probe map[string]json.RawMessage + if err := json.Unmarshal([]byte(raw), &probe); err == nil { + // System log (platform event) JSON + if _, hasType := probe["type"]; hasType { + var systemLog jsonFormatSystemLog + if err := json.Unmarshal([]byte(raw), &systemLog); err != nil { + return nil, fmt.Errorf("error unmarshalling as system log: %w", err) + } + + // Parse timestamp and type + if t, err := time.Parse(time.RFC3339, systemLog.Time); err == nil { + row.Timestamp = &t + } + row.LogType = &systemLog.Type + + // Extract requestId and metrics if present + if requestId, ok := systemLog.Record["requestId"].(string); ok { + row.RequestID = &requestId + } + if metrics, ok := systemLog.Record["metrics"].(map[string]interface{}); ok { + if v, ok := metrics["durationMs"].(float64); ok { + row.Duration = &v + } + if v, ok := metrics["billedDurationMs"].(float64); ok { + row.BilledDuration = &v + } + if v, ok := metrics["memorySizeMB"].(float64); ok { + mem := int(v) + row.MemorySize = &mem + } + if v, ok := metrics["maxMemoryUsedMB"].(float64); ok { + mem := int(v) + row.MaxMemoryUsed = &mem + } + } + + // Attach full JSON as RawMessageJson + jsonLog := convertRawMessageMap(probe) + if jsonLog != nil { + row.RawMessageJson = *jsonLog + } + + // Extract message as JSON or string + if msg, ok := (*jsonLog)["message"]; ok { + parsedData := isParsableJson(msg) + if parsedData != nil { + row.MessageJson = *parsedData + } else { + str, ok := msg.(string) + if ok { + row.Message = &str + } + } + } + + row.RawMessage = &raw + return row, nil + } else if _, hasLevel := probe["level"]; hasLevel { + // Application log (JSON format) + var appLog jsonFormatApplicationLog + if err := json.Unmarshal([]byte(raw), &appLog); err != nil { + return nil, fmt.Errorf("error unmarshalling as application log: %w", err) + } + + if t, err := time.Parse(time.RFC3339, appLog.Timestamp); err == nil { + row.Timestamp = &t + } + row.LogLevel = &appLog.Level + row.RequestID = &appLog.RequestID + + jsonLog := convertRawMessageMap(probe) + if jsonLog != nil { + row.RawMessageJson = *jsonLog + } + + if msg, ok := (*jsonLog)["message"]; ok { + parsedData := isParsableJson(msg) + if parsedData != nil { + row.MessageJson = *parsedData + } else { + str, ok := msg.(string) + if ok { + row.Message = &str + } + } + } + + row.RawMessage = &raw + return row, nil + } + } + if len(strings.Fields(raw)) >= 4 && (isTimestamp(strings.Fields(raw)[0]) || isTimestamp(strings.Fields(raw)[1])) { + // plain text application log + // Handle plain text application logs (format: timestamp requestID logLevel message) + // Example: 2024-10-27T19:17:45.586Z 79b4f56e-95b1-4643-9700-2807f4e68189 INFO some log message + // [INFO] 2025-05-26T06:42:27.551Z 66e2e287-e14b-471e-8185-faca26d2c310 This is a function log + fields := strings.Fields(raw) + // Try to parse timestamp from first or second field + if t, err := time.Parse(time.RFC3339, fields[0]); err == nil { + row.Timestamp = &t + } + if t1, err := time.Parse(time.RFC3339, fields[1]); err == nil { + row.Timestamp = &t1 + } + + // Extract log level and request ID + uuidRegex := regexp.MustCompile(`[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}`) + logLevelRegex := regexp.MustCompile(`\[?(INFO|DEBUG|WARN|ERROR|FATAL|TRACE)\]?`) + + if logLevelMatch := logLevelRegex.FindStringSubmatch(raw); len(logLevelMatch) > 1 { + row.LogLevel = &logLevelMatch[1] + } + if uuidMatch := uuidRegex.FindString(raw); uuidMatch != "" { + row.RequestID = &uuidMatch + } + if len(fields) >= 4 { + msg := strings.Join(fields[3:], " ") + parsableJsonMessage := isParsableJson(msg) + if parsableJsonMessage != nil { + row.MessageJson = *parsableJsonMessage + } else { + row.Message = &msg + } + } + row.RawMessage = &raw + } + + return row, nil +} + +func convertRawMessageMap(probe map[string]json.RawMessage) *map[string]interface{} { + result := make(map[string]interface{}) + + for key, raw := range probe { + var value interface{} + if err := json.Unmarshal(raw, &value); err != nil { + // Handle or skip on error + continue + } + result[key] = value + } + + return &result +} + +// Custom unmarshaller for s3FormatLog +func (s *s3FormatLog) UnmarshalJSON(data []byte) error { + type Alias s3FormatLog + aux := &struct { + Message json.RawMessage `json:"message"` + *Alias + }{ + Alias: (*Alias)(s), + } + + if err := json.Unmarshal(data, &aux); err != nil { + return fmt.Errorf("error unmarshalling base struct: %w", err) + } + + // Try unmarshalling message as string + var strMsg string + if err := json.Unmarshal(aux.Message, &strMsg); err == nil { + s.Message = &strMsg + return nil + } + + // Else fallback to raw JSON object (e.g., {"time": ..., ...}) + rawMsg := string(aux.Message) + s.Message = &rawMsg + return nil +} + +// parseLambdaPainTextLog handles the legacy plain text format for Lambda system logs +// These include START, END, and REPORT messages that aren't in JSON format +// Example: +// START RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 +// END RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 +// REPORT RequestId: 8b133862-5331-4ded-ac5d-1ad5da5aee81 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 128 MB Max Memory Used: 84 MB +func parseLambdaPainTextLog(line string, log *LambdaLog) (*LambdaLog, error) { + if id := extractAfter(line, "RequestId: "); id != "" { + log.RequestID = &id + } + switch { + case strings.HasPrefix(line, "START RequestId:"): + // Parse START log line + log.LogType = ptr("START") + if id := extractAfter(line, "START RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) + if len(strings.Split(line, "RequestId: "+id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: "+id)[1] + } + } else { + log.Message = ptr(line) + } + case strings.HasPrefix(line, "INIT_START"): + // Parse START log line + log.LogType = ptr("INIT_START") + if id := extractAfter(line, "INIT_START RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) + if len(strings.Split(line, "RequestId: "+id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: "+id)[1] + } + } else { + msg := strings.Split(line, "INIT_START "+id)[1] + log.Message = &msg + } + case strings.HasPrefix(line, "END RequestId:"): + // Parse END log line + log.LogType = ptr("END") + if id := extractAfter(line, "END RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) + if len(strings.Split(line, "RequestId: "+id)[1]) > 0 { + log.Message = &strings.Split(line, "RequestId: "+id)[1] + } + } else { + log.Message = ptr(line) + } + case strings.HasPrefix(line, "REPORT RequestId:"): + // Parse REPORT log line which contains metrics + log.LogType = ptr("REPORT") + + // Extract RequestId + if id := extractAfter(line, "REPORT RequestId: "); id != "" { + log.RequestID = ptr(strings.Fields(id)[0]) + } + + // Extract numeric metrics from REPORT line + if val := extractBetween(line, "Duration: ", " ms"); val != "" { + if f, err := strconv.ParseFloat(val, 64); err == nil { + log.Duration = &f + } + } + if val := extractBetween(line, "Billed Duration: ", " ms"); val != "" { + if f, err := strconv.ParseFloat(val, 64); err == nil { + log.BilledDuration = &f + } + } + if val := extractBetween(line, "Memory Size: ", " MB"); val != "" { + if i, err := strconv.Atoi(val); err == nil { + log.MemorySize = &i + } + } + if val := extractBetween(line, "Max Memory Used: ", " MB"); val != "" { + if i, err := strconv.Atoi(val); err == nil { + log.MaxMemoryUsed = &i + } + } + + case strings.HasPrefix(line, "EXTENSION"): + // Parse START log line + log.LogType = ptr("EXTENSION") + if len(strings.Split(line, "EXTENSION")[1]) > 0 { + log.Message = ptr(strings.TrimSpace(strings.Split(line, "EXTENSION")[1])) + } + + case strings.HasPrefix(line, "TELEMETRY"): + // Parse START log line + log.LogType = ptr("TELEMETRY") + if len(strings.Split(line, "TELEMETRY")[1]) > 0 { + log.Message = ptr(strings.TrimSpace(strings.Split(line, "TELEMETRY")[1])) + } + + default: + log.Message = ptr(line) + } + + return log, nil +} + +// ptr is a helper function to return a pointer to a value +func ptr[T any](v T) *T { + return &v +} + +// extractAfter extracts substring that comes after a specific prefix +func extractAfter(s, prefix string) string { + idx := strings.Index(s, prefix) + if idx == -1 { + return "" + } + return strings.TrimSpace(s[idx+len(prefix):]) +} + +// extractBetween extracts substring between start and end strings +func extractBetween(s, start, end string) string { + i := strings.Index(s, start) + if i == -1 { + return "" + } + i += len(start) + j := strings.Index(s[i:], end) + if j == -1 { + return "" + } + return strings.TrimSpace(s[i : i+j]) +} + +func isParsableJson(message interface{}) *map[string]interface{} { + var raw interface{} + + switch v := message.(type) { + case string: + // Try unmarshalling the JSON string directly + if err := json.Unmarshal([]byte(v), &raw); err != nil { + return nil + } + default: + // Marshal to JSON bytes + jsonData, err := json.Marshal(message) + if err != nil { + return nil + } + if err := json.Unmarshal(jsonData, &raw); err != nil { + return nil + } + } + + // Try asserting to map[string]interface{} + if parsedMap, ok := raw.(map[string]interface{}); ok { + return &parsedMap + } + + return nil +} + +// isTimestamp checks if the input string matches any common Go time formats. +// Used to identify if a plain text log starts with a timestamp +func isTimestamp(s string) bool { + layouts := []string{ + time.RFC3339, + time.RFC3339Nano, + time.RFC1123, + time.RFC1123Z, + time.RFC822, + time.RFC822Z, + time.RFC850, + "2006-01-02 15:04:05", // Common log format + "2006-01-02 15:04:05.000000", // With microseconds + "2006-01-02T15:04:05", // ISO-like without timezone + "2006-01-02T15:04:05Z07:00", // ISO with TZ offset + "20060102T150405Z", // AWS style compact + } + + for _, layout := range layouts { + if _, err := time.Parse(layout, s); err == nil { + return true + } + } + + return false +} diff --git a/tables/lambda_log/lambda_log_table.go b/tables/lambda_log/lambda_log_table.go new file mode 100644 index 0000000..f9db361 --- /dev/null +++ b/tables/lambda_log/lambda_log_table.go @@ -0,0 +1,93 @@ +package lambda_log + +import ( + "regexp" + "time" + + "github.com/rs/xid" + "github.com/turbot/pipe-fittings/v2/utils" + + "github.com/turbot/tailpipe-plugin-aws/sources/cloudwatch_log_group" + "github.com/turbot/tailpipe-plugin-aws/sources/s3_bucket" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source" + "github.com/turbot/tailpipe-plugin-sdk/artifact_source_config" + "github.com/turbot/tailpipe-plugin-sdk/constants" + "github.com/turbot/tailpipe-plugin-sdk/row_source" + "github.com/turbot/tailpipe-plugin-sdk/schema" + "github.com/turbot/tailpipe-plugin-sdk/table" +) + +const LambdaLogTableIdentifier = "aws_lambda_log" + +type LambdaLogTable struct{} + +func (c *LambdaLogTable) Identifier() string { + return LambdaLogTableIdentifier +} + +func (c *LambdaLogTable) GetSourceMetadata() ([]*table.SourceMetadata[*LambdaLog], error) { + defaultS3ArtifactConfig := &artifact_source_config.ArtifactSourceConfigImpl{ + FileLayout: utils.ToStringPointer("AWSLogs/(%{DATA:org_id}/)?%{NUMBER:account_id}/%{DATA:region}/%{DATA:function_name}/%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}/%{HOUR:hour}/%{DATA}.log.zst"), + } + + return []*table.SourceMetadata[*LambdaLog]{ + { + // S3 artifact source + SourceName: s3_bucket.AwsS3BucketSourceIdentifier, + Mapper: &LambdaLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithDefaultArtifactSourceConfig(defaultS3ArtifactConfig), + artifact_source.WithRowPerLine(), + }, + }, + { + // S3 artifact source + SourceName: cloudwatch_log_group.AwsCloudwatchLogGroupSourceIdentifier, + Mapper: &LambdaLogMapper{}, + }, + { + // any artifact source + SourceName: constants.ArtifactSourceIdentifier, + Mapper: &LambdaLogMapper{}, + Options: []row_source.RowSourceOption{ + artifact_source.WithRowPerLine(), + }, + }, + }, nil +} + +func (c *LambdaLogTable) EnrichRow(row *LambdaLog, sourceEnrichmentFields schema.SourceEnrichment) (*LambdaLog, error) { + row.CommonFields = sourceEnrichmentFields.CommonFields + + // Record standardization + row.TpID = xid.New().String() + row.TpIngestTimestamp = time.Now() + if row.Timestamp != nil { + row.TpTimestamp = *row.Timestamp + row.TpDate = row.Timestamp.Truncate(24 * time.Hour) + } else if !row.TpTimestamp.IsZero() { + row.TpDate = row.TpTimestamp.Truncate(24 * time.Hour) + } + + var arnRegex = regexp.MustCompile(`arn:aws:[^,\s'"\\]+`) + + seen := map[string]struct{}{} + if row.Message != nil { + for _, match := range arnRegex.FindAllString(*row.Message, -1) { + if _, exists := seen[match]; !exists { + seen[match] = struct{}{} + row.TpAkas = append(row.TpAkas, match) + } + } + } + + if row.LogGroupName == nil { + row.LogGroupName = row.TpSourceName + } + + return row, nil +} + +func (c *LambdaLogTable) GetDescription() string { + return "AWS Lambda logs capture detailed information about function executions, including invocation context, console output, errors, and performance metrics. This table provides a structured and queryable view of Lambda log data, enabling easier analysis, troubleshooting, and monitoring." +}