Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ opentelemetry-proto.workspace = true
operator.workspace = true
otel-arrow-rust.workspace = true
parking_lot.workspace = true
pgwire = { version = "0.34", default-features = false, features = [
pg_interval = "0.4"
pgwire = { version = "0.36", default-features = false, features = [
"server-api-ring",
"pg-ext-types",
] }
Expand Down
36 changes: 30 additions & 6 deletions src/servers/src/postgres/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl QueryParser for DefaultQueryParser {
&self,
_client: &C,
sql: &str,
_types: &[Type],
_types: &[Option<Type>],
) -> PgWireResult<Self::Statement> {
crate::metrics::METRIC_POSTGRES_PREPARED_COUNT.inc();
let query_ctx = self.session.new_query_context();
Expand Down Expand Up @@ -341,7 +341,9 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
C: ClientInfo + Unpin + Send + Sync,
{
let sql_plan = &stmt.statement;
let (param_types, sql_plan, format) = if let Some(plan) = &sql_plan.plan {
// client provided parameter types, can be empty if client doesn't try to parse statement
let provided_param_types = &stmt.parameter_types;
let server_inferenced_types = if let Some(plan) = &sql_plan.plan {
let param_types = plan
.get_parameter_types()
.context(DataFusionSnafu)
Expand All @@ -352,14 +354,36 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {

let types = param_types_to_pg_types(&param_types).map_err(convert_err)?;

(types, sql_plan, &Format::UnifiedBinary)
Some(types)
} else {
let param_types = stmt.parameter_types.clone();
(param_types, sql_plan, &Format::UnifiedBinary)
None
};

let param_count = if provided_param_types.is_empty() {
server_inferenced_types
.as_ref()
.map(|types| types.len())
.unwrap_or(0)
} else {
provided_param_types.len()
};

let param_types = (0..param_count)
.map(|i| {
let client_type = provided_param_types.get(i);
// use server type when client provided type is None (oid: 0 or other invalid values)
match client_type {
Some(Some(client_type)) => client_type.clone(),
_ => server_inferenced_types
.as_ref()
.and_then(|types| types.get(i).cloned())
.unwrap_or(Type::UNKNOWN),
}
})
.collect::<Vec<_>>();

if let Some(schema) = &sql_plan.schema {
schema_to_pg(schema, format)
schema_to_pg(schema, &Format::UnifiedBinary)
.map(|fields| DescribeStatementResponse::new(param_types, fields))
.map_err(convert_err)
} else {
Expand Down
22 changes: 11 additions & 11 deletions src/servers/src/postgres/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,13 @@ pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
// the index is managed from portal's parameters count so it's safe to
// unwrap here.
let param_type = portal.statement.parameter_types.get(idx).unwrap();
let param_type = portal
.statement
.parameter_types
.get(idx)
.unwrap()
.as_ref()
.unwrap_or(&Type::UNKNOWN);
match param_type {
&Type::VARCHAR | &Type::TEXT => Ok(format!(
"'{}'",
Expand Down Expand Up @@ -884,7 +890,7 @@ pub(super) fn parameters_to_scalar_values(
let mut results = Vec::with_capacity(param_count);

let client_param_types = &portal.statement.parameter_types;
let param_types = plan
let server_param_types = plan
.get_parameter_types()
.context(DataFusionSnafu)
.map_err(convert_err)?
Expand All @@ -893,18 +899,12 @@ pub(super) fn parameters_to_scalar_values(
.collect::<HashMap<_, _>>();

for idx in 0..param_count {
let server_type = param_types
let server_type = server_param_types
.get(&format!("${}", idx + 1))
.and_then(|t| t.as_ref());

let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
match (client_given_type, server_type) {
(&Type::UNKNOWN, Some(server_type)) => {
// If client type is unknown, use the server type.
type_gt_to_pg(server_type).map_err(convert_err)?
}
_ => client_given_type.clone(),
}
let client_type = if let Some(Some(client_given_type)) = client_param_types.get(idx) {
client_given_type.clone()
} else if let Some(server_provided_type) = &server_type {
type_gt_to_pg(server_provided_type).map_err(convert_err)?
} else {
Expand Down
13 changes: 10 additions & 3 deletions src/servers/src/postgres/types/bytea.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use bytes::BufMut;
use pgwire::types::ToSqlText;
use pgwire::types::format::FormatOptions;
use postgres_types::{IsNull, ToSql, Type};

#[derive(Debug)]
Expand All @@ -23,11 +24,12 @@ impl ToSqlText for HexOutputBytea<'_> {
&self,
ty: &Type,
out: &mut bytes::BytesMut,
format_options: &FormatOptions,
) -> std::result::Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
{
let _ = self.0.to_sql_text(ty, out);
let _ = self.0.to_sql_text(ty, out, format_options);
Ok(IsNull::No)
}
}
Expand Down Expand Up @@ -66,6 +68,7 @@ impl ToSqlText for EscapeOutputBytea<'_> {
&self,
_ty: &Type,
out: &mut bytes::BytesMut,
_format_options: &FormatOptions,
) -> std::result::Result<IsNull, Box<dyn std::error::Error + Sync + Send>>
where
Self: Sized,
Expand Down Expand Up @@ -120,7 +123,9 @@ mod tests {

let expected = b"abcklm*\\251T";
let mut out = bytes::BytesMut::new();
let is_null = input.to_sql_text(&Type::BYTEA, &mut out).unwrap();
let is_null = input
.to_sql_text(&Type::BYTEA, &mut out, &FormatOptions::default())
.unwrap();
assert!(matches!(is_null, IsNull::No));
assert_eq!(&out[..], expected);

Expand All @@ -138,7 +143,9 @@ mod tests {

let expected = b"\\x68656c6c6f2c20776f726c6421";
let mut out = bytes::BytesMut::new();
let is_null = input.to_sql_text(&Type::BYTEA, &mut out).unwrap();
let is_null = input
.to_sql_text(&Type::BYTEA, &mut out, &FormatOptions::default())
.unwrap();
assert!(matches!(is_null, IsNull::No));
assert_eq!(&out[..], expected);

Expand Down
Loading
Loading