Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement stream state management #553

Merged
merged 14 commits into from
Dec 1, 2023
Merged
30 changes: 30 additions & 0 deletions src/models/ctx/update_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub fn update_streams<E: Env + 'static>(
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
};
let last_stream_state = streams
.items
.get(&key)
.and_then(|item| item.adjusted_state(stream));

let streams_item = StreamsItem {
stream: stream.to_owned(),
Expand All @@ -42,12 +46,38 @@ pub fn update_streams<E: Env + 'static>(
video_id: video_id.to_owned(),
meta_transport_url: meta_request.base.to_owned(),
stream_transport_url: stream_request.base.to_owned(),
state: last_stream_state,
mtime: E::now(),
};

streams.items.insert(key, streams_item);
Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
}
Msg::Internal(Internal::StreamStateChanged {
state,
stream_request: Some(stream_request),
meta_request: Some(meta_request),
}) => {
let meta_id = &meta_request.path.id;
let video_id = &stream_request.path.id;

let key = StreamsItemKey {
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
};
let steam_item = streams.items.get(&key).cloned();
match steam_item {
Some(item) => {
let new_stream_item = StreamsItem {
state: Some(state.to_owned()),
..item
};
streams.items.insert(key, new_stream_item);
Effects::msg(Msg::Internal(Internal::StreamsChanged(false)))
}
None => Effects::none().unchanged(),
}
}
Msg::Internal(Internal::StreamsChanged(persisted)) if !persisted => {
Effects::one(push_streams_to_storage::<E>(streams)).unchanged()
}
Expand Down
109 changes: 48 additions & 61 deletions src/models/meta_details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
library::{LibraryBucket, LibraryItem},
profile::Profile,
resource::{MetaItem, Stream},
streams::{StreamsBucket, StreamsItemKey},
streams::StreamsBucket,
},
};

Expand Down Expand Up @@ -445,72 +445,59 @@ fn suggested_stream_update(
streams: &[ResourceLoadable<Vec<Stream>>],
stream_bucket: &StreamsBucket,
) -> Effects {
let all_streams = [meta_streams, streams].concat();
let next_suggested_stream = match selected {
Some(Selected {
meta_path,
stream_path: Some(stream_path),
..
}) => {
meta_items
.iter()
.find_map(|meta_item| match &meta_item.content {
Some(Loadable::Ready(meta_item)) => Some(&meta_item.videos),
_ => None,
})
.and_then(|videos| {
// Check saved stream only for last 30 videos starting from the current video
videos
.iter()
.position(|video| video.id == stream_path.id)
.and_then(|max_index| {
videos[max_index.saturating_sub(30)..=max_index]
.iter()
.rev()
.find_map(|video| {
stream_bucket.items.get(&StreamsItemKey {
meta_id: meta_path.id.to_string(),
video_id: video.id.to_owned(),
})
})
})
})
.and_then(|stream_item| {
[meta_streams, streams]
.concat()
.iter()
.find(|resource| resource.request.base == stream_item.stream_transport_url)
.and_then(|resource| match &resource.content {
Some(Loadable::Ready(streams)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Ready(
streams
.iter()
.find(|stream| *stream == &stream_item.stream)
.or_else(|| {
streams.iter().find(|stream| {
stream.behavior_hints.binge_group.as_deref()
== stream_item
.stream
.behavior_hints
.binge_group
.as_deref()
}) => meta_items
.iter()
.filter(|_| !all_streams.is_empty())
.find_map(|meta_item_res| match &meta_item_res.content {
Some(Loadable::Ready(meta_item)) => stream_bucket
.last_stream_item(&stream_path.id, meta_item)
.and_then(|stream_item| {
all_streams
.iter()
.find(|resource| {
resource.request.base == stream_item.stream_transport_url
})
.and_then(|resource| match &resource.content {
Some(Loadable::Ready(streams)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Ready(
streams
.iter()
.find(|stream| {
stream.is_source_match(&stream_item.stream)
})
.or_else(|| {
streams.iter().find(|stream| {
stream.is_binge_match(&stream_item.stream)
})
})
})
.cloned(),
)),
}),
Some(Loadable::Loading) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Loading),
}),
Some(Loadable::Err(error)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Err(error.clone())),
}),
_ => None,
.cloned(),
)),
}),
Some(Loadable::Loading) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Loading),
}),
Some(Loadable::Err(error)) => Some(ResourceLoadable {
request: resource.request.clone(),
content: Some(Loadable::Err(error.clone())),
}),
_ => None,
})
})
.or_else(|| {
Some(ResourceLoadable {
request: meta_item_res.request.clone(),
content: Some(Loadable::Ready(None)),
})
})
}
}),
_ => None,
}),
_ => None,
};
eq_update(suggested_stream, next_suggested_stream)
Expand Down
74 changes: 71 additions & 3 deletions src/models/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::types::addon::{AggrRequest, Descriptor, ExtraExt, ResourcePath, Resou
use crate::types::library::{LibraryBucket, LibraryItem};
use crate::types::profile::Settings as ProfileSettings;
use crate::types::resource::{MetaItem, SeriesInfo, Stream, Subtitles, Video};
use crate::types::streams::{StreamItemState, StreamsBucket};

use stremio_watched_bitfield::WatchedBitField;

Expand Down Expand Up @@ -84,6 +85,7 @@ pub struct Player {
pub next_stream: Option<Stream>,
pub series_info: Option<SeriesInfo>,
pub library_item: Option<LibraryItem>,
pub stream_state: Option<StreamItemState>,
#[serde(skip_serializing)]
pub watched: Option<WatchedBitField>,
#[serde(skip_serializing)]
Expand Down Expand Up @@ -165,6 +167,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
},
_ => eq_update(&mut self.meta_item, None),
};
let stream_state_effects = eq_update(&mut self.stream_state, None);
let video_params_effects = eq_update(&mut self.video_params, None);
let subtitles_effects = subtitles_update::<E>(
&mut self.subtitles,
Expand Down Expand Up @@ -241,6 +244,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
.join(update_streams_effects)
.join(selected_effects)
.join(meta_item_effects)
.join(stream_state_effects)
.join(video_params_effects)
.join(subtitles_effects)
.join(next_video_effects)
Expand Down Expand Up @@ -272,6 +276,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
let selected_effects = eq_update(&mut self.selected, None);
let video_params_effects = eq_update(&mut self.video_params, None);
let meta_item_effects = eq_update(&mut self.meta_item, None);
let stream_state_effects = eq_update(&mut self.stream_state, None);
let subtitles_effects = eq_update(&mut self.subtitles, vec![]);
let next_video_effects = eq_update(&mut self.next_video, None);
let next_streams_effects = eq_update(&mut self.next_streams, None);
Expand All @@ -289,6 +294,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
.join(selected_effects)
.join(video_params_effects)
.join(meta_item_effects)
.join(stream_state_effects)
.join(subtitles_effects)
.join(next_video_effects)
.join(next_streams_effects)
Expand All @@ -309,6 +315,24 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
);
video_params_effects.join(subtitles_effects)
}
Msg::Action(Action::Player(ActionPlayer::StreamStateChanged { state })) => {
let stream_state_effects =
eq_update(&mut self.stream_state, Some(state.to_owned()));
let state_changed_effects =
Effects::msg(Msg::Internal(Internal::StreamStateChanged {
state: state.to_owned(),
stream_request: self
.selected
.as_ref()
.and_then(|selected| selected.stream_request.to_owned()),
meta_request: self
.selected
.as_ref()
.and_then(|selected| selected.meta_request.to_owned()),
}))
.unchanged();
stream_state_effects.join(state_changed_effects)
}
Msg::Action(Action::Player(ActionPlayer::TimeChanged {
time,
duration,
Expand Down Expand Up @@ -468,6 +492,12 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
),
_ => Effects::none().unchanged(),
};
let stream_state_effects = stream_state_update(
&mut self.stream_state,
&self.selected,
&self.meta_item,
&ctx.streams,
);
let subtitles_effects = resources_update_with_vector_content::<E, _>(
&mut self.subtitles,
ResourcesAction::ResourceRequestResult { request, result },
Expand Down Expand Up @@ -532,6 +562,7 @@ impl<E: Env + 'static> UpdateWithCtx<E> for Player {
analytics_context.duration = duration;
};
meta_item_effects
.join(stream_state_effects)
.join(subtitles_effects)
.join(next_video_effects)
.join(next_streams_effects)
Expand Down Expand Up @@ -577,6 +608,45 @@ fn switch_to_next_video(
Effects::none().unchanged()
}

fn stream_state_update(
state: &mut Option<StreamItemState>,
selected: &Option<Selected>,
meta_item: &Option<ResourceLoadable<MetaItem>>,
streams: &StreamsBucket,
) -> Effects {
match (&state, selected, meta_item) {
(
None,
Some(Selected {
stream,
stream_request: Some(stream_request),
meta_request: Some(meta_request),
..
}),
Some(ResourceLoadable {
content: Some(Loadable::Ready(meta_item)),
..
}),
) => {
let video_id = &stream_request.path.id;
let next_state = streams
.last_stream_item(video_id, meta_item)
.and_then(|item| item.adjusted_state(stream));
let state_changed_effect = match &next_state {
Some(state) => Effects::msg(Msg::Internal(Internal::StreamStateChanged {
state: state.clone(),
stream_request: Some(stream_request.to_owned()),
meta_request: Some(meta_request.to_owned()),
}))
.unchanged(),
None => Effects::none().unchanged(),
};
eq_update(state, next_state).join(state_changed_effect)
}
_ => Effects::none().unchanged(),
}
}

fn next_video_update(
video: &mut Option<Video>,
stream: &Option<Stream>,
Expand Down Expand Up @@ -716,9 +786,7 @@ fn next_stream_update(
}),
) if settings.binge_watching => streams
.iter()
.find(|Stream { behavior_hints, .. }| {
behavior_hints.binge_group == stream.behavior_hints.binge_group
})
.find(|next_stream| next_stream.is_binge_match(stream))
.cloned(),
_ => None,
};
Expand Down
4 changes: 4 additions & 0 deletions src/runtime/msg/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::ops::Range;
use serde::Deserialize;
use url::Url;

