Skip to content

Commit

Permalink
Merge pull request #607 from Stremio/fix/addons-lastVideosIds-aggrega…
Browse files Browse the repository at this point in the history
…te-request

Fix/addons last videos ids aggregate request
  • Loading branch information
tymmesyde authored Feb 2, 2024
2 parents 19ff6d6 + 9840d02 commit fdf10ae
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 118 deletions.
269 changes: 161 additions & 108 deletions src/models/ctx/update_notifications.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::{hash_map::Entry, HashMap};

use chrono::Duration;
use either::Either;
use chrono::{DateTime, Duration, Utc};
use futures::FutureExt;
use lazysort::SortedBy;
use once_cell::sync::Lazy;
use tracing::trace;

use crate::{
constants::{LAST_VIDEOS_IDS_EXTRA_PROP, NOTIFICATIONS_STORAGE_KEY, NOTIFICATION_ITEMS_COUNT},
Expand All @@ -20,7 +20,7 @@ use crate::{
Effect, EffectFuture, Effects, Env, EnvFutureExt,
},
types::{
addon::{AggrRequest, ExtraValue},
addon::{AggrRequest, ExtraType},
library::LibraryBucket,
notifications::{NotificationItem, NotificationsBucket},
profile::Profile,
Expand All @@ -40,6 +40,9 @@ pub fn update_notifications<E: Env + 'static>(
) -> Effects {
match msg {
Msg::Action(Action::Ctx(ActionCtx::PullNotifications)) => {
Effects::msg(Msg::Internal(Internal::PullNotifications)).unchanged()
}
Msg::Internal(Internal::PullNotifications) => {
let (reason, should_make_request) = match notifications.last_updated {
Some(last_updated) if last_updated + *REQUEST_LAST_VIDEOS_EVERY <= E::now() => (
format!(
Expand Down Expand Up @@ -68,40 +71,39 @@ pub fn update_notifications<E: Env + 'static>(
"Should last-videos addon resource be called? {should_make_request}"
);

if !should_make_request {
return Effects::none().unchanged();
}

let library_item_ids = library
let sorted_library_items_id_types = library
.items
.values()
.filter(|library_item| library_item.should_pull_notifications())
.sorted_by(|a, b| b.mtime.cmp(&a.mtime))
.take(NOTIFICATION_ITEMS_COUNT)
.map(|library_item| &library_item.id)
.cloned()
.map(|library_item| (library_item.id.to_owned(), library_item.r#type.to_owned()))
.collect::<Vec<_>>();

let notifications_catalog_resource_effects = if !library_item_ids.is_empty() {
resources_update_with_vector_content::<E, _>(
notification_catalogs,
// force the making of a requests every time PullNotifications is called.
ResourcesAction::force_request(
&AggrRequest::AllCatalogs {
extra: &vec![ExtraValue {
name: LAST_VIDEOS_IDS_EXTRA_PROP.name.to_owned(),
value: library_item_ids.join(","),
}],
r#type: &None,
},
&profile.addons,
),
)
} else {
Effects::none().unchanged()
};
let notifications_catalog_resource_effects =
if !sorted_library_items_id_types.is_empty() && should_make_request {
trace!(
"Sorted by `mtime` LibraryItem id and type: {:?}",
sorted_library_items_id_types
);
let catalog_resource_effects = resources_update_with_vector_content::<E, _>(
notification_catalogs,
// force the making of a requests every time PullNotifications is called.
ResourcesAction::force_request(
&AggrRequest::CatalogsFiltered(vec![ExtraType::Ids {
extra_name: LAST_VIDEOS_IDS_EXTRA_PROP.name.to_owned(),
id_types: sorted_library_items_id_types,
limit: Some(NOTIFICATION_ITEMS_COUNT),
}]),
&profile.addons,
),
);

notifications.last_updated = Some(E::now());

notifications.last_updated = Some(E::now());
catalog_resource_effects
} else {
Effects::none().unchanged()
};

// first update the notification items
let notification_items_effects = update_notification_items::<E>(
Expand Down Expand Up @@ -145,8 +147,12 @@ pub fn update_notifications<E: Env + 'static>(
} else {
Effects::none().unchanged()
};

let pull_notifications_effects =
Effects::msg(Msg::Internal(Internal::PullNotifications)).unchanged();
notification_catalogs_effects
.join(notifications_effects)
.join(pull_notifications_effects)
.unchanged()
}
_ => Effects::none().unchanged(),
Expand Down Expand Up @@ -192,100 +198,147 @@ fn update_notification_items<E: Env + 'static>(
) -> Effects {
let selected_catalogs = notification_catalogs
.iter()
// take any catalog while the catalog has successful result or resulted in error
.take_while(|catalog| {
// take all catalogs with successful result or error
.filter(|catalog| {
matches!(
&catalog.content,
Some(Loadable::Ready(_)) | Some(Loadable::Err(_))
)
})
.collect::<Vec<_>>();

// Get next notifications ids from lastVideosIds request's extra value
let next_notification_ids = notification_catalogs
.first()
.map(|resource| &resource.request.path.extra)
.map(|extra| Either::Left(extra.iter()))
.unwrap_or_else(|| Either::Right(std::iter::empty()))
.find(|extra_value| extra_value.name == LAST_VIDEOS_IDS_EXTRA_PROP.name)
.map(|extra_value| Either::Left(extra_value.value.split(',')))
.unwrap_or_else(|| Either::Right(std::iter::empty()));

let next_notification_items = next_notification_ids.fold(HashMap::new(), |mut map, meta_id| {
// Get the LibraryItem from user's library
// Exit early if library item does not exist in the Library
// or we do not need to pull notifications for it
let library_item = match library.items.get(meta_id) {
Some(library_item) if library_item.should_pull_notifications() => library_item,
_ => return map,
};

// find the first occurrence of the meta item inside the catalogs
let meta_item = match selected_catalogs.iter().find_map(|catalog| {
catalog
.content
.as_ref()
.and_then(|content| content.ready())
.and_then(|content| {
content
.iter()
.find(|meta_item| meta_item.preview.id == meta_id)
})
}) {
Some(meta_item) if !meta_item.videos.is_empty() => meta_item,
_ => return map,
};
// shared function to decide if a given video should be included in notifications
// or excluded
// returns the video_released DateTime extracted from the arguments if it should be retained
let should_retail_video_released = |last_watched: Option<&DateTime<Utc>>,
video_released: Option<&DateTime<Utc>>|
-> Option<DateTime<Utc>> {
match (last_watched, video_released) {
(Some(last_watched), Some(video_released)) => {
if last_watched < video_released &&
// exclude future videos (i.e. that will air in the future)
video_released <= &E::now()
{
Some(*video_released)
} else {
None
}
}
// if you've never watched an episode, then we want to include new videos
(None, Some(video_released)) => Some(*video_released),
_ => None,
}
};

let mut meta_notifs: &mut HashMap<_, _> = map.entry(meta_id.to_string()).or_default();
let next_notification_items =
library
.items
.iter()
.fold(HashMap::new(), |mut map, (meta_id, library_item)| {
// Exit early if we don't need to pull notifications for the library item
if !library_item.should_pull_notifications() {
return map;
}

// meta items videos
meta_item
.videos_iter()
.filter_map(|video| {
match (&library_item.state.last_watched, video.released) {
(Some(last_watched), Some(video_released)) => {
if last_watched < &video_released &&
// exclude future videos (i.e. that will air in the future)
video_released <= E::now()
{
Some((&library_item.id, &video.id, video_released))
} else {
None
// find the first occurrence of the meta item inside the catalogs
let meta_item = match selected_catalogs.iter().find_map(|catalog| {
catalog
.content
.as_ref()
.and_then(|content| content.ready())
.and_then(|content| {
content.iter().find(|meta_item| {
&meta_item.preview.id == meta_id && !meta_item.videos.is_empty()
})
})
}) {
Some(meta_item) => meta_item,
_ => {
// try to default to currently existing notifications in the bucket before returning
match notification_items.get(meta_id) {
Some(existing_notifications) if !existing_notifications.is_empty() => {
let filtered_current_notifs = existing_notifications
.iter()
.filter_map(|(video_id, notif_item)| {
// filter by the same requirements as new videos
// to remove videos that no longer match
if should_retail_video_released(
library_item.state.last_watched.as_ref(),
Some(&notif_item.video_released),
)
.is_some()
{
Some((video_id.to_owned(), notif_item.to_owned()))
} else {
None
}
})
.collect();
map.insert(meta_id.to_owned(), filtered_current_notifs);
}
_ => {
// in any other case - skip it, e.g. meta_id not found or empty notifications
}
}

return map;
}
_ => None,
}
})
// We need to manually fold, otherwise the last seen element with a given key
// will be present in the final HashMap instead of the first occurrence.
.fold(
&mut meta_notifs,
|meta_notifs, (meta_id, video_id, video_released)| {
let notif_entry = meta_notifs.entry(video_id.to_owned());
};

// for now just skip same videos that already exist
// leave the first one found in the Vec.
if let Entry::Vacant(new) = notif_entry {
let notification = NotificationItem {
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
video_released,
};
let mut meta_notifs: &mut HashMap<_, _> =
map.entry(meta_id.to_owned()).or_default();

new.insert(notification);
}
// meta items videos
meta_item
.videos_iter()
.filter_map(
|video| match (&library_item.state.last_watched, video.released) {
(Some(last_watched), Some(video_released)) => {
if should_retail_video_released(
Some(last_watched),
Some(&video_released),
)
.is_some()
{
Some((&library_item.id, &video.id, video_released))
} else {
None
}
}
_ => None,
},
)
// We need to manually `fold()` instead of `collect()`,
// otherwise the last seen element with a given key
// will be present in the final HashMap instead of the first occurrence.
.fold(
&mut meta_notifs,
|meta_notifs, (meta_id, video_id, video_released)| {
let notif_entry = meta_notifs.entry(video_id.to_owned());

meta_notifs
},
);
// for now just skip same videos that already exist
// leave the first one found in the Vec.
if let Entry::Vacant(new) = notif_entry {
let notification = NotificationItem {
meta_id: meta_id.to_owned(),
video_id: video_id.to_owned(),
video_released,
};

// if not videos were added and the hashmap is empty, just remove the MetaItem record all together
if meta_notifs.is_empty() {
map.remove(meta_id);
}
new.insert(notification);
}

meta_notifs
},
);

// if not videos were added and the hashmap is empty, just remove the MetaItem record all together
if meta_notifs.is_empty() {
map.remove(meta_id);
}

map
});
map
});

eq_update(notification_items, next_notification_items)
}
Expand Down
3 changes: 3 additions & 0 deletions src/runtime/msg/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ pub enum Internal {
SearchHistoryChanged,
/// User notifications have changed
NotificationsChanged,
/// Pulling of notifications triggered either by the user (with an action) or
/// internally in core.
PullNotifications,
/// Dismiss all Notifications for a given [`MetaItemId`].
///
/// [`MetaItemId`]: crate::types::resource::MetaItemId
Expand Down
Loading

0 comments on commit fdf10ae

Please sign in to comment.