Post scheduling (fixes #234) (#5025)

* Post scheduling (fixes #234)

* clippy

* replace map_err with inspect_err

* ignore unpublished posts in read queries

* add api test

* fmt

* add some checks

* address some review comments

* allow updating schedule time

* rewrite scheduled task

* fmt

* machete

* compare date in sql, more filters

* check for community ban in sql

* remove api test (scheduled task only runs every 10 mins)

* remove mut

* add index

* remove Post::read impl

* fmt

* fix

* correctly handle changes to schedule time

* normal users can only schedule up to 10 posts
This commit is contained in:
Nutomic 2024-09-24 11:39:40 +02:00 committed by GitHub
parent bab5c93062
commit 9eee61dd06
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 254 additions and 56 deletions

1
Cargo.lock generated
View File

@ -2538,6 +2538,7 @@ dependencies = [
"actix-web",
"anyhow",
"bcrypt",
"chrono",
"futures",
"lemmy_api_common",
"lemmy_db_schema",

View File

@ -30,6 +30,8 @@ pub struct CreatePost {
pub language_id: Option<LanguageId>,
/// Instead of fetching a thumbnail, use a custom one.
pub custom_thumbnail: Option<String>,
/// Time when this post should be scheduled. Null means publish immediately.
pub scheduled_publish_time: Option<i64>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -124,6 +126,8 @@ pub struct EditPost {
pub language_id: Option<LanguageId>,
/// Instead of fetching a thumbnail, use a custom one.
pub custom_thumbnail: Option<String>,
/// Time when this post should be scheduled. Null means publish immediately.
pub scheduled_publish_time: Option<i64>,
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default, PartialEq, Eq, Hash)]

View File

@ -27,6 +27,7 @@ futures.workspace = true
uuid = { workspace = true }
moka.workspace = true
anyhow.workspace = true
chrono.workspace = true
webmention = "0.6.0"
accept-language = "3.1.0"
serde_json = { workspace = true }

View File

@ -1,3 +1,4 @@
use super::convert_published_time;
use activitypub_federation::config::Data;
use actix_web::web::Json;
use lemmy_api_common::{
@ -125,12 +126,15 @@ pub async fn create_post(
}
};
let scheduled_publish_time =
convert_published_time(data.scheduled_publish_time, &local_user_view, &context).await?;
let post_form = PostInsertForm {
url: url.map(Into::into),
body,
alt_text: data.alt_text.clone(),
nsfw: data.nsfw,
language_id,
scheduled_publish_time,
..PostInsertForm::new(
data.name.trim().to_string(),
local_user_view.person.id,
@ -142,10 +146,16 @@ pub async fn create_post(
.await
.with_lemmy_type(LemmyErrorType::CouldntCreatePost)?;
let federate_post = if scheduled_publish_time.is_none() {
send_webmention(inserted_post.clone(), community);
|post| Some(SendActivityData::CreatePost(post))
} else {
|_| None
};
generate_post_link_metadata(
inserted_post.clone(),
custom_thumbnail.map(Into::into),
|post| Some(SendActivityData::CreatePost(post)),
federate_post,
context.reset_request_count(),
)
.await?;
@ -165,11 +175,14 @@ pub async fn create_post(
mark_post_as_read(person_id, post_id, &mut context.pool()).await?;
if let Some(url) = inserted_post.url.clone() {
build_post_response(&context, community_id, local_user_view, post_id).await
}
pub fn send_webmention(post: Post, community: Community) {
if let Some(url) = post.url.clone() {
if community.visibility == CommunityVisibility::Public {
spawn_try_task(async move {
let mut webmention =
Webmention::new::<Url>(inserted_post.ap_id.clone().into(), url.clone().into())?;
let mut webmention = Webmention::new::<Url>(post.ap_id.clone().into(), url.clone().into())?;
webmention.set_checked(true);
match webmention
.send()
@ -183,6 +196,4 @@ pub async fn create_post(
});
}
};
build_post_response(&context, community_id, local_user_view, post_id).await
}

View File

@ -1,5 +1,38 @@
use chrono::{DateTime, TimeZone, Utc};
use lemmy_api_common::context::LemmyContext;
use lemmy_db_schema::source::post::Post;
use lemmy_db_views::structs::LocalUserView;
use lemmy_utils::{error::LemmyResult, LemmyErrorType};
pub mod create;
pub mod delete;
pub mod read;
pub mod remove;
pub mod update;
async fn convert_published_time(
scheduled_publish_time: Option<i64>,
local_user_view: &LocalUserView,
context: &LemmyContext,
) -> LemmyResult<Option<DateTime<Utc>>> {
const MAX_SCHEDULED_POSTS: i64 = 10;
if let Some(scheduled_publish_time) = scheduled_publish_time {
let converted = Utc
.timestamp_opt(scheduled_publish_time, 0)
.single()
.ok_or(LemmyErrorType::InvalidUnixTime)?;
if converted < Utc::now() {
Err(LemmyErrorType::PostScheduleTimeMustBeInFuture)?;
}
if !local_user_view.local_user.admin {
let count =
Post::user_scheduled_post_count(local_user_view.person.id, &mut context.pool()).await?;
if count >= MAX_SCHEDULED_POSTS {
Err(LemmyErrorType::TooManyScheduledPosts)?;
}
}
Ok(Some(converted))
} else {
Ok(None)
}
}

View File

@ -1,3 +1,4 @@
use super::{convert_published_time, create::send_webmention};
use activitypub_federation::config::Data;
use actix_web::web::Json;
use lemmy_api_common::{
@ -16,6 +17,7 @@ use lemmy_api_common::{
use lemmy_db_schema::{
source::{
actor_language::CommunityLanguage,
community::Community,
local_site::LocalSite,
post::{Post, PostUpdateForm},
},
@ -107,6 +109,21 @@ pub async fn update_post(
)
.await?;
// handle changes to scheduled_publish_time
let scheduled_publish_time = match (
orig_post.scheduled_publish_time,
data.scheduled_publish_time,
) {
// schedule time can be changed if post is still scheduled (and not published yet)
(Some(_), Some(_)) => {
Some(convert_published_time(data.scheduled_publish_time, &local_user_view, &context).await?)
}
// post was scheduled, gets changed to publish immediately
(Some(_), None) => Some(None),
// unchanged
(_, _) => None,
};
let post_form = PostUpdateForm {
name: data.name.clone(),
url,
@ -115,6 +132,7 @@ pub async fn update_post(
nsfw: data.nsfw,
language_id: data.language_id,
updated: Some(Some(naive_now())),
scheduled_publish_time,
..Default::default()
};
@ -123,13 +141,36 @@ pub async fn update_post(
.await
.with_lemmy_type(LemmyErrorType::CouldntUpdatePost)?;
// send out federation/webmention if necessary
match (
orig_post.scheduled_publish_time,
data.scheduled_publish_time,
) {
// schedule was removed, send create activity and webmention
(Some(_), None) => {
let community = Community::read(&mut context.pool(), orig_post.community_id).await?;
send_webmention(updated_post.clone(), community);
generate_post_link_metadata(
updated_post.clone(),
custom_thumbnail.flatten().map(Into::into),
|post| Some(SendActivityData::CreatePost(post)),
context.reset_request_count(),
)
.await?;
}
// post was already public, send update
(None, _) => {
generate_post_link_metadata(
updated_post.clone(),
custom_thumbnail.flatten().map(Into::into),
|post| Some(SendActivityData::UpdatePost(post)),
context.reset_request_count(),
)
.await?;
.await?
}
// schedule was changed, do nothing
(Some(_), Some(_)) => {}
};
build_post_response(
context.deref(),

View File

@ -1,7 +1,7 @@
use crate::{
diesel::OptionalExtension,
diesel::{BoolExpressionMethods, OptionalExtension},
newtypes::{CommunityId, DbUrl, PersonId, PostId},
schema::{post, post_hide, post_like, post_read, post_saved},
schema::{community, person, post, post_hide, post_like, post_read, post_saved},
source::post::{
Post,
PostHide,
@ -20,6 +20,7 @@ use crate::{
functions::coalesce,
get_conn,
naive_now,
now,
DbPool,
DELETED_REPLACEMENT_TEXT,
FETCH_LIMIT_MAX,
@ -30,7 +31,7 @@ use crate::{
use ::url::Url;
use chrono::{DateTime, Utc};
use diesel::{
dsl::insert_into,
dsl::{count, insert_into, not},
result::Error,
DecoratableTarget,
ExpressionMethods,
@ -173,6 +174,7 @@ impl Post {
let object_id: DbUrl = object_id.into();
post::table
.filter(post::ap_id.eq(object_id))
.filter(post::scheduled_publish_time.is_null())
.first(conn)
.await
.optional()
@ -246,6 +248,28 @@ impl Post {
.get_results::<Self>(conn)
.await
}
pub async fn user_scheduled_post_count(
person_id: PersonId,
pool: &mut DbPool<'_>,
) -> Result<i64, Error> {
let conn = &mut get_conn(pool).await?;
post::table
.inner_join(person::table)
.inner_join(community::table)
// find all posts which have scheduled_publish_time that is in the past
.filter(post::scheduled_publish_time.is_not_null())
.filter(coalesce(post::scheduled_publish_time, now()).lt(now()))
// make sure the post and community are still around
.filter(not(post::deleted.or(post::removed)))
.filter(not(community::removed.or(community::deleted)))
// only posts by specified user
.filter(post::creator_id.eq(person_id))
.select(count(post::id))
.first::<i64>(conn)
.await
}
}
#[async_trait]
@ -459,6 +483,7 @@ mod tests {
featured_community: false,
featured_local: false,
url_content_type: None,
scheduled_publish_time: None,
};
// Post Like

View File

@ -770,6 +770,7 @@ diesel::table! {
featured_local -> Bool,
url_content_type -> Nullable<Text>,
alt_text -> Nullable<Text>,
scheduled_publish_time -> Nullable<Timestamptz>
}
}

View File

@ -57,6 +57,8 @@ pub struct Post {
pub url_content_type: Option<String>,
/// An optional alt_text, usable for image posts.
pub alt_text: Option<String>,
/// Time at which the post will be published. None means publish immediately.
pub scheduled_publish_time: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, derive_new::new)]
@ -104,6 +106,8 @@ pub struct PostInsertForm {
pub url_content_type: Option<String>,
#[new(default)]
pub alt_text: Option<String>,
#[new(default)]
pub scheduled_publish_time: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Default)]
@ -130,6 +134,7 @@ pub struct PostUpdateForm {
pub featured_local: Option<bool>,
pub url_content_type: Option<Option<String>>,
pub alt_text: Option<Option<String>>,
pub scheduled_publish_time: Option<Option<DateTime<Utc>>>,
}
#[derive(PartialEq, Eq, Debug)]

View File

@ -619,10 +619,7 @@ mod tests {
person: inserted_timmy_person.clone(),
counts: Default::default(),
};
let site_form = SiteInsertForm::builder()
.name("test site".to_string())
.instance_id(inserted_instance.id)
.build();
let site_form = SiteInsertForm::new("test site".to_string(), inserted_instance.id);
let site = Site::create(pool, &site_form).await?;
Ok(Data {
inserted_instance,
@ -1093,6 +1090,7 @@ mod tests {
featured_community: false,
featured_local: false,
url_content_type: None,
scheduled_publish_time: None,
},
community: Community {
id: data.inserted_community.id,

View File

@ -318,11 +318,18 @@ fn queries<'a>() -> Queries<
// hide posts from deleted communities
query = query.filter(community::deleted.eq(false));
// only show deleted posts to creator
// only creator can see deleted posts and unpublished scheduled posts
if let Some(person_id) = options.local_user.person_id() {
query = query.filter(post::deleted.eq(false).or(post::creator_id.eq(person_id)));
query = query.filter(
post::scheduled_publish_time
.is_null()
.or(post::creator_id.eq(person_id)),
);
} else {
query = query.filter(post::deleted.eq(false));
query = query
.filter(post::deleted.eq(false))
.filter(post::scheduled_publish_time.is_null());
}
// only show removed posts to admin when viewing user profile
@ -1771,6 +1778,7 @@ mod tests {
featured_community: false,
featured_local: false,
url_content_type: None,
scheduled_publish_time: None,
},
my_vote: None,
unread_comments: 0,

View File

@ -172,6 +172,8 @@ pub enum LemmyErrorType {
Unknown(String),
CantDeleteSite,
UrlLengthOverflow,
PostScheduleTimeMustBeInFuture,
TooManyScheduledPosts,
NotFound,
}

View File

@ -0,0 +1,3 @@
ALTER TABLE post
DROP COLUMN scheduled_publish_time;

View File

@ -0,0 +1,5 @@
ALTER TABLE post
ADD COLUMN scheduled_publish_time timestamptz;
CREATE INDEX idx_post_scheduled_publish_time ON post (scheduled_publish_time);

View File

@ -157,11 +157,6 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
rate_limit_cell.clone(),
);
let scheduled_tasks = (!args.disable_scheduled_tasks).then(|| {
// Schedules various cleanup tasks for the DB
tokio::task::spawn(scheduled_tasks::setup(context.clone()))
});
if let Some(prometheus) = SETTINGS.prometheus.clone() {
serve_prometheus(prometheus, context.clone())?;
}
@ -187,7 +182,14 @@ pub async fn start_lemmy_server(args: CmdArgs) -> LemmyResult<()> {
}))
.expect("set function pointer");
let request_data = federation_config.to_request_data();
let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(request_data));
let outgoing_activities_task = tokio::task::spawn(handle_outgoing_activities(
request_data.reset_request_count(),
));
let scheduled_tasks = (!args.disable_scheduled_tasks).then(|| {
// Schedules various cleanup tasks for the DB
tokio::task::spawn(scheduled_tasks::setup(request_data.reset_request_count()))
});
let server = if !args.disable_http_server {
if let Some(startup_server_handle) = startup_server_handle {

View File

@ -1,20 +1,27 @@
use activitypub_federation::config::Data;
use chrono::{DateTime, TimeZone, Utc};
use clokwerk::{AsyncScheduler, TimeUnits as CTimeUnits};
use diesel::{
dsl::IntervalDsl,
dsl::{exists, not, IntervalDsl},
sql_query,
sql_types::{Integer, Timestamptz},
BoolExpressionMethods,
ExpressionMethods,
NullableExpressionMethods,
QueryDsl,
QueryableByName,
};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use lemmy_api_common::context::LemmyContext;
use lemmy_api_common::{
context::LemmyContext,
send_activity::{ActivityChannel, SendActivityData},
};
use lemmy_api_crud::post::create::send_webmention;
use lemmy_db_schema::{
schema::{
captcha_answer,
comment,
community,
community_person_ban,
instance,
person,
@ -23,10 +30,13 @@ use lemmy_db_schema::{
sent_activity,
},
source::{
community::Community,
instance::{Instance, InstanceForm},
local_user::LocalUser,
post::{Post, PostUpdateForm},
},
utils::{get_conn, naive_now, now, DbPool, DELETED_REPLACEMENT_TEXT},
traits::Crud,
utils::{functions::coalesce, get_conn, naive_now, now, DbPool, DELETED_REPLACEMENT_TEXT},
};
use lemmy_routes::nodeinfo::{NodeInfo, NodeInfoWellKnown};
use lemmy_utils::error::LemmyResult;
@ -35,13 +45,13 @@ use std::time::Duration;
use tracing::{error, info, warn};
/// Schedules various cleanup tasks for lemmy in a background thread
pub async fn setup(context: LemmyContext) -> LemmyResult<()> {
pub async fn setup(context: Data<LemmyContext>) -> LemmyResult<()> {
// Setup the connections
let mut scheduler = AsyncScheduler::new();
startup_jobs(&mut context.pool()).await;
let context_1 = context.clone();
// Update active counts every hour
// Update active counts expired bans and unpublished posts every hour
scheduler.every(CTimeUnits::hour(1)).run(move || {
let context = context_1.clone();
@ -51,23 +61,15 @@ pub async fn setup(context: LemmyContext) -> LemmyResult<()> {
}
});
let context_1 = context.clone();
// Update hot ranks every 15 minutes
let context_1 = context.reset_request_count();
// Every 10 minutes update hot ranks, delete expired captchas and publish scheduled posts
scheduler.every(CTimeUnits::minutes(10)).run(move || {
let context = context_1.clone();
let context = context_1.reset_request_count();
async move {
update_hot_ranks(&mut context.pool()).await;
}
});
let context_1 = context.clone();
// Delete any captcha answers older than ten minutes, every ten minutes
scheduler.every(CTimeUnits::minutes(10)).run(move || {
let context = context_1.clone();
async move {
delete_expired_captcha_answers(&mut context.pool()).await;
publish_scheduled_posts(&context).await;
}
});
@ -94,7 +96,7 @@ pub async fn setup(context: LemmyContext) -> LemmyResult<()> {
delete_old_denied_users(&mut context.pool()).await;
update_instance_software(&mut context.pool(), context.client())
.await
.map_err(|e| warn!("Failed to update instance software: {e}"))
.inspect_err(|e| warn!("Failed to update instance software: {e}"))
.ok();
}
});
@ -279,7 +281,7 @@ async fn delete_expired_captcha_answers(pool: &mut DbPool<'_>) {
.map(|_| {
info!("Done.");
})
.map_err(|e| error!("Failed to clear old captcha answers: {e}"))
.inspect_err(|e| error!("Failed to clear old captcha answers: {e}"))
.ok();
}
Err(e) => {
@ -300,7 +302,7 @@ async fn clear_old_activities(pool: &mut DbPool<'_>) {
)
.execute(&mut conn)
.await
.map_err(|e| error!("Failed to clear old sent activities: {e}"))
.inspect_err(|e| error!("Failed to clear old sent activities: {e}"))
.ok();
diesel::delete(
@ -310,7 +312,7 @@ async fn clear_old_activities(pool: &mut DbPool<'_>) {
.execute(&mut conn)
.await
.map(|_| info!("Done."))
.map_err(|e| error!("Failed to clear old received activities: {e}"))
.inspect_err(|e| error!("Failed to clear old received activities: {e}"))
.ok();
}
Err(e) => {
@ -325,7 +327,7 @@ async fn delete_old_denied_users(pool: &mut DbPool<'_>) {
.map(|_| {
info!("Done.");
})
.map_err(|e| error!("Failed to deleted old denied users: {e}"))
.inspect_err(|e| error!("Failed to deleted old denied users: {e}"))
.ok();
}
@ -351,7 +353,7 @@ async fn overwrite_deleted_posts_and_comments(pool: &mut DbPool<'_>) {
.map(|_| {
info!("Done.");
})
.map_err(|e| error!("Failed to overwrite deleted posts: {e}"))
.inspect_err(|e| error!("Failed to overwrite deleted posts: {e}"))
.ok();
info!("Overwriting deleted comments...");
@ -367,7 +369,7 @@ async fn overwrite_deleted_posts_and_comments(pool: &mut DbPool<'_>) {
.map(|_| {
info!("Done.");
})
.map_err(|e| error!("Failed to overwrite deleted comments: {e}"))
.inspect_err(|e| error!("Failed to overwrite deleted comments: {e}"))
.ok();
}
Err(e) => {
@ -399,14 +401,14 @@ async fn active_counts(pool: &mut DbPool<'_>) {
sql_query(update_site_stmt)
.execute(&mut conn)
.await
.map_err(|e| error!("Failed to update site stats: {e}"))
.inspect_err(|e| error!("Failed to update site stats: {e}"))
.ok();
let update_community_stmt = format!("update community_aggregates ca set users_active_{} = mv.count_ from community_aggregates_activity('{}') mv where ca.community_id = mv.community_id_", i.1, i.0);
sql_query(update_community_stmt)
.execute(&mut conn)
.await
.map_err(|e| error!("Failed to update community stats: {e}"))
.inspect_err(|e| error!("Failed to update community stats: {e}"))
.ok();
}
@ -433,7 +435,7 @@ async fn update_banned_when_expired(pool: &mut DbPool<'_>) {
.set(person::banned.eq(false))
.execute(&mut conn)
.await
.map_err(|e| error!("Failed to update person.banned when expires: {e}"))
.inspect_err(|e| error!("Failed to update person.banned when expires: {e}"))
.ok();
diesel::delete(
@ -441,7 +443,7 @@ async fn update_banned_when_expired(pool: &mut DbPool<'_>) {
)
.execute(&mut conn)
.await
.map_err(|e| error!("Failed to remove community_ban expired rows: {e}"))
.inspect_err(|e| error!("Failed to remove community_ban expired rows: {e}"))
.ok();
}
Err(e) => {
@ -450,6 +452,62 @@ async fn update_banned_when_expired(pool: &mut DbPool<'_>) {
}
}
/// Find all unpublished posts with scheduled date in the future, and publish them.
async fn publish_scheduled_posts(context: &Data<LemmyContext>) {
let pool = &mut context.pool();
let conn = get_conn(pool).await;
match conn {
Ok(mut conn) => {
let scheduled_posts: Vec<_> = post::table
.inner_join(community::table)
.inner_join(person::table)
// find all posts which have scheduled_publish_time that is in the past
.filter(post::scheduled_publish_time.is_not_null())
.filter(coalesce(post::scheduled_publish_time, now()).lt(now()))
// make sure the post, person and community are still around
.filter(not(post::deleted.or(post::removed)))
.filter(not(person::banned.or(person::deleted)))
.filter(not(community::removed.or(community::deleted)))
// ensure that user isnt banned from community
.filter(not(exists(
community_person_ban::table
.filter(community_person_ban::community_id.eq(community::id))
.filter(community_person_ban::person_id.eq(person::id)),
)))
.select((post::all_columns, community::all_columns))
.get_results::<(Post, Community)>(&mut conn)
.await
.inspect_err(|e| error!("Failed to read unpublished posts: {e}"))
.ok()
.unwrap_or_default();
for (post, community) in scheduled_posts {
// mark post as published in db
let form = PostUpdateForm {
scheduled_publish_time: Some(None),
..Default::default()
};
Post::update(&mut context.pool(), post.id, &form)
.await
.inspect_err(|e| error!("Failed update scheduled post: {e}"))
.ok();
// send out post via federation and webmention
let send_activity = SendActivityData::CreatePost(post.clone());
ActivityChannel::submit_activity(send_activity, context)
.await
.inspect_err(|e| error!("Failed federate scheduled post: {e}"))
.ok();
send_webmention(post, community);
}
}
Err(e) => {
error!("Failed to get connection from pool: {e}");
}
}
}
/// Updates the instance software and version.
///
/// Does so using the /.well-known/nodeinfo protocol described here: