diff --git a/src/invidious.cr b/src/invidious.cr index c69a4198..3758713c 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -2845,16 +2845,25 @@ post "/feed/webhook/:token" do |env| premiere_timestamp: video.premiere_timestamp, ) - PG_DB.exec("UPDATE users SET notifications = notifications || $1 \ - WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, video.ucid) + users = PG_DB.query_all("UPDATE users SET notifications = notifications || $1 \ + WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email", + video.id, video.published, video.ucid, as: String) video_array = video.to_a args = arg_array(video_array) PG_DB.exec("INSERT INTO channel_videos VALUES (#{args}) \ - ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \ - updated = $4, ucid = $5, author = $6, length_seconds = $7, \ - live_now = $8, premiere_timestamp = $9", video_array) + ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \ + updated = $4, ucid = $5, author = $6, length_seconds = $7, \ + live_now = $8, premiere_timestamp = $9", video_array) + + users.each do |user| + payload = { + "email" => user, + "action" => "refresh", + }.to_json + PG_DB.exec("NOTIFY feeds, E'#{payload}'") + end end end diff --git a/src/invidious/channels.cr b/src/invidious/channels.cr index 8e4aa0ec..c8de9543 100644 --- a/src/invidious/channels.cr +++ b/src/invidious/channels.cr @@ -178,18 +178,27 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) premiere_timestamp: premiere_timestamp ) - db.exec("UPDATE users SET notifications = notifications || $1 \ - WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, ucid) + users = db.query_all("UPDATE users SET notifications = notifications || $1 \ + WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email", + video.id, video.published, ucid, as: String) video_array = video.to_a args = arg_array(video_array) - # We don't include the 'premire_timestamp' here because channel pages don't include them, + # We don't include the 'premiere_timestamp' here because channel pages don't include them, # meaning the above timestamp is always null db.exec("INSERT INTO channel_videos VALUES (#{args}) \ ON CONFLICT (id) DO UPDATE SET title = $2, published = $3, \ updated = $4, ucid = $5, author = $6, length_seconds = $7, \ live_now = $8", video_array) + + users.each do |user| + payload = { + "email" => user, + "action" => "refresh", + }.to_json + PG_DB.exec("NOTIFY feeds, E'#{payload}'") + end end if pull_all_videos @@ -233,18 +242,29 @@ def fetch_channel(ucid, db, pull_all_videos = true, locale = nil) videos.each do |video| ids << video.id - # FIXME: Red videos don't provide published date, so the best we can do is ignore them + # We are notified of Red videos elsewhere (PubSub), which includes a correct published date, + # so since they don't provide a published date here we can safely ignore them. if Time.now - video.published > 1.minute - db.exec("UPDATE users SET notifications = notifications || $1 \ - WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications)", video.id, video.published, video.ucid) + users = db.query_all("UPDATE users SET notifications = notifications || $1 \ + WHERE updated < $2 AND $3 = ANY(subscriptions) AND $1 <> ALL(notifications) RETURNING email", + video.id, video.published, video.ucid, as: String) video_array = video.to_a args = arg_array(video_array) # We don't update the 'premire_timestamp' here because channel pages don't include them db.exec("INSERT INTO channel_videos VALUES (#{args}) \ - ON CONFLICT (id) DO UPDATE SET title = $2, updated = $4, \ - ucid = $5, author = $6, length_seconds = $7, live_now = $8", video_array) + ON CONFLICT (id) DO UPDATE SET title = $2, updated = $4, \ + ucid = $5, author = $6, length_seconds = $7, live_now = $8", video_array) + + # Update all users affected by insert + users.each do |user| + payload = { + "email" => user, + "action" => "refresh", + }.to_json + PG_DB.exec("NOTIFY feeds, E'#{payload}'") + end end end diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr index 9afd859f..05a0a5a4 100644 --- a/src/invidious/helpers/jobs.cr +++ b/src/invidious/helpers/jobs.cr @@ -45,6 +45,29 @@ end def refresh_feeds(db, logger, max_threads = 1) max_channel = Channel(Int32).new + # TODO: Make this config option, similar to use_pubsub + # Spawn thread to handle feed events + if max_threads > 0 + spawn do + PG.connect_listen(PG_URL, "feeds") do |event| + spawn do + feed = JSON.parse(event.payload) + email = feed["email"].as_s + action = feed["action"].as_s + + view_name = "subscriptions_#{sha256(email)}" + + case action + when "refresh" + db.exec("REFRESH MATERIALIZED VIEW #{view_name}") + end + end + + Fiber.yield + end + end + end + spawn do max_threads = max_channel.receive active_threads = 0