Skip to content

Latest commit

 

History

History

web-analytics-iceberg

Web Log Analytics with Amazon Kinesis Data Streams Proxy using Amazon API Gateway

This repository provides you cdk scripts and sample code on how to implement a simple web analytics system.
Below diagram shows what we are implementing.

web-analytics-arch

The cdk.json file tells the CDK Toolkit how to execute your app.

This project is set up like a standard Python project. The initialization process also creates a virtualenv within this project, stored under the .venv directory. To create the virtualenv it assumes that there is a python3 (or python for Windows) executable in your path with access to the venv package. If for any reason the automatic creation of the virtualenv fails, you can create the virtualenv manually.

To manually create a virtualenv on MacOS and Linux:

$ python3 -m venv .venv

After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.

$ source .venv/bin/activate

If you are a Windows platform, you would activate the virtualenv like this:

% .venv\Scripts\activate.bat

Once the virtualenv is activated, you can install the required dependencies.

(.venv) $ pip install -r requirements.txt

To add additional dependencies, for example other CDK libraries, just add them to your setup.py file and rerun the pip install -r requirements.txt command.

Upload Lambda Layer code

Before deployment, you should uplad zipped code files to s3 like this:

(.venv) $ aws s3api create-bucket --bucket your-s3-bucket-name-for-lambda-layer-code --region region-name
(.venv) $ ./build-aws-lambda-layer-package.sh your-s3-bucket-name-for-lambda-layer-code

⚠️ To create a bucket outside of the us-east-1 region, aws s3api create-bucket command requires the appropriate LocationConstraint to be specified in order to create the bucket in the desired region. For more information, see these examples.

⚠️ Make sure you have Docker installed.

For example,

(.venv) $ aws s3api create-bucket --bucket lambda-layer-resources --region us-east-1
(.venv) $ ./build-aws-lambda-layer-package.sh lambda-layer-resources

For more information about how to create a package for Amazon Lambda Layer, see here.

Deploy

Before to synthesize the CloudFormation template for this code, you should update cdk.context.json file.
In particular, you need to fill the s3 location of the previously created lambda lay codes.

For example,

{
  "firehose_data_tranform_lambda": {
    "s3_bucket_name": "lambda-layer-resources",
    "s3_object_key": "var/fastavro-lib.zip"
  },
  "data_firehose_configuration": {
    "buffering_hints": {
      "interval_in_seconds": 60,
      "size_in_mbs": 128
    },
    "transform_records_with_aws_lambda": {
      "buffer_size": 3,
      "buffer_interval": 300,
      "number_of_retries": 3
    },
    "destination_iceberg_table_configuration": {
      "database_name": "web_log_iceberg_db",
      "table_name": "web_log_iceberg"
    },
    "output_prefix": "web_log_iceberg_db/web_log_iceberg",
    "error_output_prefix": "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}"
  }
}

ℹ️ database_name, and table_name of data_firehose_configuration.destination_iceberg_table_configuration is used in Set up Delivery Stream step.

ℹ️ When updating or deleting records in an Iceberg table, specify the table's primary key column name as unique_keys in the data_firehose_configuration.destination_iceberg_table_configuration settings. For example,

"destination_iceberg_table_configuration": {
  "database_name": "web_log_iceberg_db",
  "table_name": "web_log_iceberg",
  "unique_keys": [
    "user_id", "timestamp"
  ]
}

Now you are ready to synthesize the CloudFormation template for this code.

(.venv) $ export CDK_DEFAULT_ACCOUNT=$(aws sts get-caller-identity --query Account --output text)
(.venv) $ export CDK_DEFAULT_REGION=$(aws configure get region)
(.venv) $ cdk synth --all

Now let's try to deploy.

List all CDK Stacks

(.venv) $ cdk list
WebAnalyticsKdsProxyApiGw
WebAnalyticsKinesisStream
WebAnalyticsDataFirehoseToIcebergS3Path
WebAnalyticsFirehoseDataTransformLambdaStack
WebAnalyticsFirehoseToIcebergRoleStack
WebAnalyticsGrantLFPermissionsOnFirehoseRole
WebAnalyticsFirehoseToIcebergStack

Use cdk deploy command to create the stack shown above.

Create API endpoint for web data collection

(.venv) $ cdk deploy --require-approval never \
              WebAnalyticsKdsProxyApiGw \
              WebAnalyticsKinesisStream

Set up Delivery Stream

  1. Create a S3 bucket for Apache Iceberg table

    (.venv) $ cdk deploy --require-approval never WebAnalyticsDataFirehoseToIcebergS3Path
    
  2. Create a table with partitioned data in Amazon Athena

    Go to Athena on the AWS Management console.

    • (step 1) Create a database

      In order to create a new database called web_log_iceberg_db, enter the following statement in the Athena query editor and click the Run button to execute the query.

       CREATE DATABASE IF NOT EXISTS web_log_iceberg_db;
       
    • (step 2) Create a table

      Copy the following query into the Athena query editor.

      Update LOCATION to your S3 bucket name and execute the query to create a new table.

       CREATE TABLE web_log_iceberg_db.web_log_iceberg (
         `user_id` string,
         `session_id` string,
         `event` string,
         `referrer` string,
         `user_agent` string,
         `ip` string,
         `hostname` string,
         `os` string,
         `timestamp` timestamp,
         `uri` string
       )
       PARTITIONED BY (event)
       LOCATION 's3://web-analytics-{region}-{account_id}/web_log_iceberg_db/web_log_iceberg'
       TBLPROPERTIES (
         'table_type'='iceberg',
         'format'='parquet',
         'write_compression'='snappy',
         'optimize_rewrite_delete_file_threshold'='10'
       );
       

      If the query is successful, a table named web_log_iceberg is created and displayed on the left panel under the Tables section.

      If you get an error, check if (a) you have updated the LOCATION to the correct S3 bucket name, (b) you have web_log_iceberg_db selected under the Database dropdown, and (c) you have AwsDataCatalog selected as the Data source.

  3. Create a lambda function to process the streaming data.

    (.venv) $ cdk deploy --require-approval never WebAnalyticsFirehoseDataTransformLambdaStack
    
  4. To allow Data Firehose to ingest data into the Apache Iceberg table, create an IAM role and grant permissions to the role.

    (.venv) $ cdk deploy --require-approval never \
                  WebAnalyticsFirehoseToIcebergRoleStack \
                  WebAnalyticsGrantLFPermissionsOnFirehoseRole
    

    ℹ️ If you fail to create the table, give Athena users access permissions on web_log_iceberg_db through AWS Lake Formation, or you can grant Amazon Data Firehose to access web_log_iceberg_db by running the following command:

    (.venv) $ aws lakeformation grant-permissions \
                  --principal DataLakePrincipalIdentifier=arn:aws:iam::{account-id}:role/role-id \
                  --permissions CREATE_TABLE DESCRIBE ALTER DROP \
                  --resource '{ "Database": { "Name": "web_log_iceberg_db" } }'
    (.venv) $ aws lakeformation grant-permissions \
                  --principal DataLakePrincipalIdentifier=arn:aws:iam::{account-id}:role/role-id \
                  --permissions SELECT DESCRIBE ALTER INSERT DELETE DROP \
                  --resource '{ "Table": {"DatabaseName": "web_log_iceberg_db", "TableWildcard": {}} }'
    
  5. Deploy Amazon Data Firehose.

    (.venv) $ cdk deploy --require-approval never WebAnalyticsFirehoseToIcebergStack
    

Run Test

  1. Run GET /streams method to invoke ListStreams in Kinesis

    $ curl -X GET https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1/streams
    

    The response is:

    {
      "HasMoreStreams": false,
      "StreamNames": [
        "PUT-Firehose-aEhWz"
      ],
      "StreamSummaries": [
        {
          "StreamARN": "arn:aws:kinesis:us-east-1:123456789012:stream/PUT-Firehose-aEhWz",
          "StreamCreationTimestamp": 1661612556,
          "StreamModeDetails": {
            "StreamMode": "ON_DEMAND"
          },
          "StreamName": "PUT-Firehose-aEhWz",
          "StreamStatus": "ACTIVE"
        }
      ]
    }
    
  2. Generate test data.

    (.venv) $ pip install -r requirements-dev.txt
    (.venv) $ python src/utils/gen_fake_data.py --max-count 5 --stream-name PUT-Firehose-aEhWz --api-url 'https://your-api-gateway-id.execute-api.us-east-1.amazonaws.com/v1' --api-method records
    [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260289903462649185194773668901646666226496176178","ShardId":"shardId-000000000003"}]}
    [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260289903462649185194774877827466280924390359090","ShardId":"shardId-000000000003"}]}
    [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260223001227053593325351479598467950537766600706","ShardId":"shardId-000000000000"}]}
    [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260245301972252123948494224242560213528447287314","ShardId":"shardId-000000000001"}]}
    [200 OK] {"EncryptionType":"KMS","FailedRecordCount":0,"Records":[{"SequenceNumber":"49633315260223001227053593325353897450107179933554966530","ShardId":"shardId-000000000000"}]}
    
  3. Check streaming data in S3

    After 5~10 minutes, you can see that the streaming data have been delivered from Kinesis Data Streams to S3.

    iceberg-table iceberg-table-data-level-01 iceberg-table-data-level-02 iceberg-table-data-level-03

  4. Run test query using Amazon Athena

    Go to Athena on the AWS Management console.

    • (Step 1) Specify the workgroup to use

      To run queries, switch to the appropriate workgroup like this: amazon-athena-switching-to-workgroup

    • (Step 2) Run test query

      Enter the following SQL statement and execute the query.

      SELECT COUNT(*)
      FROM web_log_iceberg_db.web_log_iceberg;
      

Clean Up

Delete the CloudFormation stack by running the below command.

(.venv) $ cdk destroy --force --all

Useful commands

  • cdk ls list all stacks in the app
  • cdk synth emits the synthesized CloudFormation template
  • cdk deploy deploy this stack to your default AWS account/region
  • cdk diff compare deployed stack with current state
  • cdk docs open CDK documentation

Enjoy!

References

Security

See CONTRIBUTING for more information.

License

This library is licensed under the MIT-0 License. See the LICENSE file.