Skip to content

Commit

Permalink
[PACHA-21] support uniqueness constraints in SQL layer (#1037)
Browse files Browse the repository at this point in the history
<!-- The PR description should answer 2 important questions: -->

### What

Exposes SQL constraints to datafusion's planner via the
`TableProvider::constraints()` trait method.

### How

We pull these constraints from the GraphQL configuration for the model.
This is not ideal - the data should really live at the model layer now.
But we can hopefully solve that later.

<!-- How is it trying to accomplish it (what are the implementation
steps)? -->

V3_GIT_ORIGIN_REV_ID: c723d895fd707e24defae971f474b5938478a82b
  • Loading branch information
paf31 authored and hasura-bot committed Sep 3, 2024
1 parent 9dd4622 commit 7a31d2d
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 11 deletions.
2 changes: 2 additions & 0 deletions v3/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Added

- SQL endpoint can utilize uniqueness constraints

### Fixed

### Changed
Expand Down
5 changes: 5 additions & 0 deletions v3/crates/engine/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,8 @@ fn test_aggregate_count_all_invoice_lines_aliased() -> anyhow::Result<()> {
fn test_aggregate_sum_invoice_lines() -> anyhow::Result<()> {
test_sql("sql/aggregate/sum_invoice_lines")
}

#[test]
fn test_aggregate_group_by_unique_key() -> anyhow::Result<()> {
test_sql("sql/aggregate/group_by_unique_key")
}
122 changes: 122 additions & 0 deletions v3/crates/engine/tests/sql/aggregate/group_by_unique_key/expected.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
[
{
"billingAddress": "Theodor-Heuss-Straße 34",
"billingCity": "Stuttgart",
"billingCountry": "Germany",
"billingPostalCode": "70174",
"billingState": null,
"customerId": 2,
"invoiceDate": "2009-01-01T00:00:00",
"invoiceId": 1,
"total": 1,
"invoice_line_count": 2
},
{
"billingAddress": "Ullevålsveien 14",
"billingCity": "Oslo",
"billingCountry": "Norway",
"billingPostalCode": "0171",
"billingState": null,
"customerId": 4,
"invoiceDate": "2009-01-02T00:00:00",
"invoiceId": 2,
"total": 3,
"invoice_line_count": 4
},
{
"billingAddress": "Grétrystraat 63",
"billingCity": "Brussels",
"billingCountry": "Belgium",
"billingPostalCode": "1000",
"billingState": null,
"customerId": 8,
"invoiceDate": "2009-01-03T00:00:00",
"invoiceId": 3,
"total": 5,
"invoice_line_count": 6
},
{
"billingAddress": "8210 111 ST NW",
"billingCity": "Edmonton",
"billingCountry": "Canada",
"billingPostalCode": "T6G 2C7",
"billingState": "AB",
"customerId": 14,
"invoiceDate": "2009-01-06T00:00:00",
"invoiceId": 4,
"total": 8,
"invoice_line_count": 9
},
{
"billingAddress": "69 Salem Street",
"billingCity": "Boston",
"billingCountry": "USA",
"billingPostalCode": "2113",
"billingState": "MA",
"customerId": 23,
"invoiceDate": "2009-01-11T00:00:00",
"invoiceId": 5,
"total": 13,
"invoice_line_count": 14
},
{
"billingAddress": "Berger Straße 10",
"billingCity": "Frankfurt",
"billingCountry": "Germany",
"billingPostalCode": "60316",
"billingState": null,
"customerId": 37,
"invoiceDate": "2009-01-19T00:00:00",
"invoiceId": 6,
"total": 0,
"invoice_line_count": 1
},
{
"billingAddress": "Barbarossastraße 19",
"billingCity": "Berlin",
"billingCountry": "Germany",
"billingPostalCode": "10779",
"billingState": null,
"customerId": 38,
"invoiceDate": "2009-02-01T00:00:00",
"invoiceId": 7,
"total": 1,
"invoice_line_count": 2
},
{
"billingAddress": "8, Rue Hanovre",
"billingCity": "Paris",
"billingCountry": "France",
"billingPostalCode": "75002",
"billingState": null,
"customerId": 40,
"invoiceDate": "2009-02-01T00:00:00",
"invoiceId": 8,
"total": 1,
"invoice_line_count": 2
},
{
"billingAddress": "9, Place Louis Barthou",
"billingCity": "Bordeaux",
"billingCountry": "France",
"billingPostalCode": "33000",
"billingState": null,
"customerId": 42,
"invoiceDate": "2009-02-02T00:00:00",
"invoiceId": 9,
"total": 3,
"invoice_line_count": 4
},
{
"billingAddress": "3 Chatham Street",
"billingCity": "Dublin",
"billingCountry": "Ireland",
"billingPostalCode": null,
"billingState": "Dublin",
"customerId": 46,
"invoiceDate": "2009-02-03T00:00:00",
"invoiceId": 10,
"total": 5,
"invoice_line_count": 6
}
]
10 changes: 10 additions & 0 deletions v3/crates/engine/tests/sql/aggregate/group_by_unique_key/plan.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"plan_type": "logical_plan",
"plan": "Projection: Invoice.billingAddress, Invoice.billingCity, Invoice.billingCountry, Invoice.billingPostalCode, Invoice.billingState, Invoice.customerId, Invoice.invoiceDate, Invoice.invoiceId, Invoice.total, count(Int64(1)) AS invoice_line_count\n Limit: skip=0, fetch=10\n Aggregate: groupBy=[[Invoice.invoiceId, Invoice.billingAddress, Invoice.billingCity, Invoice.billingCountry, Invoice.billingPostalCode, Invoice.billingState, Invoice.customerId, Invoice.invoiceDate, Invoice.total]], aggr=[[count(Int64(1))]]\n Projection: Invoice.billingAddress, Invoice.billingCity, Invoice.billingCountry, Invoice.billingPostalCode, Invoice.billingState, Invoice.customerId, Invoice.invoiceDate, Invoice.invoiceId, Invoice.total\n Inner Join: Invoice.invoiceId = InvoiceLine.invoiceId\n ModelQuery: model=default:Invoice, projection=[billingAddress,billingCity,billingCountry,billingPostalCode,billingState,customerId,invoiceDate,invoiceId,total]\n ModelQuery: model=default:InvoiceLine, projection=[invoiceId]"
},
{
"plan_type": "physical_plan",
"plan": "ProjectionExec: expr=[billingAddress@1 as billingAddress, billingCity@2 as billingCity, billingCountry@3 as billingCountry, billingPostalCode@4 as billingPostalCode, billingState@5 as billingState, customerId@6 as customerId, invoiceDate@7 as invoiceDate, invoiceId@0 as invoiceId, total@8 as total, count(Int64(1))@9 as invoice_line_count]\n GlobalLimitExec: skip=0, fetch=10\n AggregateExec: mode=Single, gby=[invoiceId@7 as invoiceId, billingAddress@0 as billingAddress, billingCity@1 as billingCity, billingCountry@2 as billingCountry, billingPostalCode@3 as billingPostalCode, billingState@4 as billingState, customerId@5 as customerId, invoiceDate@6 as invoiceDate, total@8 as total], aggr=[count(Int64(1))]\n CoalesceBatchesExec: target_batch_size=8192\n HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(invoiceId@7, invoiceId@0)], projection=[billingAddress@0, billingCity@1, billingCountry@2, billingPostalCode@3, billingState@4, customerId@5, invoiceDate@6, invoiceId@7, total@8]\n NDCQueryPushDown\n NDCQueryPushDown\n"
}
]
10 changes: 10 additions & 0 deletions v3/crates/engine/tests/sql/aggregate/group_by_unique_key/query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
SELECT
Invoice.*,
COUNT(1) AS invoice_line_count
FROM
Invoice
JOIN InvoiceLine USING (invoiceId)
GROUP BY
Invoice.invoiceId
LIMIT
10;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"x-hasura-role": "admin"
}
74 changes: 64 additions & 10 deletions v3/crates/sql/src/catalog/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ pub(crate) mod filter;
use std::collections::BTreeMap;
use std::{any::Any, sync::Arc};

