From 34c71765ffd74e7db892b8c6471f55c3daaf1718 Mon Sep 17 00:00:00 2001 From: shouya <526598+shouya@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:47:44 +0900 Subject: [PATCH] fix(filter): handle feed errors for merge filter (#150) Fix https://github.com/shouya/rss-funnel/issues/143. For `merge` filter, feeds that failed to fetch will no longer fail the entire endpoint. Instead an error feed item is appended to the endpoint's output. --- src/feed.rs | 12 ++++++ src/feed/preview.rs | 51 +++++++++++++++++++++++- src/filter/merge.rs | 96 ++++++++++++++++++++++++++++++++++++++++----- src/source.rs | 13 ++++++ 4 files changed, 161 insertions(+), 11 deletions(-) diff --git a/src/feed.rs b/src/feed.rs index 4f149f3..3d49ea8 100644 --- a/src/feed.rs +++ b/src/feed.rs @@ -255,6 +255,18 @@ impl Feed { } } } + + #[allow(clippy::field_reassign_with_default)] + pub fn add_post(&mut self, post_preview: PostPreview) { + match self { + Feed::Rss(channel) => { + channel.items.push(post_preview.into_rss_item()); + } + Feed::Atom(feed) => { + feed.entries.push(post_preview.into_atom_entry()); + } + }; + } } #[cfg(test)] diff --git a/src/feed/preview.rs b/src/feed/preview.rs index 0fa1aa9..b23bead 100644 --- a/src/feed/preview.rs +++ b/src/feed/preview.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, FixedOffset}; use serde::Serialize; -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Default)] pub struct FeedPreview { pub title: String, pub link: String, @@ -9,7 +9,7 @@ pub struct FeedPreview { pub posts: Vec, } -#[derive(Debug, Serialize, PartialEq, Eq, Hash)] +#[derive(Debug, Serialize, PartialEq, Eq, Hash, Default)] pub struct PostPreview { pub title: String, pub author: Option, @@ -17,3 +17,50 @@ pub struct PostPreview { pub body: Option, pub date: Option>, } + +impl PostPreview { + pub fn into_rss_item(self) -> rss::Item { + let guid = rss::Guid { + value: self.link.clone(), + permalink: true, + }; + + rss::Item { + title: Some(self.title), + link: Some(self.link), + description: self.body, + pub_date: self.date.map(|d| d.to_rfc3339()), + author: self.author, + guid: Some(guid), + ..Default::default() + } + } + + pub fn into_atom_entry(self) -> atom_syndication::Entry { + atom_syndication::Entry { + title: atom_syndication::Text::plain(self.title), + id: self.link.clone(), + links: vec![atom_syndication::Link { + href: self.link, + ..Default::default() + }], + authors: self + .author + .into_iter() + .map(|a| atom_syndication::Person { + name: a, + ..Default::default() + }) + .collect(), + updated: self + .date + .unwrap_or_else(|| chrono::Utc::now().fixed_offset()), + published: self.date, + content: self.body.map(|b| atom_syndication::Content { + value: Some(b), + ..Default::default() + }), + ..Default::default() + } + } +} diff --git a/src/filter/merge.rs b/src/filter/merge.rs index bc0eefa..b61e8ad 100644 --- a/src/filter/merge.rs +++ b/src/filter/merge.rs @@ -5,10 +5,10 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use crate::client::{Client, ClientConfig}; -use crate::feed::Feed; +use crate::feed::{Feed, PostPreview}; use crate::filter_pipeline::{FilterPipeline, FilterPipelineConfig}; use crate::source::{SimpleSourceConfig, Source}; -use crate::util::{ConfigError, Result, SingleOrVec}; +use crate::util::{ConfigError, Error, Result, SingleOrVec}; use super::{FeedFilter, FeedFilterConfig, FilterContext}; @@ -107,36 +107,97 @@ pub struct Merge { } impl Merge { - async fn fetch_sources(&self, ctx: &FilterContext) -> Result> { - stream::iter(self.sources.clone()) + async fn fetch_sources( + &self, + ctx: &FilterContext, + ) -> Result<(Vec, Vec<(Source, Error)>)> { + let iter = stream::iter(self.sources.clone()) .map(|source: Source| { let client = &self.client; async move { - let feed = source.fetch_feed(ctx, Some(client)).await?; - Ok(feed) + let fetch_res = source.fetch_feed(ctx, Some(client)).await; + (source, fetch_res) } }) .buffered(self.parallelism) .collect::>() .await - .into_iter() - .collect::>>() + .into_iter(); + + collect_partial_oks(iter) } } #[async_trait::async_trait] impl FeedFilter for Merge { async fn run(&self, ctx: &mut FilterContext, mut feed: Feed) -> Result { - for new_feed in self.fetch_sources(ctx).await? { + let (new_feeds, errors) = self.fetch_sources(ctx).await?; + + for new_feed in new_feeds { let ctx = ctx.subcontext(); let filtered_new_feed = self.filters.run(ctx, new_feed).await?; feed.merge(filtered_new_feed)?; } + + for (source, error) in errors { + let post = post_from_error(source, error, ctx); + feed.add_post(post); + } + feed.reorder(); Ok(feed) } } +fn post_from_error( + source: Source, + error: Error, + ctx: &FilterContext, +) -> PostPreview { + let source_url = source.full_url(ctx).map(|u| u.to_string()); + let title = match source_url.as_ref() { + Some(url) => format!("Error fetching source: {}", url), + None => "Error: Failed fetching source".to_owned(), + }; + let source_desc = source_url + .clone() + .unwrap_or_else(|| format!("{:?}", source)); + + let body = format!( + "

Source:
{source_desc}

Error:
{error}

" + ); + + PostPreview { + title, + link: source_url.unwrap_or_default(), + author: Some("rss-funnel".to_owned()), + body: Some(body), + date: Some(chrono::Utc::now().fixed_offset()), + } +} + +// Return Err only if all results are Err. Otherwise return both +// succeeded results (Vec) and failed results (Vec<(S, E)>). +#[allow(clippy::type_complexity)] +fn collect_partial_oks( + iter: impl Iterator)>, +) -> Result<(Vec, Vec<(S, E)>), E> { + let mut oks = Vec::new(); + let mut errs = Vec::new(); + for (source, res) in iter { + match res { + Ok(ok) => oks.push(ok), + Err(err) => errs.push((source, err)), + } + } + + if oks.is_empty() && !errs.is_empty() { + Err(errs.pop().map(|(_, e)| e).unwrap()) + } else { + Ok((oks, errs)) + } +} + #[cfg(test)] mod test { use crate::test_utils::fetch_endpoint; @@ -206,4 +267,21 @@ mod test { assert_eq!(titles.len(), 3); } } + + #[test] + fn test_partial_collect() { + let results = vec![ + (1, Ok(1)), + (2, Err("error2")), + (3, Ok(3)), + (4, Err("error4")), + ]; + let (oks, errs) = super::collect_partial_oks(results.into_iter()).unwrap(); + assert_eq!(oks, vec![1, 3]); + assert_eq!(errs, vec![(2, "error2"), (4, "error4")]); + + let results = vec![(1, Err::<(), _>("error1")), (2, Err("error2"))]; + let err = super::collect_partial_oks(results.into_iter()).unwrap_err(); + assert_eq!(err, "error2"); + } } diff --git a/src/source.rs b/src/source.rs index 3170918..cd0181e 100644 --- a/src/source.rs +++ b/src/source.rs @@ -274,6 +274,19 @@ impl Source { } } } + + pub fn full_url(&self, ctx: &FilterContext) -> Option { + match self { + Source::Dynamic => ctx.source().cloned(), + Source::AbsoluteUrl(url) => Some(url.clone()), + Source::RelativeUrl(path) => ctx + .base_expected() + .ok() + .map(|base| base.join(path).expect("failed to join base and path")), + Source::FromScratch(_) => None, + Source::Templated(_) => None, + } + } } fn split_with_delimiter<'a>(