From 2ed63611e11dd73fd0b47ee0ad36dcca7ccaa338 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 7 Oct 2022 09:14:50 +0400 Subject: [PATCH] fix(dht): remove some invalid saf failure cases --- comms/dht/src/dht.rs | 16 ++++++++++++++++ comms/dht/src/envelope.rs | 2 +- .../store_forward/database/stored_message.rs | 10 ++++------ comms/dht/src/store_forward/error.rs | 10 +++++----- comms/dht/src/store_forward/message.rs | 7 ++----- comms/dht/src/store_forward/saf_handler/task.rs | 17 +++++++++++------ comms/dht/src/store_forward/store.rs | 8 ++++---- 7 files changed, 43 insertions(+), 27 deletions(-) diff --git a/comms/dht/src/dht.rs b/comms/dht/src/dht.rs index e3acdae98a8..3fef7d43a89 100644 --- a/comms/dht/src/dht.rs +++ b/comms/dht/src/dht.rs @@ -31,6 +31,7 @@ use tari_comms::{ pipeline::PipelineError, }; use tari_shutdown::ShutdownSignal; +use tari_utilities::epoch_time::EpochTime; use thiserror::Error; use tokio::sync::{broadcast, mpsc}; use tower::{layer::Layer, Service, ServiceBuilder}; @@ -298,6 +299,7 @@ impl Dht { .layer(MetricsLayer::new(self.metrics_collector.clone())) .layer(inbound::DeserializeLayer::new(self.peer_manager.clone())) .layer(filter::FilterLayer::new(self.unsupported_saf_messages_filter())) + .layer(filter::FilterLayer::new(discard_expired_messages)) .layer(inbound::DecryptionLayer::new( self.config.clone(), self.node_identity.clone(), @@ -432,6 +434,20 @@ fn filter_messages_to_rebroadcast(msg: &DecryptedDhtMessage) -> bool { } } +/// Check message expiry and immediately discard if expired +fn discard_expired_messages(msg: &DhtInboundMessage) -> bool { + if let Some(expires) = msg.dht_header.expires { + if expires < EpochTime::now() { + debug!( + target: LOG_TARGET, + "[discard_expired_messages] Discarding expired message {}", msg + ); + return false; + } + } + true +} + #[cfg(test)] mod test { use std::{sync::Arc, time::Duration}; diff --git a/comms/dht/src/envelope.rs b/comms/dht/src/envelope.rs index 3f4f2ef06ee..6ac881cb80d 100644 --- a/comms/dht/src/envelope.rs +++ b/comms/dht/src/envelope.rs @@ -43,7 +43,7 @@ use crate::version::DhtProtocolVersion; pub(crate) fn datetime_to_timestamp(datetime: DateTime) -> Timestamp { Timestamp { seconds: datetime.timestamp(), - nanos: datetime.timestamp_subsec_nanos().try_into().unwrap_or(std::i32::MAX), + nanos: datetime.timestamp_subsec_nanos().try_into().unwrap_or(i32::MAX), } } diff --git a/comms/dht/src/store_forward/database/stored_message.rs b/comms/dht/src/store_forward/database/stored_message.rs index b8d095d9019..1913b5be02b 100644 --- a/comms/dht/src/store_forward/database/stored_message.rs +++ b/comms/dht/src/store_forward/database/stored_message.rs @@ -20,8 +20,6 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::convert::TryInto; - use chrono::NaiveDateTime; use tari_comms::message::MessageExt; use tari_utilities::{hex, hex::Hex}; @@ -50,7 +48,7 @@ pub struct NewStoredMessage { } impl NewStoredMessage { - pub fn try_construct(message: DecryptedDhtMessage, priority: StoredMessagePriority) -> Option { + pub fn new(message: DecryptedDhtMessage, priority: StoredMessagePriority) -> Self { let DecryptedDhtMessage { authenticated_origin, decryption_result, @@ -64,8 +62,8 @@ impl NewStoredMessage { }; let body_hash = hex::to_hex(&dedup::create_message_hash(&dht_header.message_signature, &body)); - Some(Self { - version: dht_header.version.as_major().try_into().ok()?, + Self { + version: dht_header.version.as_major() as i32, origin_pubkey: authenticated_origin.as_ref().map(|pk| pk.to_hex()), message_type: dht_header.message_type as i32, destination_pubkey: dht_header.destination.public_key().map(|pk| pk.to_hex()), @@ -81,7 +79,7 @@ impl NewStoredMessage { }, body_hash, body, - }) + } } } diff --git a/comms/dht/src/store_forward/error.rs b/comms/dht/src/store_forward/error.rs index 4a71b410eb1..85fd5678c21 100644 --- a/comms/dht/src/store_forward/error.rs +++ b/comms/dht/src/store_forward/error.rs @@ -27,7 +27,7 @@ use tari_comms::{ message::MessageError, peer_manager::{NodeId, PeerManagerError}, }; -use tari_utilities::byte_array::ByteArrayError; +use tari_utilities::{byte_array::ByteArrayError, epoch_time::EpochTime}; use thiserror::Error; use crate::{ @@ -81,10 +81,10 @@ pub enum StoreAndForwardError { RequesterChannelClosed, #[error("The request was cancelled by the store and forward service")] RequestCancelled, - #[error("The message was not valid for store and forward")] - InvalidStoreMessage, - #[error("The envelope version is invalid")] - InvalidEnvelopeVersion, + #[error("The {field} field was not valid, discarding SAF response: {details}")] + InvalidSafResponseMessage { field: &'static str, details: String }, + #[error("The message has expired, not storing message in SAF db (expiry: {expired}, now: {now})")] + NotStoringExpiredMessage { expired: EpochTime, now: EpochTime }, #[error("MalformedNodeId: {0}")] MalformedNodeId(#[from] ByteArrayError), #[error("DHT message type should not have been forwarded")] diff --git a/comms/dht/src/store_forward/message.rs b/comms/dht/src/store_forward/message.rs index f753b9941ba..f74af32c613 100644 --- a/comms/dht/src/store_forward/message.rs +++ b/comms/dht/src/store_forward/message.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use chrono::{DateTime, Utc}; use prost::Message; @@ -76,10 +76,7 @@ impl TryFrom for StoredMessage { let dht_header = DhtHeader::decode(message.header.as_slice())?; Ok(Self { stored_at: Some(datetime_to_timestamp(DateTime::from_utc(message.stored_at, Utc))), - version: message - .version - .try_into() - .map_err(|_| StoreAndForwardError::InvalidEnvelopeVersion)?, + version: message.version as u32, body: message.body, dht_header: Some(dht_header), }) diff --git a/comms/dht/src/store_forward/saf_handler/task.rs b/comms/dht/src/store_forward/saf_handler/task.rs index 7f5390d3827..10422a382f7 100644 --- a/comms/dht/src/store_forward/saf_handler/task.rs +++ b/comms/dht/src/store_forward/saf_handler/task.rs @@ -36,7 +36,7 @@ use tari_comms::{ types::CommsPublicKey, BytesMut, }; -use tari_utilities::{convert::try_convert_all, ByteArray}; +use tari_utilities::ByteArray; use tokio::sync::mpsc; use tower::{Service, ServiceExt}; @@ -216,7 +216,7 @@ where S: Service let messages = self.saf_requester.fetch_messages(query.clone()).await?; let stored_messages = StoredMessagesResponse { - messages: try_convert_all(messages)?, + messages: messages.into_iter().map(TryInto::try_into).collect::>()?, request_id: retrieve_msgs.request_id, response_type: resp_type as i32, }; @@ -430,8 +430,13 @@ where S: Service .stored_at .map(|t| { Result::<_, StoreAndForwardError>::Ok(DateTime::from_utc( - NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos.try_into().unwrap_or(u32::MAX)) - .ok_or(StoreAndForwardError::InvalidStoreMessage)?, + NaiveDateTime::from_timestamp_opt(t.seconds, 0).ok_or_else(|| { + StoreAndForwardError::InvalidSafResponseMessage { + field: "stored_at", + details: "number of seconds provided represents more days that can fit in a u32" + .to_string(), + } + })?, Utc, )) }) @@ -618,7 +623,7 @@ where S: Service mod test { use std::time::Duration; - use chrono::Utc; + use chrono::{Timelike, Utc}; use tari_comms::{message::MessageExt, runtime, wrap_in_envelope_body}; use tari_test_utils::collect_recv; use tari_utilities::{hex, hex::Hex}; @@ -932,7 +937,7 @@ mod test { .unwrap() .unwrap(); - assert_eq!(last_saf_received, msg2_time); + assert_eq!(last_saf_received.second(), msg2_time.second()); } #[runtime::test] diff --git a/comms/dht/src/store_forward/store.rs b/comms/dht/src/store_forward/store.rs index c0d2b8d224b..70690bde948 100644 --- a/comms/dht/src/store_forward/store.rs +++ b/comms/dht/src/store_forward/store.rs @@ -437,13 +437,13 @@ where S: Service + Se ); if let Some(expires) = message.dht_header.expires { - if expires < EpochTime::now() { - return SafResult::Err(StoreAndForwardError::InvalidStoreMessage); + let now = EpochTime::now(); + if expires < now { + return Err(StoreAndForwardError::NotStoringExpiredMessage { expired: expires, now }); } } - let stored_message = - NewStoredMessage::try_construct(message, priority).ok_or(StoreAndForwardError::InvalidStoreMessage)?; + let stored_message = NewStoredMessage::new(message, priority); self.saf_requester.insert_message(stored_message).await } }