11// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22// SPDX-License-Identifier: Apache-2.0
33
4- use lambda_http:: {
5- handler,
6- lambda_runtime:: { self , Context } ,
7- Body , IntoResponse , Request , RequestExt , Response ,
8- } ;
4+ use lambda_extension:: { service_fn as extension_handler, Extension } ;
5+ use lambda_http:: { service_fn as http_handler, Body , IntoResponse , Request , Response } ;
96use log:: * ;
107use once_cell:: sync:: OnceCell ;
118use reqwest:: { redirect, Client } ;
12- use serde_json:: json;
13- use std:: { env, mem, thread} ;
14- use tokio_retry:: strategy:: FixedInterval ;
15- use tokio_retry:: Retry ;
16- use url:: form_urlencoded:: Serializer ;
9+ use std:: { env, mem} ;
10+ use tokio:: runtime:: Handle ;
11+ use tokio_retry:: { strategy:: FixedInterval , Retry } ;
1712
1813type Error = Box < dyn std:: error:: Error + Send + Sync + ' static > ;
1914static HTTP_CLIENT : OnceCell < Client > = OnceCell :: new ( ) ;
@@ -23,25 +18,16 @@ async fn main() -> Result<(), Error> {
2318 env_logger:: init ( ) ;
2419
2520 // register as an external extension
26- let aws_lambda_runtime_api = env:: var ( "AWS_LAMBDA_RUNTIME_API" ) . unwrap ( ) ;
27- let extension_next_url = format ! ( "http://{}/2020-01-01/extension/event/next" , aws_lambda_runtime_api) ;
28- let extension_register_url = format ! ( "http://{}/2020-01-01/extension/register" , aws_lambda_runtime_api) ;
29- let executable_name = env:: current_exe ( ) . unwrap ( ) . file_name ( ) . unwrap ( ) . to_string_lossy ( ) . to_string ( ) ;
30- let client = reqwest:: blocking:: ClientBuilder :: new ( ) . timeout ( None ) . build ( ) . unwrap ( ) ;
31- let resp = client
32- . post ( extension_register_url)
33- . header ( "Lambda-Extension-Name" , executable_name)
34- . json ( & json ! ( { "events" : [ ] } ) )
35- . send ( ) ?;
36- let extension_id = resp. headers ( ) . get ( "Lambda-Extension-Identifier" ) . unwrap ( ) . clone ( ) ;
37- thread:: spawn ( move || {
38- let extension_id_str = extension_id. to_str ( ) . unwrap ( ) ;
39- debug ! ( "[extension] enter event loop for extension id: '{}'" , extension_id_str) ;
40- client
41- . get ( extension_next_url)
42- . header ( "Lambda-Extension-Identifier" , extension_id_str)
43- . send ( )
44- . unwrap ( ) ;
21+ let handle = Handle :: current ( ) ;
22+ tokio:: task:: spawn_blocking ( move || {
23+ handle. spawn ( async {
24+ Extension :: new ( )
25+ . with_events ( & [ ] )
26+ . with_events_processor ( extension_handler ( |_| async { Ok :: < ( ) , Error > ( ( ) ) } ) )
27+ . run ( )
28+ . await
29+ . expect ( "extension thread error" ) ;
30+ } )
4531 } ) ;
4632
4733 // check if the application is ready every 10 milliseconds
@@ -57,30 +43,15 @@ async fn main() -> Result<(), Error> {
5743
5844 // start lambda runtime
5945 HTTP_CLIENT . set ( Client :: builder ( ) . redirect ( redirect:: Policy :: none ( ) ) . build ( ) . unwrap ( ) ) . unwrap ( ) ;
60- lambda_runtime :: run ( handler ( http_proxy_handler) ) . await ?;
46+ lambda_http :: run ( http_handler ( http_proxy_handler) ) . await ?;
6147 Ok ( ( ) )
6248}
6349
64- async fn http_proxy_handler ( event : Request , _ : Context ) -> Result < impl IntoResponse , Error > {
50+ async fn http_proxy_handler ( event : Request ) -> Result < impl IntoResponse , Error > {
6551 let port = env:: var ( "PORT" ) . unwrap_or_else ( |_| "8080" . to_string ( ) ) ;
6652 let app_host = format ! ( "http://127.0.0.1:{}" , port) ;
67- let query_params = event. query_string_parameters ( ) ;
68- debug ! ( "query_params are {:#?}" , query_params) ;
69-
7053 let ( parts, body) = event. into_parts ( ) ;
71- let mut app_url = app_host + parts. uri . path ( ) ;
72-
73- // append query parameters to app_url
74- if !query_params. is_empty ( ) {
75- app_url. push ( '?' ) ;
76- let mut serializer = Serializer :: new ( & mut app_url) ;
77- for ( key, _) in query_params. iter ( ) {
78- for value in query_params. get_all ( key) . unwrap ( ) . iter ( ) {
79- serializer. append_pair ( key, value) ;
80- }
81- }
82- serializer. finish ( ) ;
83- }
54+ let app_url = app_host + parts. uri . path_and_query ( ) . unwrap ( ) . as_str ( ) ;
8455 debug ! ( "app_url is {:#?}" , app_url) ;
8556 debug ! ( "request headers are {:#?}" , parts. headers) ;
8657
@@ -102,7 +73,6 @@ async fn http_proxy_handler(event: Request, _: Context) -> Result<impl IntoRespo
10273}
10374
10475async fn convert_body ( app_response : reqwest:: Response ) -> Result < Body , Error > {
105- let content_type;
10676 debug ! ( "app response headers are {:#?}" , app_response. headers( ) ) ;
10777
10878 if app_response. headers ( ) . get ( http:: header:: CONTENT_ENCODING ) . is_some ( ) {
@@ -111,11 +81,11 @@ async fn convert_body(app_response: reqwest::Response) -> Result<Body, Error> {
11181 return Ok ( Body :: Binary ( content. to_vec ( ) ) ) ;
11282 }
11383
114- if let Some ( value) = app_response. headers ( ) . get ( http:: header:: CONTENT_TYPE ) {
115- content_type = value. to_str ( ) . unwrap_or_default ( ) ;
84+ let content_type = if let Some ( value) = app_response. headers ( ) . get ( http:: header:: CONTENT_TYPE ) {
85+ value. to_str ( ) . unwrap_or_default ( )
11686 } else {
117- content_type = "" ;
118- }
87+ ""
88+ } ;
11989 debug ! ( "content_type is {:?}" , content_type) ;
12090
12191 if content_type. starts_with ( "text" )
0 commit comments