Skip to content

Commit

Permalink
subscriptions refactor + misc
Browse files Browse the repository at this point in the history
  • Loading branch information
m00nwtchr committed Sep 11, 2024
1 parent f24db51 commit 8ddec60
Show file tree
Hide file tree
Showing 17 changed files with 413 additions and 299 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ scraper = { version = "0.20", optional = true }
serde_regex = { version = "1", optional = true }
wasmtime = { version = "23", optional = true }
wasmtime-wasi = { version = "23", optional = true }
inventory = "0.3.15"
32 changes: 16 additions & 16 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, ops::Deref, sync::Arc};

use axum::{extract::FromRef, routing::get, Router};
use axum::{extract::FromRef, http::StatusCode, routing::get, Router};
use futures::StreamExt;
use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
Expand All @@ -12,9 +12,10 @@ use tower_http::trace::TraceLayer;
use url::Url;

use crate::{
config::AppConfig,
config::{config, AppConfig},

Check failure on line 15 in src/app.rs

View workflow job for this annotation

GitHub Actions / all

unused import: `AppConfig`

Check failure on line 15 in src/app.rs

View workflow job for this annotation

GitHub Actions / all (ubuntu-latest, stable)

unused import: `AppConfig`

Check failure on line 15 in src/app.rs

View workflow job for this annotation

GitHub Actions / all (ubuntu-latest, beta)

unused import: `AppConfig`
flow::{node::Data, Flow, FlowBuilder},
route,
route, subscriber,
subscriber::websub::WebSubSubscriber,
};

#[derive(Clone)]
Expand Down Expand Up @@ -45,7 +46,8 @@ impl Deref for FlowHandle {
pub struct AppStateInner {
pub flows: Mutex<HashMap<String, FlowHandle>>,
pub pool: SqlitePool,
pub config: Arc<AppConfig>,

pub web_sub_subscriber: WebSubSubscriber,
}

#[derive(Clone)]
Expand All @@ -67,12 +69,6 @@ impl FromRef<AppState> for SqlitePool {
}
}

impl FromRef<AppState> for Arc<AppConfig> {
fn from_ref(input: &AppState) -> Self {
input.config.clone()
}
}

fn load_flow(content: &str) -> anyhow::Result<Flow> {
let flow: FlowBuilder = serde_json::de::from_str(content)?;

Expand All @@ -86,7 +82,8 @@ pub async fn websub_check(public_url: &Url) -> anyhow::Result<()> {
Ok(())
}

pub async fn app(config: AppConfig) -> anyhow::Result<Router> {
pub async fn app() -> anyhow::Result<Router> {
let config = config().await;
let pool = SqlitePoolOptions::new()
.connect_with(
SqliteConnectOptions::new()
Expand Down Expand Up @@ -115,17 +112,20 @@ pub async fn app(config: AppConfig) -> anyhow::Result<Router> {
.collect()
.await;

let web_sub_subscriber = WebSubSubscriber::new(pool.clone());
let state = AppState(Arc::new(AppStateInner {
flows: Mutex::new(flows),
pool,
config: Arc::new(config),
web_sub_subscriber,
}));

Ok(Router::new()
let router = Router::new()
.nest("/api", route::api())
.nest("/websub", route::websub())
.nest("/flow", route::flow())
.route("/", get(|| async { "Hello, World!".to_string() }))
.route("/", get(|| async { StatusCode::OK }))
.nest("/websub", subscriber::websub::router())
.with_state(state)
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())))
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()));

Ok(router)
}
19 changes: 15 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::IpAddr;

use confique::Config;
use tokio::sync::OnceCell;
use url::Url;

#[derive(Config)]
Expand All @@ -18,8 +19,18 @@ pub struct AppConfig {
pub public_url: Option<Url>,
}

impl AppConfig {
pub fn load() -> anyhow::Result<AppConfig> {
Ok(Config::builder().env().file("rssflow.toml").load()?)
}
pub static CONFIG: OnceCell<AppConfig> = OnceCell::const_new();

async fn init() -> AppConfig {
// dotenv().ok();

Config::builder()
.env()
.file("rssflow.toml")
.load()
.expect("")
}

pub async fn config() -> &'static AppConfig {
CONFIG.get_or_init(init).await
}
2 changes: 1 addition & 1 deletion src/flow/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde_with::{serde_as, DurationSeconds};
use url::Url;