use crate::types::streams::StreamItemState;
use crate::{
models::{
addon_details::Selected as AddonDetailsSelected,
Expand Down Expand Up @@ -130,6 +131,9 @@ pub enum ActionPlayer {
VideoParamsChanged {
video_params: Option<VideoParams>,
},
StreamStateChanged {
state: StreamItemState,
},
TimeChanged {
time: u64,
duration: u64,
Expand Down
7 changes: 7 additions & 0 deletions src/runtime/msg/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::types::library::{LibraryBucket, LibraryItem, LibraryItemId};
use crate::types::profile::{Auth, AuthKey, Profile, User};
use crate::types::resource::Stream;
use crate::types::streaming_server::Statistics;
use crate::types::streams::StreamItemState;

pub type CtxStorageResponse = (
Option<Profile>,
Expand Down Expand Up @@ -58,6 +59,12 @@ pub enum Internal {
stream_request: Option<ResourceRequest>,
meta_request: Option<ResourceRequest>,
},
/// Dispatched when stream item's state has changed
StreamStateChanged {
state: StreamItemState,
stream_request: Option<ResourceRequest>,
meta_request: Option<ResourceRequest>,
},
/// Dispatched when library item needs to be updated in the memory, storage and API.
UpdateLibraryItem(LibraryItem),
/// Dispatched when some of auth, addons or settings changed.
Expand Down
16 changes: 16 additions & 0 deletions src/types/resource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,22 @@ impl Stream {
_ => None,
}
}

#[inline]
pub fn is_source_match(&self, other_stream: &Stream) -> bool {
self.source == other_stream.source
}

#[inline]
pub fn is_binge_match(&self, other_stream: &Stream) -> bool {
match (
&self.behavior_hints.binge_group,
&other_stream.behavior_hints.binge_group,
) {
(Some(a), Some(b)) => a == b,
_ => false,
}
}
}

#[serde_as]
Expand Down
Loading