Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for P2P blocklists #330

Merged
merged 7 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/librqbit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ regex = "1"
reqwest = { version = "0.12", default-features = false, features = [
"json",
"socks",
"stream",
] }
urlencoding = "2"
byteorder = "1"
Expand Down Expand Up @@ -117,6 +118,8 @@ async-backtrace = { version = "0.2", optional = true }
notify = { version = "7", optional = true }
walkdir = "2.5.0"
arc-swap = "1.7.1"
intervaltree = "0.2.7"
async-compression = {version="0.4.18", features= ["tokio", "gzip"] }

[build-dependencies]
anyhow = "1"
Expand Down
273 changes: 273 additions & 0 deletions crates/librqbit/src/blocklist.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
use anyhow::{Context, Result};
use async_compression::tokio::bufread::GzipDecoder;
use futures::TryStreamExt;
use intervaltree::IntervalTree;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::pin::Pin;
use std::str::FromStr;
use tokio::io::{AsyncBufRead, AsyncRead};
use tokio::{io::AsyncBufReadExt, io::BufReader};
use tokio_util::io::StreamReader;
use tracing::{debug, info, trace};
use url::Url;

pub struct Blocklist {
// ipv4 and ipv6 do not overlap
// see: https://www.rfc-editor.org/rfc/rfc4291#section-2.5.5
blocked_ranges: IntervalTree<IpAddr, ()>,
}

impl Blocklist {
pub fn empty() -> Self {
return Self::new(std::iter::empty());
}

pub fn new(ip_ranges: impl IntoIterator<Item = std::ops::Range<IpAddr>>) -> Self {
Self {
blocked_ranges: IntervalTree::from_iter(ip_ranges.into_iter().map(|r| (r, ()))),
}
}

pub async fn load_from_url(url: &str) -> Result<Self> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about treating file:// urls specially?

let parsed_url = Url::parse(url).context("Failed to parse URL")?;

if parsed_url.scheme() == "file" {
let path = parsed_url
.to_file_path()
.map_err(|_| anyhow::anyhow!("Failed to convert file URL to path"))?;
return Self::load_from_file(path.to_str().unwrap()).await;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than using .unwrap() here it would be better to pass &Path (or AsRef) into load_from_file

In any case let's strive not use .unwrap() in librqbit unless it's 100% clear it will never fail

}

let response = reqwest::get(parsed_url)
.await
.context("Failed to send request for blocklist")?;
if response.status() != 200 {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think reqwests has some shortcut like .status().is_success()

anyhow::bail!("Failed to fetch blocklist: HTTP {}", response.status());
}

let content_length = response
.content_length()
.ok_or_else(|| anyhow::anyhow!("Failed to get content length"))?;

if content_length < 2 {
anyhow::bail!("Content too short: not enough data to determine compression");
}

let reader = StreamReader::new(
response
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)),
);
Self::create_from_stream(reader).await
}

pub async fn load_from_file(path: &str) -> Result<Self> {
let file = tokio::fs::File::open(path).await?;
let reader = tokio::io::BufReader::new(file);
Self::create_from_stream(reader).await
}

async fn create_from_stream<R>(reader: R) -> Result<Self>
where
R: AsyncRead + Unpin + Send,
{
let mut peek_bytes = [0u8; 2];
let mut reader = tokio::io::BufReader::new(reader);

// Peek the first bytes by filling buffer
let buffer = reader.fill_buf().await?;
if buffer.len() >= 2 {
peek_bytes.copy_from_slice(&buffer[0..2]);
} else {
anyhow::bail!("Content too short: not enough data to determine compression");
}

// Check for Gzip magic bytes (1F 8B)
let is_gzip = peek_bytes == [0x1F, 0x8B];

let mut reader: Pin<Box<dyn AsyncBufRead + Send>> = if is_gzip {
trace!("Detected Gzip file, decompressing...");
Box::pin(BufReader::new(GzipDecoder::new(reader)))
} else {
trace!("Plain text file detected.");
Box::pin(reader)
};

let mut line: String = Default::default();
let mut ip_ranges: Vec<std::ops::Range<IpAddr>> = Vec::new();
while reader.read_line(&mut line).await? > 0 {
if let Some((start_ip, end_ip)) = parse_ip_range(&line) {
let range = start_ip..(increment_ip(end_ip));
ip_ranges.push(range);
}
line.clear();
}

info!(
ip_entry_count = ip_ranges.len(),
"Finished loading blocklist"
);

let blocklist = Self::new(ip_ranges);
Ok(blocklist)
}

pub fn is_blocked(&self, ip: IpAddr) -> bool {
self.blocked_ranges.query_point(ip).next().is_some()
}
}

/// Safely increments an `IpAddr`, returning `None` if it would overflow.
fn increment_ip(ip: IpAddr) -> IpAddr {
match ip {
IpAddr::V4(ipv4) => {
let num = u32::from_be_bytes(ipv4.octets());
std::net::IpAddr::V4(Ipv4Addr::from(num.saturating_add(1)))
}
IpAddr::V6(ipv6) => {
let num = u128::from_be_bytes(ipv6.octets());
std::net::IpAddr::V6(Ipv6Addr::from(num.saturating_add(1)))
}
}
}

