Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent, performant, reliable federation queue #3605

Merged
merged 72 commits into from
Sep 9, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
4506309
persistent activity queue
phiresky Jul 23, 2023
5521204
fixes
phiresky Jul 23, 2023
a2e3fc8
fixes
phiresky Jul 23, 2023
c19211f
make federation workers function callable from outside
phiresky Jul 23, 2023
0d6042e
log federation instances
phiresky Jul 23, 2023
2d086e8
dead instance detection not needed here
phiresky Jul 23, 2023
b27277c
taplo fmt
phiresky Jul 23, 2023
bfa1e57
split federate bin/lib
phiresky Jul 24, 2023
3f51168
minor fix
phiresky Jul 24, 2023
080c380
better logging
phiresky Jul 24, 2023
82c2243
log
phiresky Jul 24, 2023
14479ce
create struct to hold cancellable task for readability
phiresky Jul 30, 2023
59f08d8
use boxfuture for readability
phiresky Jul 30, 2023
11e4c45
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Jul 30, 2023
fceec6a
reset submodule
phiresky Jul 30, 2023
2d3ad1b
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Aug 2, 2023
bddcbab
fix
phiresky Aug 2, 2023
e6e96a7
fix lint
phiresky Aug 2, 2023
0f77534
swap
phiresky Aug 2, 2023
ef60dc0
remove json column, use separate array columns instead
phiresky Aug 3, 2023
7f82bd0
some review comments
phiresky Aug 3, 2023
7e72ad8
make worker a struct for readability
phiresky Aug 3, 2023
de50f37
minor readability
phiresky Aug 3, 2023
5c686f7
add local filter to community follower view
phiresky Aug 3, 2023
8a6b3c2
remove separate lemmy_federate entry point
phiresky Aug 3, 2023
c2ec41f
fix remaining duration
phiresky Aug 3, 2023
bdbb499
address review comments mostly
phiresky Aug 10, 2023
574e23c
Merge branch 'main' into persistent-queue
dessalines Aug 22, 2023
eb761bb
fix lint
phiresky Aug 22, 2023
5b4ecfb
upgrade actitypub-fed to simpler interface
phiresky Aug 22, 2023
b99a93b
Merge branch 'main' into persistent-queue
dessalines Aug 22, 2023
9425106
fix sql format
phiresky Aug 23, 2023
7fe2724
increase delays a bit
phiresky Aug 23, 2023
10e4940
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Aug 24, 2023
9c79347
fixes after merge
phiresky Aug 24, 2023
49fa1b2
remove selectable
phiresky Aug 26, 2023
68b69ee
fix instance selectable
phiresky Aug 26, 2023
b526727
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Aug 29, 2023
062e398
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Aug 29, 2023
6e445ac
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Aug 31, 2023
c654cee
add comment
phiresky Sep 1, 2023
284ebb3
start federation based on latest id at the time
phiresky Sep 1, 2023
557576a
rename federate process args
phiresky Sep 1, 2023
e945e9f
dead instances in one query
phiresky Sep 1, 2023
884307a
filter follow+report activities by local
phiresky Sep 1, 2023
2767ab4
remove synchronous federation
phiresky Sep 1, 2023
44703e7
lint
phiresky Sep 1, 2023
77b8adb
fix federation tests by waiting for results to change
phiresky Sep 3, 2023
77219ff
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Sep 4, 2023
769a1f5
fix fed test
phiresky Sep 5, 2023
b62d520
fix comment report
phiresky Sep 5, 2023
8cb6408
wait some more
phiresky Sep 5, 2023
3000809
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Sep 6, 2023
8cb8880
Apply suggestions from code review
phiresky Sep 6, 2023
21e3f7e
Merge remote-tracking branch 'mine/persistent-queue' into persistent-…
phiresky Sep 6, 2023
8e0f4b9
fix most remaining tests
phiresky Sep 6, 2023
9d96070
wait until private messages
phiresky Sep 6, 2023
838bb42
fix community tests
phiresky Sep 6, 2023
a1a925d
Merge remote-tracking branch 'origin/main' into persistent-queue
phiresky Sep 7, 2023
603c53f
fix community tests
phiresky Sep 7, 2023
d10660e
move arg parse
phiresky Sep 7, 2023
35a408d
use instance_id instead of domain in federation_queue_state table
phiresky Sep 8, 2023
e9f2303
Merge branch 'main' into persistent-queue
dessalines Sep 8, 2023
5c10654
fix tests hopefully
phiresky Sep 9, 2023
f70a02d
.then
phiresky Sep 9, 2023
0746073
move create http server to separate function
phiresky Sep 9, 2023
4d7c984
move prom metrics
phiresky Sep 9, 2023
fe34df0
fix test
phiresky Sep 9, 2023
156d441
restart federation worker when it errors
phiresky Sep 9, 2023
e84bbd4
don't fail worker if a single activity cannot be sent
phiresky Sep 9, 2023
6e0bc61
clippy
phiresky Sep 9, 2023
4537944
no unwrap
phiresky Sep 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 169 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ members = [
"crates/db_views_actor",
"crates/db_views_actor",
"crates/routes",
"crates/federate",
]

