Skip to content

Commit

Permalink
[PACHA-18] sql: forward request headers to connectors (#1036)
Browse files Browse the repository at this point in the history
<!-- The PR description should answer 2 important questions: -->

### What

When a command is called through the sql interface, we now respect
`argumentPresets` configured at the data connector link. We also
partially support `responseHeaders` in that we extract the response from
the 'result' key but not extract 'headers' and forward them.

### How

The part of the engine code that deals with data connector argument
presets is exposed from the ir crate and re-used in the sql layer.

V3_GIT_ORIGIN_REV_ID: 7c3124596a9bbc2b18cb79cb899c75fd4de3f7e5
  • Loading branch information
0x777 authored and hasura-bot committed Aug 29, 2024
1 parent 846ae16 commit 42ce01d
Show file tree
Hide file tree
Showing 21 changed files with 312 additions and 13 deletions.
1 change: 1 addition & 0 deletions v3/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions v3/crates/engine/bin/engine/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,7 @@ async fn pre_execution_plugins_middleware<'a>(

/// Handle a SQL request and execute it.
async fn handle_sql_request(
headers: axum::http::header::HeaderMap,
State(state): State<Arc<EngineState>>,
Extension(session): Extension<Session>,
Json(request): Json<sql::execute::SqlRequest>,
Expand All @@ -680,6 +681,7 @@ async fn handle_sql_request(
|| {
Box::pin(async {
sql::execute::execute_sql(
Arc::new(headers),
state.sql_context.clone(),
Arc::new(session),
Arc::new(state.http_context.clone()),
Expand Down
12 changes: 12 additions & 0 deletions v3/crates/engine/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,7 @@ pub(crate) fn test_sql(test_path_string: &str) -> anyhow::Result<()> {

let request_path = test_path.join("query.sql");
let request_path_json = test_path.join("query.json");
let headers_path_json = test_path.join("headers.json");
let response_path = test_path_string.to_string() + "/expected.json";
let explain_path = test_path_string.to_string() + "/plan.json";
let metadata_path = root_test_dir.join("sql/metadata.json");
Expand Down Expand Up @@ -590,6 +591,13 @@ pub(crate) fn test_sql(test_path_string: &str) -> anyhow::Result<()> {
serde_json::from_str(&json_content)?
};

let header_map = if let Ok(content) = fs::read_to_string(headers_path_json) {
let header_map: HashMap<String, String> = serde_json::from_str(&content)?;
Arc::new(reqwest::header::HeaderMap::try_from(&header_map)?)
} else {
Arc::new(reqwest::header::HeaderMap::new())
};

let session = Arc::new({
let session_vars_path = &test_path.join("session_variables.json");
let session_variables: HashMap<SessionVariable, SessionVariableValue> =
Expand All @@ -608,6 +616,7 @@ pub(crate) fn test_sql(test_path_string: &str) -> anyhow::Result<()> {
&http_context,
&mut test_ctx.mint,
explain_path,
&header_map,
&SqlRequest::new(format!("EXPLAIN {}", request.sql)),
)
.await?;
Expand All @@ -618,6 +627,7 @@ pub(crate) fn test_sql(test_path_string: &str) -> anyhow::Result<()> {
&http_context,
&mut test_ctx.mint,
response_path,
&header_map,
&request,
)
.await?;
Expand All @@ -632,9 +642,11 @@ async fn snapshot_sql(
http_context: &Arc<execute::HttpContext>,
mint: &mut Mint,
response_path: String,
request_headers: &Arc<reqwest::header::HeaderMap>,
request: &SqlRequest,
) -> Result<(), anyhow::Error> {
let response = sql::execute::execute_sql(
request_headers.clone(),
catalog.clone(),
session.clone(),
http_context.clone(),
Expand Down
5 changes: 5 additions & 0 deletions v3/crates/engine/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ fn test_commands_functions() -> anyhow::Result<()> {
test_sql("sql/commands/functions")
}

#[test]
fn test_commands_functions_forward_headers() -> anyhow::Result<()> {
test_sql("sql/commands/functions_forward_headers")
}

#[test]
fn test_commands_functions_empty_args() -> anyhow::Result<()> {
test_sql("sql/commands/functions_empty_args")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"token": "64a6c518-4a5b-4067-a99f-3abc11eeeacf",
"expiry": "2025-12-12T05:48:33+0000"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"authorization": "foo"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"plan_type": "logical_plan",
"plan": "TableScan: tmp_table projection=[token, expiry]"
},
{
"plan_type": "physical_plan",
"plan": "NDCFunctionPushDown\n"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT
*
FROM
get_session_info(STRUCT(1 as "userId"));

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"x-hasura-role": "admin"
}
15 changes: 15 additions & 0 deletions v3/crates/engine/tests/sql/introspection/functions/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@
}
]
},
{
"function_name": "get_session_info",
"return_type": "SessionInfo",
"description": null,
"arguments": [
{
"name": "userId",
"position": 0,
"argument_type": "INT32",
"argument_type_normalized": "INT32",
"is_nullable": false,
"description": null
}
]
},
{
"function_name": "uppercase_actor_name_by_id",
"return_type": "actor",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@
}
]
},
{
"name": "SessionInfo",
"description": null,
"fields": [
{
"field_name": "expiry",
"field_type": "STRING",
"field_type_normalized": "STRING",
"is_nullable": false,
"description": null
},
{
"field_name": "token",
"field_type": "STRING",
"field_type_normalized": "STRING",
"is_nullable": false,
"description": null
}
]
},
{
"name": "actor",
"description": null,
Expand Down
103 changes: 103 additions & 0 deletions v3/crates/engine/tests/sql/metadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -13192,6 +13192,109 @@
"rootFieldKind": "Mutation"
}
}
},
{
"kind": "ObjectType",
"version": "v1",
"definition": {
"name": "SessionInfo",
"fields": [
{
"name": "token",
"type": "String!"
},
{
"name": "expiry",
"type": "String!"
}
],
"graphql": {
"typeName": "SessionInfo"
},
"dataConnectorTypeMapping": [
{
"dataConnectorName": "custom",
"dataConnectorObjectType": "session_info",
"fieldMapping": {
"token": {
"column": {
"name": "token"
}
},
"expiry": {
"column": {
"name": "expiry"
}
}
}
}
]
}
},
{
"kind": "TypePermissions",
"version": "v1",
"definition": {
"typeName": "SessionInfo",
"permissions": [
{
"role": "admin",
"output": {
"allowedFields": ["token", "expiry"]
}
},
{
"role": "user",
"output": {
"allowedFields": ["token", "expiry"]
}
}
]
}
},
{
"kind": "Command",
"version": "v1",
"definition": {
"name": "get_session_info",
"arguments": [
{
"name": "userId",
"type": "Int!"
}
],
"outputType": "SessionInfo",
"source": {
"dataConnectorName": "custom",
"dataConnectorCommand": {
"function": "get_session_details"
},
"argumentMapping": {
"userId": "user_id"
}
},
"graphql": {
"rootFieldName": "getSessionInfo",
"rootFieldKind": "Query"
}
}
},
{
"kind": "CommandPermissions",
"version": "v1",
"definition": {
"commandName": "get_session_info",
"permissions": [
{
"role": "admin",
"allowExecution": true
},
{
"role": "user",
"allowExecution": true
}
]
}
}
]
}
Expand Down
24 changes: 20 additions & 4 deletions v3/crates/ir/src/arguments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,25 @@ where
}
}

// preset arguments from `DataConnectorLink` argument presets
for (argument_name, value) in process_connector_link_presets(
data_connector_link_argument_presets,
session_variables,
request_headers,
)? {
arguments.insert(argument_name, Argument::Literal { value });
}

Ok(arguments)
}