fn parse_ip_range(line: &str) -> Option<(IpAddr, IpAddr)> {
// Skip comments and empty lines
let line = line.trim();
if line.starts_with('#') || line.is_empty() {
return None;
}

let is_ipv4 = line.matches('.').count() >= 6;
// Find the split point based on whether it's IPv4 or not
let split_point: usize = if is_ipv4 {
Copy link
Owner

@ikatson ikatson Feb 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this different for ipv4 and ipv6?

why not just

if let Some(rule_name, range) = split_once(':') {
    if let Some(start, end) = range.split_once('-') {
        match (start.parse::<IpAddr>, end.parse::<IpAddr>) {
            // if same kinds, handle, otherwise ignore
        }
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While testing against some real world block list i found out that some rules contained colon ()':') in the rule name.
This would cause the rule to fail parsing.
I think we could safely ignore those rules though since they are not well formatted.

line.rfind(':')
} else {
line.find(':')
}
.unwrap_or(0);

let (rule_name, ip_range) = line.split_at(split_point + 1);
if let Some((start, end)) = ip_range.split_once('-') {
if let (Ok(start_ip), Ok(end_ip)) =
(IpAddr::from_str(start.trim()), IpAddr::from_str(end.trim()))
{
return Some((start_ip, end_ip));
} else {
// Mismatched IP versions, skip this range
debug!(rulen_name = rule_name, "Could not be parsed");
}
}

None
}

#[cfg(test)]
mod tests {
use std::io::Cursor;

use super::*;
use async_compression::tokio::write::GzipEncoder;
use futures::stream::once;
use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn test_blocklist_gzipped() -> Result<()> {
let blocklist = r#"
# test
local:192.168.1.1-192.168.1.255
localv6:2001:db8::1-2001:db8::ffff
"#;
let mut gzipped_blocklist = Vec::new();
{
let mut encoder = GzipEncoder::new(&mut gzipped_blocklist);
encoder.write_all(blocklist.as_bytes()).await.unwrap();
encoder.flush().await.unwrap();
encoder.shutdown().await.unwrap();
}

let stream = StreamReader::new(Box::pin(once(async {
Ok::<_, std::io::Error>(Cursor::new(gzipped_blocklist))
})));
let blocklist = Blocklist::create_from_stream(stream).await?;
assert!(blocklist.is_blocked("192.168.1.1".parse().unwrap()));
assert!(!blocklist.is_blocked("8.8.8.8".parse().unwrap()));

Ok(())
}

#[tokio::test]
async fn test_blocklist_plaintext() -> Result<()> {
let blocklist = r#"
# test
local:192.168.1.1-192.168.1.255
localv6:2001:db8::1-2001:db8::ffff
"#;

let stream = StreamReader::new(Box::pin(once(async {
Ok::<_, std::io::Error>(Cursor::new(blocklist.as_bytes().to_vec()))
})));
let blocklist = Blocklist::create_from_stream(stream).await?;
assert!(blocklist.is_blocked("192.168.1.1".parse().unwrap()));
assert!(!blocklist.is_blocked("8.8.8.8".parse().unwrap()));

Ok(())
}

#[tokio::test]
async fn test_blocklist_from_plaintext_file() -> Result<()> {
let blocklist_content = r#"
# test
local:192.168.1.1-192.168.1.255
localv6:2001:db8::1-2001:db8::ffff
"#;

// Create a temporary file
let mut temp_file = tokio::fs::File::create("temp_blocklist.txt").await?;
tokio::io::AsyncWriteExt::write_all(&mut temp_file, blocklist_content.as_bytes()).await?;
drop(temp_file); // Close the file

// Load the blocklist from the file
let blocklist = Blocklist::load_from_file("temp_blocklist.txt").await?;

// Verify the blocklist
assert!(blocklist.is_blocked("192.168.1.1".parse().unwrap()));
assert!(!blocklist.is_blocked("8.8.8.8".parse().unwrap()));
assert!(blocklist.is_blocked("2001:db8::1".parse().unwrap()));
assert!(!blocklist.is_blocked("2001:4860:4860::8888".parse().unwrap()));

// Clean up the temporary file
tokio::fs::remove_file("temp_blocklist.txt").await?;

Ok(())
}

#[test]
fn test_blocklist_empty() {
let blocklist = Blocklist::empty();
assert!(!blocklist.is_blocked("127.0.0.1".parse().unwrap()));
assert!(!blocklist.is_blocked("::1".parse().unwrap()));
}

#[test]
fn test_manual_ranges() {
// Add IPv4 range
let start_v4: IpAddr = "192.168.0.0".parse().unwrap();
let end_v4: IpAddr = "192.168.255.255".parse().unwrap();
let ipv4_range = start_v4..end_v4;

// Add IPv6 range
let start_v6: IpAddr = "2001:db8::".parse().unwrap();
let end_v6: IpAddr = "2001:db8::ffff".parse().unwrap();
let ipv6_range = start_v6..end_v6;

let blocklist = Blocklist::new(vec![ipv4_range, ipv6_range]);
// Test IPv4 addresses
assert!(blocklist.is_blocked("192.168.1.1".parse().unwrap()));
assert!(!blocklist.is_blocked("10.0.0.1".parse().unwrap()));

// Test IPv6 addresses
assert!(blocklist.is_blocked("2001:db8::1".parse().unwrap()));
assert!(!blocklist.is_blocked("2001:db9::1".parse().unwrap()));
}
}
1 change: 1 addition & 0 deletions crates/librqbit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod api;
mod api_error;
mod bitv;
mod bitv_factory;
mod blocklist;
mod chunk_tracker;
mod create_torrent_file;
mod dht_utils;
Expand Down
Loading