Skip to content

Commit

Permalink
Check for dead federated instances (fixes #2221)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nutomic committed Jun 30, 2023
1 parent 3159eed commit 914585c
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 15 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ url_serde = "0.2.0"
reqwest = { version = "0.11.18", features = ["json", "blocking"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = "0.4.4"
clokwerk = "0.3.5"
clokwerk = "0.4.0"
doku = { version = "0.21.1", features = ["url-2"] }
bcrypt = "0.13.0"
chrono = { version = "0.4.26", features = ["serde"], default-features = false }
Expand Down Expand Up @@ -147,4 +147,5 @@ rustls = { workspace = true }
futures-util = { workspace = true }
tokio-postgres = { workspace = true }
tokio-postgres-rustls = { workspace = true }
chrono = { workspace = true }
chrono = { workspace = true }
once_cell = { workspace = true }
1 change: 1 addition & 0 deletions crates/apub/src/activities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ where
ActorT: Actor,
Activity: ActivityHandler<Error = LemmyError>,
{
// TODO: find a way to retrieve Site.is_alive here
info!("Sending activity {}", activity.id().to_string());
let activity = WithContext::new(activity, CONTEXT.deref().clone());

Expand Down
1 change: 1 addition & 0 deletions crates/apub/src/objects/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl Object for ApubSite {
public_key: Some(apub.public_key.public_key_pem.clone()),
private_key: None,
instance_id: instance.id,
is_alive: Some(true),
};
let languages = LanguageTag::to_language_id_multiple(apub.language, data.pool()).await?;

Expand Down
1 change: 0 additions & 1 deletion crates/db_schema/src/impls/site.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ impl Site {
)
}

// TODO this needs fixed
pub async fn read_remote_sites(pool: &DbPool) -> Result<Vec<Self>, Error> {
let conn = &mut get_conn(pool).await?;
site.order_by(id).offset(1).get_results::<Self>(conn).await
Expand Down
1 change: 1 addition & 0 deletions crates/db_schema/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,7 @@ diesel::table! {
private_key -> Nullable<Text>,
public_key -> Text,
instance_id -> Int4,
is_alive -> Bool,
}
}

Expand Down
3 changes: 3 additions & 0 deletions crates/db_schema/src/source/site.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct Site {
pub private_key: Option<String>,
pub public_key: String,
pub instance_id: InstanceId,
pub is_alive: bool,
}

#[derive(Clone, TypedBuilder)]
Expand All @@ -56,6 +57,7 @@ pub struct SiteInsertForm {
pub public_key: Option<String>,
#[builder(!default)]
pub instance_id: InstanceId,
pub is_alive: Option<bool>,
}

#[derive(Clone, TypedBuilder)]
Expand All @@ -75,4 +77,5 @@ pub struct SiteUpdateForm {
pub inbox_url: Option<DbUrl>,
pub private_key: Option<Option<String>>,
pub public_key: Option<String>,
pub is_alive: Option<bool>,
}
1 change: 1 addition & 0 deletions migrations/2023-06-30-111745_instance_alive_check/down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table site drop column is_alive;
1 change: 1 addition & 0 deletions migrations/2023-06-30-111745_instance_alive_check/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table site add column is_alive bool not null default true;
58 changes: 48 additions & 10 deletions src/scheduled_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::NaiveDateTime;
use clokwerk::{Scheduler, TimeUnits as CTimeUnits};
use clokwerk::{AsyncScheduler, Scheduler, TimeUnits as CTimeUnits};
use diesel::{
dsl::{now, IntervalDsl},
sql_types::{Integer, Timestamp},
Expand All @@ -14,21 +14,26 @@ use diesel::{sql_query, PgConnection, RunQueryDsl};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
schema::{activity, captcha_answer, comment, community_person_ban, instance, person, post},
source::instance::{Instance, InstanceForm},
source::{
instance::{Instance, InstanceForm},
site::{Site, SiteUpdateForm},
},
traits::Crud,
utils::{naive_now, DELETED_REPLACEMENT_TEXT},
};
use lemmy_routes::nodeinfo::NodeInfo;
use lemmy_utils::{error::LemmyError, REQWEST_TIMEOUT};
use reqwest::blocking::Client;
use lemmy_utils::{
error::{LemmyError, LemmyResult},
REQWEST_TIMEOUT,
};
use reqwest::{blocking::Client, StatusCode};
use std::{thread, time::Duration};
use tokio::sync::OnceCell;
use tracing::{error, info};
use url::Url;

/// Schedules various cleanup tasks for lemmy in a background thread
pub fn setup(
db_url: String,
user_agent: String,
context_1: LemmyContext,
) -> Result<(), LemmyError> {
pub fn setup(db_url: String, user_agent: String, context: LemmyContext) -> Result<(), LemmyError> {
// Setup the connections
let mut scheduler = Scheduler::new();

Expand Down Expand Up @@ -64,9 +69,10 @@ pub fn setup(
});

// Remove old rate limit buckets after 1 to 2 hours of inactivity
let context_ = context.clone();
scheduler.every(CTimeUnits::hour(1)).run(move || {
let hour = Duration::from_secs(3600);
context_1.settings_updated_channel().remove_older_than(hour);
context_.settings_updated_channel().remove_older_than(hour);
});

// Overwrite deleted & removed posts and comments every day
Expand All @@ -82,8 +88,22 @@ pub fn setup(
update_instance_software(&mut conn, &user_agent);
});

let mut async_scheduler = AsyncScheduler::new();

// Check for dead federated instances
static CONTEXT: OnceCell<LemmyContext> = OnceCell::const_new();
CONTEXT.set(context).ok();
async_scheduler.every(CTimeUnits::minutes(1)).run(|| async {
// TODO: this is not getting executed for some reason. change to daily once working
check_dead_instances(CONTEXT.get().unwrap())
.await
.map_err(|e| error!("Failed to check federated instances: {e}"))
.ok();
});

// Manually run the scheduler in an event loop
loop {
async_scheduler.run_pending();
scheduler.run_pending();
thread::sleep(Duration::from_millis(1000));
}
Expand Down Expand Up @@ -381,6 +401,24 @@ fn update_instance_software(conn: &mut PgConnection, user_agent: &str) {
}
}

async fn check_dead_instances(context: &LemmyContext) -> LemmyResult<()> {
info!("Checking if federated instances are alive");
let instances = Site::read_remote_sites(context.pool()).await?;

for i in instances {
let url: Url = i.actor_id.into();
let res = context.client().get(url).send().await;
let is_alive = match res {
Ok(o) => o.status() == StatusCode::OK,
Err(_) => false,
};
let form = SiteUpdateForm::builder().is_alive(Some(is_alive)).build();
Site::update(context.pool(), i.id, &form).await?;
}
info!("Finished checking if federated instances are alive");
Ok(())
}

#[cfg(test)]
mod tests {
use lemmy_routes::nodeinfo::NodeInfo;
Expand Down

0 comments on commit 914585c

Please sign in to comment.