From 9eee61dd06220176fbb97ccbba4a594ea21bb5c6 Mon Sep 17 00:00:00 2001 From: Nutomic Date: Tue, 24 Sep 2024 11:39:40 +0200 Subject: [PATCH] Post scheduling (fixes #234) (#5025) * Post scheduling (fixes #234) * clippy * replace map_err with inspect_err * ignore unpublished posts in read queries * add api test * fmt * add some checks * address some review comments * allow updating schedule time * rewrite scheduled task * fmt * machete * compare date in sql, more filters * check for community ban in sql * remove api test (scheduled task only runs every 10 mins) * remove mut * add index * remove Post::read impl * fmt * fix * correctly handle changes to schedule time * normal users can only schedule up to 10 posts --- Cargo.lock | 1 + crates/api_common/src/post.rs | 4 + crates/api_crud/Cargo.toml | 1 + crates/api_crud/src/post/create.rs | 23 +++- crates/api_crud/src/post/mod.rs | 33 +++++ crates/api_crud/src/post/update.rs | 55 +++++++-- crates/db_schema/src/impls/post.rs | 31 ++++- crates/db_schema/src/schema.rs | 1 + crates/db_schema/src/source/post.rs | 5 + crates/db_views/src/comment_view.rs | 6 +- crates/db_views/src/post_view.rs | 12 +- crates/utils/src/error.rs | 2 + .../2024-09-16-095656_schedule-post/down.sql | 3 + .../2024-09-16-095656_schedule-post/up.sql | 5 + src/lib.rs | 14 ++- src/scheduled_tasks.rs | 114 +++++++++++++----- 16 files changed, 254 insertions(+), 56 deletions(-) create mode 100644 migrations/2024-09-16-095656_schedule-post/down.sql create mode 100644 migrations/2024-09-16-095656_schedule-post/up.sql diff --git a/Cargo.lock b/Cargo.lock index 135934193..a799d9d6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2538,6 +2538,7 @@ dependencies = [ "actix-web", "anyhow", "bcrypt", + "chrono", "futures", "lemmy_api_common", "lemmy_db_schema", diff --git a/crates/api_common/src/post.rs b/crates/api_common/src/post.rs index 44436fa84..fa45459e2 100644 --- a/crates/api_common/src/post.rs +++ b/crates/api_common/src/post.rs @@ -30,6 +30,8 @@ pub struct CreatePost { pub language_id: Option, /// Instead of fetching a thumbnail, use a custom one. pub custom_thumbnail: Option, + /// Time when this post should be scheduled. Null means publish immediately. + pub scheduled_publish_time: Option, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -124,6 +126,8 @@ pub struct EditPost { pub language_id: Option, /// Instead of fetching a thumbnail, use a custom one. pub custom_thumbnail: Option, + /// Time when this post should be scheduled. Null means publish immediately. + pub scheduled_publish_time: Option, } #[derive(Debug, Serialize, Deserialize, Clone, Copy, Default, PartialEq, Eq, Hash)] diff --git a/crates/api_crud/Cargo.toml b/crates/api_crud/Cargo.toml index 259116a38..2793beac3 100644 --- a/crates/api_crud/Cargo.toml +++ b/crates/api_crud/Cargo.toml @@ -27,6 +27,7 @@ futures.workspace = true uuid = { workspace = true } moka.workspace = true anyhow.workspace = true +chrono.workspace = true webmention = "0.6.0" accept-language = "3.1.0" serde_json = { workspace = true } diff --git a/crates/api_crud/src/post/create.rs b/crates/api_crud/src/post/create.rs index cdeb10f44..a1357395b 100644 --- a/crates/api_crud/src/post/create.rs +++ b/crates/api_crud/src/post/create.rs @@ -1,3 +1,4 @@ +use super::convert_published_time; use activitypub_federation::config::Data; use actix_web::web::Json; use lemmy_api_common::{ @@ -125,12 +126,15 @@ pub async fn create_post( } }; + let scheduled_publish_time = + convert_published_time(data.scheduled_publish_time, &local_user_view, &context).await?; let post_form = PostInsertForm { url: url.map(Into::into), body, alt_text: data.alt_text.clone(), nsfw: data.nsfw, language_id, + scheduled_publish_time, ..PostInsertForm::new( data.name.trim().to_string(), local_user_view.person.id, @@ -142,10 +146,16 @@ pub async fn create_post( .await .with_lemmy_type(LemmyErrorType::CouldntCreatePost)?; + let federate_post = if scheduled_publish_time.is_none() { + send_webmention(inserted_post.clone(), community); + |post| Some(SendActivityData::CreatePost(post)) + } else { + |_| None + }; generate_post_link_metadata( inserted_post.clone(), custom_thumbnail.map(Into::into), - |post| Some(SendActivityData::CreatePost(post)), + federate_post, context.reset_request_count(), ) .await?; @@ -165,11 +175,14 @@ pub async fn create_post( mark_post_as_read(person_id, post_id, &mut context.pool()).await?; - if let Some(url) = inserted_post.url.clone() { + build_post_response(&context, community_id, local_user_view, post_id).await +} + +pub fn send_webmention(post: Post, community: Community) { + if let Some(url) = post.url.clone() { if community.visibility == CommunityVisibility::Public { spawn_try_task(async move { - let mut webmention = - Webmention::new::(inserted_post.ap_id.clone().into(), url.clone().into())?; + let mut webmention = Webmention::new::(post.ap_id.clone().into(), url.clone().into())?; webmention.set_checked(true); match webmention .send() @@ -183,6 +196,4 @@ pub async fn create_post( }); } }; - - build_post_response(&context, community_id, local_user_view, post_id).await } diff --git a/crates/api_crud/src/post/mod.rs b/crates/api_crud/src/post/mod.rs index 8bb842b70..95df9663c 100644 --- a/crates/api_crud/src/post/mod.rs +++ b/crates/api_crud/src/post/mod.rs @@ -1,5 +1,38 @@ +use chrono::{DateTime, TimeZone, Utc}; +use lemmy_api_common::context::LemmyContext; +use lemmy_db_schema::source::post::Post; +use lemmy_db_views::structs::LocalUserView; +use lemmy_utils::{error::LemmyResult, LemmyErrorType}; + pub mod create; pub mod delete; pub mod read; pub mod remove; pub mod update; + +async fn convert_published_time( + scheduled_publish_time: Option, + local_user_view: &LocalUserView, + context: &LemmyContext, +) -> LemmyResult>> { + const MAX_SCHEDULED_POSTS: i64 = 10; + if let Some(scheduled_publish_time) = scheduled_publish_time { + let converted = Utc + .timestamp_opt(scheduled_publish_time, 0) + .single() + .ok_or(LemmyErrorType::InvalidUnixTime)?; + if converted < Utc::now() { + Err(LemmyErrorType::PostScheduleTimeMustBeInFuture)?; + } + if !local_user_view.local_user.admin { + let count = + Post::user_scheduled_post_count(local_user_view.person.id, &mut context.pool()).await?; + if count >= MAX_SCHEDULED_POSTS { + Err(LemmyErrorType::TooManyScheduledPosts)?; + } + } + Ok(Some(converted)) + } else { + Ok(None) + } +} diff --git a/crates/api_crud/src/post/update.rs b/crates/api_crud/src/post/update.rs index 48b2ccd42..72f8309d1 100644 --- a/crates/api_crud/src/post/update.rs +++ b/crates/api_crud/src/post/update.rs @@ -1,3 +1,4 @@ +use super::{convert_published_time, create::send_webmention}; use activitypub_federation::config::Data; use actix_web::web::Json; use lemmy_api_common::{ @@ -16,6 +17,7 @@ use lemmy_api_common::{ use lemmy_db_schema::{ source::{ actor_language::CommunityLanguage, + community::Community, local_site::LocalSite, post::{Post, PostUpdateForm}, }, @@ -107,6 +109,21 @@ pub async fn update_post( ) .await?; + // handle changes to scheduled_publish_time + let scheduled_publish_time = match ( + orig_post.scheduled_publish_time, + data.scheduled_publish_time, + ) { + // schedule time can be changed if post is still scheduled (and not published yet) + (Some(_), Some(_)) => { + Some(convert_published_time(data.scheduled_publish_time, &local_user_view, &context).await?) + } + // post was scheduled, gets changed to publish immediately + (Some(_), None) => Some(None), + // unchanged + (_, _) => None, + }; + let post_form = PostUpdateForm { name: data.name.clone(), url, @@ -115,6 +132,7 @@ pub async fn update_post( nsfw: data.nsfw, language_id: data.language_id, updated: Some(Some(naive_now())), + scheduled_publish_time, ..Default::default() }; @@ -123,13 +141,36 @@ pub async fn update_post( .await .with_lemmy_type(LemmyErrorType::CouldntUpdatePost)?; - generate_post_link_metadata( - updated_post.clone(), - custom_thumbnail.flatten().map(Into::into), - |post| Some(SendActivityData::UpdatePost(post)), - context.reset_request_count(), - ) - .await?; + // send out federation/webmention if necessary + match ( + orig_post.scheduled_publish_time, + data.scheduled_publish_time, + ) { + // schedule was removed, send create activity and webmention + (Some(_), None) => { + let community = Community::read(&mut context.pool(), orig_post.community_id).await?; + send_webmention(updated_post.clone(), community); + generate_post_link_metadata( + updated_post.clone(), + custom_thumbnail.flatten().map(Into::into), + |post| Some(SendActivityData::CreatePost(post)), + context.reset_request_count(), + ) + .await?; + } + // post was already public, send update + (None, _) => { + generate_post_link_metadata( + updated_post.clone(), + custom_thumbnail.flatten().map(Into::into), + |post| Some(SendActivityData::UpdatePost(post)), + context.reset_request_count(), + ) + .await? + } + // schedule was changed, do nothing + (Some(_), Some(_)) => {} + }; build_post_response( context.deref(), diff --git a/crates/db_schema/src/impls/post.rs b/crates/db_schema/src/impls/post.rs index 43ccbc0a7..6a89dd577 100644 --- a/crates/db_schema/src/impls/post.rs +++ b/crates/db_schema/src/impls/post.rs @@ -1,7 +1,7 @@ use crate::{ - diesel::OptionalExtension, + diesel::{BoolExpressionMethods, OptionalExtension}, newtypes::{CommunityId, DbUrl, PersonId, PostId}, - schema::{post, post_hide, post_like, post_read, post_saved}, + schema::{community, person, post, post_hide, post_like, post_read, post_saved}, source::post::{ Post, PostHide, @@ -20,6 +20,7 @@ use crate::{ functions::coalesce, get_conn, naive_now, + now, DbPool, DELETED_REPLACEMENT_TEXT, FETCH_LIMIT_MAX, @@ -30,7 +31,7 @@ use crate::{ use ::url::Url; use chrono::{DateTime, Utc}; use diesel::{ - dsl::insert_into, + dsl::{count, insert_into, not}, result::Error, DecoratableTarget, ExpressionMethods, @@ -173,6 +174,7 @@ impl Post { let object_id: DbUrl = object_id.into(); post::table .filter(post::ap_id.eq(object_id)) + .filter(post::scheduled_publish_time.is_null()) .first(conn) .await .optional() @@ -246,6 +248,28 @@ impl Post { .get_results::(conn) .await } + + pub async fn user_scheduled_post_count( + person_id: PersonId, + pool: &mut DbPool<'_>, + ) -> Result { + let conn = &mut get_conn(pool).await?; + + post::table + .inner_join(person::table) + .inner_join(community::table) + // find all posts which have scheduled_publish_time that is in the past + .filter(post::scheduled_publish_time.is_not_null()) + .filter(coalesce(post::scheduled_publish_time, now()).lt(now())) + // make sure the post and community are still around + .filter(not(post::deleted.or(post::removed))) + .filter(not(community::removed.or(community::deleted))) + // only posts by specified user + .filter(post::creator_id.eq(person_id)) + .select(count(post::id)) + .first::(conn) + .await + } } #[async_trait] @@ -459,6 +483,7 @@ mod tests { featured_community: false, featured_local: false, url_content_type: None, + scheduled_publish_time: None, }; // Post Like diff --git a/crates/db_schema/src/schema.rs b/crates/db_schema/src/schema.rs index 289032e00..129b00d8b 100644 --- a/crates/db_schema/src/schema.rs +++ b/crates/db_schema/src/schema.rs @@ -770,6 +770,7 @@ diesel::table! { featured_local -> Bool, url_content_type -> Nullable, alt_text -> Nullable, + scheduled_publish_time -> Nullable } } diff --git a/crates/db_schema/src/source/post.rs b/crates/db_schema/src/source/post.rs index bf719f54f..3819bd773 100644 --- a/crates/db_schema/src/source/post.rs +++ b/crates/db_schema/src/source/post.rs @@ -57,6 +57,8 @@ pub struct Post { pub url_content_type: Option, /// An optional alt_text, usable for image posts. pub alt_text: Option, + /// Time at which the post will be published. None means publish immediately. + pub scheduled_publish_time: Option>, } #[derive(Debug, Clone, derive_new::new)] @@ -104,6 +106,8 @@ pub struct PostInsertForm { pub url_content_type: Option, #[new(default)] pub alt_text: Option, + #[new(default)] + pub scheduled_publish_time: Option>, } #[derive(Debug, Clone, Default)] @@ -130,6 +134,7 @@ pub struct PostUpdateForm { pub featured_local: Option, pub url_content_type: Option>, pub alt_text: Option>, + pub scheduled_publish_time: Option>>, } #[derive(PartialEq, Eq, Debug)] diff --git a/crates/db_views/src/comment_view.rs b/crates/db_views/src/comment_view.rs index 494536c87..7f3c853f1 100644 --- a/crates/db_views/src/comment_view.rs +++ b/crates/db_views/src/comment_view.rs @@ -619,10 +619,7 @@ mod tests { person: inserted_timmy_person.clone(), counts: Default::default(), }; - let site_form = SiteInsertForm::builder() - .name("test site".to_string()) - .instance_id(inserted_instance.id) - .build(); + let site_form = SiteInsertForm::new("test site".to_string(), inserted_instance.id); let site = Site::create(pool, &site_form).await?; Ok(Data { inserted_instance, @@ -1093,6 +1090,7 @@ mod tests { featured_community: false, featured_local: false, url_content_type: None, + scheduled_publish_time: None, }, community: Community { id: data.inserted_community.id, diff --git a/crates/db_views/src/post_view.rs b/crates/db_views/src/post_view.rs index 5dee59538..a8d908c7d 100644 --- a/crates/db_views/src/post_view.rs +++ b/crates/db_views/src/post_view.rs @@ -318,11 +318,18 @@ fn queries<'a>() -> Queries< // hide posts from deleted communities query = query.filter(community::deleted.eq(false)); - // only show deleted posts to creator + // only creator can see deleted posts and unpublished scheduled posts if let Some(person_id) = options.local_user.person_id() { query = query.filter(post::deleted.eq(false).or(post::creator_id.eq(person_id))); + query = query.filter( + post::scheduled_publish_time + .is_null() + .or(post::creator_id.eq(person_id)), + ); } else { - query = query.filter(post::deleted.eq(false)); + query = query + .filter(post::deleted.eq(false)) + .filter(post::scheduled_publish_time.is_null()); } // only show removed posts to admin when viewing user profile @@ -1771,6 +1778,7 @@ mod tests { featured_community: false, featured_local: false, url_content_type: None, + scheduled_publish_time: None, }, my_vote: None, unread_comments: 0, diff --git a/crates/utils/src/error.rs b/crates/utils/src/error.rs index 6d5c40b04..e03ff2e23 100644 --- a/crates/utils/src/error.rs +++ b/crates/utils/src/error.rs @@ -172,6 +172,8 @@ pub enum LemmyErrorType { Unknown(String), CantDeleteSite, UrlLengthOverflow, + PostScheduleTimeMustBeInFuture, + TooManyScheduledPosts, NotFound, } diff --git a/migrations/2024-09-16-095656_schedule-post/down.sql b/migrations/2024-09-16-095656_schedule-post/down.sql new file mode 100644 index 000000000..bd136ca33 --- /dev/null +++ b/migrations/2024-09-16-095656_schedule-post/down.sql @@ -0,0 +1,3 @@ +ALTER TABLE post + DROP COLUMN scheduled_publish_time; + diff --git a/migrations/2024-09-16-095656_schedule-post/up.sql b/migrations/2024-09-16-095656_schedule-post/up.sql new file mode 100644 index 000000000..7be0e9e22 --- /dev/null +++ b/migrations/2024-09-16-095656_schedule-post/up.sql @@ -0,0 +1,5 @@ +ALTER TABLE post + ADD COLUMN scheduled_publish_time timestamptz; + +CREATE INDEX idx_post_scheduled_publish_time ON post (scheduled_publish_time); + diff --git a/src/lib.rs b/src/lib.rs index 3662f6bc8..804ac7aa1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -157,11 +157,6 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { rate_limit_cell.clone(), ); - let scheduled_tasks = (!args.disable_scheduled_tasks).then(|| { - // Schedules various cleanup tasks for the DB - tokio::task::spawn(scheduled_tasks::setup(context.clone())) - }); - if let Some(prometheus) = SETTINGS.prometheus.clone() { serve_prometheus(prometheus, context.clone())?; } @@ -187,7 +182,14 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> { })) .expect("set function pointer"); let request_data = federation_config.to_request_data(); - let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data)); + let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities( + request_data.reset_request_count(), + )); + + let scheduled_tasks = (!args.disable_scheduled_tasks).then(|| { + // Schedules various cleanup tasks for the DB + tokio::task::spawn(scheduled_tasks::setup(request_data.reset_request_count())) + }); let server = if !args.disable_http_server { if let Some(startup_server_handle) = startup_server_handle { diff --git a/src/scheduled_tasks.rs b/src/scheduled_tasks.rs index dc93aecb4..b7532d83c 100644 --- a/src/scheduled_tasks.rs +++ b/src/scheduled_tasks.rs @@ -1,20 +1,27 @@ +use activitypub_federation::config::Data; use chrono::{DateTime, TimeZone, Utc}; use clokwerk::{AsyncScheduler, TimeUnits as CTimeUnits}; use diesel::{ - dsl::IntervalDsl, + dsl::{exists, not, IntervalDsl}, sql_query, sql_types::{Integer, Timestamptz}, + BoolExpressionMethods, ExpressionMethods, NullableExpressionMethods, QueryDsl, QueryableByName, }; use diesel_async::{AsyncPgConnection, RunQueryDsl}; -use lemmy_api_common::context::LemmyContext; +use lemmy_api_common::{ + context::LemmyContext, + send_activity::{ActivityChannel, SendActivityData}, +}; +use lemmy_api_crud::post::create::send_webmention; use lemmy_db_schema::{ schema::{ captcha_answer, comment, + community, community_person_ban, instance, person, @@ -23,10 +30,13 @@ use lemmy_db_schema::{ sent_activity, }, source::{ + community::Community, instance::{Instance, InstanceForm}, local_user::LocalUser, + post::{Post, PostUpdateForm}, }, - utils::{get_conn, naive_now, now, DbPool, DELETED_REPLACEMENT_TEXT}, + traits::Crud, + utils::{functions::coalesce, get_conn, naive_now, now, DbPool, DELETED_REPLACEMENT_TEXT}, }; use lemmy_routes::nodeinfo::{NodeInfo, NodeInfoWellKnown}; use lemmy_utils::error::LemmyResult; @@ -35,13 +45,13 @@ use std::time::Duration; use tracing::{error, info, warn}; /// Schedules various cleanup tasks for lemmy in a background thread -pub async fn setup(context: LemmyContext) -> LemmyResult<()> { +pub async fn setup(context: Data) -> LemmyResult<()> { // Setup the connections let mut scheduler = AsyncScheduler::new(); startup_jobs(&mut context.pool()).await; let context_1 = context.clone(); - // Update active counts every hour + // Update active counts expired bans and unpublished posts every hour scheduler.every(CTimeUnits::hour(1)).run(move || { let context = context_1.clone(); @@ -51,23 +61,15 @@ pub async fn setup(context: LemmyContext) -> LemmyResult<()> { } }); - let context_1 = context.clone(); - // Update hot ranks every 15 minutes + let context_1 = context.reset_request_count(); + // Every 10 minutes update hot ranks, delete expired captchas and publish scheduled posts scheduler.every(CTimeUnits::minutes(10)).run(move || { - let context = context_1.clone(); + let context = context_1.reset_request_count(); async move { update_hot_ranks(&mut context.pool()).await; - } - }); - - let context_1 = context.clone(); - // Delete any captcha answers older than ten minutes, every ten minutes - scheduler.every(CTimeUnits::minutes(10)).run(move || { - let context = context_1.clone(); - - async move { delete_expired_captcha_answers(&mut context.pool()).await; + publish_scheduled_posts(&context).await; } }); @@ -94,7 +96,7 @@ pub async fn setup(context: LemmyContext) -> LemmyResult<()> { delete_old_denied_users(&mut context.pool()).await; update_instance_software(&mut context.pool(), context.client()) .await - .map_err(|e| warn!("Failed to update instance software: {e}")) + .inspect_err(|e| warn!("Failed to update instance software: {e}")) .ok(); } }); @@ -279,7 +281,7 @@ async fn delete_expired_captcha_answers(pool: &mut DbPool<'_>) { .map(|_| { info!("Done."); }) - .map_err(|e| error!("Failed to clear old captcha answers: {e}")) + .inspect_err(|e| error!("Failed to clear old captcha answers: {e}")) .ok(); } Err(e) => { @@ -300,7 +302,7 @@ async fn clear_old_activities(pool: &mut DbPool<'_>) { ) .execute(&mut conn) .await - .map_err(|e| error!("Failed to clear old sent activities: {e}")) + .inspect_err(|e| error!("Failed to clear old sent activities: {e}")) .ok(); diesel::delete( @@ -310,7 +312,7 @@ async fn clear_old_activities(pool: &mut DbPool<'_>) { .execute(&mut conn) .await .map(|_| info!("Done.")) - .map_err(|e| error!("Failed to clear old received activities: {e}")) + .inspect_err(|e| error!("Failed to clear old received activities: {e}")) .ok(); } Err(e) => { @@ -325,7 +327,7 @@ async fn delete_old_denied_users(pool: &mut DbPool<'_>) { .map(|_| { info!("Done."); }) - .map_err(|e| error!("Failed to deleted old denied users: {e}")) + .inspect_err(|e| error!("Failed to deleted old denied users: {e}")) .ok(); } @@ -351,7 +353,7 @@ async fn overwrite_deleted_posts_and_comments(pool: &mut DbPool<'_>) { .map(|_| { info!("Done."); }) - .map_err(|e| error!("Failed to overwrite deleted posts: {e}")) + .inspect_err(|e| error!("Failed to overwrite deleted posts: {e}")) .ok(); info!("Overwriting deleted comments..."); @@ -367,7 +369,7 @@ async fn overwrite_deleted_posts_and_comments(pool: &mut DbPool<'_>) { .map(|_| { info!("Done."); }) - .map_err(|e| error!("Failed to overwrite deleted comments: {e}")) + .inspect_err(|e| error!("Failed to overwrite deleted comments: {e}")) .ok(); } Err(e) => { @@ -399,14 +401,14 @@ async fn active_counts(pool: &mut DbPool<'_>) { sql_query(update_site_stmt) .execute(&mut conn) .await - .map_err(|e| error!("Failed to update site stats: {e}")) + .inspect_err(|e| error!("Failed to update site stats: {e}")) .ok(); let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0); sql_query(update_community_stmt) .execute(&mut conn) .await - .map_err(|e| error!("Failed to update community stats: {e}")) + .inspect_err(|e| error!("Failed to update community stats: {e}")) .ok(); } @@ -433,7 +435,7 @@ async fn update_banned_when_expired(pool: &mut DbPool<'_>) { .set(person::banned.eq(false)) .execute(&mut conn) .await - .map_err(|e| error!("Failed to update person.banned when expires: {e}")) + .inspect_err(|e| error!("Failed to update person.banned when expires: {e}")) .ok(); diesel::delete( @@ -441,7 +443,7 @@ async fn update_banned_when_expired(pool: &mut DbPool<'_>) { ) .execute(&mut conn) .await - .map_err(|e| error!("Failed to remove community_ban expired rows: {e}")) + .inspect_err(|e| error!("Failed to remove community_ban expired rows: {e}")) .ok(); } Err(e) => { @@ -450,6 +452,62 @@ async fn update_banned_when_expired(pool: &mut DbPool<'_>) { } } +/// Find all unpublished posts with scheduled date in the future, and publish them. +async fn publish_scheduled_posts(context: &Data) { + let pool = &mut context.pool(); + let conn = get_conn(pool).await; + + match conn { + Ok(mut conn) => { + let scheduled_posts: Vec<_> = post::table + .inner_join(community::table) + .inner_join(person::table) + // find all posts which have scheduled_publish_time that is in the past + .filter(post::scheduled_publish_time.is_not_null()) + .filter(coalesce(post::scheduled_publish_time, now()).lt(now())) + // make sure the post, person and community are still around + .filter(not(post::deleted.or(post::removed))) + .filter(not(person::banned.or(person::deleted))) + .filter(not(community::removed.or(community::deleted))) + // ensure that user isnt banned from community + .filter(not(exists( + community_person_ban::table + .filter(community_person_ban::community_id.eq(community::id)) + .filter(community_person_ban::person_id.eq(person::id)), + ))) + .select((post::all_columns, community::all_columns)) + .get_results::<(Post, Community)>(&mut conn) + .await + .inspect_err(|e| error!("Failed to read unpublished posts: {e}")) + .ok() + .unwrap_or_default(); + + for (post, community) in scheduled_posts { + // mark post as published in db + let form = PostUpdateForm { + scheduled_publish_time: Some(None), + ..Default::default() + }; + Post::update(&mut context.pool(), post.id, &form) + .await + .inspect_err(|e| error!("Failed update scheduled post: {e}")) + .ok(); + + // send out post via federation and webmention + let send_activity = SendActivityData::CreatePost(post.clone()); + ActivityChannel::submit_activity(send_activity, context) + .await + .inspect_err(|e| error!("Failed federate scheduled post: {e}")) + .ok(); + send_webmention(post, community); + } + } + Err(e) => { + error!("Failed to get connection from pool: {e}"); + } + } +} + /// Updates the instance software and version. /// /// Does so using the /.well-known/nodeinfo protocol described here: