diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml index be0b1dcccaf6..f97b18e42ec8 100644 --- a/arrow-flight/Cargo.toml +++ b/arrow-flight/Cargo.toml @@ -44,11 +44,11 @@ bytes = { version = "1", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } once_cell = { version = "1", optional = true } paste = { version = "1.0" } -prost = { version = "0.13.1", default-features = false, features = ["prost-derive"] } +prost = { version = "0.12.3", default-features = false, features = ["prost-derive"] } # For Timestamp type -prost-types = { version = "0.13.1", default-features = false } +prost-types = { version = "0.12.3", default-features = false } tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "rt-multi-thread"] } -tonic = { version = "0.12.1", default-features = false, features = ["transport", "codegen", "prost"] } +tonic = { version = "0.11.0", default-features = false, features = ["transport", "codegen", "prost"] } # CLI-related dependencies anyhow = { version = "1.0", optional = true } @@ -70,9 +70,8 @@ cli = ["anyhow", "arrow-cast/prettyprint", "clap", "tracing-log", "tracing-subsc [dev-dependencies] arrow-cast = { workspace = true, features = ["prettyprint"] } assert_cmd = "2.0.8" -http = "1.1.0" -http-body = "1.0.0" -hyper-util = "0.1" +http = "0.2.9" +http-body = "0.4.5" pin-project-lite = "0.2" tempfile = "3.3" tokio-stream = { version = "0.1", features = ["net"] } diff --git a/arrow-flight/examples/flight_sql_server.rs b/arrow-flight/examples/flight_sql_server.rs index 81afecf85625..dcb8cf043791 100644 --- a/arrow-flight/examples/flight_sql_server.rs +++ b/arrow-flight/examples/flight_sql_server.rs @@ -791,8 +791,7 @@ impl ProstMessageExt for FetchResults { #[cfg(test)] mod tests { use super::*; - use futures::{TryFutureExt, TryStreamExt}; - use hyper_util::rt::TokioIo; + use futures::TryStreamExt; use std::fs; use std::future::Future; use std::net::SocketAddr; @@ -852,8 +851,7 @@ mod tests { .serve_with_incoming(stream); let request_future = async { - let connector = - service_fn(move |_| UnixStream::connect(path.clone()).map_ok(TokioIo::new)); + let connector = service_fn(move |_| UnixStream::connect(path.clone())); let channel = Endpoint::try_from("http://example.com") .unwrap() .connect_with_connector(connector) diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml index ae6000ae7a0b..7264a527ca8d 100644 --- a/arrow-flight/gen/Cargo.toml +++ b/arrow-flight/gen/Cargo.toml @@ -33,5 +33,5 @@ publish = false # Pin specific version of the tonic-build dependencies to avoid auto-generated # (and checked in) arrow.flight.protocol.rs from changing proc-macro2 = { version = "=1.0.86", default-features = false } -prost-build = { version = "=0.13.1", default-features = false } -tonic-build = { version = "=0.12.2", default-features = false, features = ["transport", "prost"] } +prost-build = { version = "=0.12.6", default-features = false } +tonic-build = { version = "=0.11.0", default-features = false, features = ["transport", "prost"] } diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index 670d4a649879..bc314de9d19f 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -38,7 +38,7 @@ pub struct BasicAuth { pub password: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct Empty {} /// /// Describes an available action, including both the name used for execution @@ -103,7 +103,7 @@ pub struct Result { /// /// The result should be stored in Result.body. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CancelFlightInfoResult { #[prost(enumeration = "CancelStatus", tag = "1")] pub status: i32, @@ -464,8 +464,8 @@ pub mod flight_service_client { where T: tonic::client::GrpcService, T::Error: Into, - T::ResponseBody: Body + std::marker::Send + 'static, - ::Error: Into + std::marker::Send, + T::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, { pub fn new(inner: T) -> Self { let inner = tonic::client::Grpc::new(inner); @@ -490,7 +490,7 @@ pub mod flight_service_client { >, , - >>::Error: Into + std::marker::Send + std::marker::Sync, + >>::Error: Into + Send + Sync, { FlightServiceClient::new(InterceptedService::new(inner, interceptor)) } @@ -876,12 +876,12 @@ pub mod flight_service_server { use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer. #[async_trait] - pub trait FlightService: std::marker::Send + std::marker::Sync + 'static { + pub trait FlightService: Send + Sync + 'static { /// Server streaming response type for the Handshake method. type HandshakeStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + std::marker::Send + + Send + 'static; /// /// Handshake between client and server. Depending on the server, the @@ -896,7 +896,7 @@ pub mod flight_service_server { type ListFlightsStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + std::marker::Send + + Send + 'static; /// /// Get a list of available streams given a particular criteria. Most flight @@ -967,7 +967,7 @@ pub mod flight_service_server { type DoGetStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + std::marker::Send + + Send + 'static; /// /// Retrieve a single stream associated with a particular descriptor @@ -982,7 +982,7 @@ pub mod flight_service_server { type DoPutStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + std::marker::Send + + Send + 'static; /// /// Push a stream to the flight service associated with a particular @@ -999,7 +999,7 @@ pub mod flight_service_server { type DoExchangeStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + std::marker::Send + + Send + 'static; /// /// Open a bidirectional data channel for a given descriptor. This @@ -1015,7 +1015,7 @@ pub mod flight_service_server { type DoActionStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + std::marker::Send + + Send + 'static; /// /// Flight services can support an arbitrary number of simple actions in @@ -1032,7 +1032,7 @@ pub mod flight_service_server { type ListActionsStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > - + std::marker::Send + + Send + 'static; /// /// A flight service exposes all of the available action types that it has @@ -1052,18 +1052,20 @@ pub mod flight_service_server { /// accessed using the Arrow Flight Protocol. Additionally, a flight service /// can expose a set of actions that are available. #[derive(Debug)] - pub struct FlightServiceServer { - inner: Arc, + pub struct FlightServiceServer { + inner: _Inner, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - impl FlightServiceServer { + struct _Inner(Arc); + impl FlightServiceServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { + let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -1113,8 +1115,8 @@ pub mod flight_service_server { impl tonic::codegen::Service> for FlightServiceServer where T: FlightService, - B: Body + std::marker::Send + 'static, - B::Error: Into + std::marker::Send + 'static, + B: Body + Send + 'static, + B::Error: Into + Send + 'static, { type Response = http::Response; type Error = std::convert::Infallible; @@ -1126,6 +1128,7 @@ pub mod flight_service_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { + let inner = self.inner.clone(); match req.uri().path() { "/arrow.flight.protocol.FlightService/Handshake" => { #[allow(non_camel_case_types)] @@ -1159,6 +1162,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = HandshakeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1205,6 +1209,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = ListFlightsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1250,6 +1255,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = GetFlightInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1296,6 +1302,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = PollFlightInfoSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1341,6 +1348,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = GetSchemaSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1387,6 +1395,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = DoGetSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1433,6 +1442,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = DoPutSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1479,6 +1489,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = DoExchangeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1525,6 +1536,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = DoActionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1571,6 +1583,7 @@ pub mod flight_service_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { + let inner = inner.0; let method = ListActionsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -1592,11 +1605,8 @@ pub mod flight_service_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", tonic::Code::Unimplemented as i32) - .header( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ) + .header("grpc-status", "12") + .header("content-type", "application/grpc") .body(empty_body()) .unwrap(), ) @@ -1605,7 +1615,7 @@ pub mod flight_service_server { } } } - impl Clone for FlightServiceServer { + impl Clone for FlightServiceServer { fn clone(&self) -> Self { let inner = self.inner.clone(); Self { @@ -1617,9 +1627,17 @@ pub mod flight_service_server { } } } - /// Generated gRPC service name - pub const SERVICE_NAME: &str = "arrow.flight.protocol.FlightService"; - impl tonic::server::NamedService for FlightServiceServer { - const NAME: &'static str = SERVICE_NAME; + impl Clone for _Inner { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } + } + impl std::fmt::Debug for _Inner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.0) + } + } + impl tonic::server::NamedService for FlightServiceServer { + const NAME: &'static str = "arrow.flight.protocol.FlightService"; } } diff --git a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs index 3eeed6ff4b12..822f095ed088 100644 --- a/arrow-flight/src/sql/arrow.flight.protocol.sql.rs +++ b/arrow-flight/src/sql/arrow.flight.protocol.sql.rs @@ -101,7 +101,7 @@ pub struct CommandGetSqlInfo { /// > /// The returned data should be ordered by data_type and then by type_name. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CommandGetXdbcTypeInfo { /// /// Specifies the data type to search for the info. @@ -121,7 +121,7 @@ pub struct CommandGetXdbcTypeInfo { /// > /// The returned data should be ordered by catalog_name. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CommandGetCatalogs {} /// /// Represents a request to retrieve the list of database schemas on a Flight SQL enabled backend. @@ -232,7 +232,7 @@ pub struct CommandGetTables { /// > /// The returned data should be ordered by table_type. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct CommandGetTableTypes {} /// /// Represents a request to retrieve the primary keys of a table on a Flight SQL enabled backend. @@ -511,7 +511,7 @@ pub struct ActionClosePreparedStatementRequest { /// Request message for the "BeginTransaction" action. /// Begins a transaction. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ActionBeginTransactionRequest {} /// /// Request message for the "BeginSavepoint" action. @@ -839,7 +839,7 @@ pub struct CommandStatementIngest { pub mod command_statement_ingest { /// Options for table definition behavior #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, Copy, PartialEq, ::prost::Message)] + #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableDefinitionOptions { #[prost( enumeration = "table_definition_options::TableNotExistOption", @@ -950,7 +950,7 @@ pub mod command_statement_ingest { /// CommandPreparedStatementUpdate, or CommandStatementIngest was /// in the request, containing results from the update. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct DoPutUpdateResult { /// The number of records updated. A return value of -1 represents /// an unknown updated record count. @@ -1010,7 +1010,7 @@ pub struct ActionCancelQueryRequest { /// This command is deprecated since 13.0.0. Use the "CancelFlightInfo" /// action with DoAction instead. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, Copy, PartialEq, ::prost::Message)] +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ActionCancelQueryResult { #[prost(enumeration = "action_cancel_query_result::CancelResult", tag = "1")] pub result: i32, diff --git a/arrow-flight/tests/common/trailers_layer.rs b/arrow-flight/tests/common/trailers_layer.rs index 0ccb7df86c74..b2ab74f7d925 100644 --- a/arrow-flight/tests/common/trailers_layer.rs +++ b/arrow-flight/tests/common/trailers_layer.rs @@ -21,7 +21,7 @@ use std::task::{Context, Poll}; use futures::ready; use http::{HeaderValue, Request, Response}; -use http_body::{Frame, SizeHint}; +use http_body::SizeHint; use pin_project_lite::pin_project; use tower::{Layer, Service}; @@ -99,19 +99,31 @@ impl http_body::Body for WrappedBody { type Data = B::Data; type Error = B::Error; - fn poll_frame( - self: Pin<&mut Self>, + fn poll_data( + mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - let mut result = ready!(self.project().inner.poll_frame(cx)); + ) -> Poll>> { + self.as_mut().project().inner.poll_data(cx) + } + + fn poll_trailers( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let result: Result, Self::Error> = + ready!(self.as_mut().project().inner.poll_trailers(cx)); + + let mut trailers = http::header::HeaderMap::new(); + trailers.insert("test-trailer", HeaderValue::from_static("trailer_val")); - if let Some(Ok(frame)) = &mut result { - if let Some(trailers) = frame.trailers_mut() { - trailers.insert("test-trailer", HeaderValue::from_static("trailer_val")); + match result { + Ok(Some(mut existing)) => { + existing.extend(trailers.iter().map(|(k, v)| (k.clone(), v.clone()))); + Poll::Ready(Ok(Some(existing))) } + Ok(None) => Poll::Ready(Ok(Some(trailers))), + Err(e) => Poll::Ready(Err(e)), } - - Poll::Ready(result) } fn is_end_stream(&self) -> bool { diff --git a/arrow-integration-testing/Cargo.toml b/arrow-integration-testing/Cargo.toml index 7be56d919852..032b99f4fbbb 100644 --- a/arrow-integration-testing/Cargo.toml +++ b/arrow-integration-testing/Cargo.toml @@ -42,11 +42,11 @@ async-trait = { version = "0.1.41", default-features = false } clap = { version = "4", default-features = false, features = ["std", "derive", "help", "error-context", "usage"] } futures = { version = "0.3", default-features = false } hex = { version = "0.4", default-features = false, features = ["std"] } -prost = { version = "0.13", default-features = false } +prost = { version = "0.12", default-features = false } serde = { version = "1.0", default-features = false, features = ["rc", "derive"] } serde_json = { version = "1.0", default-features = false, features = ["std"] } tokio = { version = "1.0", default-features = false } -tonic = { version = "0.12", default-features = false } +tonic = { version = "0.11", default-features = false } tracing-subscriber = { version = "0.3.1", default-features = false, features = ["fmt"], optional = true } num = { version = "0.4", default-features = false, features = ["std"] } flate2 = { version = "1", default-features = false, features = ["rust_backend"] }