Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[copy_from] AWS source #31144

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl Coordinator {
session,
);
}
CopyFromSource::Url(_) => {
CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
self.sequence_copy_from(ctx, plan, target_cluster).await;
}
},
Expand Down
81 changes: 68 additions & 13 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::str::FromStr;

use mz_adapter_types::connection::ConnectionId;
use mz_ore::cast::CastInto;
use mz_persist_client::batch::ProtoBatch;
use mz_pgcopy::CopyFormatParams;
use mz_repr::{CatalogItemId, Datum, RowArena};
use mz_sql::plan::{self, CopyFromSource, HirScalarExpr};
use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::client::TableData;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
Expand All @@ -38,16 +40,10 @@ impl Coordinator {
source,
columns: _,
params,
filter,
} = plan;

let from_expr = match source {
CopyFromSource::Url(from_expr) => from_expr,
CopyFromSource::Stdin => {
unreachable!("COPY FROM STDIN should be handled elsewhere")
}
};

let eval_url = |from: HirScalarExpr| -> Result<Url, AdapterError> {
let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::NotAvailable,
session: ctx.session(),
Expand All @@ -66,10 +62,8 @@ impl Coordinator {
other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
};

Url::parse(eval_string)
.map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")))
Ok(eval_string.to_string())
};
let url = return_if_err!(eval_url(from_expr), ctx);

// We check in planning that we're copying into a Table, but be defensive.
let Some(dest_table) = self.catalog().get_entry(&id).table() else {
Expand All @@ -93,9 +87,70 @@ impl Coordinator {
}
};

let source = match source {
CopyFromSource::Url(from_expr) => {
let url = return_if_err!(eval_uri(from_expr), ctx);
// TODO(cf2): Structured errors.
let result = Url::parse(&url)
.map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
let url = return_if_err!(result, ctx);

mz_storage_types::oneshot_sources::ContentSource::Http { url }
}
CopyFromSource::AwsS3 {
uri,
connection,
connection_id,
} => {
let uri = return_if_err!(eval_uri(uri), ctx);

// Validate the URI is an S3 URI, with a bucket name. We rely on validating here
// and expect it in clusterd.
//
// TODO(cf2): Structured errors.
let result = http::Uri::from_str(&uri)
.map_err(|err| {
AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
})
.and_then(|uri| {
if uri.scheme_str() != Some("s3") {
coord_bail!("only 's3://...' urls are supported as COPY FROM target");
}
Ok(uri)
})
.and_then(|uri| {
if uri.host().is_none() {
coord_bail!("missing bucket name from 's3://...' url");
}
Ok(uri)
});
let uri = return_if_err!(result, ctx);

mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
connection,
connection_id,
uri: uri.to_string(),
}
}
CopyFromSource::Stdin => {
unreachable!("COPY FROM STDIN should be handled elsewhere")
}
};

let filter = match filter {
None => mz_storage_types::oneshot_sources::ContentFilter::None,
Some(CopyFromFilter::Files(files)) => {
mz_storage_types::oneshot_sources::ContentFilter::Files(files)
}
Some(CopyFromFilter::Pattern(pattern)) => {
mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
}
};

let request = OneshotIngestionRequest {
source: mz_storage_types::oneshot_sources::ContentSource::Http { url },
source,
format,
filter,
};

let target_cluster = match self
Expand Down
9 changes: 7 additions & 2 deletions src/aws-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ workspace = true
[dependencies]
anyhow = "1.0.66"
aws-config = { version = "1.2.0", default-features = false }
aws-sdk-s3 = { version = "1.23.0", default-features = false, features = ["rt-tokio"], optional = true }
aws-sdk-s3 = { version = "1.23.0", default-features = false, features = [
"rt-tokio",
], optional = true }
aws-smithy-runtime-api = "1.1.1"
aws-smithy-runtime = { version = "1.1.1", features = ["connector-hyper-0-14-x"] }
aws-smithy-types = { version = "1.1.8", features = ["byte-stream-poll-next"] }
aws-types = "1.1.1"
bytes = "1.3.0"
bytesize = "1.1.0"
futures = "0.3.25"
http = "1.1.0"
hyper-tls = "0.5.0"
mz-ore = { path = "../ore", default-features = false }
mz-ore = { path = "../ore", features = ["async"], default-features = false }
pin-project = "1.0.12"
thiserror = "1.0.37"
tokio = { version = "1.38.0", default-features = false, features = ["macros"] }
uuid = { version = "1.7.0", features = ["v4"] }
Expand Down
31 changes: 30 additions & 1 deletion src/aws-util/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
// by the Apache License, Version 2.0.

use aws_sdk_s3::config::Builder;
use aws_sdk_s3::Client;
use aws_types::sdk_config::SdkConfig;
use bytes::Bytes;

