From f731d0f7e6ff8d84f951d09343f8ed7bb17677b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Tue, 30 Aug 2022 17:13:37 -0400 Subject: [PATCH 1/3] add gzip encoding to firehose GRPC stream requests (requires server support) --- Cargo.lock | 22 +++++++++++++++++++++- graph/Cargo.toml | 4 ++-- graph/src/firehose/endpoints.rs | 8 ++++++-- graph/src/firehose/sf.firehose.v1.rs | 16 ++++++++++++++-- graph/src/substreams/sf.substreams.v1.rs | 16 ++++++++++++++-- 5 files changed, 57 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80d6a654cbd..95092d5e521 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -218,7 +218,7 @@ dependencies = [ "cc", "cfg-if 1.0.0", "libc", - "miniz_oxide", + "miniz_oxide 0.4.4", "object 0.26.0", "rustc-demangle", ] @@ -1238,6 +1238,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "398ea4fabe40b9b0d885340a2a991a44c8a645624075ad966d21f88688e2b69e" +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide 0.5.3", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2551,6 +2561,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "miniz_oxide" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.7.13" @@ -4476,6 +4495,7 @@ dependencies = [ "axum", "base64", "bytes", + "flate2", "futures-core", "futures-util", "h2", diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 6338d38c197..89a82fa1ad4 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -49,7 +49,7 @@ tokio-retry = "0.3.0" url = "2.2.1" prometheus = "0.13.1" priority-queue = "0.7.0" -tonic = { version = "0.7.1", features = ["tls-roots"] } +tonic = { version = "0.7.1", features = ["tls-roots","compression"] } prost = "0.10.4" prost-types = "0.10.1" @@ -70,4 +70,4 @@ maplit = "1.0.2" structopt = { version = "0.3" } [build-dependencies] -tonic-build = { version = "0.7.2", features = ["prost"] } +tonic-build = { version = "0.7.2", features = ["prost","compression"] } diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 5732f2375f0..f6908631bc8 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -119,7 +119,9 @@ impl FirehoseEndpoint { Ok(r) }, - ); + ) + .accept_gzip() + .send_gzip(); debug!( logger, @@ -208,7 +210,9 @@ impl FirehoseEndpoint { Ok(r) }, - ); + ) + .accept_gzip() + .send_gzip(); let response_stream = client.blocks(request).await?; let block_stream = response_stream.into_inner(); diff --git a/graph/src/firehose/sf.firehose.v1.rs b/graph/src/firehose/sf.firehose.v1.rs index 9866fe1ff62..d57c793a772 100644 --- a/graph/src/firehose/sf.firehose.v1.rs +++ b/graph/src/firehose/sf.firehose.v1.rs @@ -198,8 +198,8 @@ pub mod stream_server { #[derive(Debug)] pub struct StreamServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, } struct _Inner(Arc); impl StreamServer { @@ -223,6 +223,18 @@ pub mod stream_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with `gzip`. + #[must_use] + pub fn accept_gzip(mut self) -> Self { + self.accept_compression_encodings.enable_gzip(); + self + } + /// Compress responses with `gzip`, if the client supports it. + #[must_use] + pub fn send_gzip(mut self) -> Self { + self.send_compression_encodings.enable_gzip(); + self + } } impl tonic::codegen::Service> for StreamServer where diff --git a/graph/src/substreams/sf.substreams.v1.rs b/graph/src/substreams/sf.substreams.v1.rs index be5774745c8..0f81ea7fc97 100644 --- a/graph/src/substreams/sf.substreams.v1.rs +++ b/graph/src/substreams/sf.substreams.v1.rs @@ -482,8 +482,8 @@ pub mod stream_server { #[derive(Debug)] pub struct StreamServer { inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, } struct _Inner(Arc); impl StreamServer { @@ -507,6 +507,18 @@ pub mod stream_server { { InterceptedService::new(Self::new(inner), interceptor) } + /// Enable decompressing requests with `gzip`. + #[must_use] + pub fn accept_gzip(mut self) -> Self { + self.accept_compression_encodings.enable_gzip(); + self + } + /// Compress responses with `gzip`, if the client supports it. + #[must_use] + pub fn send_gzip(mut self) -> Self { + self.send_compression_encodings.enable_gzip(); + self + } } impl tonic::codegen::Service> for StreamServer where From 5b2eaa7563b41ff27284c46d43346d8bfe3d6d27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Thu, 8 Sep 2022 14:08:21 -0400 Subject: [PATCH 2/3] gate gzip on firehose behind 'compression' config feature --- graph/src/firehose/endpoints.rs | 16 ++++++++++++---- node/src/chain.rs | 1 + node/src/config.rs | 7 ++++++- tests/src/fixture/ethereum.rs | 1 + 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index f6908631bc8..344506a9a00 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -25,6 +25,7 @@ pub struct FirehoseEndpoint { pub provider: String, pub token: Option, pub filters_enabled: bool, + pub compression_enabled: bool, channel: Channel, } @@ -40,6 +41,7 @@ impl FirehoseEndpoint { url: S, token: Option, filters_enabled: bool, + compression_enabled: bool, conn_pool_size: u16, ) -> Self { let uri = url @@ -81,6 +83,7 @@ impl FirehoseEndpoint { channel, token, filters_enabled, + compression_enabled, } } @@ -120,8 +123,11 @@ impl FirehoseEndpoint { Ok(r) }, ) - .accept_gzip() - .send_gzip(); + .accept_gzip(); + + if self.compression_enabled { + client = client.send_gzip(); + } debug!( logger, @@ -211,8 +217,10 @@ impl FirehoseEndpoint { Ok(r) }, ) - .accept_gzip() - .send_gzip(); + .accept_gzip(); + if self.compression_enabled { + client = client.send_gzip(); + } let response_stream = client.blocks(request).await?; let block_stream = response_stream.into_inner(); diff --git a/node/src/chain.rs b/node/src/chain.rs index d183d5d0316..c46fc1668fa 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -187,6 +187,7 @@ pub fn create_firehose_networks( &firehose.url, firehose.token.clone(), firehose.filters_enabled(), + firehose.compression_enabled(), firehose.conn_pool_size, ); diff --git a/node/src/config.rs b/node/src/config.rs index bbbaa8a98ba..14d4906f249 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -549,7 +549,9 @@ pub enum ProviderDetails { } const FIREHOSE_FILTER_FEATURE: &str = "filters"; -const FIREHOSE_PROVIDER_FEATURES: [&str; 1] = [FIREHOSE_FILTER_FEATURE]; +const FIREHOSE_COMPRESSION_FEATURE: &str = "compression"; +const FIREHOSE_PROVIDER_FEATURES: [&str; 2] = + [FIREHOSE_FILTER_FEATURE, FIREHOSE_COMPRESSION_FEATURE]; fn ten() -> u16 { 10 @@ -569,6 +571,9 @@ impl FirehoseProvider { pub fn filters_enabled(&self) -> bool { self.features.contains(FIREHOSE_FILTER_FEATURE) } + pub fn compression_enabled(&self) -> bool { + self.features.contains(FIREHOSE_COMPRESSION_FEATURE) + } } #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index 3094198b477..c44922a3de1 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -33,6 +33,7 @@ pub async fn chain(blocks: Vec>, stores: &Stores) -> Ch "https://example.com", None, true, + false, 0, ))] .into(); From 63ed4df5d694cf9e098226d02d6126a7178dbfd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Thu, 8 Sep 2022 15:04:18 -0400 Subject: [PATCH 3/3] fix examples run from test for gzip grpc enhancement --- chain/ethereum/examples/firehose.rs | 1 + chain/substreams/examples/substreams.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/chain/ethereum/examples/firehose.rs b/chain/ethereum/examples/firehose.rs index ead0364494a..d2088b2167a 100644 --- a/chain/ethereum/examples/firehose.rs +++ b/chain/ethereum/examples/firehose.rs @@ -24,6 +24,7 @@ async fn main() -> Result<(), Error> { "https://api.streamingfast.io:443", token, false, + false, 1, )); diff --git a/chain/substreams/examples/substreams.rs b/chain/substreams/examples/substreams.rs index 07c758ba2e1..bc7af4acb5f 100644 --- a/chain/substreams/examples/substreams.rs +++ b/chain/substreams/examples/substreams.rs @@ -50,6 +50,7 @@ async fn main() -> Result<(), Error> { &endpoint, token, false, + false, 1, ));