diff --git a/.github/workflows/_lambda-do-release-runners.yml b/.github/workflows/_lambda-do-release-runners.yml index bea7a17451..d764b5034e 100644 --- a/.github/workflows/_lambda-do-release-runners.yml +++ b/.github/workflows/_lambda-do-release-runners.yml @@ -85,12 +85,13 @@ jobs: fail-fast: false matrix: include: [ - { dir-name: 'ci-queue-pct', zip-name: 'ci-queue-pct' }, - { dir-name: 'oss_ci_job_queue_time', zip-name: 'oss-ci-job-queue-time' }, - { dir-name: 'oss_ci_cur', zip-name: 'oss-ci-cur' }, + { dir-name: 'ci-queue-pct', zip-name: 'ci-queue-pct' }, + { dir-name: 'oss_ci_job_queue_time', zip-name: 'oss-ci-job-queue-time' }, + { dir-name: 'oss_ci_cur', zip-name: 'oss-ci-cur' }, { dir-name: 'benchmark-results-uploader', zip-name: 'benchmark-results-uploader' }, - { dir-name: 'pytorch-auto-revert', zip-name: 'pytorch-auto-revert' }, + { dir-name: 'pytorch-auto-revert', zip-name: 'pytorch-auto-revert' }, { dir-name: 'keep-going-call-log-classifier', zip-name: 'keep-going-call-log-classifier' }, + { dir-name: 'buildkite-webhook-handler', zip-name: 'buildkite-webhook-handler' }, ] name: Upload Release for ${{ matrix.dir-name }} lambda runs-on: ubuntu-latest diff --git a/aws/lambda/buildkite-webhook-handler/Makefile b/aws/lambda/buildkite-webhook-handler/Makefile new file mode 100644 index 0000000000..478548770a --- /dev/null +++ b/aws/lambda/buildkite-webhook-handler/Makefile @@ -0,0 +1,19 @@ +all: run-local + +clean: + rm -rf deployment + rm -rf venv + rm -rf deployment.zip + +venv/bin/python: + virtualenv venv + venv/bin/pip install -r requirements.txt + +deployment.zip: + mkdir -p deployment + cp lambda_function.py ./deployment/. + pip3.10 install -r requirements.txt -t ./deployment/. --platform manylinux2014_x86_64 --only-binary=:all: --implementation cp --python-version 3.10 --upgrade + cd ./deployment && zip -q -r ../deployment.zip . + +.PHONY: create-deployment-package +create-deployment-package: deployment.zip diff --git a/aws/lambda/buildkite-webhook-handler/README.md b/aws/lambda/buildkite-webhook-handler/README.md new file mode 100644 index 0000000000..306396c30c --- /dev/null +++ b/aws/lambda/buildkite-webhook-handler/README.md @@ -0,0 +1,53 @@ +# Buildkite Webhook Handler Lambda + +This Lambda function receives and processes Buildkite webhook events for +all available Buildkite webhook events, saving them to DynamoDB tables. + +* In the near-term, this allows vLLM maintainers to explore their CI data +like time to signals or queueing time. +* In the longer-term, this will provide the foundation for future UX projects +on vLLM like vLLM HUD, CI failures notifications. + +## Overview + +The lambda handles two types of Buildkite webhook events: +- **Agent events** (`agent.*`) - Saved to `vllm-buildkite-agent-events` table +- **Build events** (`build.*`) - Saved to `vllm-buildkite-build-events` table +- **Job events** (`job.*`) - Saved to `vllm-buildkite-job-events` table + +## DynamoDB Schema + +### Agent Events Table: `vllm-buildkite-agent-events` +- **Partition Key**: `dynamoKey` (format: `AGENT_ID`) +- https://buildkite.com/docs/apis/webhooks/pipelines/agent-events + +### Build Events Table: `vllm-buildkite-build-events` +- **Partition Key**: `dynamoKey` (format: `REPO_NAME/PIPELINE_NAME/BUILD_NUMBER`) +- https://buildkite.com/docs/apis/webhooks/pipelines/build-events + +### Job Events Table: `vllm-buildkite-job-events` +- **Partition Key**: `dynamoKey` (format: `REPO_NAME/JOB_ID`) +- https://buildkite.com/docs/apis/webhooks/pipelines/job-events + +## Deployment + +```bash +make create-deployment-package +``` + +This creates a `deployment.zip` file ready for AWS Lambda deployment. + +## Event Processing + +The lambda automatically: +1. Identifies event type from webhook payload +2. Extracts repository name and relevant IDs +3. Saves to appropriate DynamoDB table with structured key +4. Returns success/error response + +## Error Handling + +- Invalid JSON payloads return 400 status +- Missing required fields return 400 status +- DynamoDB errors return 500 status +- Unsupported event types return 400 status diff --git a/aws/lambda/buildkite-webhook-handler/lambda_function.py b/aws/lambda/buildkite-webhook-handler/lambda_function.py new file mode 100644 index 0000000000..49b72165be --- /dev/null +++ b/aws/lambda/buildkite-webhook-handler/lambda_function.py @@ -0,0 +1,203 @@ +import json +from typing import Any, Dict + +import boto3 +from botocore.exceptions import ClientError + + +dynamodb = boto3.resource("dynamodb") +agent_events_table = dynamodb.Table("vllm-buildkite-agent-events") +build_events_table = dynamodb.Table("vllm-buildkite-build-events") +job_events_table = dynamodb.Table("vllm-buildkite-job-events") + + +def save_agent_event(event_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Save agent events to DynamoDB table. + + Args: + event_data: The agent event payload from Buildkite + + Returns: + Dict[str, Any]: Response containing status and result information + """ + try: + agent = event_data.get("agent", {}) + agent_id = agent.get("id", "") + + if not agent_id: + return { + "statusCode": 400, + "body": json.dumps({"message": "Missing agent ID"}), + } + + dynamo_key = agent_id + item = {"dynamoKey": dynamo_key, **event_data} + + agent_events_table.put_item(Item=item) + + return { + "statusCode": 200, + "body": json.dumps( + {"message": f"Agent event saved successfully with key: {dynamo_key}"} + ), + } + + except ClientError as e: + return { + "statusCode": 500, + "body": json.dumps({"message": f"DynamoDB error: {str(e)}"}), + } + except Exception as e: + return { + "statusCode": 500, + "body": json.dumps({"message": f"Error saving agent event: {str(e)}"}), + } + + +def save_build_event(event_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Save build event to DynamoDB table. + + Args: + event_data: The build event payload from Buildkite + + Returns: + Dict[str, Any]: Response containing status and result information + """ + try: + build = event_data.get("build", {}) + repo_name = event_data.get("pipeline", {}).get("repository", "").split("/")[-1] + pipeline_name = event_data.get("pipeline", {}).get("name", "") + build_number = build.get("number", "") + + if not repo_name or not build_number: + return { + "statusCode": 400, + "body": json.dumps( + {"message": "Missing repository name or build number"} + ), + } + + # Buildkite build_number is only unique in a pipeline + dynamo_key = f"{repo_name}/{pipeline_name}/{build_number}" + + item = {"dynamoKey": dynamo_key, **event_data} + build_events_table.put_item(Item=item) + + return { + "statusCode": 200, + "body": json.dumps( + {"message": f"Build event saved successfully with key: {dynamo_key}"} + ), + } + + except ClientError as e: + return { + "statusCode": 500, + "body": json.dumps({"message": f"DynamoDB error: {str(e)}"}), + } + except Exception as e: + return { + "statusCode": 500, + "body": json.dumps({"message": f"Error saving build event: {str(e)}"}), + } + + +def save_job_event(event_data: Dict[str, Any]) -> Dict[str, Any]: + """ + Save job event to DynamoDB table. + + Args: + event_data: The job event payload from Buildkite + + Returns: + Dict[str, Any]: Response containing status and result information + """ + try: + job = event_data.get("job", {}) + repo_name = event_data.get("pipeline", {}).get("repository", "").split("/")[-1] + job_id = job.get("id", "") + + if not repo_name or not job_id: + return { + "statusCode": 400, + "body": json.dumps({"message": "Missing repository name or job ID"}), + } + + dynamo_key = f"{repo_name}/{job_id}" + + item = {"dynamoKey": dynamo_key, **event_data} + + job_events_table.put_item(Item=item) + + return { + "statusCode": 200, + "body": json.dumps( + {"message": f"Job event saved successfully with key: {dynamo_key}"} + ), + } + + except ClientError as e: + return { + "statusCode": 500, + "body": json.dumps({"message": f"DynamoDB error: {str(e)}"}), + } + except Exception as e: + return { + "statusCode": 500, + "body": json.dumps({"message": f"Error saving job event: {str(e)}"}), + } + + +def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: + """ + Main Lambda handler function for Buildkite webhook events. + + Args: + event: Contains the webhook payload from Buildkite + context: Provides runtime information about the Lambda function + + Returns: + Dict[str, Any]: Response containing status and result information + """ + try: + if event.get("body"): + body = json.loads(event["body"]) + else: + body = event + + event_type = body.get("event") + + if not event_type: + return { + "statusCode": 400, + "body": json.dumps( + {"message": "Missing event type in webhook payload"} + ), + } + + if event_type.startswith("agent."): + return save_agent_event(body) + elif event_type.startswith("build."): + return save_build_event(body) + elif event_type.startswith("job."): + return save_job_event(body) + else: + return { + "statusCode": 400, + "body": json.dumps( + {"message": f"Unsupported event type: {event_type}"} + ), + } + + except json.JSONDecodeError as e: + return { + "statusCode": 400, + "body": json.dumps({"message": f"Invalid JSON payload: {str(e)}"}), + } + except Exception as e: + return { + "statusCode": 500, + "body": json.dumps({"message": f"Unexpected error: {str(e)}"}), + } diff --git a/aws/lambda/buildkite-webhook-handler/requirements.txt b/aws/lambda/buildkite-webhook-handler/requirements.txt new file mode 100644 index 0000000000..7e78fe8858 --- /dev/null +++ b/aws/lambda/buildkite-webhook-handler/requirements.txt @@ -0,0 +1 @@ +boto3==1.36.21 diff --git a/aws/lambda/buildkite-webhook-handler/test_lambda_function.py b/aws/lambda/buildkite-webhook-handler/test_lambda_function.py new file mode 100644 index 0000000000..25f3dfd55a --- /dev/null +++ b/aws/lambda/buildkite-webhook-handler/test_lambda_function.py @@ -0,0 +1,241 @@ +import json +import unittest +from unittest.mock import patch + +from lambda_function import ( + lambda_handler, + save_agent_event, + save_build_event, + save_job_event, +) + + +class TestBuildkiteWebhookHandler(unittest.TestCase): + def setUp(self): + # Sample agent event + self.agent_event = { + "event": "agent.connected", + "agent": { + "id": "test-agent-123", + "name": "test-agent", + "hostname": "test-host", + }, + } + + # Sample build event + self.build_event = { + "event": "build.finished", + "build": {"number": 123, "branch": "main", "state": "passed"}, + "pipeline": { + "repository": "https://github.com/test/repo", + "name": "test-pipeline", + }, + } + + # Sample job event + self.job_event = { + "event": "job.finished", + "job": {"id": "test-job-456", "name": "test-job", "state": "passed"}, + "pipeline": {"repository": "https://github.com/test/repo"}, + } + + # Lambda event with body + self.lambda_event_with_body = {"body": json.dumps(self.build_event)} + + # Lambda event without body (direct invocation) + self.lambda_event_direct = self.build_event + + @patch("lambda_function.agent_events_table") + def test_save_agent_event_success(self, mock_table): + mock_table.put_item.return_value = {} + + response = save_agent_event(self.agent_event) + + self.assertEqual(response["statusCode"], 200) + body = json.loads(response["body"]) + self.assertIn("Agent event saved successfully", body["message"]) + self.assertIn("test-agent-123", body["message"]) + + mock_table.put_item.assert_called_once() + call_args = mock_table.put_item.call_args[1] + self.assertEqual(call_args["Item"]["dynamoKey"], "test-agent-123") + + @patch("lambda_function.agent_events_table") + def test_save_agent_event_missing_id(self, mock_table): + event_without_id = {"event": "agent.connected", "agent": {"name": "test-agent"}} + + response = save_agent_event(event_without_id) + + self.assertEqual(response["statusCode"], 400) + body = json.loads(response["body"]) + self.assertEqual(body["message"], "Missing agent ID") + mock_table.put_item.assert_not_called() + + @patch("lambda_function.agent_events_table") + def test_save_agent_event_dynamodb_error(self, mock_table): + from botocore.exceptions import ClientError + + mock_table.put_item.side_effect = ClientError( + {"Error": {"Code": "ValidationException", "Message": "Test error"}}, + "PutItem", + ) + + response = save_agent_event(self.agent_event) + + self.assertEqual(response["statusCode"], 500) + body = json.loads(response["body"]) + self.assertIn("DynamoDB error", body["message"]) + + @patch("lambda_function.build_events_table") + def test_save_build_event_success(self, mock_table): + mock_table.put_item.return_value = {} + + response = save_build_event(self.build_event) + + self.assertEqual(response["statusCode"], 200) + body = json.loads(response["body"]) + self.assertIn("Build event saved successfully", body["message"]) + self.assertIn("repo/test-pipeline/123", body["message"]) + + mock_table.put_item.assert_called_once() + call_args = mock_table.put_item.call_args[1] + self.assertEqual(call_args["Item"]["dynamoKey"], "repo/test-pipeline/123") + + @patch("lambda_function.build_events_table") + def test_save_build_event_missing_data(self, mock_table): + event_without_build_number = { + "event": "build.finished", + "build": {"branch": "main"}, + "pipeline": { + "repository": "https://github.com/test/repo", + "name": "test-pipeline", + }, + } + + response = save_build_event(event_without_build_number) + + self.assertEqual(response["statusCode"], 400) + body = json.loads(response["body"]) + self.assertEqual(body["message"], "Missing repository name or build number") + mock_table.put_item.assert_not_called() + + @patch("lambda_function.job_events_table") + def test_save_job_event_success(self, mock_table): + mock_table.put_item.return_value = {} + + response = save_job_event(self.job_event) + + self.assertEqual(response["statusCode"], 200) + body = json.loads(response["body"]) + self.assertIn("Job event saved successfully", body["message"]) + self.assertIn("repo/test-job-456", body["message"]) + + mock_table.put_item.assert_called_once() + call_args = mock_table.put_item.call_args[1] + self.assertEqual(call_args["Item"]["dynamoKey"], "repo/test-job-456") + + @patch("lambda_function.job_events_table") + def test_save_job_event_missing_data(self, mock_table): + event_without_job_id = { + "event": "job.finished", + "job": {"name": "test-job"}, + "pipeline": {"repository": "https://github.com/test/repo"}, + } + + response = save_job_event(event_without_job_id) + + self.assertEqual(response["statusCode"], 400) + body = json.loads(response["body"]) + self.assertEqual(body["message"], "Missing repository name or job ID") + mock_table.put_item.assert_not_called() + + @patch("lambda_function.save_build_event") + def test_lambda_handler_with_body(self, mock_save_build): + mock_save_build.return_value = { + "statusCode": 200, + "body": json.dumps({"message": "Success"}), + } + + response = lambda_handler(self.lambda_event_with_body, {}) + + self.assertEqual(response["statusCode"], 200) + mock_save_build.assert_called_once_with(self.build_event) + + @patch("lambda_function.save_build_event") + def test_lambda_handler_direct_event(self, mock_save_build): + mock_save_build.return_value = { + "statusCode": 200, + "body": json.dumps({"message": "Success"}), + } + + response = lambda_handler(self.lambda_event_direct, {}) + + self.assertEqual(response["statusCode"], 200) + mock_save_build.assert_called_once_with(self.build_event) + + @patch("lambda_function.save_agent_event") + def test_lambda_handler_agent_event(self, mock_save_agent): + agent_lambda_event = {"body": json.dumps(self.agent_event)} + mock_save_agent.return_value = { + "statusCode": 200, + "body": json.dumps({"message": "Success"}), + } + + response = lambda_handler(agent_lambda_event, {}) + + self.assertEqual(response["statusCode"], 200) + mock_save_agent.assert_called_once_with(self.agent_event) + + @patch("lambda_function.save_job_event") + def test_lambda_handler_job_event(self, mock_save_job): + job_lambda_event = {"body": json.dumps(self.job_event)} + mock_save_job.return_value = { + "statusCode": 200, + "body": json.dumps({"message": "Success"}), + } + + response = lambda_handler(job_lambda_event, {}) + + self.assertEqual(response["statusCode"], 200) + mock_save_job.assert_called_once_with(self.job_event) + + def test_lambda_handler_missing_event_type(self): + event_without_type = {"body": json.dumps({"some": "data"})} + + response = lambda_handler(event_without_type, {}) + + self.assertEqual(response["statusCode"], 400) + body = json.loads(response["body"]) + self.assertEqual(body["message"], "Missing event type in webhook payload") + + def test_lambda_handler_unsupported_event_type(self): + unsupported_event = {"body": json.dumps({"event": "unsupported.event"})} + + response = lambda_handler(unsupported_event, {}) + + self.assertEqual(response["statusCode"], 400) + body = json.loads(response["body"]) + self.assertEqual(body["message"], "Unsupported event type: unsupported.event") + + def test_lambda_handler_invalid_json(self): + invalid_json_event = {"body": "invalid json"} + + response = lambda_handler(invalid_json_event, {}) + + self.assertEqual(response["statusCode"], 400) + body = json.loads(response["body"]) + self.assertIn("Invalid JSON payload", body["message"]) + + @patch("lambda_function.save_build_event") + def test_lambda_handler_unexpected_error(self, mock_save_build): + mock_save_build.side_effect = Exception("Unexpected error") + + response = lambda_handler({"body": json.dumps(self.build_event)}, {}) + + self.assertEqual(response["statusCode"], 500) + body = json.loads(response["body"]) + self.assertIn("Unexpected error", body["message"]) + + +if __name__ == "__main__": + unittest.main()