Skip to content

Commit 4925adb

Browse files
committed
Fix broken streaming mode
1 parent 7226bfd commit 4925adb

File tree

2 files changed

+46
-52
lines changed

2 files changed

+46
-52
lines changed

audio/src/fetch/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ struct AudioFileShared {
248248
cond: Condvar,
249249
download_status: Mutex<AudioFileDownloadStatus>,
250250
download_strategy: Mutex<DownloadStrategy>,
251+
number_of_open_requests: AtomicUsize,
251252
ping_time_ms: AtomicUsize,
252253
read_position: AtomicUsize,
253254
}
@@ -356,6 +357,7 @@ impl AudioFileStreaming {
356357
downloaded: RangeSet::new(),
357358
}),
358359
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
360+
number_of_open_requests: AtomicUsize::new(0),
359361
ping_time_ms: AtomicUsize::new(0),
360362
read_position: AtomicUsize::new(0),
361363
});

audio/src/fetch/receive.rs

+44-52
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,16 @@ async fn receive_data(
6868
initial_data_offset: usize,
6969
initial_request_length: usize,
7070
request_sent_time: Instant,
71-
mut measure_ping_time: bool,
72-
finish_tx: mpsc::UnboundedSender<()>,
7371
) {
7472
let mut data_offset = initial_data_offset;
7573
let mut request_length = initial_request_length;
7674

75+
let old_number_of_request = shared
76+
.number_of_open_requests
77+
.fetch_add(1, atomic::Ordering::SeqCst);
78+
79+
let mut measure_ping_time = old_number_of_request == 0;
80+
7781
let result = loop {
7882
let data = match data_rx.next().await {
7983
Some(Ok(data)) => data,
@@ -121,7 +125,9 @@ async fn receive_data(
121125
shared.cond.notify_all();
122126
}
123127

124-
let _ = finish_tx.send(());
128+
shared
129+
.number_of_open_requests
130+
.fetch_sub(1, atomic::Ordering::SeqCst);
125131

126132
if result.is_err() {
127133
warn!(
@@ -144,9 +150,6 @@ struct AudioFileFetch {
144150
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
145151
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
146152
network_response_times_ms: Vec<usize>,
147-
number_of_open_requests: usize,
148-
149-
download_finish_tx: mpsc::UnboundedSender<()>,
150153
}
151154

152155
// Might be replaced by enum from std once stable
@@ -214,11 +217,7 @@ impl AudioFileFetch {
214217
range.start,
215218
range.length,
216219
Instant::now(),
217-
self.number_of_open_requests == 0,
218-
self.download_finish_tx.clone(),
219220
));
220-
221-
self.number_of_open_requests += 1;
222221
}
223222
}
224223

@@ -341,7 +340,6 @@ impl AudioFileFetch {
341340
}
342341
StreamLoaderCommand::StreamMode() => {
343342
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
344-
self.trigger_preload();
345343
}
346344
StreamLoaderCommand::Close() => return ControlFlow::Break,
347345
}
@@ -355,36 +353,6 @@ impl AudioFileFetch {
355353
output.seek(SeekFrom::Start(0)).unwrap();
356354
let _ = complete_tx.send(output);
357355
}
358-
359-
fn trigger_preload(&mut self) {
360-
if self.number_of_open_requests >= MAX_PREFETCH_REQUESTS {
361-
return;
362-
}
363-
364-
let max_requests_to_send = MAX_PREFETCH_REQUESTS - self.number_of_open_requests;
365-
366-
let bytes_pending: usize = {
367-
let download_status = self.shared.download_status.lock().unwrap();
368-
download_status
369-
.requested
370-
.minus(&download_status.downloaded)
371-
.len()
372-
};
373-
374-
let ping_time_seconds =
375-
0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
376-
let download_rate = self.session.channel().get_download_rate_estimate();
377-
378-
let desired_pending_bytes = max(
379-
(PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64)
380-
as usize,
381-
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize,
382-
);
383-
384-
if bytes_pending < desired_pending_bytes {
385-
self.pre_fetch_more_data(desired_pending_bytes - bytes_pending, max_requests_to_send);
386-
}
387-
}
388356
}
389357

390358
pub(super) async fn audio_file_fetch(
@@ -399,7 +367,6 @@ pub(super) async fn audio_file_fetch(
399367
complete_tx: oneshot::Sender<NamedTempFile>,
400368
) {
401369
let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel();
402-
let (download_finish_tx, mut download_finish_rx) = mpsc::unbounded_channel();
403370

404371
{
405372
let requested_range = Range::new(0, initial_data_length);
@@ -414,8 +381,6 @@ pub(super) async fn audio_file_fetch(
414381
0,
415382
initial_data_length,
416383
initial_request_sent_time,
417-
true,
418-
download_finish_tx.clone(),
419384
));
420385

421386
let mut fetch = AudioFileFetch {
@@ -426,9 +391,6 @@ pub(super) async fn audio_file_fetch(
426391
file_data_tx,
427392
complete_tx: Some(complete_tx),
428393
network_response_times_ms: Vec::new(),
429-
number_of_open_requests: 1,
430-
431-
download_finish_tx,
432394
};
433395

434396
loop {
@@ -442,12 +404,42 @@ pub(super) async fn audio_file_fetch(
442404
if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) {
443405
break;
444406
}
445-
},
446-
_ = download_finish_rx.recv() => {
447-
fetch.number_of_open_requests -= 1;
407+
}
408+
}
409+
410+
if fetch.get_download_strategy() == DownloadStrategy::Streaming() {
411+
let number_of_open_requests = fetch
412+
.shared
413+
.number_of_open_requests
414+
.load(atomic::Ordering::SeqCst);
415+
if number_of_open_requests < MAX_PREFETCH_REQUESTS {
416+
let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests;
417+
418+
let bytes_pending: usize = {
419+
let download_status = fetch.shared.download_status.lock().unwrap();
420+
download_status
421+
.requested
422+
.minus(&download_status.downloaded)
423+
.len()
424+
};
448425

449-
if fetch.get_download_strategy() == DownloadStrategy::Streaming() {
450-
fetch.trigger_preload();
426+
let ping_time_seconds =
427+
0.001 * fetch.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
428+
let download_rate = fetch.session.channel().get_download_rate_estimate();
429+
430+
let desired_pending_bytes = max(
431+
(PREFETCH_THRESHOLD_FACTOR
432+
* ping_time_seconds
433+
* fetch.shared.stream_data_rate as f64) as usize,
434+
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
435+
as usize,
436+
);
437+
438+
if bytes_pending < desired_pending_bytes {
439+
fetch.pre_fetch_more_data(
440+
desired_pending_bytes - bytes_pending,
441+
max_requests_to_send,
442+
);
451443
}
452444
}
453445
}

0 commit comments

Comments
 (0)