Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming API for existing on-disk storage #124

Merged
merged 22 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
- [x] use some concurrent hashmap e.g. flurry or dashmap
- [x] tracing instead of logging. Debugging peers: RUST_LOG=[{peer=.*}]=debug
test-log for tests
- [x] reopen read only is bugged
- [x] (reopen) read only is bugged
- [x] initializing/checking
- [x] blocks the whole process. Need to break it up. On slower devices (rpi) just hangs for a good while
- [x] checking torrents should be visible right away
Expand Down Expand Up @@ -89,3 +89,13 @@ refactor:
- [x] don't account for stolen pieces in mesuring speed
- [ ] file priority
- [ ] start/end priority pieces per selected file, not per torrent

Streaming:

- [x] I want to stream files even if they are not checkboxed.

Other:

- [ ] keepalive is useless, the tieout is 120s, and read timeout is 10s. Need to send keepalive only if nothing was done recently.
- [x] url should have the filename
- [ ] reopening files: get rid of it!!! Even on Windows it should be alright - no need to reopen them.
7 changes: 6 additions & 1 deletion crates/librqbit/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
},
torrent_state::{
peer::stats::snapshot::{PeerStatsFilter, PeerStatsSnapshot},
ManagedTorrentHandle,
FileStream, ManagedTorrentHandle,
},
tracing_subscriber_config_utils::LineBroadcast,
};
Expand Down Expand Up @@ -239,6 +239,11 @@ impl Api {
let mgr = self.mgr_handle(idx)?;
Ok(mgr.with_chunk_tracker(|chunks| format!("{:?}", chunks.get_have_pieces()))?)
}

pub fn api_stream(&self, idx: TorrentId, file_id: usize) -> Result<FileStream> {
let mgr = self.mgr_handle(idx)?;
Ok(mgr.stream(file_id)?)
}
}

#[derive(Serialize)]
Expand Down
11 changes: 8 additions & 3 deletions crates/librqbit/src/chunk_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,6 @@ impl ChunkTracker {
&self.have
}

pub fn get_selected_pieces(&self) -> &BF {
&self.selected
}
pub fn reserve_needed_piece(&mut self, index: ValidPieceIndex) {
self.queue_pieces.set(index.get() as usize, false)
}
Expand Down Expand Up @@ -197,6 +194,10 @@ impl ChunkTracker {
.filter_map(|id| self.lengths.validate_piece_index(id))
}

pub(crate) fn is_piece_have(&self, id: ValidPieceIndex) -> bool {
self.have[id.get() as usize]
}

// None if wrong chunk
// true if did something
// false if didn't do anything
Expand Down Expand Up @@ -367,6 +368,10 @@ impl ChunkTracker {
self.hns = res;
Ok(res)
}

pub(crate) fn get_selected_pieces(&self) -> &BF {
&self.selected
}
}

#[cfg(test)]
Expand Down
66 changes: 64 additions & 2 deletions crates/librqbit/src/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ use axum::response::IntoResponse;
use axum::routing::{get, post};
use futures::future::BoxFuture;
use futures::{FutureExt, TryStreamExt};
use http::{HeaderMap, HeaderValue, StatusCode};
use itertools::Itertools;

use serde::{Deserialize, Serialize};
use std::io::SeekFrom;
use std::net::SocketAddr;
use std::str::FromStr;
use std::time::Duration;
use tracing::{debug, info};
use tokio::io::AsyncSeekExt;
use tracing::{debug, info, trace};

use axum::Router;

Expand Down Expand Up @@ -154,6 +157,60 @@ impl HttpApi {
state.api_peer_stats(idx, filter).map(axum::Json)
}