use super::node::{Data, DataKind, NodeTrait, IO};
use crate::websub::WebSub;
use crate::subscriber::websub::WebSub;

fn mutex_now() -> Mutex<Instant> {
Mutex::new(Instant::now())
Expand Down
2 changes: 1 addition & 1 deletion src/flow/html.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde_with::{serde_as, DurationSeconds};
use url::Url;

use super::node::{Data, DataKind, NodeTrait, IO};
use crate::websub::WebSub;
use crate::subscriber::websub::WebSub;

fn mutex_now() -> Mutex<Instant> {
Mutex::new(Instant::now())
Expand Down
32 changes: 26 additions & 6 deletions src/flow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod wasm;

use node::{Data, DataKind, Node, NodeTrait, IO};

use crate::websub::WebSub;
use crate::subscriber::websub::WebSub;

#[inline]
fn feed_io() -> Arc<IO> {
Expand All @@ -40,7 +40,7 @@ fn feed_arr<const N: usize>() -> [Arc<IO>; N] {
pub struct Flow {
nodes: Mutex<Vec<Node>>,

web_sub: parking_lot::Mutex<Option<WebSub>>,
subscriptions: parking_lot::Mutex<Vec<WebSub>>,
inputs: Box<[Arc<IO>]>,
outputs: Box<[Arc<IO>]>,
}
Expand All @@ -49,6 +49,14 @@ impl Flow {
pub fn result(&self) -> Option<Data> {
self.outputs.first()?.get()
}

pub fn subscriptions(&self) -> Vec<WebSub> {
self.subscriptions.lock().clone()
}

pub fn has_subscriptions(&self) -> bool {
!self.subscriptions.lock().is_empty()
}
}

#[async_trait]
Expand All @@ -72,6 +80,12 @@ impl NodeTrait for Flow {
async fn run(&self) -> anyhow::Result<()> {
// TODO: Run nodes in order based on input/output dependencies, run adjacent nodes concurrently.

let mut subscriptions: Option<Vec<WebSub>> = if self.subscriptions.lock().is_empty() {
Some(Vec::new())
} else {
None
};

let nodes = self.nodes.lock().await;
for node in nodes.iter() {
if node.is_dirty() {
Expand All @@ -82,11 +96,17 @@ impl NodeTrait for Flow {
for io in inputs.iter().filter(|i| i.is_dirty()) {
io.clear();
}

if let Some(subscriptions) = &mut subscriptions {
if let Some(sub) = node.web_sub() {
subscriptions.push(sub);
}
}
}
}

if let Some(web_sub) = nodes.first().and_then(NodeTrait::web_sub) {
self.web_sub.lock().replace(web_sub);
if let Some(subscriptions) = subscriptions {
*self.subscriptions.lock() = subscriptions;
}

Ok(())
Expand All @@ -100,7 +120,7 @@ impl NodeTrait for Flow {
}

fn web_sub(&self) -> Option<WebSub> {
self.web_sub.lock().clone()
self.subscriptions.lock().first().cloned()
}
}

Expand Down Expand Up @@ -198,7 +218,7 @@ impl FlowBuilder {
nodes: Mutex::new(self.nodes),
inputs,
outputs,
web_sub: parking_lot::Mutex::default(),
subscriptions: parking_lot::Mutex::default(),
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/flow/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use strum::{Display, EnumDiscriminants};

use crate::{
flow::{ai::AI, seen::Seen},
websub::WebSub,
};
use crate::flow::{ai::AI, seen::Seen};
use crate::subscriber::websub::WebSub;

#[async_trait]
#[enum_dispatch]
Expand Down
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ mod config;
mod feed;
mod flow;
mod route;
mod websub;
mod subscriber;

use crate::{
app::{app, websub_check},
config::AppConfig,
config::config,
};

#[global_allocator]
Expand All @@ -23,7 +23,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let config = AppConfig::load()?;
let config = config().await;

let listener = TcpListener::bind(SocketAddr::new(config.address, config.port)).await?;
if let Some(public_url) = &config.public_url {
Expand All @@ -35,7 +35,7 @@ async fn main() -> anyhow::Result<()> {
}
});
}
axum::serve(listener, app(config).await?).await?;
axum::serve(listener, app().await?).await?;

Ok(())
}
Loading

0 comments on commit 8ddec60

Please sign in to comment.