Skip to content

Commit

Permalink
start, implementation of an S3 oneshot source
Browse files Browse the repository at this point in the history
* add a new OneshotSource implementation
* support the FILES and PATTERN options for COPY FROM
  • Loading branch information
ParkMyCar committed Jan 24, 2025
1 parent 1d2f1d5 commit 58f62b2
Show file tree
Hide file tree
Showing 19 changed files with 676 additions and 46 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ impl Coordinator {
session,
);
}
CopyFromSource::Url(_) => {
CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
self.sequence_copy_from(ctx, plan, target_cluster).await;
}
},
Expand Down
81 changes: 68 additions & 13 deletions src/adapter/src/coord/sequencer/inner/copy_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::str::FromStr;

use mz_adapter_types::connection::ConnectionId;
use mz_ore::cast::CastInto;
use mz_persist_client::batch::ProtoBatch;
use mz_pgcopy::CopyFormatParams;
use mz_repr::{CatalogItemId, Datum, RowArena};
use mz_sql::plan::{self, CopyFromSource, HirScalarExpr};
use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
use mz_sql::session::metadata::SessionMetadata;
use mz_storage_client::client::TableData;
use mz_storage_types::oneshot_sources::OneshotIngestionRequest;
Expand All @@ -38,16 +40,10 @@ impl Coordinator {
source,
columns: _,
params,
filter,
} = plan;

let from_expr = match source {
CopyFromSource::Url(from_expr) => from_expr,
CopyFromSource::Stdin => {
unreachable!("COPY FROM STDIN should be handled elsewhere")
}
};

let eval_url = |from: HirScalarExpr| -> Result<Url, AdapterError> {
let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
let style = ExprPrepStyle::OneShot {
logical_time: EvalTime::NotAvailable,
session: ctx.session(),
Expand All @@ -66,10 +62,8 @@ impl Coordinator {
other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
};

Url::parse(eval_string)
.map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")))
Ok(eval_string.to_string())
};
let url = return_if_err!(eval_url(from_expr), ctx);

// We check in planning that we're copying into a Table, but be defensive.
let Some(dest_table) = self.catalog().get_entry(&id).table() else {
Expand All @@ -93,9 +87,70 @@ impl Coordinator {
}
};

let source = match source {
CopyFromSource::Url(from_expr) => {
let url = return_if_err!(eval_uri(from_expr), ctx);
// TODO(cf2): Structured errors.
let result = Url::parse(&url)
.map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
let url = return_if_err!(result, ctx);

mz_storage_types::oneshot_sources::ContentSource::Http { url }
}
CopyFromSource::AwsS3 {
uri,
connection,
connection_id,
} => {
let uri = return_if_err!(eval_uri(uri), ctx);

// Validate the URI is an S3 URI, with a bucket name. We rely on validating here
// and expect it in clusterd.
//
// TODO(cf2): Structured errors.
let result = http::Uri::from_str(&uri)
.map_err(|err| {
AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
})
.and_then(|uri| {
if uri.scheme_str() != Some("s3") {
coord_bail!("only 's3://...' urls are supported as COPY FROM target");
}
Ok(uri)
})
.and_then(|uri| {
if uri.host().is_none() {
coord_bail!("missing bucket name from 's3://...' url");
}
Ok(uri)
});
let uri = return_if_err!(result, ctx);

mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
connection,
connection_id,
uri: uri.to_string(),
}
}
CopyFromSource::Stdin => {
unreachable!("COPY FROM STDIN should be handled elsewhere")
}
};

let filter = match filter {
None => mz_storage_types::oneshot_sources::ContentFilter::None,
Some(CopyFromFilter::Files(files)) => {
mz_storage_types::oneshot_sources::ContentFilter::Files(files)
}
Some(CopyFromFilter::Pattern(pattern)) => {
mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
}
};

let request = OneshotIngestionRequest {
source: mz_storage_types::oneshot_sources::ContentSource::Http { url },
source,
format,
filter,
};

let target_cluster = match self
Expand Down
9 changes: 7 additions & 2 deletions src/aws-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ workspace = true
[dependencies]
anyhow = "1.0.66"
aws-config = { version = "1.2.0", default-features = false }
aws-sdk-s3 = { version = "1.23.0", default-features = false, features = ["rt-tokio"], optional = true }
aws-sdk-s3 = { version = "1.23.0", default-features = false, features = [
"rt-tokio",
], optional = true }
aws-smithy-runtime-api = "1.1.1"
aws-smithy-runtime = { version = "1.1.1", features = ["connector-hyper-0-14-x"] }
aws-smithy-types = { version = "1.1.8", features = ["byte-stream-poll-next"] }
aws-types = "1.1.1"
bytes = "1.3.0"
bytesize = "1.1.0"
futures = "0.3.25"
http = "1.1.0"
hyper-tls = "0.5.0"
mz-ore = { path = "../ore", default-features = false }
mz-ore = { path = "../ore", features = ["async"], default-features = false }
pin-project = "1.0.12"
thiserror = "1.0.37"
tokio = { version = "1.38.0", default-features = false, features = ["macros"] }
uuid = { version = "1.7.0", features = ["v4"] }
Expand Down
31 changes: 30 additions & 1 deletion src/aws-util/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
// by the Apache License, Version 2.0.

use aws_sdk_s3::config::Builder;
use aws_sdk_s3::Client;
use aws_types::sdk_config::SdkConfig;

pub use aws_sdk_s3::Client;
use bytes::Bytes;