async fn torrent_stream_file(
State(state): State<ApiState>,
Path((idx, file_id)): Path<(usize, usize)>,
headers: http::HeaderMap,
) -> Result<impl IntoResponse> {
let mut stream = state.api_stream(idx, file_id)?;
let mut status = StatusCode::OK;
let mut output_headers = HeaderMap::new();
output_headers.insert("Accept-Ranges", HeaderValue::from_static("bytes"));

let range_header = headers.get(http::header::RANGE);
trace!(torrent_id=idx, file_id=file_id, range=?range_header, "request for HTTP stream");

if let Some(range) = headers.get(http::header::RANGE) {
let offset: Option<u64> = range
.to_str()
.ok()
.and_then(|s| s.strip_prefix("bytes="))
.and_then(|s| s.strip_suffix('-'))
.and_then(|s| s.parse().ok());
if let Some(offset) = offset {
status = StatusCode::PARTIAL_CONTENT;
stream
.seek(SeekFrom::Start(offset))
.await
.context("error seeking")?;

output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len() - stream.position()))
.context("bug")?,
);
output_headers.insert(
http::header::CONTENT_RANGE,
HeaderValue::from_str(&format!(
"bytes {}-{}/{}",
stream.position(),
stream.len().saturating_sub(1),
stream.len()
))
.context("bug")?,
);
} else {
output_headers.insert(
http::header::CONTENT_LENGTH,
HeaderValue::from_str(&format!("{}", stream.len())).context("bug")?,
);
}
}

let s = tokio_util::io::ReaderStream::new(stream);
Ok((status, (output_headers, axum::body::Body::from_stream(s))))
}

async fn torrent_action_pause(
State(state): State<ApiState>,
Path(idx): Path<usize>,
Expand Down Expand Up @@ -223,7 +280,12 @@ impl HttpApi {
.route("/torrents/:id/haves", get(torrent_haves))
.route("/torrents/:id/stats", get(torrent_stats_v0))
.route("/torrents/:id/stats/v1", get(torrent_stats_v1))
.route("/torrents/:id/peer_stats", get(peer_stats));
.route("/torrents/:id/peer_stats", get(peer_stats))
.route("/torrents/:id/stream/:file_id", get(torrent_stream_file))
.route(
"/torrents/:id/stream/:file_id/*filename",
get(torrent_stream_file),
);

if !self.opts.read_only {
app = app
Expand Down
17 changes: 0 additions & 17 deletions crates/librqbit/src/opened_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::{
use anyhow::Context;
use librqbit_core::lengths::Lengths;
use parking_lot::Mutex;
use tracing::debug;

#[derive(Debug)]
pub(crate) struct OpenedFile {
Expand Down Expand Up @@ -63,22 +62,6 @@ impl OpenedFile {
piece_range,
}
}
pub fn reopen(&self, read_only: bool) -> anyhow::Result<()> {
let log_suffix = if read_only { " read only" } else { "" };

let mut open_opts = std::fs::OpenOptions::new();
open_opts.read(true);
if !read_only {
open_opts.write(true).create(false);
}

let mut g = self.file.lock();
*g = open_opts
.open(&self.filename)
.with_context(|| format!("error re-opening {:?}{log_suffix}", self.filename))?;
debug!("reopened {:?}{log_suffix}", self.filename);
Ok(())
}

pub fn take(&self) -> anyhow::Result<File> {
let mut f = self.file.lock();
Expand Down
11 changes: 3 additions & 8 deletions crates/librqbit/src/torrent_state/initializing.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
fs::{File, OpenOptions},
fs::OpenOptions,
sync::{atomic::AtomicU64, Arc},
time::Instant,
};
Expand All @@ -16,10 +16,6 @@ use crate::{

use super::{paused::TorrentStatePaused, ManagedTorrentInfo};

fn ensure_file_length(file: &File, length: u64) -> anyhow::Result<()> {
Ok(file.set_len(length)?)
}

pub struct TorrentStateInitializing {
pub(crate) meta: Arc<ManagedTorrentInfo>,
pub(crate) only_files: Option<Vec<usize>>,
Expand Down Expand Up @@ -107,7 +103,7 @@ impl TorrentStateInitializing {
.unwrap_or(true)
{
let now = Instant::now();
if let Err(err) = ensure_file_length(&file.file.lock(), file.len) {
if let Err(err) = file.file.lock().set_len(file.len) {
warn!(
"Error setting length for file {:?} to {}: {:#?}",
file.filename, file.len, err
Expand All @@ -121,8 +117,6 @@ impl TorrentStateInitializing {
);
}
}

file.reopen(true)?;
}
Ok::<_, anyhow::Error>(())
})?;
Expand All @@ -138,6 +132,7 @@ impl TorrentStateInitializing {
info: self.meta.clone(),
files,
chunk_tracker,
streams: Arc::new(Default::default()),
};
Ok(paused)
}
Expand Down
Loading