[workspace.dependencies]
Expand All @@ -69,7 +70,7 @@ lemmy_db_views_actor = { version = "=0.18.1", path = "./crates/db_views_actor" }
lemmy_db_views_moderator = { version = "=0.18.1", path = "./crates/db_views_moderator" }
activitypub_federation = { version = "0.4.6", default-features = false, features = [
"actix-web",
] }
], git = "https://github.com/phiresky/activitypub-federation-rust/", branch = "raw-sending" }
phiresky marked this conversation as resolved.
Show resolved Hide resolved
diesel = "2.1.0"
diesel_migrations = "2.1.0"
diesel-async = "0.3.1"
Expand All @@ -88,7 +89,6 @@ tracing-error = "0.2.0"
tracing-log = "0.1.3"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
url = { version = "2.4.0", features = ["serde"] }
url_serde = "0.2.0"
reqwest = { version = "0.11.18", features = ["json", "blocking", "gzip"] }
reqwest-middleware = "0.2.2"
reqwest-tracing = "0.4.5"
Expand Down Expand Up @@ -119,7 +119,6 @@ futures = "0.3.28"
http = "0.2.9"
percent-encoding = "2.3.0"
rosetta-i18n = "0.1.3"
rand = "0.8.5"
opentelemetry = { version = "0.19.0", features = ["rt-tokio"] }
tracing-opentelemetry = { version = "0.19.0" }
ts-rs = { version = "6.2", features = ["serde-compat", "chrono-impl"] }
Expand Down Expand Up @@ -167,3 +166,5 @@ tokio-postgres-rustls = { workspace = true }
chrono = { workspace = true }
prometheus = { version = "0.13.3", features = ["process"], optional = true }
actix-web-prom = { version = "0.6.0", optional = true }
clap = { version = "4.3.19", features = ["derive"] }
lemmy_federate = { version = "0.18.1", path = "crates/federate" }
7 changes: 4 additions & 3 deletions crates/apub/src/activities/block/block_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
},
activity_lists::AnnouncableActivities,
insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson},
objects::person::ApubPerson,
protocol::activities::block::block_user::BlockUser,
};
use activitypub_federation::{
Expand All @@ -27,6 +27,7 @@ use lemmy_api_common::{
};
use lemmy_db_schema::{
source::{
activity::ActivitySendTargets,
community::{
CommunityFollower,
CommunityFollowerForm,
Expand Down Expand Up @@ -97,12 +98,12 @@ impl BlockUser {

match target {
SiteOrCommunity::Site(_) => {
let inboxes = remote_instance_inboxes(&mut context.pool()).await?;
let inboxes = ActivitySendTargets::to_all_instances();
send_lemmy_activity(context, block, mod_, inboxes, false).await
}
SiteOrCommunity::Community(c) => {
let activity = AnnouncableActivities::BlockUser(block);
let inboxes = vec![user.shared_inbox_or_inbox()];
let inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox());
send_activity_in_community(activity, mod_, c, inboxes, true, context).await
}
}
Expand Down
7 changes: 4 additions & 3 deletions crates/apub/src/activities/block/undo_block_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
},
activity_lists::AnnouncableActivities,
insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson},
objects::person::ApubPerson,
protocol::activities::block::{block_user::BlockUser, undo_block_user::UndoBlockUser},
};
use activitypub_federation::{
Expand All @@ -20,6 +20,7 @@ use activitypub_federation::{
use lemmy_api_common::{context::LemmyContext, utils::sanitize_html_opt};
use lemmy_db_schema::{
source::{
activity::ActivitySendTargets,
community::{CommunityPersonBan, CommunityPersonBanForm},
moderator::{ModBan, ModBanForm, ModBanFromCommunity, ModBanFromCommunityForm},
person::{Person, PersonUpdateForm},
Expand Down Expand Up @@ -59,10 +60,10 @@ impl UndoBlockUser {
audience,
};

let mut inboxes = vec![user.shared_inbox_or_inbox()];
let mut inboxes = ActivitySendTargets::to_inbox(user.shared_inbox_or_inbox());
match target {
SiteOrCommunity::Site(_) => {
inboxes.append(&mut remote_instance_inboxes(&mut context.pool()).await?);
inboxes.set_all_instances();
send_lemmy_activity(context, undo, mod_, inboxes, false).await
}
SiteOrCommunity::Community(c) => {
Expand Down
3 changes: 2 additions & 1 deletion crates/apub/src/activities/community/announce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use activitypub_federation::{
traits::{ActivityHandler, Actor},
};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::source::activity::ActivitySendTargets;
use lemmy_utils::error::{LemmyError, LemmyErrorType};
use serde_json::Value;
use url::Url;
Expand Down Expand Up @@ -94,7 +95,7 @@ impl AnnounceActivity {
context: &Data<LemmyContext>,
) -> Result<(), LemmyError> {
let announce = AnnounceActivity::new(object.clone(), community, context)?;
let inboxes = community.get_follower_inboxes(context).await?;
let inboxes = ActivitySendTargets::to_local_community_followers(community.id);
send_lemmy_activity(context, announce, community, inboxes.clone(), false).await?;

// Pleroma and Mastodon can't handle activities like Announce/Create/Page. So for
Expand Down
13 changes: 11 additions & 2 deletions crates/apub/src/activities/community/collection_add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use lemmy_db_schema::{
impls::community::CollectionType,
newtypes::{CommunityId, PersonId},
source::{
activity::ActivitySendTargets,
community::{Community, CommunityModerator, CommunityModeratorForm},
moderator::{ModAddCommunity, ModAddCommunityForm},
person::Person,
Expand Down Expand Up @@ -62,7 +63,7 @@ impl CollectionAdd {
};

let activity = AnnouncableActivities::CollectionAdd(add);
let inboxes = vec![added_mod.shared_inbox_or_inbox()];
let inboxes = ActivitySendTargets::to_inbox(added_mod.shared_inbox_or_inbox());
send_activity_in_community(activity, actor, community, inboxes, true, context).await
}

Expand All @@ -87,7 +88,15 @@ impl CollectionAdd {
audience: Some(community.id().into()),
};
let activity = AnnouncableActivities::CollectionAdd(add);
send_activity_in_community(activity, actor, community, vec![], true, context).await
send_activity_in_community(
activity,
actor,
community,
ActivitySendTargets::empty(),
true,
context,
)
.await
}
}

Expand Down
13 changes: 11 additions & 2 deletions crates/apub/src/activities/community/collection_remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use lemmy_api_common::{
use lemmy_db_schema::{
impls::community::CollectionType,
source::{
activity::ActivitySendTargets,
community::{Community, CommunityModerator, CommunityModeratorForm},
moderator::{ModAddCommunity, ModAddCommunityForm},
post::{Post, PostUpdateForm},
Expand Down Expand Up @@ -57,7 +58,7 @@ impl CollectionRemove {
};

let activity = AnnouncableActivities::CollectionRemove(remove);
let inboxes = vec![removed_mod.shared_inbox_or_inbox()];
let inboxes = ActivitySendTargets::to_inbox(removed_mod.shared_inbox_or_inbox());
send_activity_in_community(activity, actor, community, inboxes, true, context).await
}

Expand All @@ -82,7 +83,15 @@ impl CollectionRemove {
audience: Some(community.id().into()),
};
let activity = AnnouncableActivities::CollectionRemove(remove);
send_activity_in_community(activity, actor, community, vec![], true, context).await
send_activity_in_community(
activity,
actor,
community,
ActivitySendTargets::empty(),
true,
context,
)
.await
}
}

Expand Down
11 changes: 10 additions & 1 deletion crates/apub/src/activities/community/lock_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use activitypub_federation::{
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
source::{
activity::ActivitySendTargets,
community::Community,
person::Person,
post::{Post, PostUpdateForm},
Expand Down Expand Up @@ -141,6 +142,14 @@ pub(crate) async fn send_lock_post(
};
AnnouncableActivities::UndoLockPost(undo)
};
send_activity_in_community(activity, &actor.into(), &community, vec![], true, &context).await?;
send_activity_in_community(
activity,
&actor.into(),
&community,
ActivitySendTargets::empty(),
true,
&context,
)
.await?;
Ok(())
}
11 changes: 5 additions & 6 deletions crates/apub/src/activities/community/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use crate::{
};
use activitypub_federation::{config::Data, traits::Actor};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::source::person::PersonFollower;
use lemmy_db_schema::source::{activity::ActivitySendTargets, person::PersonFollower};
use lemmy_utils::error::LemmyError;
use url::Url;

pub mod announce;
pub mod collection_add;
Expand All @@ -34,7 +33,7 @@ pub(crate) async fn send_activity_in_community(
activity: AnnouncableActivities,
actor: &ApubPerson,
community: &ApubCommunity,
extra_inboxes: Vec<Url>,
extra_inboxes: ActivitySendTargets,
is_mod_action: bool,
context: &Data<LemmyContext>,
) -> Result<(), LemmyError> {
Expand All @@ -43,8 +42,8 @@ pub(crate) async fn send_activity_in_community(

// send to user followers
if !is_mod_action {
inboxes.extend(
&mut PersonFollower::list_followers(&mut context.pool(), actor.id)
inboxes.add_inboxes(
PersonFollower::list_followers(&mut context.pool(), actor.id)
.await?
.into_iter()
.map(|p| ApubPerson(p).shared_inbox_or_inbox()),
Expand All @@ -56,7 +55,7 @@ pub(crate) async fn send_activity_in_community(
AnnounceActivity::send(activity.clone().try_into()?, community, context).await?;
} else {
// send to the community, which will then forward to followers
inboxes.push(community.shared_inbox_or_inbox());
inboxes.add_inbox(community.shared_inbox_or_inbox());
}

send_lemmy_activity(context, activity.clone(), actor, inboxes, false).await?;
Expand Down
5 changes: 3 additions & 2 deletions crates/apub/src/activities/community/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use activitypub_federation::{
use lemmy_api_common::{context::LemmyContext, utils::sanitize_html};
use lemmy_db_schema::{
source::{
activity::ActivitySendTargets,
comment_report::{CommentReport, CommentReportForm},
community::Community,
person::Person,
Expand Down Expand Up @@ -49,8 +50,8 @@ impl Report {
id: id.clone(),
audience: Some(community.id().into()),
};

let inbox = vec![community.shared_inbox_or_inbox()];
// todo: this should probably filter and only send if the community is remote?
let inbox = ActivitySendTargets::to_inbox(community.shared_inbox_or_inbox());
phiresky marked this conversation as resolved.
Show resolved Hide resolved
send_lemmy_activity(&context, report, &actor, inbox, false).await
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these should all use send_activity_in_community() (same for follow.rs and undo_follow.rs). Not sure why its not the case now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering that too and it would need a bit of modifications because send_activity_in_community has some other things that trigger that don't trigger here (send to person followers, announce locally)

}
}
Expand Down
12 changes: 10 additions & 2 deletions crates/apub/src/activities/community/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use activitypub_federation::{
};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::{
source::{community::Community, person::Person},
source::{activity::ActivitySendTargets, community::Community, person::Person},
traits::Crud,
};
use lemmy_utils::error::LemmyError;
Expand Down Expand Up @@ -46,7 +46,15 @@ pub(crate) async fn send_update_community(
};

let activity = AnnouncableActivities::UpdateCommunity(update);
send_activity_in_community(activity, &actor, &community, vec![], true, &context).await
send_activity_in_community(
activity,
&actor,
&community,
ActivitySendTargets::empty(),
true,
&context,
)
.await
}

#[async_trait::async_trait]
Expand Down
5 changes: 3 additions & 2 deletions crates/apub/src/activities/create_or_update/comment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use lemmy_db_schema::{
aggregates::structs::CommentAggregates,
newtypes::PersonId,
source::{
activity::ActivitySendTargets,
comment::{Comment, CommentLike, CommentLikeForm},
community::Community,
person::Person,
Expand Down Expand Up @@ -88,10 +89,10 @@ impl CreateOrUpdateNote {
.map(|t| t.href.clone())
.map(ObjectId::from)
.collect();
let mut inboxes = vec![];
let mut inboxes = ActivitySendTargets::empty();
for t in tagged_users {
let person = t.dereference(&context).await?;
inboxes.push(person.shared_inbox_or_inbox());
inboxes.add_inbox(person.shared_inbox_or_inbox());
}

let activity = AnnouncableActivities::CreateOrUpdateComment(create_or_update);
Expand Down
3 changes: 2 additions & 1 deletion crates/apub/src/activities/create_or_update/post.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use lemmy_db_schema::{
aggregates::structs::PostAggregates,
newtypes::PersonId,
source::{
activity::ActivitySendTargets,
community::Community,
person::Person,
post::{Post, PostLike, PostLikeForm},
Expand Down Expand Up @@ -80,7 +81,7 @@ impl CreateOrUpdatePage {
activity,
&person,
&community,
vec![],
ActivitySendTargets::empty(),
is_mod_action,
&context,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use activitypub_federation::{
traits::{ActivityHandler, Actor, Object},
};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::source::activity::ActivitySendTargets;
use lemmy_db_views::structs::PrivateMessageView;
use lemmy_utils::error::LemmyError;
use url::Url;
Expand All @@ -38,7 +39,7 @@ pub(crate) async fn send_create_or_update_pm(
.await?,
kind,
};
let inbox = vec![recipient.shared_inbox_or_inbox()];
let inbox = ActivitySendTargets::to_inbox(recipient.shared_inbox_or_inbox());
send_lemmy_activity(&context, create_or_update, &actor, inbox, true).await
}

Expand Down
7 changes: 4 additions & 3 deletions crates/apub/src/activities/deletion/delete_user.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
activities::{generate_activity_id, send_lemmy_activity, verify_is_public, verify_person},
insert_received_activity,
objects::{instance::remote_instance_inboxes, person::ApubPerson},
objects::person::ApubPerson,
protocol::activities::deletion::delete_user::DeleteUser,
};
use activitypub_federation::{
Expand All @@ -11,7 +11,7 @@ use activitypub_federation::{
traits::{ActivityHandler, Actor},
};
use lemmy_api_common::{context::LemmyContext, utils::delete_user_account};
use lemmy_db_schema::source::person::Person;
use lemmy_db_schema::source::{activity::ActivitySendTargets, person::Person};
use lemmy_utils::error::LemmyError;
use url::Url;

Expand All @@ -38,7 +38,8 @@ pub async fn delete_user(person: Person, context: Data<LemmyContext>) -> Result<
cc: vec![],
};

let inboxes = remote_instance_inboxes(&mut context.pool()).await?;
let inboxes = ActivitySendTargets::to_all_instances();

send_lemmy_activity(&context, delete, &actor, inboxes, true).await?;
Ok(())
}
Expand Down
Loading