Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement stream state management #553

Merged
merged 14 commits into from
Dec 1, 2023
Merged
37 changes: 35 additions & 2 deletions src/models/ctx/update_streams.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use enclose::enclose;
use futures::FutureExt;
use std::collections::hash_map::Entry;

use crate::constants::STREAMS_STORAGE_KEY;
use crate::models::common::{Loadable, ResourceLoadable};
use crate::models::ctx::{CtxError, CtxStatus};
use crate::runtime::msg::{Action, ActionCtx, Event, Internal, Msg};
use crate::runtime::{Effect, EffectFuture, Effects, Env, EnvFutureExt};
Expand All @@ -25,15 +27,24 @@ pub fn update_streams<E: Env + 'static>(
Msg::Internal(Internal::StreamLoaded {
stream,
stream_request: Some(stream_request),
meta_request: Some(meta_request),
}) => {
meta_item:
ResourceLoadable {
request: meta_request,
content: Some(meta_content),
},
}) if !meta_content.is_loading() => {
let meta_id = &meta_request.path.id;
let video_id = &stream_request.path.id;

let key = StreamsItemKey {
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
};
let last_stream_item = match meta_content {
Loadable::Ready(meta_item) => streams.last_stream_item(video_id, meta_item),
_ => streams.items.get(&key),
};
let last_stream_state = last_stream_item.and_then(|item| item.adjusted_state(stream));

let streams_item = StreamsItem {
stream: stream.to_owned(),
Expand All @@ -42,12 +53,34 @@ pub fn update_streams<E: Env + 'static>(
video_id: video_id.to_owned(),
meta_transport_url: meta_request.base.to_owned(),
stream_transport_url: stream_request.base.to_owned(),
state: last_stream_state,
mtime: E::now(),
};

streams.items.insert(key, streams_item);
Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
}
Msg::Internal(Internal::StreamStateChanged {
state,
stream_request: Some(stream_request),
meta_request: Some(meta_request),
}) => {
let meta_id = &meta_request.path.id;
let video_id = &stream_request.path.id;

let key = StreamsItemKey {
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
};
let entry = streams
.items
.entry(key)
.and_modify(|item| item.state = Some(state.to_owned()));
match entry {
Entry::Occupied(_) => Effects::msg(Msg::Internal(Internal::StreamsChanged(false))),
_ => Effects::none().unchanged(),
}
}
Msg::Internal(Internal::StreamsChanged(persisted)) if !persisted => {
Effects::one(push_streams_to_storage::<E>(streams)).unchanged()
}
Expand Down
109 changes: 48 additions & 61 deletions src/models/meta_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
library::{LibraryBucket, LibraryItem},
profile::Profile,
resource::{MetaItem, Stream},
streams::{StreamsBucket, StreamsItemKey},
streams::StreamsBucket,
},
};

