From a15fa4840c45964390e2aab1d5d8214204e76230 Mon Sep 17 00:00:00 2001 From: sakridge Date: Wed, 1 Apr 2020 06:48:35 -0700 Subject: [PATCH] Fix repair dos (#9056) --- core/src/serve_repair.rs | 60 ++++++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 17 deletions(-) diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index c0a55143eb6d46..721e05cc600618 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -9,6 +9,7 @@ use crate::{ use bincode::serialize; use rand::{thread_rng, Rng}; use solana_ledger::blockstore::Blockstore; +use solana_measure::measure::Measure; use solana_measure::thread_mem_usage; use solana_metrics::{datapoint_debug, inc_new_counter_debug}; use solana_perf::packet::{Packets, PacketsRecycler}; @@ -184,12 +185,33 @@ impl ServeRepair { blockstore: Option<&Arc>, requests_receiver: &PacketReceiver, response_sender: &PacketSender, + max_packets: &mut usize, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); - let reqs = requests_receiver.recv_timeout(timeout)?; + let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?]; + let mut total_packets = reqs_v[0].packets.len(); + + while let Ok(more) = requests_receiver.try_recv() { + total_packets += more.packets.len(); + if total_packets < *max_packets { + // Drop the rest in the channel in case of dos + reqs_v.push(more); + } + } - Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender); + let mut time = Measure::start("repair::handle_packets"); + for reqs in reqs_v { + Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender); + } + time.stop(); + if total_packets >= *max_packets { + if time.as_ms() > 1000 { + *max_packets = (*max_packets * 9) / 10; + } else { + *max_packets = (*max_packets * 10) / 9; + } + } Ok(()) } @@ -204,22 +226,26 @@ impl ServeRepair { let recycler = PacketsRecycler::default(); Builder::new() .name("solana-repair-listen".to_string()) - .spawn(move || loop { - let result = Self::run_listen( - &me, - &recycler, - blockstore.as_ref(), - &requests_receiver, - &response_sender, - ); - match result { - Err(Error::RecvTimeoutError(_)) | Ok(_) => {} - Err(err) => info!("repair listener error: {:?}", err), - }; - if exit.load(Ordering::Relaxed) { - return; + .spawn(move || { + let mut max_packets = 1024; + loop { + let result = Self::run_listen( + &me, + &recycler, + blockstore.as_ref(), + &requests_receiver, + &response_sender, + &mut max_packets, + ); + match result { + Err(Error::RecvTimeoutError(_)) | Ok(_) => {} + Err(err) => info!("repair listener error: {:?}", err), + }; + if exit.load(Ordering::Relaxed) { + return; + } + thread_mem_usage::datapoint("solana-repair-listen"); } - thread_mem_usage::datapoint("solana-repair-listen"); }) .unwrap() }