pub use aws_sdk_s3::Client;

/// Creates a new client from an [SDK config](aws_types::sdk_config::SdkConfig)
/// with Materialize-specific customizations.
Expand Down Expand Up @@ -46,3 +48,30 @@ pub async fn list_bucket_path(
})
.transpose()
}

/// A wrapper around [`ByteStream`] that implements the [`futures::stream::Stream`] trait.
///
/// [`ByteStream`]: aws_smithy_types::byte_stream::ByteStream
#[pin_project::pin_project]
pub struct ByteStreamAdapter {
#[pin]
inner: aws_smithy_types::byte_stream::ByteStream,
}

impl ByteStreamAdapter {
pub fn new(bytes: aws_smithy_types::byte_stream::ByteStream) -> Self {
ByteStreamAdapter { inner: bytes }
}
}

impl futures::stream::Stream for ByteStreamAdapter {
type Item = Result<Bytes, aws_smithy_types::byte_stream::error::Error>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
aws_smithy_types::byte_stream::ByteStream::poll_next(this.inner, cx)
}
}
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Features
Fetch
Fields
File
Files
Filter
First
Fixpoint
Expand Down Expand Up @@ -320,6 +321,7 @@ Partition
Partitions
Password
Path
Pattern
Physical
Plan
Plans
Expand Down
5 changes: 5 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ pub enum CopyOptionName {
Header,
AwsConnection,
MaxFileSize,
Files,
Pattern,
}

impl AstDisplay for CopyOptionName {
Expand All @@ -387,6 +389,8 @@ impl AstDisplay for CopyOptionName {
CopyOptionName::Header => "HEADER",
CopyOptionName::AwsConnection => "AWS CONNECTION",
CopyOptionName::MaxFileSize => "MAX FILE SIZE",
CopyOptionName::Files => "FILES",
CopyOptionName::Pattern => "PATTERN",
})
}
}
Expand All @@ -407,6 +411,7 @@ impl WithOptionName for CopyOptionName {
| CopyOptionName::Header
| CopyOptionName::AwsConnection
| CopyOptionName::MaxFileSize => false,
CopyOptionName::Files | CopyOptionName::Pattern => true,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6403,9 +6403,9 @@ impl<'a> Parser<'a> {
}

fn parse_copy_option(&mut self) -> Result<CopyOption<Raw>, ParserError> {
let name = match self
.expect_one_of_keywords(&[FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX])?
{
let name = match self.expect_one_of_keywords(&[
FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX, FILES, PATTERN,
])? {
FORMAT => CopyOptionName::Format,
DELIMITER => CopyOptionName::Delimiter,
NULL => CopyOptionName::Null,
Expand All @@ -6423,6 +6423,8 @@ impl<'a> Parser<'a> {
self.expect_keywords(&[FILE, SIZE])?;
CopyOptionName::MaxFileSize
}
FILES => CopyOptionName::Files,
PATTERN => CopyOptionName::Pattern,
_ => unreachable!(),
};
Ok(CopyOption {
Expand Down
23 changes: 22 additions & 1 deletion src/sql-parser/tests/testdata/copy
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t")
parse-statement
COPY t TO STDOUT ()
----
error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX, found right parenthesis
error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX or FILES or PATTERN, found right parenthesis
COPY t TO STDOUT ()
^

Expand Down Expand Up @@ -184,3 +184,24 @@ COPY INTO t(a, b) TO '/any/path'
error: Expected identifier, found INTO
COPY INTO t(a, b) TO '/any/path'
^

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv']);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv'))
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }] })

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv'], FORMAT CSV);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv'), FORMAT = csv)
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }, CopyOption { name: Format, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("csv")]))) }] })

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv']);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv'))
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv"))])) }] })
18 changes: 17 additions & 1 deletion src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,16 +936,32 @@ pub struct CopyFromPlan {
pub source: CopyFromSource,
pub columns: Vec<usize>,
pub params: CopyFormatParams<'static>,
pub filter: Option<CopyFromFilter>,
}

#[derive(Debug)]
pub enum CopyFromSource {
/// Copying from a file local to the user, transmitted via pgwire.
Stdin,
/// A remote resource, e.g. S3.
/// A remote resource, e.g. HTTP file.
///
/// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
Url(HirScalarExpr),
/// A file in an S3 bucket.
AwsS3 {
/// Expression that evaluates to the file we want to copy.
uri: HirScalarExpr,
/// Details for how we connect to AWS S3.
connection: AwsConnection,
/// ID of the connection object.
connection_id: CatalogItemId,
},
}

#[derive(Debug)]
pub enum CopyFromFilter {
Files(Vec<String>),
Pattern(String),
}

#[derive(Debug, Clone)]
Expand Down
Loading