Skip to content

Commit

Permalink
fix(filter): handle feed errors for merge filter (#150)
Browse files Browse the repository at this point in the history
Fix #143.

For `merge` filter, feeds that failed to fetch will no longer fail the
entire endpoint. Instead an error feed item is appended to the
endpoint's output.
  • Loading branch information
shouya authored Sep 20, 2024
1 parent bf8610a commit 34c7176
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 11 deletions.
12 changes: 12 additions & 0 deletions src/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ impl Feed {
}
}
}

#[allow(clippy::field_reassign_with_default)]
pub fn add_post(&mut self, post_preview: PostPreview) {
match self {
Feed::Rss(channel) => {
channel.items.push(post_preview.into_rss_item());
}
Feed::Atom(feed) => {
feed.entries.push(post_preview.into_atom_entry());
}
};
}
}

#[cfg(test)]
Expand Down
51 changes: 49 additions & 2 deletions src/feed/preview.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,66 @@
use chrono::{DateTime, FixedOffset};
use serde::Serialize;

#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Default)]
pub struct FeedPreview {
pub title: String,
pub link: String,
pub description: Option<String>,
pub posts: Vec<PostPreview>,
}

#[derive(Debug, Serialize, PartialEq, Eq, Hash)]
#[derive(Debug, Serialize, PartialEq, Eq, Hash, Default)]
pub struct PostPreview {
pub title: String,
pub author: Option<String>,
pub link: String,
pub body: Option<String>,
pub date: Option<DateTime<FixedOffset>>,
}

impl PostPreview {
pub fn into_rss_item(self) -> rss::Item {
let guid = rss::Guid {
value: self.link.clone(),
permalink: true,
};

rss::Item {
title: Some(self.title),
link: Some(self.link),
description: self.body,
pub_date: self.date.map(|d| d.to_rfc3339()),
author: self.author,
guid: Some(guid),
..Default::default()
}
}

pub fn into_atom_entry(self) -> atom_syndication::Entry {
atom_syndication::Entry {
title: atom_syndication::Text::plain(self.title),
id: self.link.clone(),
links: vec![atom_syndication::Link {
href: self.link,
..Default::default()
}],
authors: self
.author
.into_iter()
.map(|a| atom_syndication::Person {
name: a,
..Default::default()
})
.collect(),
updated: self
.date
.unwrap_or_else(|| chrono::Utc::now().fixed_offset()),
published: self.date,
content: self.body.map(|b| atom_syndication::Content {
value: Some(b),
..Default::default()
}),
..Default::default()
}
}
}
96 changes: 87 additions & 9 deletions src/filter/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

use crate::client::{Client, ClientConfig};
use crate::feed::Feed;
use crate::feed::{Feed, PostPreview};
use crate::filter_pipeline::{FilterPipeline, FilterPipelineConfig};
use crate::source::{SimpleSourceConfig, Source};
use crate::util::{ConfigError, Result, SingleOrVec};
use crate::util::{ConfigError, Error, Result, SingleOrVec};

use super::{FeedFilter, FeedFilterConfig, FilterContext};

Expand Down Expand Up @@ -107,36 +107,97 @@ pub struct Merge {
}

impl Merge {
async fn fetch_sources(&self, ctx: &FilterContext) -> Result<Vec<Feed>> {
stream::iter(self.sources.clone())
async fn fetch_sources(
&self,
ctx: &FilterContext,
) -> Result<(Vec<Feed>, Vec<(Source, Error)>)> {
let iter = stream::iter(self.sources.clone())
.map(|source: Source| {
let client = &self.client;
async move {
let feed = source.fetch_feed(ctx, Some(client)).await?;
Ok(feed)
let fetch_res = source.fetch_feed(ctx, Some(client)).await;
(source, fetch_res)
}
})
.buffered(self.parallelism)
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<Feed>>>()
.into_iter();

collect_partial_oks(iter)
}
}

#[async_trait::async_trait]
impl FeedFilter for Merge {
async fn run(&self, ctx: &mut FilterContext, mut feed: Feed) -> Result<Feed> {
for new_feed in self.fetch_sources(ctx).await? {
let (new_feeds, errors) = self.fetch_sources(ctx).await?;

for new_feed in new_feeds {
let ctx = ctx.subcontext();
let filtered_new_feed = self.filters.run(ctx, new_feed).await?;
feed.merge(filtered_new_feed)?;
}

for (source, error) in errors {
let post = post_from_error(source, error, ctx);
feed.add_post(post);
}

feed.reorder();
Ok(feed)
}
}

fn post_from_error(
source: Source,
error: Error,
ctx: &FilterContext,
) -> PostPreview {
let source_url = source.full_url(ctx).map(|u| u.to_string());
let title = match source_url.as_ref() {
Some(url) => format!("Error fetching source: {}", url),
None => "Error: Failed fetching source".to_owned(),
};
let source_desc = source_url
.clone()
.unwrap_or_else(|| format!("{:?}", source));

let body = format!(
"<p><b>Source:</b><br>{source_desc}</p><p><b>Error:</b><br>{error}</p>"
);

PostPreview {
title,
link: source_url.unwrap_or_default(),
author: Some("rss-funnel".to_owned()),
body: Some(body),
date: Some(chrono::Utc::now().fixed_offset()),
}
}

// Return Err only if all results are Err. Otherwise return both
// succeeded results (Vec<T>) and failed results (Vec<(S, E)>).
#[allow(clippy::type_complexity)]
fn collect_partial_oks<S, T, E>(
iter: impl Iterator<Item = (S, Result<T, E>)>,
) -> Result<(Vec<T>, Vec<(S, E)>), E> {
let mut oks = Vec::new();
let mut errs = Vec::new();
for (source, res) in iter {
match res {
Ok(ok) => oks.push(ok),
Err(err) => errs.push((source, err)),
}
}

if oks.is_empty() && !errs.is_empty() {
Err(errs.pop().map(|(_, e)| e).unwrap())
} else {
Ok((oks, errs))
}
}

#[cfg(test)]
mod test {
use crate::test_utils::fetch_endpoint;
Expand Down Expand Up @@ -206,4 +267,21 @@ mod test {
assert_eq!(titles.len(), 3);
}
}

#[test]
fn test_partial_collect() {
let results = vec![
(1, Ok(1)),
(2, Err("error2")),
(3, Ok(3)),
(4, Err("error4")),
];
let (oks, errs) = super::collect_partial_oks(results.into_iter()).unwrap();
assert_eq!(oks, vec![1, 3]);
assert_eq!(errs, vec![(2, "error2"), (4, "error4")]);

let results = vec![(1, Err::<(), _>("error1")), (2, Err("error2"))];
let err = super::collect_partial_oks(results.into_iter()).unwrap_err();
assert_eq!(err, "error2");
}
}
13 changes: 13 additions & 0 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,19 @@ impl Source {
}
}
}

pub fn full_url(&self, ctx: &FilterContext) -> Option<Url> {
match self {
Source::Dynamic => ctx.source().cloned(),
Source::AbsoluteUrl(url) => Some(url.clone()),
Source::RelativeUrl(path) => ctx
.base_expected()
.ok()
.map(|base| base.join(path).expect("failed to join base and path")),
Source::FromScratch(_) => None,
Source::Templated(_) => None,
}
}
}

fn split_with_delimiter<'a>(
Expand Down

0 comments on commit 34c7176

Please sign in to comment.