Expand Down Expand Up @@ -445,72 +445,59 @@ fn suggested_stream_update(
streams: &[ResourceLoadable<Vec<Stream>>],
stream_bucket: &StreamsBucket,
) -> Effects {
let all_streams = [meta_streams, streams].concat();
let next_suggested_stream = match selected {
Some(Selected {
meta_path,
stream_path: Some(stream_path),
..
}) => {
meta_items
.iter()
.find_map(|meta_item| match &meta_item.content {
Some(Loadable::Ready(meta_item)) => Some(&meta_item.videos),
_ => None,
})
.and_then(|videos| {
// Check saved stream only for last 30 videos starting from the current video
videos
.iter()
.position(|video| video.id == stream_path.id)
.and_then(|max_index| {
videos[max_index.saturating_sub(30)..=max_index]
.iter()
.rev()
.find_map(|video| {
stream_bucket.items.get(&StreamsItemKey {
meta_id: meta_path.id.to_string(),
video_id: video.id.to_owned(),
})
})
})
})
.and_then(|stream_item| {
[meta_streams, streams]
.concat()
.iter()
.find(|resource| resource.request.base == stream_item.stream_transport_url)
.and_then(|resource| match &resource.content {
Some(Loadable::Ready(streams)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Ready(
streams
.iter()
.find(|stream| *stream == &stream_item.stream)
.or_else(|| {
streams.iter().find(|stream| {
stream.behavior_hints.binge_group.as_deref()
== stream_item
.stream
.behavior_hints
.binge_group
.as_deref()
}) => meta_items
.iter()
.filter(|_| !all_streams.is_empty())
.find_map(|meta_item_res| match &meta_item_res.content {
Some(Loadable::Ready(meta_item)) => stream_bucket
.last_stream_item(&stream_path.id, meta_item)
.and_then(|stream_item| {
all_streams
.iter()
.find(|resource| {
resource.request.base == stream_item.stream_transport_url
})
.and_then(|resource| match &resource.content {
Some(Loadable::Ready(streams)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Ready(
streams
.iter()
.find(|stream| {
stream.is_source_match(&stream_item.stream)
})
.or_else(|| {
streams.iter().find(|stream| {
stream.is_binge_match(&stream_item.stream)
})
})
})
.cloned(),
)),
}),
Some(Loadable::Loading) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Loading),
}),
Some(Loadable::Err(error)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Err(error.clone())),
}),
_ => None,
.cloned(),
)),
}),
Some(Loadable::Loading) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Loading),
}),
Some(Loadable::Err(error)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Err(error.clone())),
}),
_ => None,
})
})
.or_else(|| {
Some(ResourceLoadable {
request: meta_item_res.request.clone(),
content: Some(Loadable::Ready(None)),
})
})
}
}),
_ => None,
}),
_ => None,
};
eq_update(suggested_stream, next_suggested_stream)
Expand Down
89 changes: 64 additions & 25 deletions src/models/player.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::marker::PhantomData;

