From fff2b36f228a5ee00fb7999322d0e05c56191858 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Mon, 19 Apr 2021 20:12:28 -0400 Subject: [PATCH] Issue 223: Fix connection leak (#231) --- .github/workflows/python_test.yml | 7 +- Cargo.toml | 2 +- bindings/src/lib.rs | 1 + config/src/lib.rs | 7 +- controller-client/Cargo.toml | 2 +- controller-client/proto/Controller.proto | 4 +- controller-client/src/lib.rs | 7 +- controller-client/src/model_helper.rs | 48 ++++---- integration_test/src/disconnection_tests.rs | 4 +- .../src/event_stream_reader_tests.rs | 1 + integration_test/src/lib.rs | 66 ++++++---- integration_test/src/wirecommand_tests.rs | 97 ++++++++------- src/cli.rs | 5 +- src/error.rs | 3 + src/event_reader.rs | 2 +- src/raw_client.rs | 16 +++ src/reactor/reactors.rs | 6 +- src/reader_group_config.rs | 12 +- src/segment_metadata.rs | 2 + src/segment_reader.rs | 23 +++- src/stream/event_pointer.rs | 2 +- src/stream/position.rs | 2 +- src/stream/stream_cut.rs | 4 +- src/table_synchronizer.rs | 2 +- src/tablemap.rs | 10 +- src/transaction/mod.rs | 4 +- src/transaction/pinger.rs | 29 ++--- .../transactional_event_stream_writer.rs | 4 +- wire_protocol/src/commands.rs | 115 +++++++++--------- wire_protocol/src/mock_connection.rs | 2 +- 30 files changed, 261 insertions(+), 228 deletions(-) diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index 583cedb94..bd7fc149d 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -24,9 +24,9 @@ jobs: java-version: '11' # The JDK version to make available on the path. - name: Download and Run Pravega standalone run: | - wget https://github.com/pravega/pravega/releases/download/v0.9.0/pravega-0.9.0.tgz - tar -xzvf pravega-0.9.0.tgz - pravega-0.9.0/bin/pravega-standalone > pravega.log 2>&1 & + wget https://github.com/Tristan1900/pravega/releases/download/ip-fix/pravega-0.10.0-ip-fix.tgz + tar -xzvf pravega-0.10.0-ip-fix.tgz + pravega-0.10.0-2866.172245f-SNAPSHOT/bin/pravega-standalone > pravega.log 2>&1 & sleep 120 && echo "Started standalone" - name: Set up Python uses: actions/setup-python@v2 @@ -44,6 +44,7 @@ jobs: run: tox -c bindings/tox.ini - name: Upload Pravega standalone logs uses: actions/upload-artifact@v2 + if: always() with: name: pravega-standalone-log path: pravega.log diff --git a/Cargo.toml b/Cargo.toml index a2ee5a75e..9a6517254 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ pravega-client-config = {path = "./config", version = "0.1"} async-trait = "0.1" futures = "0.3" snafu = "0.6" -tokio = { version = "1.1", features = ["full"] } +tokio = { version = "1", features = ["full"] } tracing = "0.1" tracing-futures = "0.2" tracing-subscriber = "0.2" diff --git a/bindings/src/lib.rs b/bindings/src/lib.rs index 52de8268f..f8c1a6987 100644 --- a/bindings/src/lib.rs +++ b/bindings/src/lib.rs @@ -7,6 +7,7 @@ // // http://www.apache.org/licenses/LICENSE-2.0 // +#![allow(clippy::from_over_into)] #[macro_use] extern crate cfg_if; diff --git a/config/src/lib.rs b/config/src/lib.rs index f38de1253..a9cfec0f0 100644 --- a/config/src/lib.rs +++ b/config/src/lib.rs @@ -222,7 +222,11 @@ mod tests { assert_eq!( config.credentials.get_request_metadata(), format!("{} {}", "Basic", "ABCDE") - ) + ); + env::remove_var("pravega_client_auth_method"); + env::remove_var("pravega_client_auth_username"); + env::remove_var("pravega_client_auth_password"); + env::remove_var("pravega_client_auth_token"); } #[test] @@ -241,5 +245,6 @@ mod tests { .build() .unwrap(); assert_eq!(config.trustcert, "/"); + env::remove_var("pravega_client_tls_cert_path"); } } diff --git a/controller-client/Cargo.toml b/controller-client/Cargo.toml index 16160ba2a..d3c0532b1 100644 --- a/controller-client/Cargo.toml +++ b/controller-client/Cargo.toml @@ -34,7 +34,7 @@ jsonwebtoken = "7" serde = {version = "1.0", features = ["derive"] } [build-dependencies] -tonic-build = "0.3" +tonic-build = "0.4" [[bin]] name = "controller-cli" diff --git a/controller-client/proto/Controller.proto b/controller-client/proto/Controller.proto index bd3dfdb98..35424d298 100644 --- a/controller-client/proto/Controller.proto +++ b/controller-client/proto/Controller.proto @@ -117,10 +117,10 @@ message TxnStatus { message PingTxnStatus { enum Status { + reserved 3; OK = 0; LEASE_TOO_LARGE = 1; MAX_EXECUTION_TIME_EXCEEDED = 2; - SCALE_GRACE_TIME_EXCEEDED = 3 [deprecated=true]; DISCONNECTED = 4; COMMITTED = 5; ABORTED = 6; @@ -240,9 +240,9 @@ message TxnId { } message CreateTxnRequest { + reserved 3; StreamInfo streamInfo = 1; int64 lease = 2; - int64 scaleGracePeriod = 3 [deprecated=true]; } message CreateTxnResponse { diff --git a/controller-client/src/lib.rs b/controller-client/src/lib.rs index f802d43ea..9c8326048 100644 --- a/controller-client/src/lib.rs +++ b/controller-client/src/lib.rs @@ -26,6 +26,7 @@ #![allow(clippy::multiple_crate_versions)] #![allow(dead_code)] #![allow(clippy::similar_names)] +#![allow(clippy::upper_case_acronyms)] use std::result::Result as StdResult; use std::time::{Duration, Instant}; @@ -867,7 +868,6 @@ impl ControllerClientImpl { let request = CreateTxnRequest { stream_info: Some(StreamInfo::from(stream)), lease: lease.as_millis() as i64, - scale_grace_period: 0, }; let op_status: StdResult, tonic::Status> = self .get_controller_client() @@ -941,11 +941,6 @@ impl ControllerClientImpl { operation: operation_name.into(), error_msg: "Ping transaction failed, Reason:MaxExecutionTimeExceeded".into(), }), - Status::ScaleGraceTimeExceeded => Err(ControllerError::OperationError { - can_retry: false, // do not retry. - operation: operation_name.into(), - error_msg: "Ping transaction failed, Reason:ScaleGraceTimeExceeded".into(), - }), Status::Disconnected => Err(ControllerError::OperationError { can_retry: false, // do not retry. operation: operation_name.into(), diff --git a/controller-client/src/model_helper.rs b/controller-client/src/model_helper.rs index 009714d52..64feeeaef 100644 --- a/controller-client/src/model_helper.rs +++ b/controller-client/src/model_helper.rs @@ -30,15 +30,14 @@ impl From for TxnId { } } } - -impl Into for ScopedSegment { - fn into(self) -> SegmentId { +impl From for SegmentId { + fn from(segment: ScopedSegment) -> SegmentId { SegmentId { stream_info: Some(StreamInfo { - scope: self.scope.name, - stream: self.stream.name, + scope: segment.scope.name, + stream: segment.stream.name, }), - segment_id: self.segment.number, + segment_id: segment.segment.number, } } } @@ -54,12 +53,11 @@ impl<'a> From<&'a ScopedSegment> for SegmentId { } } } - -impl Into for ScopedStream { - fn into(self) -> StreamInfo { +impl From for StreamInfo { + fn from(stream: ScopedStream) -> StreamInfo { StreamInfo { - scope: self.scope.name, - stream: self.stream.name, + scope: stream.scope.name, + stream: stream.stream.name, } } } @@ -98,30 +96,28 @@ impl<'a> From<&'a StreamConfiguration> for StreamConfig { } } } - -impl Into for StreamConfiguration { - fn into(self) -> StreamConfig { +impl From for StreamConfig { + fn from(config: StreamConfiguration) -> StreamConfig { StreamConfig { - stream_info: Some(self.scoped_stream.into()), + stream_info: Some(config.scoped_stream.into()), scaling_policy: Some(ScalingPolicy { - scale_type: self.scaling.scale_type as i32, - target_rate: self.scaling.target_rate, - scale_factor: self.scaling.scale_factor, - min_num_segments: self.scaling.min_num_segments, + scale_type: config.scaling.scale_type as i32, + target_rate: config.scaling.target_rate, + scale_factor: config.scaling.scale_factor, + min_num_segments: config.scaling.min_num_segments, }), retention_policy: Some(RetentionPolicy { - retention_type: self.retention.retention_type as i32, - retention_param: self.retention.retention_param, + retention_type: config.retention.retention_type as i32, + retention_param: config.retention.retention_param, }), } } } - -impl Into for pravega_client_shared::StreamCut { - fn into(self) -> crate::controller::StreamCut { +impl From for crate::controller::StreamCut { + fn from(cut: pravega_client_shared::StreamCut) -> crate::controller::StreamCut { crate::controller::StreamCut { - stream_info: Some(self.scoped_stream.into()), - cut: self.segment_offset_map, + stream_info: Some(cut.scoped_stream.into()), + cut: cut.segment_offset_map, } } } diff --git a/integration_test/src/disconnection_tests.rs b/integration_test/src/disconnection_tests.rs index 9c7064268..ddac8e34b 100644 --- a/integration_test/src/disconnection_tests.rs +++ b/integration_test/src/disconnection_tests.rs @@ -106,7 +106,7 @@ async fn create_scope_stream(controller_client: &dyn ControllerClient) { }) .await .expect("create scope"); - assert!(result, true); + assert!(result); let stream_name = Stream::from("testStream".to_owned()); let request = StreamConfiguration { @@ -135,7 +135,7 @@ async fn create_scope_stream(controller_client: &dyn ControllerClient) { }) .await .expect("create stream"); - assert!(result, true); + assert!(result); } fn test_retry_with_unexpected_reply() { diff --git a/integration_test/src/event_stream_reader_tests.rs b/integration_test/src/event_stream_reader_tests.rs index daa9e2324..9f64fb42f 100644 --- a/integration_test/src/event_stream_reader_tests.rs +++ b/integration_test/src/event_stream_reader_tests.rs @@ -44,6 +44,7 @@ pub fn test_event_stream_reader(config: PravegaStandaloneServiceConfig) { test_multiple_readers(&client_factory); test_reader_offline(&client_factory); test_segment_rebalance(&client_factory); + info!("test event stream reader finished"); } fn test_read_large_events(client_factory: &ClientFactory, rt: &Runtime) { diff --git a/integration_test/src/lib.rs b/integration_test/src/lib.rs index 6fa2a9082..9a1ee7686 100644 --- a/integration_test/src/lib.rs +++ b/integration_test/src/lib.rs @@ -72,19 +72,12 @@ mod test { fn integration_test() { trace::init(); // metric::metric_init(PROMETHEUS_SCRAPE_PORT.parse::().unwrap()); - let span = info_span!("integration test", auth = true, tls = true); - span.in_scope(|| { - info!("Running integration test"); - let config = PravegaStandaloneServiceConfig::new(false, true, true); - run_tests(config); - }); + info!("Running integration test"); + let config = PravegaStandaloneServiceConfig::new(false, true, true); + run_tests(config); - let span = info_span!("integration test", auth = false, tls = false); - span.in_scope(|| { - info!("Running integration test"); - let config = PravegaStandaloneServiceConfig::new(false, false, false); - run_tests(config); - }); + let config = PravegaStandaloneServiceConfig::new(false, false, false); + run_tests(config); // disconnection test will start its own Pravega Standalone. disconnection_tests::disconnection_test_wrapper(); @@ -98,20 +91,41 @@ mod test { env::set_var("pravega_client_auth_username", "admin"); env::set_var("pravega_client_auth_password", "1111_aaaa"); } - controller_tests::test_controller_apis(config.clone()); - - tablemap_tests::test_tablemap(config.clone()); - - event_stream_writer_tests::test_event_stream_writer(config.clone()); - - tablesynchronizer_tests::test_tablesynchronizer(config.clone()); - - transactional_event_stream_writer_tests::test_transactional_event_stream_writer(config.clone()); - - byte_stream_tests::test_byte_stream(config.clone()); - - event_stream_reader_tests::test_event_stream_reader(config.clone()); - + let span = info_span!("controller test", auth = config.auth, tls = config.tls); + span.in_scope(|| { + info!("Running controller test"); + controller_tests::test_controller_apis(config.clone()); + }); + let span = info_span!("table map test", auth = config.auth, tls = config.tls); + span.in_scope(|| { + info!("Running table map test"); + tablemap_tests::test_tablemap(config.clone()); + }); + let span = info_span!("event stream writer test", auth = config.auth, tls = config.tls); + span.in_scope(|| { + info!("Running event stream writer test"); + event_stream_writer_tests::test_event_stream_writer(config.clone()); + }); + let span = info_span!("synchronizer test", auth = config.auth, tls = config.tls); + span.in_scope(|| { + info!("Running synchronizer test"); + tablesynchronizer_tests::test_tablesynchronizer(config.clone()); + }); + let span = info_span!("transaction test", auth = config.auth, tls = config.tls); + span.in_scope(|| { + info!("Running transaction test"); + transactional_event_stream_writer_tests::test_transactional_event_stream_writer(config.clone()); + }); + let span = info_span!("byte stream test", auth = config.auth, tls = config.tls); + span.in_scope(|| { + info!("Running byte stream test"); + byte_stream_tests::test_byte_stream(config.clone()); + }); + let span = info_span!("event reader test", auth = config.auth, tls = config.tls); + span.in_scope(|| { + info!("Running event reader test"); + event_stream_reader_tests::test_event_stream_reader(config.clone()); + }); // Shut down Pravega standalone pravega.stop().unwrap(); wait_for_standalone_with_timeout(false, 30); diff --git a/integration_test/src/wirecommand_tests.rs b/integration_test/src/wirecommand_tests.rs index 46beeb505..545d87333 100644 --- a/integration_test/src/wirecommand_tests.rs +++ b/integration_test/src/wirecommand_tests.rs @@ -456,7 +456,7 @@ async fn test_get_stream_segment_info(factory: &ClientFactory) { .await .expect("fail to get reply"); if let Replies::StreamSegmentInfo(info) = reply { - assert!(info.is_sealed, true); + assert!(info.is_sealed); } else { panic!("Wrong reply type"); } @@ -787,15 +787,16 @@ async fn test_update_table_entries(factory: &ClientFactory) { raw_client.send_request(&request).await.expect("create segment"); //create a table. - let mut entries = Vec::new(); - entries.push(( - TableKey::new(String::from("key1").into_bytes(), i64::min_value()), - TableValue::new(String::from("value1").into_bytes()), - )); - entries.push(( - TableKey::new(String::from("key2").into_bytes(), i64::min_value()), - TableValue::new(String::from("value2").into_bytes()), - )); + let entries = vec![ + ( + TableKey::new(String::from("key1").into_bytes(), i64::min_value()), + TableValue::new(String::from("value1").into_bytes()), + ), + ( + TableKey::new(String::from("key2").into_bytes(), i64::min_value()), + TableValue::new(String::from("value2").into_bytes()), + ), + ]; let table = TableEntries { entries }; let request = Requests::UpdateTableEntries(UpdateTableEntriesCommand { request_id: 19, @@ -804,9 +805,7 @@ async fn test_update_table_entries(factory: &ClientFactory) { table_entries: table, table_segment_offset: -1, }); - let mut versions = Vec::new(); - versions.push(0_i64); - versions.push(27_i64); // why return version is 27. + let versions = vec![0_i64, 27_i64]; let reply = Replies::TableEntriesUpdated(TableEntriesUpdatedCommand { request_id: 19, updated_versions: versions, @@ -818,11 +817,10 @@ async fn test_update_table_entries(factory: &ClientFactory) { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); //test table key not exist. - let mut entries = Vec::new(); - entries.push(( + let entries = vec![( TableKey::new(String::from("key3").into_bytes(), 1), TableValue::new(String::from("value3").into_bytes()), - )); + )]; let table = TableEntries { entries }; let request = Requests::UpdateTableEntries(UpdateTableEntriesCommand { request_id: 20, @@ -842,11 +840,10 @@ async fn test_update_table_entries(factory: &ClientFactory) { .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); //test table key bad version. - let mut entries = Vec::new(); - entries.push(( + let entries = vec![( TableKey::new(String::from("key1").into_bytes(), 10), TableValue::new(String::from("value1").into_bytes()), - )); + )]; let table = TableEntries { entries }; let request = Requests::UpdateTableEntries(UpdateTableEntriesCommand { request_id: 21, @@ -895,10 +892,10 @@ async fn test_read_table_key(factory: &ClientFactory) { continuation_token: Vec::new(), }); - let mut keys = Vec::new(); - keys.push(TableKey::new(String::from("key1").into_bytes(), 0)); - keys.push(TableKey::new(String::from("key2").into_bytes(), 27)); - + let keys = vec![ + TableKey::new(String::from("key1").into_bytes(), 0), + TableKey::new(String::from("key2").into_bytes(), 27), + ]; let reply = raw_client.send_request(&request).await.expect("read table key"); if let Replies::TableKeysRead(t) = reply { @@ -930,10 +927,10 @@ async fn test_read_table(factory: &ClientFactory) { factory.get_config().request_timeout(), ); - let mut keys = Vec::new(); - keys.push(TableKey::new(String::from("key1").into_bytes(), i64::min_value())); - keys.push(TableKey::new(String::from("key2").into_bytes(), i64::min_value())); - + let keys = vec![ + TableKey::new(String::from("key1").into_bytes(), i64::MIN), + TableKey::new(String::from("key2").into_bytes(), i64::MIN), + ]; let request = Requests::ReadTable(ReadTableCommand { request_id: 23, segment: segment_name.to_string(), @@ -941,15 +938,16 @@ async fn test_read_table(factory: &ClientFactory) { keys, }); - let mut entries = Vec::new(); - entries.push(( - TableKey::new(String::from("key1").into_bytes(), 0), - TableValue::new(String::from("value1").into_bytes()), - )); - entries.push(( - TableKey::new(String::from("key2").into_bytes(), 27), - TableValue::new(String::from("value2").into_bytes()), - )); + let entries = vec![ + ( + TableKey::new(String::from("key1").into_bytes(), 0), + TableValue::new(String::from("value1").into_bytes()), + ), + ( + TableKey::new(String::from("key2").into_bytes(), 27), + TableValue::new(String::from("value2").into_bytes()), + ), + ]; let table = TableEntries { entries }; let reply = Replies::TableRead(TableReadCommand { @@ -958,10 +956,10 @@ async fn test_read_table(factory: &ClientFactory) { entries: table, }); - raw_client - .send_request(&request) - .await - .map_or_else(|e| panic!("failed to get reply: {}", e), |r| assert_eq!(reply, r)); + raw_client.send_request(&request).await.map_or_else( + |e| panic!("failed to get reply: {:?}", e), + |r| assert_eq!(reply, r), + ); } async fn test_read_table_entries(factory: &ClientFactory) { @@ -993,15 +991,16 @@ async fn test_read_table_entries(factory: &ClientFactory) { continuation_token: Vec::new(), }); - let mut entries = Vec::new(); - entries.push(( - TableKey::new(String::from("key1").into_bytes(), 0), - TableValue::new(String::from("value1").into_bytes()), - )); - entries.push(( - TableKey::new(String::from("key2").into_bytes(), 27), - TableValue::new(String::from("value2").into_bytes()), - )); + let entries = vec![ + ( + TableKey::new(String::from("key1").into_bytes(), 0), + TableValue::new(String::from("value1").into_bytes()), + ), + ( + TableKey::new(String::from("key2").into_bytes(), 27), + TableValue::new(String::from("value2").into_bytes()), + ), + ]; let table = TableEntries { entries }; let reply = raw_client diff --git a/src/cli.rs b/src/cli.rs index 85c62f6de..98303b397 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -653,11 +653,10 @@ async fn main() { table_segment_offset, } => { let id = ID_GENERATOR.fetch_add(1, Ordering::SeqCst) as i64; - let mut entries = Vec::new(); - entries.push(( + let entries = vec![( TableKey::new(key.into_bytes(), key_version), TableValue::new(value.into_bytes()), - )); + )]; let table = TableEntries { entries }; let request = Requests::UpdateTableEntries(UpdateTableEntriesCommand { request_id: id, diff --git a/src/error.rs b/src/error.rs index cea5f28cf..17fb6103d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -40,6 +40,9 @@ pub enum RawClientError { #[snafu(display("Request has timed out: {:?}", source))] RequestTimeout { source: Elapsed }, + + #[snafu(display("Wrong reply id {:?} for request {:?}", reply_id, request_id))] + WrongReplyId { reply_id: i64, request_id: i64 }, } impl RawClientError { diff --git a/src/event_reader.rs b/src/event_reader.rs index 770ff4f9b..88fa9b521 100644 --- a/src/event_reader.rs +++ b/src/event_reader.rs @@ -289,7 +289,7 @@ impl EventReader { slice_meta_map.iter().for_each(|(segment, meta)| { let (tx_stop, rx_stop) = oneshot::channel(); stop_reading_map.insert(segment.clone(), tx_stop); - factory.get_runtime().enter(); + let _guard = factory.get_runtime().enter(); tokio::spawn(SegmentSlice::get_segment_data( segment.clone(), meta.start_offset, diff --git a/src/raw_client.rs b/src/raw_client.rs index 4096b861d..f660073eb 100644 --- a/src/raw_client.rs +++ b/src/raw_client.rs @@ -8,11 +8,13 @@ // http://www.apache.org/licenses/LICENSE-2.0 // +use crate::error::RawClientError::WrongReplyId; use crate::error::*; use async_trait::async_trait; use pravega_client_shared::PravegaNodeUri; use pravega_connection_pool::connection_pool::ConnectionPool; use pravega_wire_protocol::client_connection::{ClientConnection, ClientConnectionImpl}; +use pravega_wire_protocol::commands::{Reply, Request}; use pravega_wire_protocol::connection_factory::SegmentConnectionManager; use pravega_wire_protocol::wire_commands::{Replies, Requests}; use snafu::ResultExt; @@ -80,6 +82,13 @@ impl<'a> RawClient<'a> for RawClientImpl<'a> { .await .context(RequestTimeout {})?; let reply = result.context(ReadReply {})?; + if reply.get_request_id() != request.get_request_id() { + client_connection.connection.invalidate(); + return Err(WrongReplyId { + reply_id: reply.get_request_id(), + request_id: request.get_request_id(), + }); + } check_auth_token_expired(&reply)?; Ok(reply) } @@ -100,6 +109,13 @@ impl<'a> RawClient<'a> for RawClientImpl<'a> { .await .context(RequestTimeout {})?; let reply = result.context(ReadReply {})?; + if reply.get_request_id() != request.get_request_id() { + client_connection.connection.invalidate(); + return Err(WrongReplyId { + reply_id: reply.get_request_id(), + request_id: request.get_request_id(), + }); + } check_auth_token_expired(&reply)?; Ok((reply, Box::new(client_connection) as Box)) } diff --git a/src/reactor/reactors.rs b/src/reactor/reactors.rs index f086f1dd1..1ecf4070b 100644 --- a/src/reactor/reactors.rs +++ b/src/reactor/reactors.rs @@ -187,11 +187,11 @@ impl Reactor { Ok(()) } _ => { - error!( - "receive unexpected reply {:?}, closing stream reactor", + info!( + "receive unexpected reply {:?}, probably because of the stale message in a reused connection", server_reply.reply ); - Err("Unexpected reply") + Ok(()) } } } diff --git a/src/reader_group_config.rs b/src/reader_group_config.rs index afd17bc81..c94a741db 100644 --- a/src/reader_group_config.rs +++ b/src/reader_group_config.rs @@ -95,7 +95,7 @@ impl ReaderGroupConfigBuilder { /// pub fn add_stream(&mut self, stream: ScopedStream) -> &mut Self { self.starting_stream_cuts - .insert(stream, StreamCutVersioned::UNBOUNDED); + .insert(stream, StreamCutVersioned::Unbounded); self } @@ -133,7 +133,7 @@ impl ReaderGroupConfigVersioned { } fn from_bytes(input: &[u8]) -> Result { - let decoded: ReaderGroupConfigVersioned = from_slice(&input[..]).context(Cbor { + let decoded: ReaderGroupConfigVersioned = from_slice(input).context(Cbor { msg: "serialize ReaderGroupConfigVersioned".to_owned(), })?; Ok(decoded) @@ -174,14 +174,14 @@ impl ReaderGroupConfigV1 { self.starting_stream_cuts.insert(stream.clone(), cut); } else { self.starting_stream_cuts - .insert(stream.clone(), StreamCutVersioned::UNBOUNDED); + .insert(stream.clone(), StreamCutVersioned::Unbounded); } if let Some(cut) = ending_stream_cuts { self.ending_stream_cuts.insert(stream, cut); } else { self.ending_stream_cuts - .insert(stream, StreamCutVersioned::UNBOUNDED); + .insert(stream, StreamCutVersioned::Unbounded); } self @@ -234,7 +234,7 @@ mod tests { .starting_stream_cuts .contains_key(&ScopedStream::from("scope2/s2"))); for val in v1.starting_stream_cuts.values() { - assert_eq!(&StreamCutVersioned::UNBOUNDED, val); + assert_eq!(&StreamCutVersioned::Unbounded, val); } } @@ -251,7 +251,7 @@ mod tests { .starting_stream_cuts .contains_key(&ScopedStream::from("scope1/s1"))); for val in v1.starting_stream_cuts.values() { - assert_eq!(&StreamCutVersioned::UNBOUNDED, val); + assert_eq!(&StreamCutVersioned::Unbounded, val); } } diff --git a/src/segment_metadata.rs b/src/segment_metadata.rs index a6beafe2d..8fedd6bf7 100644 --- a/src/segment_metadata.rs +++ b/src/segment_metadata.rs @@ -199,6 +199,8 @@ impl SegmentMetadataClient { RetryResult::Retry("wrong host".to_string()) } Replies::NoSuchSegment(_cmd) => RetryResult::Fail("no such segment".to_string()), + // this might caused by retry. + Replies::SegmentIsSealed(_cmd) => RetryResult::Success(()), _ => RetryResult::Fail("unexpected reply".to_string()), }, Err(e) => { diff --git a/src/segment_reader.rs b/src/segment_reader.rs index fe103c0d3..e471a2766 100644 --- a/src/segment_reader.rs +++ b/src/segment_reader.rs @@ -362,6 +362,7 @@ pub(crate) struct PrefetchingAsyncSegmentReader { end_of_segment: bool, receiver: Option>>, handle: Handle, + bg_job_handle: Option>, } // maximum number of buffered reply @@ -383,6 +384,7 @@ impl PrefetchingAsyncSegmentReader { handle, end_of_segment: false, receiver: None, + bg_job_handle: None, }; wrapper.issue_request_if_needed(); wrapper @@ -435,7 +437,7 @@ impl PrefetchingAsyncSegmentReader { self.reader } - /// Returns the size of data availble in buffer + /// Returns the size of data available in buffer pub(crate) fn available(&self) -> usize { let mut size = 0; for (i, cmd) in self.buffer.iter().enumerate() { @@ -451,13 +453,16 @@ impl PrefetchingAsyncSegmentReader { fn issue_request_if_needed(&mut self) { if !self.end_of_segment && self.receiver.is_none() { let (sender, receiver) = oneshot::channel(); + let (bg_sender, bg_receiver) = oneshot::channel(); self.handle.spawn(PrefetchingAsyncSegmentReader::read_async( self.reader.clone(), sender, + bg_receiver, self.offset + self.available() as i64, self.read_size as i32, )); self.receiver = Some(receiver); + self.bg_job_handle = Some(bg_sender); } } @@ -465,14 +470,20 @@ impl PrefetchingAsyncSegmentReader { async fn read_async( reader: Arc>, sender: oneshot::Sender>, + shutdown: oneshot::Receiver, offset: i64, length: i32, ) { - let result = reader.read(offset, length).await; - sender - .send(result) - .map_err(|e| warn!("failed to send reply back: {:?}", e)) - .expect("send reply back"); + tokio::select! { + result = reader.read(offset, length) => { + let _ = sender + .send(result) + .map_err(|_e| warn!("failed to send reply back")); + } + _ = shutdown => { + debug!("shut down background async read"); + } + } } fn fill_buffer_if_available(&mut self) -> Result<(), ReaderError> { diff --git a/src/stream/event_pointer.rs b/src/stream/event_pointer.rs index 22178a5c7..3e1c850fd 100644 --- a/src/stream/event_pointer.rs +++ b/src/stream/event_pointer.rs @@ -29,7 +29,7 @@ impl EventPointerVersioned { } fn from_bytes(input: &[u8]) -> Result { - let decoded: EventPointerVersioned = from_slice(&input[..]).context(Cbor { + let decoded: EventPointerVersioned = from_slice(input).context(Cbor { msg: "deserialize EventPointerVersioned".to_owned(), })?; Ok(decoded) diff --git a/src/stream/position.rs b/src/stream/position.rs index 2a0fb7b7f..951675426 100644 --- a/src/stream/position.rs +++ b/src/stream/position.rs @@ -30,7 +30,7 @@ impl PositionVersioned { } fn from_bytes(input: &[u8]) -> Result { - let decoded: PositionVersioned = from_slice(&input[..]).context(Cbor { + let decoded: PositionVersioned = from_slice(input).context(Cbor { msg: "serialize PositionVersioned".to_owned(), })?; Ok(decoded) diff --git a/src/stream/stream_cut.rs b/src/stream/stream_cut.rs index 28ecbbfe7..9c11c94a4 100644 --- a/src/stream/stream_cut.rs +++ b/src/stream/stream_cut.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; #[derive(Serialize, Deserialize, PartialEq, Debug, Clone)] pub(crate) enum StreamCutVersioned { V1(StreamCutV1), - UNBOUNDED, + Unbounded, } impl StreamCutVersioned { @@ -31,7 +31,7 @@ impl StreamCutVersioned { } fn from_bytes(input: &[u8]) -> Result { - let decoded: StreamCutVersioned = from_slice(&input[..]).context(Cbor { + let decoded: StreamCutVersioned = from_slice(input).context(Cbor { msg: "serialize StreamCutVersioned".to_owned(), })?; Ok(decoded) diff --git a/src/table_synchronizer.rs b/src/table_synchronizer.rs index f6e2f8fb1..6e316df02 100644 --- a/src/table_synchronizer.rs +++ b/src/table_synchronizer.rs @@ -1040,7 +1040,7 @@ mod test { let value_option = sync.get("outer_key", "inner_key"); assert!(value_option.is_some()); - rt.block_on(sync.remove(|table| { + rt.block_on(sync.insert(|table| { table.insert_tombstone("outer_key".to_owned(), "inner_key".to_owned())?; Ok(None) })) diff --git a/src/tablemap.rs b/src/tablemap.rs index 0c4612fc8..638306919 100644 --- a/src/tablemap.rs +++ b/src/tablemap.rs @@ -102,8 +102,9 @@ impl TableMap { Err(e) => { if e.is_token_expired() { delegation_token_provider.signal_token_expiry(); - info!("auth token needs to refresh"); + debug!("auth token needs to refresh"); } + debug!("retry on error {:?}", e); RetryResult::Retry(e) } } @@ -679,8 +680,9 @@ impl TableMap { Err(e) => { if e.is_token_expired() { self.delegation_token_provider.signal_token_expiry(); - info!("auth token needs to refresh"); + debug!("auth token needs to refresh"); } + debug!("retry on error {:?}", e); RetryResult::Retry(e) } } @@ -734,7 +736,6 @@ impl TableMap { .create_raw_client_for_endpoint(self.endpoint.clone()) .send_request(&req) .await; - debug!("Reply for read tableKeys request {:?}", result); match result { Ok(reply) => RetryResult::Success(reply), Err(e) => { @@ -946,18 +947,15 @@ mod test { .block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), -1, -1)) .expect("unconditionally insert into table map"); assert_eq!(version, 0); - // conditionally insert existing key let version = rt .block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1)) .expect("conditionally insert into table map"); assert_eq!(version, 1); - // conditionally insert key with wrong version let result = rt.block_on(table_map.insert_conditionally(&"key".to_string(), &"value".to_string(), 0, -1)); assert!(result.is_err()); - // conditionally remove key let result = rt.block_on(table_map.remove_conditionally(&"key".to_string(), 1, -1)); assert!(result.is_ok()); diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index b1d9625f8..02fa61a2c 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -73,7 +73,7 @@ impl Transaction { let txn_id = info.txn_id; let span = info_span!("StreamReactor", txn_id = %txn_id, event_stream_writer = %writer_id); // tokio::spawn is tied to the factory runtime. - rt_handle.enter(); + let _guard = rt_handle.enter(); tokio::spawn( Reactor::run( info.stream.clone(), @@ -206,7 +206,6 @@ impl Transaction { // remove this transaction from ping list self.handle .remove(self.info.txn_id) - .await .context(TxnStreamWriterError {})?; self.factory @@ -235,7 +234,6 @@ impl Transaction { // remove this transaction from ping list self.handle .remove(self.info.txn_id) - .await .context(TxnStreamWriterError {})?; self.factory diff --git a/src/transaction/pinger.rs b/src/transaction/pinger.rs index 6d9d67d3a..c12da550f 100644 --- a/src/transaction/pinger.rs +++ b/src/transaction/pinger.rs @@ -14,7 +14,7 @@ use futures::FutureExt; use pravega_client_shared::{PingStatus, ScopedStream, TxId}; use std::collections::HashSet; use std::time::Duration; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::time::sleep; use tracing::{debug, error, info}; @@ -33,17 +33,17 @@ pub(crate) struct Pinger { txn_lease_millis: u64, ping_interval_millis: u64, factory: ClientFactory, - receiver: Receiver, + receiver: UnboundedReceiver, } /// PingerHandle is just a wrapped channel sender which is used to communicate with the Pinger. /// It can be used to add or remove a transaction from Pinger's ping list. #[derive(Clone)] -pub(crate) struct PingerHandle(Sender); +pub(crate) struct PingerHandle(UnboundedSender); impl PingerHandle { - pub(crate) async fn add(&mut self, txn_id: TxId) -> Result<(), TransactionalEventStreamWriterError> { - if let Err(e) = self.0.send(PingerEvent::Add(txn_id)).await { + pub(crate) fn add(&mut self, txn_id: TxId) -> Result<(), TransactionalEventStreamWriterError> { + if let Err(e) = self.0.send(PingerEvent::Add(txn_id)) { error!("pinger failed to add transaction: {:?}", e); Err(TransactionalEventStreamWriterError::PingerError { msg: format!("failed to add transaction due to: {:?}", e), @@ -53,8 +53,8 @@ impl PingerHandle { } } - pub(crate) async fn remove(&mut self, txn_id: TxId) -> Result<(), TransactionalEventStreamWriterError> { - if let Err(e) = self.0.send(PingerEvent::Remove(txn_id)).await { + pub(crate) fn remove(&mut self, txn_id: TxId) -> Result<(), TransactionalEventStreamWriterError> { + if let Err(e) = self.0.send(PingerEvent::Remove(txn_id)) { error!("pinger failed to remove transaction: {:?}", e); Err(TransactionalEventStreamWriterError::PingerError { msg: format!("failed to remove transaction due to: {:?}", e), @@ -63,16 +63,11 @@ impl PingerHandle { Ok(()) } } +} - pub(crate) async fn shutdown(&mut self) -> Result<(), TransactionalEventStreamWriterError> { - if let Err(e) = self.0.send(PingerEvent::Terminate).await { - error!("pinger failed to shutdown: {:?}", e); - Err(TransactionalEventStreamWriterError::PingerError { - msg: format!("failed to shutdown transaction due to: {:?}", e), - }) - } else { - Ok(()) - } +impl Drop for PingerHandle { + fn drop(&mut self) { + let _res = self.0.send(PingerEvent::Terminate); } } @@ -82,7 +77,7 @@ impl Pinger { txn_lease_millis: u64, factory: ClientFactory, ) -> (Self, PingerHandle) { - let (tx, rx) = channel(100); + let (tx, rx) = unbounded_channel(); let pinger = Pinger { stream, txn_lease_millis, diff --git a/src/transaction/transactional_event_stream_writer.rs b/src/transaction/transactional_event_stream_writer.rs index a3325a27d..b68d4902d 100644 --- a/src/transaction/transactional_event_stream_writer.rs +++ b/src/transaction/transactional_event_stream_writer.rs @@ -82,7 +82,7 @@ impl TransactionalEventStreamWriter { Arc::new(factory.create_delegation_token_provider(stream.clone()).await); let runtime_handle = factory.get_runtime(); let span = info_span!("Pinger", transactional_event_stream_writer = %writer_id); - runtime_handle.enter(); + let _guard = runtime_handle.enter(); tokio::spawn(async move { pinger.start_ping().instrument(span).await }); TransactionalEventStreamWriter { stream, @@ -107,7 +107,7 @@ impl TransactionalEventStreamWriter { .context(TxnStreamControllerError {})?; info!("Transaction {} created", txn_segments.tx_id); let txn_id = txn_segments.tx_id; - self.pinger_handle.add(txn_id).await?; + self.pinger_handle.add(txn_id)?; Ok(Transaction::new( TransactionInfo::new(txn_id, self.writer_id, self.stream.clone(), false), txn_segments.stream_segments, diff --git a/wire_protocol/src/commands.rs b/wire_protocol/src/commands.rs index 32e79781b..d6ab5e2ac 100644 --- a/wire_protocol/src/commands.rs +++ b/wire_protocol/src/commands.rs @@ -93,7 +93,7 @@ impl Command for HelloCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: HelloCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: HelloCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -132,7 +132,7 @@ impl Command for WrongHostCommand { Ok(encoded) } fn read_from(input: &[u8]) -> Result { - let decoded: WrongHostCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: WrongHostCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -169,7 +169,7 @@ impl Command for SegmentIsSealedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentIsSealedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentIsSealedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -207,7 +207,7 @@ impl Command for SegmentIsTruncatedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentIsTruncatedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentIsTruncatedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -243,7 +243,7 @@ impl Command for SegmentAlreadyExistsCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentAlreadyExistsCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentAlreadyExistsCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -286,7 +286,7 @@ impl Command for NoSuchSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: NoSuchSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: NoSuchSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -328,7 +328,7 @@ impl Command for TableSegmentNotEmptyCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableSegmentNotEmptyCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableSegmentNotEmptyCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -370,7 +370,7 @@ impl Command for InvalidEventNumberCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: InvalidEventNumberCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: InvalidEventNumberCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -416,7 +416,7 @@ impl Command for OperationUnsupportedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: OperationUnsupportedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: OperationUnsupportedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -530,7 +530,7 @@ impl Command for SetupAppendCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SetupAppendCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SetupAppendCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -565,7 +565,7 @@ impl Command for AppendBlockCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: AppendBlockCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: AppendBlockCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -597,7 +597,7 @@ impl Command for AppendBlockEndCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: AppendBlockEndCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: AppendBlockEndCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -691,7 +691,7 @@ impl Command for AppendSetupCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: AppendSetupCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: AppendSetupCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -727,7 +727,7 @@ impl Command for DataAppendedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: DataAppendedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: DataAppendedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -761,7 +761,7 @@ impl Command for ConditionalCheckFailedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: ConditionalCheckFailedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: ConditionalCheckFailedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -797,7 +797,7 @@ impl Command for ReadSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: ReadSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: ReadSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -835,7 +835,7 @@ impl Command for SegmentReadCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentReadCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentReadCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -870,7 +870,7 @@ impl Command for GetSegmentAttributeCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: GetSegmentAttributeCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: GetSegmentAttributeCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -903,7 +903,7 @@ impl Command for SegmentAttributeCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentAttributeCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentAttributeCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -940,7 +940,7 @@ impl Command for UpdateSegmentAttributeCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: UpdateSegmentAttributeCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: UpdateSegmentAttributeCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -973,10 +973,9 @@ impl Command for SegmentAttributeUpdatedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentAttributeUpdatedCommand = - CONFIG.deserialize(&input[..]).context(InvalidData { - command_type: Self::TYPE_CODE, - })?; + let decoded: SegmentAttributeUpdatedCommand = CONFIG.deserialize(input).context(InvalidData { + command_type: Self::TYPE_CODE, + })?; Ok(decoded) } } @@ -1008,7 +1007,7 @@ impl Command for GetStreamSegmentInfoCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: GetStreamSegmentInfoCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: GetStreamSegmentInfoCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1047,7 +1046,7 @@ impl Command for StreamSegmentInfoCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: StreamSegmentInfoCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: StreamSegmentInfoCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1083,7 +1082,7 @@ impl Command for CreateSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: CreateSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: CreateSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1117,7 +1116,7 @@ impl Command for CreateTableSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: CreateTableSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: CreateTableSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1150,7 +1149,7 @@ impl Command for SegmentCreatedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentCreatedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentCreatedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1186,7 +1185,7 @@ impl Command for UpdateSegmentPolicyCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: UpdateSegmentPolicyCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: UpdateSegmentPolicyCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1219,7 +1218,7 @@ impl Command for SegmentPolicyUpdatedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentPolicyUpdatedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentPolicyUpdatedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1254,7 +1253,7 @@ impl Command for MergeSegmentsCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: MergeSegmentsCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: MergeSegmentsCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1288,7 +1287,7 @@ impl Command for MergeTableSegmentsCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: MergeTableSegmentsCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: MergeTableSegmentsCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1323,7 +1322,7 @@ impl Command for SegmentsMergedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentsMergedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentsMergedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1357,7 +1356,7 @@ impl Command for SealSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SealSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SealSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1391,7 +1390,7 @@ impl Command for SealTableSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SealTableSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SealTableSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1424,7 +1423,7 @@ impl Command for SegmentSealedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentSealedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentSealedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1459,7 +1458,7 @@ impl Command for TruncateSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TruncateSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TruncateSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1492,7 +1491,7 @@ impl Command for SegmentTruncatedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentTruncatedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentTruncatedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1526,7 +1525,7 @@ impl Command for DeleteSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: DeleteSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: DeleteSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1560,7 +1559,7 @@ impl Command for DeleteTableSegmentCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: DeleteTableSegmentCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: DeleteTableSegmentCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1593,7 +1592,7 @@ impl Command for SegmentDeletedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: SegmentDeletedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: SegmentDeletedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1670,7 +1669,7 @@ impl Command for AuthTokenCheckFailedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: AuthTokenCheckFailedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: AuthTokenCheckFailedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1731,7 +1730,7 @@ impl Command for UpdateTableEntriesCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: UpdateTableEntriesCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: UpdateTableEntriesCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1764,7 +1763,7 @@ impl Command for TableEntriesUpdatedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableEntriesUpdatedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableEntriesUpdatedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1800,7 +1799,7 @@ impl Command for RemoveTableKeysCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: RemoveTableKeysCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: RemoveTableKeysCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1833,7 +1832,7 @@ impl Command for TableKeysRemovedCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableKeysRemovedCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableKeysRemovedCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1868,7 +1867,7 @@ impl Command for ReadTableCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: ReadTableCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: ReadTableCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1902,7 +1901,7 @@ impl Command for TableReadCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableReadCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableReadCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1938,7 +1937,7 @@ impl Command for ReadTableKeysCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: ReadTableKeysCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: ReadTableKeysCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -1973,7 +1972,7 @@ impl Command for TableKeysReadCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableKeysReadCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableKeysReadCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -2009,7 +2008,7 @@ impl Command for ReadTableEntriesCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: ReadTableEntriesCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: ReadTableEntriesCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -2044,7 +2043,7 @@ impl Command for TableEntriesReadCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableEntriesReadCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableEntriesReadCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -2078,7 +2077,7 @@ impl Command for TableKeyDoesNotExistCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableKeyDoesNotExistCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableKeyDoesNotExistCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -2126,7 +2125,7 @@ impl Command for TableKeyBadVersionCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableKeyBadVersionCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableKeyBadVersionCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -2255,7 +2254,7 @@ impl Command for ReadTableEntriesDeltaCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: ReadTableEntriesDeltaCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: ReadTableEntriesDeltaCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -2292,7 +2291,7 @@ impl Command for TableEntriesDeltaReadCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: TableEntriesDeltaReadCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: TableEntriesDeltaReadCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) @@ -2329,7 +2328,7 @@ impl Command for ConditionalBlockEndCommand { } fn read_from(input: &[u8]) -> Result { - let decoded: ConditionalBlockEndCommand = CONFIG.deserialize(&input[..]).context(InvalidData { + let decoded: ConditionalBlockEndCommand = CONFIG.deserialize(input).context(InvalidData { command_type: Self::TYPE_CODE, })?; Ok(decoded) diff --git a/wire_protocol/src/mock_connection.rs b/wire_protocol/src/mock_connection.rs index 47efd79ae..5639c478a 100644 --- a/wire_protocol/src/mock_connection.rs +++ b/wire_protocol/src/mock_connection.rs @@ -448,7 +448,7 @@ async fn send_happy( table.remove(&k).expect("remove key"); } let reply = Replies::TableKeysRemoved(TableKeysRemovedCommand { - request_id: 0, + request_id: cmd.request_id, segment, }); sender.send(reply).expect("send reply");