/// Builds arguments for a command that come from a connector link's argument presets
pub fn process_connector_link_presets(
data_connector_link_argument_presets: &BTreeMap<DataConnectorArgumentName, ArgumentPresetValue>,
session_variables: &SessionVariables,
request_headers: &reqwest::header::HeaderMap,
) -> Result<BTreeMap<DataConnectorArgumentName, serde_json::Value>, error::Error> {
let mut arguments = BTreeMap::new();
// preset arguments from `DataConnectorLink` argument presets
for (dc_argument_preset_name, dc_argument_preset_value) in data_connector_link_argument_presets
{
Expand Down Expand Up @@ -199,12 +218,9 @@ where

arguments.insert(
dc_argument_preset_name.clone(),
Argument::Literal {
value: serde_json::to_value(SerializableHeaderMap(headers_argument))?,
},
serde_json::to_value(SerializableHeaderMap(headers_argument))?,
);
}

Ok(arguments)
}

Expand Down
2 changes: 1 addition & 1 deletion v3/crates/ir/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub use remote_joins::VariableName;
pub use aggregates::{
mk_alias_from_graphql_field_path, AggregateFieldSelection, AggregateSelectionSet,
};
pub use arguments::Argument;
pub use arguments::{process_connector_link_presets, Argument};
pub use commands::{CommandInfo, FunctionBasedCommand, ProcedureBasedCommand};
pub use filter::expression::{
ComparisonTarget, ComparisonValue, Expression, LocalFieldComparison, RelationshipColumnMapping,
Expand Down
1 change: 1 addition & 0 deletions v3/crates/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ serde = { workspace = true, features = ["rc"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
chrono = { workspace = true }
reqwest = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand Down
2 changes: 2 additions & 0 deletions v3/crates/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl datafusion::CatalogProvider for model::WithSession<Catalog> {
impl Catalog {
pub fn create_session_context(
self: Arc<Self>,
request_headers: &Arc<reqwest::header::HeaderMap>,
session: &Arc<Session>,
http_context: &Arc<execute::HttpContext>,
) -> datafusion::SessionContext {
Expand Down Expand Up @@ -174,6 +175,7 @@ impl Catalog {
catalog: self.clone(),
session: session.clone(),
http_context: http_context.clone(),
request_headers: request_headers.clone(),
});
let session_state = datafusion::SessionStateBuilder::new()
.with_config(session_config)
Expand Down
4 changes: 3 additions & 1 deletion v3/crates/sql/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl TraceableError for SqlExecutionError {

/// Executes an SQL Request using the Apache DataFusion query engine.
pub async fn execute_sql(
request_headers: Arc<reqwest::header::HeaderMap>,
catalog: Arc<crate::catalog::Catalog>,
session: Arc<Session>,
http_context: Arc<execute::HttpContext>,
Expand All @@ -85,7 +86,8 @@ pub async fn execute_sql(
"Create a datafusion SessionContext",
SpanVisibility::Internal,
|| {
let session = catalog.create_session_context(&session, &http_context);
let session =
catalog.create_session_context(&request_headers, &session, &http_context);
Successful::new(session)
},
)
Expand Down
4 changes: 4 additions & 0 deletions v3/crates/sql/src/execute/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use datafusion::{
use async_trait::async_trait;

pub(crate) struct OpenDDQueryPlanner {
pub(crate) request_headers: Arc<reqwest::header::HeaderMap>,
pub(crate) session: Arc<Session>,
pub(crate) http_context: Arc<execute::HttpContext>,
pub(crate) catalog: Arc<crate::catalog::Catalog>,
Expand All @@ -37,6 +38,7 @@ impl QueryPlanner for OpenDDQueryPlanner {
// Teach the default physical planner how to plan TopK nodes.
let physical_planner =
DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(NDCPushDownPlanner {
request_headers: self.request_headers.clone(),
session: self.session.clone(),
http_context: self.http_context.clone(),
catalog: self.catalog.clone(),
Expand All @@ -49,6 +51,7 @@ impl QueryPlanner for OpenDDQueryPlanner {
}

pub(crate) struct NDCPushDownPlanner {
pub(crate) request_headers: Arc<reqwest::header::HeaderMap>,
pub(crate) session: Arc<Session>,
pub(crate) http_context: Arc<execute::HttpContext>,
pub(crate) catalog: Arc<crate::catalog::Catalog>,
Expand Down Expand Up @@ -78,6 +81,7 @@ impl ExtensionPlanner for NDCPushDownPlanner {
assert_eq!(logical_inputs.len(), 0, "Inconsistent number of inputs");
assert_eq!(physical_inputs.len(), 0, "Inconsistent number of inputs");
build_execution_plan(
&self.request_headers,
&self.catalog.metadata,
&self.http_context,
&self.session,
Expand Down
Loading

0 comments on commit 42ce01d

Please sign in to comment.