Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Fix repair dos
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Mar 31, 2020
1 parent 62e12e3 commit cded4d5
Showing 1 changed file with 36 additions and 3 deletions.
39 changes: 36 additions & 3 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
};
use bincode::serialize;
use solana_ledger::blockstore::Blockstore;
use solana_measure::measure::Measure;
use solana_measure::thread_mem_usage;
use solana_metrics::{datapoint_debug, inc_new_counter_debug};
use solana_perf::packet::{limited_deserialize, Packet, Packets, PacketsRecycler};
Expand Down Expand Up @@ -50,6 +51,7 @@ impl RepairType {
#[derive(Default)]
pub struct ServeRepairStats {
pub total_packets: usize,
pub dropped_packets: usize,
pub processed: usize,
pub self_repair: usize,
pub window_index: usize,
Expand Down Expand Up @@ -196,13 +198,39 @@ impl ServeRepair {
requests_receiver: &PacketReceiver,
response_sender: &PacketSender,
stats: &mut ServeRepairStats,
max_packets: &mut usize,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
let reqs = requests_receiver.recv_timeout(timeout)?;
stats.total_packets += reqs.packets.len();
let mut reqs_v = vec![requests_receiver.recv_timeout(timeout)?];
let mut total_packets = reqs_v[0].packets.len();

let mut dropped_packets = 0;
while let Ok(more) = requests_receiver.try_recv() {
total_packets += more.packets.len();
if total_packets < *max_packets {
// Drop the rest in the channel in case of dos
reqs_v.push(more);
} else {
dropped_packets += more.packets.len();
}
}

Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
stats.dropped_packets += dropped_packets;
stats.total_packets += total_packets;

let mut time = Measure::start("repair::handle_packets");
for reqs in reqs_v {
Self::handle_packets(obj, &recycler, blockstore, reqs, response_sender, stats);
}
time.stop();
if total_packets >= *max_packets {
if time.as_ms() > 1000 {
*max_packets = (*max_packets * 9) / 10;
} else {
*max_packets = (*max_packets * 10) / 9;
}
}
Ok(())
}

Expand All @@ -216,6 +244,9 @@ impl ServeRepair {
inc_new_counter_debug!("serve_repair-handle-repair--eq", stats.self_repair);
}

inc_new_counter_info!("serve_repair-total_packets", stats.total_packets);
inc_new_counter_info!("serve_repair-dropped_packets", stats.dropped_packets);

debug!(
"repair_listener: total_packets: {} passed: {}",
stats.total_packets, stats.processed
Expand Down Expand Up @@ -245,6 +276,7 @@ impl ServeRepair {
.spawn(move || {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
let mut max_packets = 1024;
loop {
let result = Self::run_listen(
&me,
Expand All @@ -253,6 +285,7 @@ impl ServeRepair {
&requests_receiver,
&response_sender,
&mut stats,
&mut max_packets,
);
match result {
Err(Error::RecvTimeoutError(_)) | Ok(_) => {}
Expand Down

0 comments on commit cded4d5

Please sign in to comment.