Skip to content

Commit

Permalink
Implement 'Seen' node, in conjunction with SSE allows you to receive …
Browse files Browse the repository at this point in the history
…only new entries
  • Loading branch information
m00nwtchr committed Jul 14, 2024
1 parent 18c9245 commit e427437
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 27 deletions.
8 changes: 4 additions & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ use tokio::sync::{broadcast, Mutex};

use crate::{
config::AppConfig,
flow::{Flow, FlowBuilder},
flow::{node::Data, Flow, FlowBuilder},
route,
};

#[derive(Clone)]
pub struct FlowHandle(Arc<Flow>, broadcast::Sender<Feed>);
pub struct FlowHandle(Arc<Flow>, broadcast::Sender<Data>);
impl FlowHandle {
pub fn new(arc: Arc<Flow>) -> Self {
FlowHandle(arc, broadcast::channel(100).0)
}

pub fn tx(&self) -> &broadcast::Sender<Feed> {
pub fn tx(&self) -> &broadcast::Sender<Data> {
&self.1
}

pub fn subscribe(&self) -> broadcast::Receiver<Feed> {
pub fn subscribe(&self) -> broadcast::Receiver<Data> {
self.1.subscribe()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/flow/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ impl NodeTrait for Filter {
&[DataKind::Feed]
}

#[tracing::instrument(name = "filter_node")]
async fn run(&self) -> anyhow::Result<()> {
let Some(Data::Feed(mut atom)) = self.input.get() else {
return Err(anyhow!(""));
};

let _span = tracing::info_span!("filter_node").entered();
atom.entries.retain(|item| {
let cmp = match self.field {
Field::Author => item.authors().first().map(|p| &p.name),
Expand Down
1 change: 1 addition & 0 deletions src/flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod node;
pub mod retrieve;
#[cfg(feature = "sanitise")]
pub mod sanitise;
pub mod seen;
#[cfg(feature = "wasm")]
pub mod wasm;

Expand Down
15 changes: 12 additions & 3 deletions src/flow/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#![allow(clippy::module_name_repetitions)]
use std::sync::Arc;

use super::feed::Feed;
use crate::{flow::seen::Seen, websub::WebSub};
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -9,9 +11,6 @@ use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use strum::{Display, EnumDiscriminants};

use super::feed::Feed;
use crate::websub::WebSub;

#[cfg(feature = "filter")]
use super::filter::Filter;
#[cfg(feature = "html")]
Expand Down Expand Up @@ -67,6 +66,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.inputs(),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.inputs(),
Self::Seen(n) => n.inputs(),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.inputs(),
Self::Other(n) => n.inputs(),
Expand All @@ -83,6 +83,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.outputs(),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.outputs(),
Self::Seen(n) => n.outputs(),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.outputs(),
Self::Other(n) => n.outputs(),
Expand All @@ -99,6 +100,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.input_types(),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.input_types(),
Self::Seen(n) => n.input_types(),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.input_types(),
Self::Other(n) => n.input_types(),
Expand All @@ -115,6 +117,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.output_types(),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.output_types(),
Self::Seen(n) => n.output_types(),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.output_types(),
Self::Other(n) => n.output_types(),
Expand All @@ -131,6 +134,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.is_dirty(),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.is_dirty(),
Self::Seen(n) => n.is_dirty(),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.is_dirty(),
Self::Other(n) => n.is_dirty(),
Expand All @@ -147,6 +151,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.run().await,
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.run().await,
Self::Seen(n) => n.run().await,
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.run().await,
Self::Other(n) => n.run().await,
Expand All @@ -163,6 +168,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.set_input(index, input),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.set_input(index, input),
Self::Seen(n) => n.set_input(index, input),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.set_input(index, input),
Self::Other(n) => n.set_input(index, input),
Expand All @@ -179,6 +185,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.set_output(index, output),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.set_output(index, output),
Self::Seen(n) => n.set_output(index, output),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.set_output(index, output),
Self::Other(n) => n.set_output(index, output),
Expand All @@ -195,6 +202,7 @@ impl NodeTrait for Node {
Self::Retrieve(n) => n.web_sub(),
#[cfg(feature = "sanitise")]
Self::Sanitise(n) => n.web_sub(),
Self::Seen(n) => n.web_sub(),
#[cfg(feature = "wasm")]
Self::Wasm(n) => n.web_sub(),
Self::Other(n) => n.web_sub(),
Expand All @@ -214,6 +222,7 @@ pub enum Node {
Retrieve(Retrieve),
#[cfg(feature = "sanitise")]
Sanitise(Sanitise),
Seen(Seen),
#[cfg(feature = "wasm")]
#[serde(skip)]
Wasm(Wasm),
Expand Down
87 changes: 87 additions & 0 deletions src/flow/seen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{collections::HashSet, slice, sync::Arc};

use anyhow::anyhow;
use async_trait::async_trait;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};

use super::node::{Data, DataKind, NodeTrait, IO};

#[derive(Serialize, Deserialize, Debug)]
pub struct Seen {
#[serde(default)]
store: Store,

#[serde(skip)]
input: Arc<IO>,
#[serde(skip)]
output: Arc<IO>,
}

impl Seen {
pub fn new() -> Self {
Self {
store: Store::default(),

output: Arc::default(),
input: Arc::default(),
}
}
}

#[async_trait]
impl NodeTrait for Seen {
fn inputs(&self) -> &[Arc<IO>] {
slice::from_ref(&self.input)
}

fn outputs(&self) -> &[Arc<IO>] {
slice::from_ref(&self.input)
}

fn input_types(&self) -> &[DataKind] {
&[DataKind::Feed]
}

fn output_types(&self) -> &[DataKind] {
&[DataKind::Feed]
}

#[tracing::instrument(name = "seen_node")]
async fn run(&self) -> anyhow::Result<()> {
let Some(Data::Feed(mut atom)) = self.input.get() else {
return Err(anyhow!("Input data not available"));
};

if let Store::Internal(seen) = &self.store {

Check failure on line 56 in src/flow/seen.rs

View workflow job for this annotation

GitHub Actions / all

irrefutable `if let` pattern

Check failure on line 56 in src/flow/seen.rs

View workflow job for this annotation

GitHub Actions / all (ubuntu-latest, stable)

irrefutable `if let` pattern

Check failure on line 56 in src/flow/seen.rs

View workflow job for this annotation

GitHub Actions / all (ubuntu-latest, beta)

irrefutable `if let` pattern
let mut seen = seen.lock();

// seen.retain(|id| atom.entries.iter().any(|i| i.id.eq(id)));
atom.entries.retain(|item| seen.insert(item.id.clone()));
}

if !atom.entries.is_empty() {
self.output.accept(atom)?;
}
Ok(())
}

fn set_input(&mut self, _index: usize, input: Arc<IO>) {
self.input = input;
}
fn set_output(&mut self, _index: usize, output: Arc<IO>) {
self.output = output;
}
}

#[derive(Debug, Serialize, Deserialize)]
enum Store {
Internal(#[serde(skip)] Mutex<HashSet<String>>),
// External,
}

impl Default for Store {
fn default() -> Self {
Self::Internal(Mutex::default())
}
}
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#![allow(clippy::module_name_repetitions)]
use std::net::SocketAddr;

use tokio::net::TcpListener;

mod app;
mod config;
mod feed;
Expand All @@ -20,10 +22,8 @@ async fn main() -> anyhow::Result<()> {

let config = AppConfig::load()?;

let listener = tokio::net::TcpListener::bind(SocketAddr::new(config.address, config.port))
.await
.unwrap();
axum::serve(listener, app(config).await?).await.unwrap();
let listener = TcpListener::bind(SocketAddr::new(config.address, config.port)).await?;
axum::serve(listener, app(config).await?).await?;

Ok(())
}
29 changes: 16 additions & 13 deletions src/route/flow.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
use crate::{
app::AppState,
flow::node::{Data, NodeTrait},
route::Atom,
};
use anyhow::anyhow;
use atom_syndication::Feed;
use axum::{
extract::{Path, State},
http::StatusCode,
Expand All @@ -15,10 +8,15 @@ use axum::{
routing::get,
Router,
};
use futures::{stream, Stream};
use std::convert::Infallible;
use futures::Stream;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};

use crate::{
app::AppState,
flow::node::{Data, NodeTrait},
route::Atom,
};

#[tracing::instrument(name = "run_flow_handler", skip(state))]
async fn run(
Path(name): Path<String>,
Expand All @@ -43,7 +41,7 @@ async fn subscribe(
Path(name): Path<String>,
State(state): State<AppState>,
) -> Result<Sse<impl Stream<Item = anyhow::Result<Event>>>, StatusCode> {
let Some((flow, rx)) = state
let Some((_flow, rx)) = state
.flows
.lock()
.await
Expand All @@ -54,11 +52,16 @@ async fn subscribe(
};

let stream = BroadcastStream::new(rx).map(|res| {
let feed = res?;
// let entries = res.map(|d| {
// if let Data::Feed(feed) = d {
// Data::Vec(feed.entries.into_iter().map(Data::Entry).collect())
// } else {
// d
// }
// });

Ok(Event::default().json_data(&feed)?)
Ok(Event::default().json_data(res?)?)
});

Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}

Expand Down
13 changes: 11 additions & 2 deletions src/route/websub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,17 @@ pub async fn receive(

tokio::spawn(async move {
if let Ok(_) = flow.run().await {
if let Some(Data::Feed(feed)) = flow.result() {
let _ = flow.tx().send(feed);
if let Some(data) = flow.result() {
match data {
Data::Feed(feed) => {
for entry in feed.entries.into_iter().rev() {
let _ = flow.tx().send(Data::Entry(entry));
}
}
_ => {
let _ = flow.tx().send(data);
}
}
}
}
});
Expand Down
45 changes: 45 additions & 0 deletions src/websub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::anyhow;
use rand::{distributions::Uniform, Rng};
use serde::{Deserialize, Serialize};
use sqlx::SqliteConnection;
use std::str::FromStr;
use uuid::{NoContext, Timestamp, Uuid};

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -94,3 +96,46 @@ impl WebSub {
Ok(())
}
}

impl FromStr for WebSub {
type Err = anyhow::Error;

fn from_str(header: &str) -> Result<Self, Self::Err> {
let mut hub = None;
let mut topic = None;

// Split the header into individual link parts
for part in header.split(',') {
let segments: Vec<&str> = part.trim().split(';').collect();
if segments.len() < 2 {
continue;
}

let url_part = segments[0].trim();
let rel_part = segments[1].trim();

if !url_part.starts_with('<') || !url_part.ends_with('>') {
continue;
}

// Extract the URL and rel values
let url = &url_part[1..url_part.len() - 1];
let rel = rel_part
.split('=')
.nth(1)
.map(|s| s.trim_matches('"'))
.unwrap_or("");

match rel {
"hub" => hub = Some(url.to_string()),
"self" => topic = Some(url.to_string()),
_ => (),
}
}

Ok(WebSub {
topic: topic.ok_or_else(|| anyhow!(""))?,
hub: hub.ok_or_else(|| anyhow!(""))?,
})
}
}

0 comments on commit e427437

Please sign in to comment.