federation: parallel sending per instance (#4623)

* federation: parallel sending

* federation: some comments

* lint and set force_write true when a request fails

* inbox_urls return vec

* split inbox functions into separate file

* cleanup

* extract sending task code to separate file

* move federation concurrent config to config file

* off by one issue

* improve msg

* fix both permanent stopping of federation queues and multiple creation of the same federation queues

* fix after merge

* lint fix

* Update crates/federate/src/send.rs

Co-authored-by: dullbananas <dull.bananas0@gmail.com>

* comment about reverse ordering

* remove crashable, comment

* comment

* move comment

* run federation tests twice

* fix test run

* prettier

* fix config default

* upgrade rust to 1.78 to fix diesel cli

* fix clippy

* delay

* add debug to make localhost urls not valid in ap crate, add some debug logs

* federation tests: ensure server stop after test and random activity id

* ci fix

* add test to federate 100 events

* fix send 100 test

* different data every time so activities are distinguishable

* allow out of order receives in test

* lint

* comment about https://github.com/LemmyNet/lemmy/pull/4623#discussion_r1565437391

* move sender for clarity, add comment

* move more things to members

* update test todo comment, use same env var as worker test but default to 1

* remove else below continue

* some more cleanup

* handle todo about smooth exit

* add federate inboxes collector tests

* lint

* actor max length

* don't reset fail count if activity skipped

* fix some comments

* reuse vars

* format

* Update .woodpecker.yml

* fix recheck time

* fix inboxes tests under fast mode

* format

* make i32 and ugly casts

* clippy

---------

Co-authored-by: dullbananas <dull.bananas0@gmail.com>
This commit is contained in:
phiresky 2024-07-21 17:50:50 +02:00 committed by GitHub
parent 073ff44676
commit a08642f813
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1686 additions and 330 deletions

View File

@ -181,6 +181,7 @@ steps:
LEMMY_DATABASE_URL: postgres://lemmy:password@database:5432/lemmy
RUST_BACKTRACE: "1"
CARGO_HOME: .cargo_home
LEMMY_TEST_FAST_FEDERATION: "1"
commands:
- export LEMMY_CONFIG_LOCATION=../../config/config.hjson
- cargo test --workspace --no-fail-fast

120
Cargo.lock generated
View File

@ -1674,6 +1674,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "downcast"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
[[package]]
name = "downcast-rs"
version = "1.2.1"
@ -1915,6 +1921,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fragile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "fs2"
version = "0.4.3"
@ -2980,7 +2992,9 @@ name = "lemmy_federate"
version = "0.19.5"
dependencies = [
"activitypub_federation",
"actix-web",
"anyhow",
"async-trait",
"chrono",
"diesel",
"diesel-async",
@ -2990,14 +3004,19 @@ dependencies = [
"lemmy_db_schema",
"lemmy_db_views_actor",
"lemmy_utils",
"mockall",
"moka",
"once_cell",
"reqwest 0.11.27",
"serde_json",
"serial_test",
"test-context",
"tokio",
"tokio-util",
"tracing",
"tracing-test",
"url",
"uuid",
]
[[package]]
@ -3475,6 +3494,33 @@ version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9be0862c1b3f26a88803c4a49de6889c10e608b3ee9344e6ef5b45fb37ad3d1"
[[package]]
name = "mockall"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43766c2b5203b10de348ffe19f7e54564b64f3d6018ff7648d1e2d6d3a0f0a48"
dependencies = [
"cfg-if",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af7cbce79ec385a1d4f54baa90a76401eb15d9cab93685f62e7e9f942aa00ae2"
dependencies = [
"cfg-if",
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "moka"
version = "0.12.8"
@ -4273,6 +4319,32 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "925383efa346730478fb4838dbe9137d2a47675ad789c546d150a6e1dd4ab31c"
[[package]]
name = "predicates"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68b87bfd4605926cdfefc1c3b5f8fe560e3feca9d5552cf68c466d3d8236c7e8"
dependencies = [
"anstyle",
"predicates-core",
]
[[package]]
name = "predicates-core"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174"
[[package]]
name = "predicates-tree"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf"
dependencies = [
"predicates-core",
"termtree",
]
[[package]]
name = "pretty_assertions"
version = "1.4.0"
@ -5676,6 +5748,33 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "termtree"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "test-context"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6676ab8513edfd2601a108621103fdb45cac9098305ca25ec93f7023b06b05d9"
dependencies = [
"futures",
"test-context-macros",
]
[[package]]
name = "test-context-macros"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ea17a2dc368aeca6f554343ced1b1e31f76d63683fa8016e5844bd7a5144a1"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "thiserror"
version = "1.0.62"
@ -6299,6 +6398,27 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn 2.0.66",
]
[[package]]
name = "triomphe"
version = "0.1.11"

View File

@ -11,6 +11,7 @@ import {
betaUrl,
registerUser,
unfollows,
delay,
} from "./shared";
beforeAll(setupLogins);
@ -21,39 +22,48 @@ test("Follow local community", async () => {
let user = await registerUser(beta, betaUrl);
let community = (await resolveBetaCommunity(user)).community!;
expect(community.counts.subscribers).toBe(1);
expect(community.counts.subscribers_local).toBe(1);
let follow = await followCommunity(user, true, community.community.id);
// Make sure the follow response went through
expect(follow.community_view.community.local).toBe(true);
expect(follow.community_view.subscribed).toBe("Subscribed");
expect(follow.community_view.counts.subscribers).toBe(2);
expect(follow.community_view.counts.subscribers_local).toBe(2);
expect(follow.community_view.counts.subscribers).toBe(
community.counts.subscribers + 1,
);
expect(follow.community_view.counts.subscribers_local).toBe(
community.counts.subscribers_local + 1,
);
// Test an unfollow
let unfollow = await followCommunity(user, false, community.community.id);
expect(unfollow.community_view.subscribed).toBe("NotSubscribed");
expect(unfollow.community_view.counts.subscribers).toBe(1);
expect(unfollow.community_view.counts.subscribers_local).toBe(1);
expect(unfollow.community_view.counts.subscribers).toBe(
community.counts.subscribers,
);
expect(unfollow.community_view.counts.subscribers_local).toBe(
community.counts.subscribers_local,
);
});
test("Follow federated community", async () => {
// It takes about 1 second for the community aggregates to federate
let betaCommunity = (
await delay(2000); // if this is the second test run, we don't have a way to wait for the correct number of subscribers
const betaCommunityInitial = (
await waitUntil(
() => resolveBetaCommunity(alpha),
c =>
c.community?.counts.subscribers === 1 &&
c.community.counts.subscribers_local === 0,
c => !!c.community && c.community?.counts.subscribers >= 1,
)
).community;
if (!betaCommunity) {
if (!betaCommunityInitial) {
throw "Missing beta community";
}
let follow = await followCommunity(alpha, true, betaCommunity.community.id);
let follow = await followCommunity(
alpha,
true,
betaCommunityInitial.community.id,
);
expect(follow.community_view.subscribed).toBe("Pending");
betaCommunity = (
const betaCommunity = (
await waitUntil(
() => resolveBetaCommunity(alpha),
c => c.community?.subscribed === "Subscribed",
@ -64,20 +74,24 @@ test("Follow federated community", async () => {
expect(betaCommunity?.community.local).toBe(false);
expect(betaCommunity?.community.name).toBe("main");
expect(betaCommunity?.subscribed).toBe("Subscribed");
expect(betaCommunity?.counts.subscribers_local).toBe(1);
expect(betaCommunity?.counts.subscribers_local).toBe(
betaCommunityInitial.counts.subscribers_local + 1,
);
// check that unfollow was federated
let communityOnBeta1 = await resolveBetaCommunity(beta);
expect(communityOnBeta1.community?.counts.subscribers).toBe(2);
expect(communityOnBeta1.community?.counts.subscribers_local).toBe(1);
expect(communityOnBeta1.community?.counts.subscribers).toBe(
betaCommunityInitial.counts.subscribers + 1,
);
// Check it from local
let site = await getSite(alpha);
let remoteCommunityId = site.my_user?.follows.find(
c => c.community.local == false,
c =>
c.community.local == false &&
c.community.id === betaCommunityInitial.community.id,
)?.community.id;
expect(remoteCommunityId).toBeDefined();
expect(site.my_user?.follows.length).toBe(2);
if (!remoteCommunityId) {
throw "Missing remote community id";
@ -89,10 +103,21 @@ test("Follow federated community", async () => {
// Make sure you are unsubbed locally
let siteUnfollowCheck = await getSite(alpha);
expect(siteUnfollowCheck.my_user?.follows.length).toBe(1);
expect(
siteUnfollowCheck.my_user?.follows.find(
c => c.community.id === betaCommunityInitial.community.id,
),
).toBe(undefined);
// check that unfollow was federated
let communityOnBeta2 = await resolveBetaCommunity(beta);
expect(communityOnBeta2.community?.counts.subscribers).toBe(1);
let communityOnBeta2 = await waitUntil(
() => resolveBetaCommunity(beta),
c =>
c.community?.counts.subscribers ===
betaCommunityInitial.counts.subscribers,
);
expect(communityOnBeta2.community?.counts.subscribers).toBe(
betaCommunityInitial.counts.subscribers,
);
expect(communityOnBeta2.community?.counts.subscribers_local).toBe(1);
});

View File

@ -52,17 +52,23 @@ beforeAll(async () => {
afterAll(unfollows);
async function assertPostFederation(postOne: PostView, postTwo: PostView) {
async function assertPostFederation(
postOne: PostView,
postTwo: PostView,
waitForMeta = true,
) {
// Link metadata is generated in background task and may not be ready yet at this time,
// so wait for it explicitly. For removed posts we cant refetch anything.
if (waitForMeta) {
postOne = await waitForPost(beta, postOne.post, res => {
return res === null || res?.post.embed_title !== null;
return res === null || !!res?.post.embed_title;
});
postTwo = await waitForPost(
beta,
postTwo.post,
res => res === null || res?.post.embed_title !== null,
res => res === null || !!res?.post.embed_title,
);
}
expect(postOne?.post.ap_id).toBe(postTwo?.post.ap_id);
expect(postOne?.post.name).toBe(postTwo?.post.name);
@ -408,7 +414,11 @@ test("Remove a post from admin and community on same instance", async () => {
p => p?.post_view.post.removed ?? false,
);
expect(alphaPost?.post_view.post.removed).toBe(true);
await assertPostFederation(alphaPost.post_view, removePostRes.post_view);
await assertPostFederation(
alphaPost.post_view,
removePostRes.post_view,
false,
);
// Undelete
let undeletedPost = await removePost(beta, false, betaPost.post);

View File

@ -131,7 +131,11 @@ test("Requests with invalid auth should be treated as unauthenticated", async ()
});
test("Create user with Arabic name", async () => {
let user = await registerUser(alpha, alphaUrl, "تجريب");
let user = await registerUser(
alpha,
alphaUrl,
"تجريب" + Math.random().toString().slice(2, 10), // less than actor_name_max_length
);
let site = await getSite(user);
expect(site.my_user).toBeDefined();

View File

@ -108,10 +108,12 @@
port: 8536
# Whether the site is available over TLS. Needs to be true for federation to work.
tls_enabled: true
# The number of activitypub federation workers that can be in-flight concurrently
worker_count: 0
# The number of activitypub federation retry workers that can be in-flight concurrently
retry_count: 0
federation: {
# Limit to the number of concurrent outgoing federation requests per target instance.
# Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities
# per second) and if a receiving instance is not keeping up.
concurrent_sends_per_instance: 1
}
prometheus: {
bind: "127.0.0.1"
port: 10002

View File

@ -55,7 +55,7 @@ impl LemmyContext {
/// Initialize a context for use in tests which blocks federation network calls.
///
/// Do not use this in production code.
pub async fn init_test_context() -> Data<LemmyContext> {
pub async fn init_test_federation_config() -> FederationConfig<LemmyContext> {
// call this to run migrations
let pool = build_db_pool_for_tests().await;
@ -70,14 +70,19 @@ impl LemmyContext {
let rate_limit_cell = RateLimitCell::with_test_config();
let context = LemmyContext::create(pool, client, secret, rate_limit_cell.clone());
let config = FederationConfig::builder()
FederationConfig::builder()
.domain(context.settings().hostname.clone())
.app_data(context)
.debug(true)
// Dont allow any network fetches
.http_fetch_limit(0)
.build()
.await
.expect("build federation config");
.expect("build federation config")
}
pub async fn init_test_context() -> Data<LemmyContext> {
let config = Self::init_test_federation_config().await;
config.to_request_data()
}
}

View File

@ -107,7 +107,7 @@ pub struct PrivateMessageReportId(i32);
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]
#[cfg_attr(feature = "full", ts(export))]
/// The site id.
pub struct SiteId(i32);
pub struct SiteId(pub i32);
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, Serialize, Deserialize, Default)]
#[cfg_attr(feature = "full", derive(DieselNewType, TS))]

View File

@ -34,6 +34,13 @@ tokio = { workspace = true, features = ["full"] }
tracing.workspace = true
moka.workspace = true
tokio-util = "0.7.11"
async-trait.workspace = true
[dev-dependencies]
serial_test = { workspace = true }
url.workspace = true
actix-web.workspace = true
tracing-test = "0.2.5"
uuid.workspace = true
test-context = "0.3.0"
mockall = "0.12.1"

View File

@ -0,0 +1,572 @@
use crate::util::LEMMY_TEST_FAST_FEDERATION;
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use lemmy_db_schema::{
newtypes::{CommunityId, DbUrl, InstanceId},
source::{activity::SentActivity, site::Site},
utils::{ActualDbPool, DbPool},
};
use lemmy_db_views_actor::structs::CommunityFollowerView;
use once_cell::sync::Lazy;
use reqwest::Url;
use std::collections::{HashMap, HashSet};
/// interval with which new additions to community_followers are queried.
///
/// The first time some user on an instance follows a specific remote community (or, more precisely:
/// the first time a (followed_community_id, follower_inbox_url) tuple appears), this delay limits
/// the maximum time until the follow actually results in activities from that community id being
/// sent to that inbox url. This delay currently needs to not be too small because the DB load is
/// currently fairly high because of the current structure of storing inboxes for every person, not
/// having a separate list of shared_inboxes, and the architecture of having every instance queue be
/// fully separate. (see https://github.com/LemmyNet/lemmy/issues/3958)
static FOLLOW_ADDITIONS_RECHECK_DELAY: Lazy<chrono::TimeDelta> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
chrono::TimeDelta::try_seconds(1).expect("TimeDelta out of bounds")
} else {
chrono::TimeDelta::try_minutes(2).expect("TimeDelta out of bounds")
}
});
/// The same as FOLLOW_ADDITIONS_RECHECK_DELAY, but triggering when the last person on an instance
/// unfollows a specific remote community. This is expected to happen pretty rarely and updating it
/// in a timely manner is not too important.
static FOLLOW_REMOVALS_RECHECK_DELAY: Lazy<chrono::TimeDelta> =
Lazy::new(|| chrono::TimeDelta::try_hours(1).expect("TimeDelta out of bounds"));
#[async_trait]
pub trait DataSource: Send + Sync {
async fn read_site_from_instance_id(
&self,
instance_id: InstanceId,
) -> Result<Option<Site>, diesel::result::Error>;
async fn get_instance_followed_community_inboxes(
&self,
instance_id: InstanceId,
last_fetch: DateTime<Utc>,
) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error>;
}
pub struct DbDataSource {
pool: ActualDbPool,
}
impl DbDataSource {
pub fn new(pool: ActualDbPool) -> Self {
Self { pool }
}
}
#[async_trait]
impl DataSource for DbDataSource {
async fn read_site_from_instance_id(
&self,
instance_id: InstanceId,
) -> Result<Option<Site>, diesel::result::Error> {
Site::read_from_instance_id(&mut DbPool::Pool(&self.pool), instance_id).await
}
async fn get_instance_followed_community_inboxes(
&self,
instance_id: InstanceId,
last_fetch: DateTime<Utc>,
) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error> {
CommunityFollowerView::get_instance_followed_community_inboxes(
&mut DbPool::Pool(&self.pool),
instance_id,
last_fetch,
)
.await
}
}
pub(crate) struct CommunityInboxCollector<T: DataSource> {
// load site lazily because if an instance is first seen due to being on allowlist,
// the corresponding row in `site` may not exist yet since that is only added once
// `fetch_instance_actor_for_object` is called.
// (this should be unlikely to be relevant outside of the federation tests)
site_loaded: bool,
site: Option<Site>,
followed_communities: HashMap<CommunityId, HashSet<Url>>,
last_full_communities_fetch: DateTime<Utc>,
last_incremental_communities_fetch: DateTime<Utc>,
instance_id: InstanceId,
domain: String,
pub(crate) data_source: T,
}
pub type RealCommunityInboxCollector = CommunityInboxCollector<DbDataSource>;
impl<T: DataSource> CommunityInboxCollector<T> {
pub fn new_real(
pool: ActualDbPool,
instance_id: InstanceId,
domain: String,
) -> RealCommunityInboxCollector {
CommunityInboxCollector::new(DbDataSource::new(pool), instance_id, domain)
}
pub fn new(
data_source: T,
instance_id: InstanceId,
domain: String,
) -> CommunityInboxCollector<T> {
CommunityInboxCollector {
data_source,
site_loaded: false,
site: None,
followed_communities: HashMap::new(),
last_full_communities_fetch: Utc.timestamp_nanos(0),
last_incremental_communities_fetch: Utc.timestamp_nanos(0),
instance_id,
domain,
}
}
/// get inbox urls of sending the given activity to the given instance
/// most often this will return 0 values (if instance doesn't care about the activity)
/// or 1 value (the shared inbox)
/// > 1 values only happens for non-lemmy software
pub async fn get_inbox_urls(&mut self, activity: &SentActivity) -> Result<Vec<Url>> {
let mut inbox_urls: HashSet<Url> = HashSet::new();
if activity.send_all_instances {
if !self.site_loaded {
self.site = self
.data_source
.read_site_from_instance_id(self.instance_id)
.await?;
self.site_loaded = true;
}
if let Some(site) = &self.site {
// Nutomic: Most non-lemmy software wont have a site row. That means it cant handle these
// activities. So handling it like this is fine.
inbox_urls.insert(site.inbox_url.inner().clone());
}
}
if let Some(t) = &activity.send_community_followers_of {
if let Some(urls) = self.followed_communities.get(t) {
inbox_urls.extend(urls.iter().cloned());
}
}
inbox_urls.extend(
activity
.send_inboxes
.iter()
.filter_map(std::option::Option::as_ref)
// a similar filter also happens within the activitypub-federation crate. but that filter
// happens much later - by doing it here, we can ensure that in the happy case, this
// function returns 0 urls which means the system doesn't have to create a tokio
// task for sending at all (since that task has a fair amount of overhead)
.filter(|&u| (u.domain() == Some(&self.domain)))
.map(|u| u.inner().clone()),
);
tracing::trace!(
"get_inbox_urls: {:?}, send_inboxes: {:?}",
inbox_urls,
activity.send_inboxes
);
Ok(inbox_urls.into_iter().collect())
}
pub async fn update_communities(&mut self) -> Result<()> {
if (Utc::now() - self.last_full_communities_fetch) > *FOLLOW_REMOVALS_RECHECK_DELAY {
tracing::debug!("{}: fetching full list of communities", self.domain);
// process removals every hour
(self.followed_communities, self.last_full_communities_fetch) = self
.get_communities(self.instance_id, Utc.timestamp_nanos(0))
.await?;
self.last_incremental_communities_fetch = self.last_full_communities_fetch;
}
if (Utc::now() - self.last_incremental_communities_fetch) > *FOLLOW_ADDITIONS_RECHECK_DELAY {
// process additions every minute
let (news, time) = self
.get_communities(self.instance_id, self.last_incremental_communities_fetch)
.await?;
if !news.is_empty() {
tracing::debug!(
"{}: fetched {} incremental new followed communities",
self.domain,
news.len()
);
}
self.followed_communities.extend(news);
self.last_incremental_communities_fetch = time;
}
Ok(())
}
/// get a list of local communities with the remote inboxes on the given instance that cares about
/// them
async fn get_communities(
&mut self,
instance_id: InstanceId,
last_fetch: DateTime<Utc>,
) -> Result<(HashMap<CommunityId, HashSet<Url>>, DateTime<Utc>)> {
// update to time before fetch to ensure overlap. subtract some time to ensure overlap even if
// published date is not exact
let new_last_fetch = Utc::now() - *FOLLOW_ADDITIONS_RECHECK_DELAY / 2;
let inboxes = self
.data_source
.get_instance_followed_community_inboxes(instance_id, last_fetch)
.await?;
let map: HashMap<CommunityId, HashSet<Url>> =
inboxes.into_iter().fold(HashMap::new(), |mut map, (c, u)| {
map.entry(c).or_default().insert(u.into());
map
});
Ok((map, new_last_fetch))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
#[allow(clippy::indexing_slicing)]
mod tests {
use super::*;
use lemmy_db_schema::{
newtypes::{ActivityId, CommunityId, InstanceId, SiteId},
source::activity::{ActorType, SentActivity},
};
use mockall::{mock, predicate::*};
use serde_json::json;
mock! {
DataSource {}
#[async_trait]
impl DataSource for DataSource {
async fn read_site_from_instance_id(&self, instance_id: InstanceId) -> Result<Option<Site>, diesel::result::Error>;
async fn get_instance_followed_community_inboxes(
&self,
instance_id: InstanceId,
last_fetch: DateTime<Utc>,
) -> Result<Vec<(CommunityId, DbUrl)>, diesel::result::Error>;
}
}
fn setup_collector() -> CommunityInboxCollector<MockDataSource> {
let mock_data_source = MockDataSource::new();
let instance_id = InstanceId(1);
let domain = "example.com".to_string();
CommunityInboxCollector::new(mock_data_source, instance_id, domain)
}
#[tokio::test]
async fn test_get_inbox_urls_empty() {
let mut collector = setup_collector();
let activity = SentActivity {
id: ActivityId(1),
ap_id: Url::parse("https://example.com/activities/1")
.unwrap()
.into(),
data: json!({}),
sensitive: false,
published: Utc::now(),
send_inboxes: vec![],
send_community_followers_of: None,
send_all_instances: false,
actor_type: ActorType::Person,
actor_apub_id: None,
};
let result = collector.get_inbox_urls(&activity).await.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn test_get_inbox_urls_send_all_instances() {
let mut collector = setup_collector();
let site_inbox = Url::parse("https://example.com/inbox").unwrap();
let site = Site {
id: SiteId(1),
name: "Test Site".to_string(),
sidebar: None,
published: Utc::now(),
updated: None,
icon: None,
banner: None,
description: None,
actor_id: Url::parse("https://example.com/site").unwrap().into(),
last_refreshed_at: Utc::now(),
inbox_url: site_inbox.clone().into(),
private_key: None,
public_key: "test_key".to_string(),
instance_id: InstanceId(1),
content_warning: None,
};
collector
.data_source
.expect_read_site_from_instance_id()
.return_once(move |_| Ok(Some(site)));
let activity = SentActivity {
id: ActivityId(1),
ap_id: Url::parse("https://example.com/activities/1")
.unwrap()
.into(),
data: json!({}),
sensitive: false,
published: Utc::now(),
send_inboxes: vec![],
send_community_followers_of: None,
send_all_instances: true,
actor_type: ActorType::Person,
actor_apub_id: None,
};
let result = collector.get_inbox_urls(&activity).await.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0], site_inbox);
}
#[tokio::test]
async fn test_get_inbox_urls_community_followers() {
let mut collector = setup_collector();
let community_id = CommunityId(1);
let url1 = "https://follower1.example.com/inbox";
let url2 = "https://follower2.example.com/inbox";
collector
.data_source
.expect_get_instance_followed_community_inboxes()
.return_once(move |_, _| {
Ok(vec![
(community_id, Url::parse(url1).unwrap().into()),
(community_id, Url::parse(url2).unwrap().into()),
])
});
collector.update_communities().await.unwrap();
let activity = SentActivity {
id: ActivityId(1),
ap_id: Url::parse("https://example.com/activities/1")
.unwrap()
.into(),
data: json!({}),
sensitive: false,
published: Utc::now(),
send_inboxes: vec![],
send_community_followers_of: Some(community_id),
send_all_instances: false,
actor_type: ActorType::Person,
actor_apub_id: None,
};
let result = collector.get_inbox_urls(&activity).await.unwrap();
assert_eq!(result.len(), 2);
assert!(result.contains(&Url::parse(url1).unwrap()));
assert!(result.contains(&Url::parse(url2).unwrap()));
}
#[tokio::test]
async fn test_get_inbox_urls_send_inboxes() {
let mut collector = setup_collector();
collector.domain = "example.com".to_string();
let inbox_user_1 = Url::parse("https://example.com/user1/inbox").unwrap();
let inbox_user_2 = Url::parse("https://example.com/user2/inbox").unwrap();
let other_domain_inbox = Url::parse("https://other-domain.com/user3/inbox").unwrap();
let activity = SentActivity {
id: ActivityId(1),
ap_id: Url::parse("https://example.com/activities/1")
.unwrap()
.into(),
data: json!({}),
sensitive: false,
published: Utc::now(),
send_inboxes: vec![
Some(inbox_user_1.clone().into()),
Some(inbox_user_2.clone().into()),
Some(other_domain_inbox.clone().into()),
],
send_community_followers_of: None,
send_all_instances: false,
actor_type: ActorType::Person,
actor_apub_id: None,
};
let result = collector.get_inbox_urls(&activity).await.unwrap();
assert_eq!(result.len(), 2);
assert!(result.contains(&inbox_user_1));
assert!(result.contains(&inbox_user_2));
assert!(!result.contains(&other_domain_inbox));
}
#[tokio::test]
async fn test_get_inbox_urls_combined() {
let mut collector = setup_collector();
collector.domain = "example.com".to_string();
let community_id = CommunityId(1);
let site_inbox = Url::parse("https://example.com/site_inbox").unwrap();
let site = Site {
id: SiteId(1),
name: "Test Site".to_string(),
sidebar: None,
published: Utc::now(),
updated: None,
icon: None,
banner: None,
description: None,
actor_id: Url::parse("https://example.com/site").unwrap().into(),
last_refreshed_at: Utc::now(),
inbox_url: site_inbox.clone().into(),
private_key: None,
public_key: "test_key".to_string(),
instance_id: InstanceId(1),
content_warning: None,
};
collector
.data_source
.expect_read_site_from_instance_id()
.return_once(move |_| Ok(Some(site)));
let subdomain_inbox = "https://follower.example.com/inbox";
collector
.data_source
.expect_get_instance_followed_community_inboxes()
.return_once(move |_, _| {
Ok(vec![(
community_id,
Url::parse(subdomain_inbox).unwrap().into(),
)])
});
collector.update_communities().await.unwrap();
let user1_inbox = Url::parse("https://example.com/user1/inbox").unwrap();
let user2_inbox = Url::parse("https://other-domain.com/user2/inbox").unwrap();
let activity = SentActivity {
id: ActivityId(1),
ap_id: Url::parse("https://example.com/activities/1")
.unwrap()
.into(),
data: json!({}),
sensitive: false,
published: Utc::now(),
send_inboxes: vec![
Some(user1_inbox.clone().into()),
Some(user2_inbox.clone().into()),
],
send_community_followers_of: Some(community_id),
send_all_instances: true,
actor_type: ActorType::Person,
actor_apub_id: None,
};
let result = collector.get_inbox_urls(&activity).await.unwrap();
assert_eq!(result.len(), 3);
assert!(result.contains(&site_inbox));
assert!(result.contains(&Url::parse(subdomain_inbox).unwrap()));
assert!(result.contains(&user1_inbox));
assert!(!result.contains(&user2_inbox));
}
#[tokio::test]
async fn test_update_communities() {
let mut collector = setup_collector();
let community_id1 = CommunityId(1);
let community_id2 = CommunityId(2);
let community_id3 = CommunityId(3);
let user1_inbox_str = "https://follower1.example.com/inbox";
let user1_inbox = Url::parse(user1_inbox_str).unwrap();
let user2_inbox_str = "https://follower2.example.com/inbox";
let user2_inbox = Url::parse(user2_inbox_str).unwrap();
let user3_inbox_str = "https://follower3.example.com/inbox";
let user3_inbox = Url::parse(user3_inbox_str).unwrap();
collector
.data_source
.expect_get_instance_followed_community_inboxes()
.times(2)
.returning(move |_, last_fetch| {
if last_fetch == Utc.timestamp_nanos(0) {
Ok(vec![
(community_id1, Url::parse(user1_inbox_str).unwrap().into()),
(community_id2, Url::parse(user2_inbox_str).unwrap().into()),
])
} else {
Ok(vec![(
community_id3,
Url::parse(user3_inbox_str).unwrap().into(),
)])
}
});
// First update
collector.update_communities().await.unwrap();
assert_eq!(collector.followed_communities.len(), 2);
assert!(collector.followed_communities[&community_id1].contains(&user1_inbox));
assert!(collector.followed_communities[&community_id2].contains(&user2_inbox));
// Simulate time passing
collector.last_full_communities_fetch = Utc::now() - chrono::TimeDelta::try_minutes(3).unwrap();
collector.last_incremental_communities_fetch =
Utc::now() - chrono::TimeDelta::try_minutes(3).unwrap();
// Second update (incremental)
collector.update_communities().await.unwrap();
assert_eq!(collector.followed_communities.len(), 3);
assert!(collector.followed_communities[&community_id1].contains(&user1_inbox));
assert!(collector.followed_communities[&community_id3].contains(&user3_inbox));
assert!(collector.followed_communities[&community_id2].contains(&user2_inbox));
}
#[tokio::test]
async fn test_get_inbox_urls_no_duplicates() {
let mut collector = setup_collector();
collector.domain = "example.com".to_string();
let community_id = CommunityId(1);
let site_inbox = Url::parse("https://example.com/site_inbox").unwrap();
let site_inbox_clone = site_inbox.clone();
let site = Site {
id: SiteId(1),
name: "Test Site".to_string(),
sidebar: None,
published: Utc::now(),
updated: None,
icon: None,
banner: None,
description: None,
actor_id: Url::parse("https://example.com/site").unwrap().into(),
last_refreshed_at: Utc::now(),
inbox_url: site_inbox.clone().into(),
private_key: None,
public_key: "test_key".to_string(),
instance_id: InstanceId(1),
content_warning: None,
};
collector
.data_source
.expect_read_site_from_instance_id()
.return_once(move |_| Ok(Some(site)));
collector
.data_source
.expect_get_instance_followed_community_inboxes()
.return_once(move |_, _| Ok(vec![(community_id, site_inbox_clone.into())]));
collector.update_communities().await.unwrap();
let activity = SentActivity {
id: ActivityId(1),
ap_id: Url::parse("https://example.com/activities/1")
.unwrap()
.into(),
data: json!({}),
sensitive: false,
published: Utc::now(),
send_inboxes: vec![Some(site_inbox.into())],
send_community_followers_of: Some(community_id),
send_all_instances: true,
actor_type: ActorType::Person,
actor_apub_id: None,
};
let result = collector.get_inbox_urls(&activity).await.unwrap();
assert_eq!(result.len(), 1);
assert!(result.contains(&Url::parse("https://example.com/site_inbox").unwrap()));
}
}

View File

@ -1,6 +1,9 @@
use crate::{util::CancellableTask, worker::InstanceWorker};
use activitypub_federation::config::FederationConfig;
use lemmy_api_common::context::LemmyContext;
use lemmy_api_common::{
context::LemmyContext,
lemmy_utils::settings::structs::FederationWorkerConfig,
};
use lemmy_db_schema::{newtypes::InstanceId, source::instance::Instance};
use lemmy_utils::error::LemmyResult;
use stats::receive_print_stats;
@ -14,6 +17,8 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
use util::FederationQueueStateWithDomain;
mod inboxes;
mod send;
mod stats;
mod util;
mod worker;
@ -38,10 +43,15 @@ pub struct SendManager {
context: FederationConfig<LemmyContext>,
stats_sender: UnboundedSender<FederationQueueStateWithDomain>,
exit_print: JoinHandle<()>,
federation_worker_config: FederationWorkerConfig,
}
impl SendManager {
fn new(opts: Opts, context: FederationConfig<LemmyContext>) -> Self {
fn new(
opts: Opts,
context: FederationConfig<LemmyContext>,
federation_worker_config: FederationWorkerConfig,
) -> Self {
assert!(opts.process_count > 0);
assert!(opts.process_index > 0);
assert!(opts.process_index <= opts.process_count);
@ -56,14 +66,20 @@ impl SendManager {
stats_receiver,
)),
context,
federation_worker_config,
}
}
pub fn run(opts: Opts, context: FederationConfig<LemmyContext>) -> CancellableTask {
pub fn run(
opts: Opts,
context: FederationConfig<LemmyContext>,
config: FederationWorkerConfig,
) -> CancellableTask {
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |cancel| {
let opts = opts.clone();
let config = config.clone();
let context = context.clone();
let mut manager = Self::new(opts, context);
let mut manager = Self::new(opts, context, config);
async move {
let result = manager.do_loop(cancel).await;
// the loop function will only return if there is (a) an internal error (e.g. db connection
@ -120,22 +136,21 @@ impl SendManager {
// create new worker
let context = self.context.clone();
let stats_sender = self.stats_sender.clone();
let federation_worker_config = self.federation_worker_config.clone();
self.workers.insert(
instance.id,
CancellableTask::spawn(WORKER_EXIT_TIMEOUT, move |stop| {
// if the instance worker ends unexpectedly due to internal/db errors, this lambda is rerun by cancellabletask.
// if the instance worker ends unexpectedly due to internal/db errors, this lambda is
// rerun by cancellabletask.
let instance = instance.clone();
let req_data = context.to_request_data();
let stats_sender = stats_sender.clone();
async move {
InstanceWorker::init_and_loop(
instance,
req_data,
context.clone(),
federation_worker_config.clone(),
stop,
stats_sender,
stats_sender.clone(),
)
.await
}
}),
);
} else if !should_federate {
@ -214,7 +229,14 @@ mod test {
.app_data(context.clone())
.build()
.await?;
let concurrent_sends_per_instance = std::env::var("LEMMY_TEST_FEDERATION_CONCURRENT_SENDS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let federation_worker_config = FederationWorkerConfig {
concurrent_sends_per_instance,
};
let pool = &mut context.pool();
let instances = vec![
Instance::read_or_create(pool, "alpha.com".to_string()).await?,
@ -222,7 +244,7 @@ mod test {
Instance::read_or_create(pool, "gamma.com".to_string()).await?,
];
let send_manager = SendManager::new(opts, federation_config);
let send_manager = SendManager::new(opts, federation_config, federation_worker_config);
Ok(Self {
send_manager,
context,

148
crates/federate/src/send.rs Normal file
View File

@ -0,0 +1,148 @@
use crate::util::get_actor_cached;
use activitypub_federation::{
activity_sending::SendActivityTask,
config::Data,
protocol::context::WithContext,
};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use lemmy_api_common::{context::LemmyContext, federate_retry_sleep_duration};
use lemmy_apub::{activity_lists::SharedInboxActivities, FEDERATION_CONTEXT};
use lemmy_db_schema::{newtypes::ActivityId, source::activity::SentActivity};
use reqwest::Url;
use std::ops::Deref;
use tokio::{sync::mpsc::UnboundedSender, time::sleep};
use tokio_util::sync::CancellationToken;
#[derive(Debug, Eq)]
pub(crate) struct SendSuccessInfo {
pub activity_id: ActivityId,
pub published: Option<DateTime<Utc>>,
// true if the activity was skipped because the target instance is not interested in this
// activity
pub was_skipped: bool,
}
impl PartialEq for SendSuccessInfo {
fn eq(&self, other: &Self) -> bool {
self.activity_id == other.activity_id
}
}
/// order backwards because the binary heap is a max heap, and we need the smallest element to be on
/// top
impl PartialOrd for SendSuccessInfo {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for SendSuccessInfo {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.activity_id.cmp(&self.activity_id)
}
}
/// Represents the result of sending an activity.
///
/// This enum is used to communicate the outcome of a send operation from a send task
/// to the main instance worker. It's designed to maintain a clean separation between
/// the send task and the main thread, allowing the send.rs file to be self-contained
/// and easier to understand.
///
/// The use of a channel for communication (rather than shared atomic variables) was chosen
/// because:
/// 1. It keeps the send task cleanly separated with no direct interaction with the main thread.
/// 2. The failure event needs to be transferred to the main task for database updates anyway.
/// 3. The main fail_count should only be updated under certain conditions, which are best handled
/// in the main task.
/// 4. It maintains consistency in how data is communicated (all via channels rather than a mix of
/// channels and atomics).
/// 5. It simplifies concurrency management and makes the flow of data more predictable.
pub(crate) enum SendActivityResult {
Success(SendSuccessInfo),
Failure { fail_count: i32 },
}
/// Represents a task for retrying to send an activity.
///
/// This struct encapsulates all the necessary information and resources for attempting
/// to send an activity to multiple inbox URLs, with built-in retry logic.
pub(crate) struct SendRetryTask<'a> {
pub activity: &'a SentActivity,
pub object: &'a SharedInboxActivities,
/// Must not be empty at this point
pub inbox_urls: Vec<Url>,
/// Channel to report results back to the main instance worker
pub report: &'a mut UnboundedSender<SendActivityResult>,
/// The first request will be sent immediately, but subsequent requests will be delayed
/// according to the number of previous fails + 1
///
/// This is a read-only immutable variable that is passed only one way, from the main
/// thread to each send task. It allows the task to determine how long to sleep initially
/// if the request fails.
pub initial_fail_count: i32,
/// For logging purposes
pub domain: String,
pub context: Data<LemmyContext>,
pub stop: CancellationToken,
}
impl<'a> SendRetryTask<'a> {
// this function will return successfully when (a) send succeeded or (b) worker cancelled
// and will return an error if an internal error occurred (send errors cause an infinite loop)
pub async fn send_retry_loop(self) -> Result<()> {
let SendRetryTask {
activity,
object,
inbox_urls,
report,
initial_fail_count,
domain,
context,
stop,
} = self;
debug_assert!(!inbox_urls.is_empty());
let pool = &mut context.pool();
let Some(actor_apub_id) = &activity.actor_apub_id else {
return Err(anyhow::anyhow!("activity is from before lemmy 0.19"));
};
let actor = get_actor_cached(pool, activity.actor_type, actor_apub_id)
.await
.context("failed getting actor instance (was it marked deleted / removed?)")?;
let object = WithContext::new(object.clone(), FEDERATION_CONTEXT.deref().clone());
let requests = SendActivityTask::prepare(&object, actor.as_ref(), inbox_urls, &context).await?;
for task in requests {
// usually only one due to shared inbox
tracing::debug!("sending out {}", task);
let mut fail_count = initial_fail_count;
while let Err(e) = task.sign_and_send(&context).await {
fail_count += 1;
report.send(SendActivityResult::Failure {
fail_count,
// activity_id: activity.id,
})?;
let retry_delay = federate_retry_sleep_duration(fail_count);
tracing::info!(
"{}: retrying {:?} attempt {} with delay {retry_delay:.2?}. ({e})",
domain,
activity.id,
fail_count
);
tokio::select! {
() = sleep(retry_delay) => {},
() = stop.cancelled() => {
// cancel sending without reporting any result.
// the InstanceWorker needs to be careful to not hang on receive of that
// channel when cancelled (see handle_send_results)
return Ok(());
}
}
}
}
report.send(SendActivityResult::Success(SendSuccessInfo {
activity_id: activity.id,
published: Some(activity.published),
was_skipped: false,
}))?;
Ok(())
}
}

View File

@ -1,7 +1,6 @@
use anyhow::{anyhow, Context, Result};
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use lemmy_api_common::lemmy_utils::CACHE_DURATION_FEDERATION;
use lemmy_apub::{
activity_lists::SharedInboxActivities,
fetcher::{site_or_community_or_user::SiteOrCommunityOrUser, user_or_community::UserOrCommunity},
@ -28,19 +27,28 @@ use tokio_util::sync::CancellationToken;
/// Decrease the delays of the federation queue.
/// Should only be used for federation tests since it significantly increases CPU and DB load of the
/// federation queue.
/// federation queue. This is intentionally a separate flag from other flags like debug_assertions,
/// since this is a invasive change we only need rarely.
pub(crate) static LEMMY_TEST_FAST_FEDERATION: Lazy<bool> = Lazy::new(|| {
std::env::var("LEMMY_TEST_FAST_FEDERATION")
.map(|s| !s.is_empty())
.unwrap_or(false)
});
/// Recheck for new federation work every n seconds.
/// Recheck for new federation work every n seconds within each InstanceWorker.
///
/// When the queue is processed faster than new activities are added and it reaches the current time
/// with an empty batch, this is the delay the queue waits before it checks if new activities have
/// been added to the sent_activities table. This delay is only applied if no federated activity
/// happens during sending activities of the last batch.
/// happens during sending activities of the last batch, which means on high-activity instances it
/// may never be used. This means that it does not affect the maximum throughput of the queue.
///
///
/// This is thus the interval with which tokio wakes up each of the
/// InstanceWorkers to check for new work, if the queue previously was empty.
/// If the delay is too short, the workers (one per federated instance) will wake up too
/// often and consume a lot of CPU. If the delay is long, then activities on low-traffic instances
/// will on average take delay/2 seconds to federate.
pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
Duration::from_millis(100)
@ -49,6 +57,21 @@ pub(crate) static WORK_FINISHED_RECHECK_DELAY: Lazy<Duration> = Lazy::new(|| {
}
});
/// Cache the latest activity id for a certain duration.
///
/// This cache is common to all the instance workers and prevents there from being more than one
/// call per N seconds between each DB query to find max(activity_id).
pub(crate) static CACHE_DURATION_LATEST_ID: Lazy<Duration> = Lazy::new(|| {
if *LEMMY_TEST_FAST_FEDERATION {
// in test mode, we use the same cache duration as the recheck delay so when recheck happens
// data is fresh, accelerating the time the tests take.
*WORK_FINISHED_RECHECK_DELAY
} else {
// in normal mode, we limit the query to one per second
Duration::from_secs(1)
}
});
/// A task that will be run in an infinite loop, unless it is cancelled.
/// If the task exits without being cancelled, an error will be logged and the task will be
/// restarted.
@ -174,7 +197,7 @@ pub(crate) async fn get_activity_cached(
pub(crate) async fn get_latest_activity_id(pool: &mut DbPool<'_>) -> Result<ActivityId> {
static CACHE: Lazy<Cache<(), ActivityId>> = Lazy::new(|| {
Cache::builder()
.time_to_live(CACHE_DURATION_FEDERATION)
.time_to_live(*CACHE_DURATION_LATEST_ID)
.build()
});
CACHE

File diff suppressed because it is too large Load Diff

View File

@ -43,12 +43,8 @@ pub struct Settings {
#[default(None)]
#[doku(skip)]
pub opentelemetry_url: Option<Url>,
/// The number of activitypub federation workers that can be in-flight concurrently
#[default(0)]
pub worker_count: usize,
/// The number of activitypub federation retry workers that can be in-flight concurrently
#[default(0)]
pub retry_count: usize,
#[default(Default::default())]
pub federation: FederationWorkerConfig,
// Prometheus configuration.
#[default(None)]
#[doku(example = "Some(Default::default())")]
@ -237,3 +233,14 @@ pub struct PrometheusConfig {
#[doku(example = "10002")]
pub port: i32,
}
#[derive(Debug, Deserialize, Serialize, Clone, SmartDefault, Document)]
#[serde(default)]
// named federation"worker"config to disambiguate from the activitypub library configuration
pub struct FederationWorkerConfig {
/// Limit to the number of concurrent outgoing federation requests per target instance.
/// Set this to a higher value than 1 (e.g. 6) only if you have a huge instance (>10 activities
/// per second) and if a receiving instance is not keeping up.
#[default(1)]
pub concurrent_sends_per_instance: i64,
}

View File

@ -14,6 +14,7 @@ source scripts/start_dev_db.sh
# so to load the config we need to traverse to the repo root
export LEMMY_CONFIG_LOCATION=../../config/config.hjson
export RUST_BACKTRACE=1
export LEMMY_TEST_FAST_FEDERATION=1 # by default, the persistent federation queue has delays in the scale of 30s-5min
if [ -n "$PACKAGE" ];
then

View File

@ -233,6 +233,7 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
process_count: args.federate_process_count,
},
cfg,
SETTINGS.federation.clone(),
)
});
let mut interrupt = tokio::signal::unix::signal(SignalKind::interrupt())?;