/// Creates a new client from an [SDK config](aws_types::sdk_config::SdkConfig)
/// with Materialize-specific customizations.
///
Expand Down Expand Up @@ -46,3 +48,30 @@ pub async fn list_bucket_path(
})
.transpose()
}

/// A wrapper around [`ByteStream`] that implements the [`futures::stream::Stream`] trait.
///
/// [`ByteStream`]: aws_smithy_types::byte_stream::ByteStream
#[pin_project::pin_project]
pub struct ByteStreamAdapter {
#[pin]
inner: aws_smithy_types::byte_stream::ByteStream,
}

impl ByteStreamAdapter {
pub fn new(bytes: aws_smithy_types::byte_stream::ByteStream) -> Self {
ByteStreamAdapter { inner: bytes }
}
}

impl futures::stream::Stream for ByteStreamAdapter {
type Item = Result<Bytes, aws_smithy_types::byte_stream::error::Error>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
aws_smithy_types::byte_stream::ByteStream::poll_next(this.inner, cx)
}
}
2 changes: 2 additions & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Features
Fetch
Fields
File
Files
Filter
First
Fixpoint
Expand Down Expand Up @@ -320,6 +321,7 @@ Partition
Partitions
Password
Path
Pattern
Physical
Plan
Plans
Expand Down
5 changes: 5 additions & 0 deletions src/sql-parser/src/ast/defs/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ pub enum CopyOptionName {
Header,
AwsConnection,
MaxFileSize,
Files,
Pattern,
}

impl AstDisplay for CopyOptionName {
Expand All @@ -387,6 +389,8 @@ impl AstDisplay for CopyOptionName {
CopyOptionName::Header => "HEADER",
CopyOptionName::AwsConnection => "AWS CONNECTION",
CopyOptionName::MaxFileSize => "MAX FILE SIZE",
CopyOptionName::Files => "FILES",
CopyOptionName::Pattern => "PATTERN",
})
}
}
Expand All @@ -407,6 +411,7 @@ impl WithOptionName for CopyOptionName {
| CopyOptionName::Header
| CopyOptionName::AwsConnection
| CopyOptionName::MaxFileSize => false,
CopyOptionName::Files | CopyOptionName::Pattern => true,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6403,9 +6403,9 @@ impl<'a> Parser<'a> {
}

fn parse_copy_option(&mut self) -> Result<CopyOption<Raw>, ParserError> {
let name = match self
.expect_one_of_keywords(&[FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX])?
{
let name = match self.expect_one_of_keywords(&[
FORMAT, DELIMITER, NULL, ESCAPE, QUOTE, HEADER, AWS, MAX, FILES, PATTERN,
])? {
FORMAT => CopyOptionName::Format,
DELIMITER => CopyOptionName::Delimiter,
NULL => CopyOptionName::Null,
Expand All @@ -6423,6 +6423,8 @@ impl<'a> Parser<'a> {
self.expect_keywords(&[FILE, SIZE])?;
CopyOptionName::MaxFileSize
}
FILES => CopyOptionName::Files,
PATTERN => CopyOptionName::Pattern,
_ => unreachable!(),
};
Ok(CopyOption {
Expand Down
23 changes: 22 additions & 1 deletion src/sql-parser/tests/testdata/copy
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t")
parse-statement
COPY t TO STDOUT ()
----
error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX, found right parenthesis
error: Expected one of FORMAT or DELIMITER or NULL or ESCAPE or QUOTE or HEADER or AWS or MAX or FILES or PATTERN, found right parenthesis
COPY t TO STDOUT ()
^

Expand Down Expand Up @@ -184,3 +184,24 @@ COPY INTO t(a, b) TO '/any/path'
error: Expected identifier, found INTO
COPY INTO t(a, b) TO '/any/path'
^

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv']);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv'))
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }] })

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv', 'bar.csv'], FORMAT CSV);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv', 'bar.csv'), FORMAT = csv)
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv")), Value(String("bar.csv"))])) }, CopyOption { name: Format, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("csv")]))) }] })

parse-statement
COPY INTO t1 FROM 'http://spacemonkey.info' WITH (FILES = ['foo.csv']);
----
COPY t1 FROM 'http://spacemonkey.info' WITH (FILES = ('foo.csv'))
=>
Copy(CopyStatement { relation: Named { name: Name(UnresolvedItemName([Ident("t1")])), columns: [] }, direction: From, target: Expr(Value(String("http://spacemonkey.info"))), options: [CopyOption { name: Files, value: Some(Sequence([Value(String("foo.csv"))])) }] })
18 changes: 17 additions & 1 deletion src/sql/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,16 +936,32 @@ pub struct CopyFromPlan {
pub source: CopyFromSource,
pub columns: Vec<usize>,
pub params: CopyFormatParams<'static>,
pub filter: Option<CopyFromFilter>,
}

#[derive(Debug)]
pub enum CopyFromSource {
/// Copying from a file local to the user, transmitted via pgwire.
Stdin,
/// A remote resource, e.g. S3.
/// A remote resource, e.g. HTTP file.
///
/// The contained [`HirScalarExpr`] evaluates to the Url for the remote resource.
Url(HirScalarExpr),
/// A file in an S3 bucket.
AwsS3 {
/// Expression that evaluates to the file we want to copy.
uri: HirScalarExpr,
/// Details for how we connect to AWS S3.
connection: AwsConnection,
/// ID of the connection object.
connection_id: CatalogItemId,
},
}

#[derive(Debug)]
pub enum CopyFromFilter {
Files(Vec<String>),
Pattern(String),
}

#[derive(Debug, Clone)]
Expand Down
Loading

0 comments on commit 58f62b2

Please sign in to comment.