use ::datafusion::common::Constraints;
use ::datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use ::datafusion::sql::sqlparser::ast::TableConstraint;
use async_trait::async_trait;
use hasura_authn_core::Session;
use indexmap::IndexMap;
Expand Down Expand Up @@ -115,7 +117,7 @@ impl datafusion::TableFunctionImpl for TableValuedFunction {
_exprs: &[datafusion::Expr],
) -> datafusion::Result<Arc<dyn datafusion::TableProvider>> {
let arguments = BTreeMap::new();
let table = Table::new(self.metadata.clone(), self.model.clone(), arguments);
let table = Table::new(self.metadata.clone(), self.model.clone(), arguments)?;
Ok(Arc::new(table) as Arc<dyn datafusion::TableProvider>)
}
}
Expand All @@ -129,26 +131,31 @@ pub(crate) struct Table {
pub(crate) model: Arc<Model>,
/// This will be empty if the model doesn't take any arguments
pub(crate) arguments: BTreeMap<ArgumentName, serde_json::Value>,
/// DF constraints which can help guide the planner,
/// e.g. uniqueness constraints establish functional dependencies
/// which ensure all correct columns are in scope under a GROUP BY.
pub(crate) constraints: ::datafusion::common::Constraints,
}

impl Table {
pub(crate) fn new(
metadata: Arc<Metadata>,
model: Arc<Model>,
arguments: BTreeMap<ArgumentName, serde_json::Value>,
) -> Self {
Table {
) -> datafusion::Result<Self> {
let constraints = compute_table_constraints(&model, &metadata)?;
Ok(Table {
metadata,
model,
arguments,
}
constraints,
})
}
pub(crate) fn new_no_args(metadata: Arc<Metadata>, model: Arc<Model>) -> Self {
Table {
metadata,
model,
arguments: BTreeMap::new(),
}
pub(crate) fn new_no_args(
metadata: Arc<Metadata>,
model: Arc<Model>,
) -> datafusion::Result<Self> {
Self::new(metadata, model, BTreeMap::new())
}
pub(crate) fn to_logical_plan(
&self,
Expand All @@ -170,6 +177,49 @@ impl Table {
}
}

/// Computes DF constraints from the model metadata.
/// TODO: this function currently uses the select_uniques field from
/// the GraphQL configuration section, but we should have that data on the
/// model itself, since it's not just a GraphQL concern now.
fn compute_table_constraints(
model: &Arc<Model>,
metadata: &Arc<Metadata>,
) -> datafusion::Result<::datafusion::common::Constraints> {
let schema = model.schema.as_ref().clone();
let df_schema = datafusion::DFSchema::from_unqualified_fields(schema.fields, schema.metadata)?;
let model = metadata
.models
.get(&Qualified::new(model.subgraph.clone(), model.name.clone()))
.ok_or(::datafusion::error::DataFusionError::Internal(format!(
"Model {} could not be found",
model.name
)))?;

let table_constraints = model
.graphql_api
.select_uniques
.iter()
.map(|unique| TableConstraint::Unique {
name: None,
index_name: None,
index_type_display: ::datafusion::sql::sqlparser::ast::KeyOrIndexDisplay::None,
index_type: None,
columns: unique
.unique_identifier
.iter()
.map(|(field_name, _)| field_name.as_str().into())
.collect(),
index_options: vec![],
characteristics: None,
})
.collect::<Vec<_>>();

::datafusion::common::Constraints::new_from_table_constraints(
table_constraints.as_slice(),
&Arc::new(df_schema),
)
}

#[async_trait]
impl datafusion::TableProvider for Table {
fn as_any(&self) -> &dyn Any {
Expand All @@ -184,6 +234,10 @@ impl datafusion::TableProvider for Table {
datafusion::TableType::Base
}

fn constraints(&self) -> Option<&Constraints> {
Some(&self.constraints)
}

async fn scan(
&self,
state: &dyn datafusion::Session,
Expand Down
2 changes: 1 addition & 1 deletion v3/crates/sql/src/catalog/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl datafusion::SchemaProvider for catalog::model::WithSession<Subgraph> {
name: &str,
) -> datafusion::Result<Option<Arc<dyn datafusion::TableProvider>>> {
if let Some(model) = self.value.tables.get(name) {
let table = model::Table::new_no_args(self.value.metadata.clone(), model.clone());
let table = model::Table::new_no_args(self.value.metadata.clone(), model.clone())?;
Ok(Some(Arc::new(table) as Arc<dyn datafusion::TableProvider>))
} else {
Ok(None)
Expand Down

0 comments on commit 7a31d2d

Please sign in to comment.