Skip to content

Commit

Permalink
tonic: fold encode_client() into EncodeBody::new_client()
Browse files Browse the repository at this point in the history
  • Loading branch information
djc committed Sep 21, 2024
1 parent 168ba46 commit 66a2ab1
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 28 deletions.
5 changes: 3 additions & 2 deletions tonic/src/client/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::codec::compression::{CompressionEncoding, EnabledCompressionEncodings};
use crate::codec::EncodeBody;
use crate::metadata::GRPC_CONTENT_TYPE;
use crate::{
body::BoxBody,
client::GrpcService,
codec::{encode_client, Codec, Decoder, Streaming},
codec::{Codec, Decoder, Streaming},
request::SanitizeHeaders,
Code, Request, Response, Status,
};
Expand Down Expand Up @@ -295,7 +296,7 @@ impl<T> Grpc<T> {
{
let request = request
.map(|s| {
encode_client(
EncodeBody::new_client(
codec.encoder(),
s.map(Ok),
self.config.send_compression_encodings,
Expand Down
39 changes: 15 additions & 24 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,6 @@ use std::{
};
use tokio_stream::{adapters::Fuse, Stream, StreamExt};

/// Turns a stream of grpc messages into [EncodeBody] which is used by grpc clients for
/// turning the messages into http frames for sending over the network.
pub fn encode_client<T, U>(
encoder: T,
source: U,
compression_encoding: Option<CompressionEncoding>,
max_message_size: Option<usize>,
) -> EncodeBody<T, U>
where
T: Encoder<Error = Status>,
U: Stream,
{
let stream = EncodedBytes::new(
encoder,
source,
compression_encoding,
SingleMessageCompressionOverride::default(),
max_message_size,
);
EncodeBody::new_client(stream)
}

/// Combinator for efficient encoding of messages into reasonably sized buffers.
/// EncodedBytes encodes ready messages from its delegate stream into a BytesMut,
/// splitting off and yielding a buffer when either:
Expand Down Expand Up @@ -251,9 +229,22 @@ struct EncodeState {
}

impl<T: Encoder, U: Stream> EncodeBody<T, U> {
fn new_client(inner: EncodedBytes<T, U>) -> Self {
/// Turns a stream of grpc messages into [EncodeBody] which is used by grpc clients for
/// turning the messages into http frames for sending over the network.
pub fn new_client(
encoder: T,
source: U,
compression_encoding: Option<CompressionEncoding>,
max_message_size: Option<usize>,
) -> Self {
Self {
inner,
inner: EncodedBytes::new(
encoder,
source,
compression_encoding,
SingleMessageCompressionOverride::default(),
max_message_size,
),
state: EncodeState {
error: None,
role: Role::Client,
Expand Down
2 changes: 1 addition & 1 deletion tonic/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::io;
pub use self::buffer::{DecodeBuf, EncodeBuf};
pub use self::compression::{CompressionEncoding, EnabledCompressionEncodings};
pub use self::decode::Streaming;
pub use self::encode::{encode_client, EncodeBody};
pub use self::encode::EncodeBody;
#[cfg(feature = "prost")]
pub use self::prost::ProstCodec;

Expand Down
4 changes: 3 additions & 1 deletion tonic/src/codec/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ fn from_decode_error(error: prost::DecodeError) -> crate::Status {
#[cfg(test)]
mod tests {
use crate::codec::compression::SingleMessageCompressionOverride;
use crate::codec::{DecodeBuf, Decoder, EncodeBody, EncodeBuf, Encoder, Streaming, HEADER_SIZE};
use crate::codec::{
DecodeBuf, Decoder, EncodeBody, EncodeBuf, Encoder, Streaming, HEADER_SIZE,
};
use crate::Status;
use bytes::{Buf, BufMut, BytesMut};
use http_body::Body;
Expand Down

0 comments on commit 66a2ab1

Please sign in to comment.