use crate::constants::{
CREDITS_THRESHOLD_COEF, VIDEO_FILENAME_EXTRA_PROP, VIDEO_HASH_EXTRA_PROP,
CREDITS_THRESHOLD_COEF, META_RESOURCE_NAME, VIDEO_FILENAME_EXTRA_PROP, VIDEO_HASH_EXTRA_PROP,
VIDEO_SIZE_EXTRA_PROP, WATCHED_THRESHOLD_COEF,
};
use crate::models::common::{
Expand All @@ -15,6 +15,7 @@ use crate::types::addon::{AggrRequest, Descriptor, ExtraExt, ResourcePath, Resou
use crate::types::library::{LibraryBucket, LibraryItem};
use crate::types::profile::Settings as ProfileSettings;
use crate::types::resource::{MetaItem, SeriesInfo, Stream, Subtitles, Video};
use crate::types::streams::{StreamItemState, StreamsBucket, StreamsItemKey};

use stremio_watched_bitfield::WatchedBitField;

Expand Down Expand Up @@ -86,6 +87,7 @@ pub struct Player {
pub next_stream: Option<Stream>,
pub series_info: Option<SeriesInfo>,
pub library_item: Option<LibraryItem>,
pub stream_state: Option<StreamItemState>,
#[serde(skip_serializing)]
pub watched: Option<WatchedBitField>,
#[serde(skip_serializing)]
Expand Down Expand Up @@ -121,26 +123,6 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
} else {
Effects::none().unchanged()
};
let update_streams_effects = if self.selected.as_ref().map(|selected| {
(
&selected.stream,
&selected.stream_request,
&selected.meta_request,
)
}) != Some((
&selected.stream,
&selected.stream_request,
&selected.meta_request,
)) {
Effects::msg(Msg::Internal(Internal::StreamLoaded {
stream: selected.stream.to_owned(),
stream_request: selected.stream_request.to_owned(),
meta_request: selected.meta_request.to_owned(),
}))
.unchanged()
} else {
Effects::none().unchanged()
};
let selected_effects = eq_update(&mut self.selected, Some(*selected.to_owned()));
let meta_item_effects = match &selected.meta_request {
Some(meta_request) => match &mut self.meta_item {
Expand All @@ -167,6 +149,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
},
_ => eq_update(&mut self.meta_item, None),
};
let stream_state_effects = eq_update(&mut self.stream_state, None);
let video_params_effects = eq_update(&mut self.video_params, None);
let subtitles_effects = subtitles_update::<E>(
&mut self.subtitles,
Expand Down Expand Up @@ -240,9 +223,9 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
self.ended = false;
self.paused = None;
switch_to_next_video_effects
.join(update_streams_effects)
.join(selected_effects)
.join(meta_item_effects)
.join(stream_state_effects)
.join(video_params_effects)
.join(subtitles_effects)
.join(next_video_effects)
Expand Down Expand Up @@ -274,6 +257,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
let selected_effects = eq_update(&mut self.selected, None);
let video_params_effects = eq_update(&mut self.video_params, None);
let meta_item_effects = eq_update(&mut self.meta_item, None);
let stream_state_effects = eq_update(&mut self.stream_state, None);
let subtitles_effects = eq_update(&mut self.subtitles, vec![]);
let next_video_effects = eq_update(&mut self.next_video, None);
let next_streams_effects = eq_update(&mut self.next_streams, None);
Expand All @@ -291,6 +275,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
.join(selected_effects)
.join(video_params_effects)
.join(meta_item_effects)
.join(stream_state_effects)
.join(subtitles_effects)
.join(next_video_effects)
.join(next_streams_effects)
Expand All @@ -311,6 +296,20 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
);
video_params_effects.join(subtitles_effects)
}
Msg::Action(Action::Player(ActionPlayer::StreamStateChanged { state })) => {
Effects::msg(Msg::Internal(Internal::StreamStateChanged {
state: state.to_owned(),
stream_request: self
.selected
.as_ref()
.and_then(|selected| selected.stream_request.to_owned()),
meta_request: self
.selected
.as_ref()
.and_then(|selected| selected.meta_request.to_owned()),
}))
.unchanged()
}
Msg::Action(Action::Player(ActionPlayer::TimeChanged {
time,
duration,
Expand Down Expand Up @@ -462,6 +461,9 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
}))
.unchanged()
}
Msg::Internal(Internal::StreamsChanged(_)) => {
stream_state_update(&mut self.stream_state, &self.selected, &ctx.streams)
}
Msg::Internal(Internal::ResourceRequestResult(request, result)) => {
let meta_item_effects = match &mut self.meta_item {
Some(meta_item) => resource_update::<E, _>(
Expand All @@ -470,6 +472,19 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
),
_ => Effects::none().unchanged(),
};
let update_streams_effects = match (&self.selected, &self.meta_item) {
(Some(selected), Some(meta_item))
if request.path.resource == META_RESOURCE_NAME =>
{
Effects::msg(Msg::Internal(Internal::StreamLoaded {
stream: selected.stream.to_owned(),
stream_request: selected.stream_request.to_owned(),
meta_item: meta_item.to_owned(),
}))
.unchanged()
}
_ => Effects::none().unchanged(),
};
let subtitles_effects = resources_update_with_vector_content::<E, _>(
&mut self.subtitles,
ResourcesAction::ResourceRequestResult { request, result },
Expand Down Expand Up @@ -534,6 +549,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
analytics_context.duration = duration;
};
meta_item_effects
.join(update_streams_effects)
.join(subtitles_effects)
.join(next_video_effects)
.join(next_streams_effects)
Expand Down Expand Up @@ -579,6 +595,31 @@ fn switch_to_next_video(
Effects::none().unchanged()
}

fn stream_state_update(
state: &mut Option<StreamItemState>,
selected: &Option<Selected>,
streams: &StreamsBucket,
) -> Effects {
let next_state = match selected {
Some(Selected {
stream_request: Some(stream_request),
meta_request: Some(meta_request),
..
}) => {
let key = StreamsItemKey {
meta_id: meta_request.path.id.to_owned(),
video_id: stream_request.path.id.to_owned(),
};
streams
.items
.get(&key)
.and_then(|stream_item| stream_item.state.to_owned())
}
_ => None,
};
eq_update(state, next_state)
}

fn next_video_update(
video: &mut Option<Video>,
stream: &Option<Stream>,
Expand Down Expand Up @@ -718,9 +759,7 @@ fn next_stream_update(
}),
) if settings.binge_watching => streams
.iter()
.find(|Stream { behavior_hints, .. }| {
behavior_hints.binge_group == stream.behavior_hints.binge_group
})
.find(|next_stream| next_stream.is_binge_match(stream))
.cloned(),
_ => None,
};
Expand Down
Loading