Add proper queuing for feed events

This commit is contained in:
Omar Roth 2019-05-27 12:23:15 -05:00
parent de77c71042
commit 3ac766530d
No known key found for this signature in database
GPG Key ID: B8254FB7EC3D37F2

View File

@ -45,13 +45,14 @@ end
def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false) def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false)
max_channel = Channel(Int32).new max_channel = Channel(Int32).new
# TODO: Instead of Fiber.yield, use proper queuing to prevent overloading DB
# Spawn thread to handle feed events # Spawn thread to handle feed events
if use_feed_events if use_feed_events
queue = Deque(String).new(30)
spawn do spawn do
PG.connect_listen(PG_URL, "feeds") do |event| loop do
spawn do if event = queue.shift?
feed = JSON.parse(event.payload) feed = JSON.parse(event)
email = feed["email"].as_s email = feed["email"].as_s
action = feed["action"].as_s action = feed["action"].as_s
@ -61,11 +62,20 @@ def refresh_feeds(db, logger, max_threads = 1, use_feed_events = false)
when "refresh" when "refresh"
db.exec("REFRESH MATERIALIZED VIEW #{view_name}") db.exec("REFRESH MATERIALIZED VIEW #{view_name}")
end end
# Delete any future events that we just processed
queue.delete(event)
else
sleep 1.second
end end
Fiber.yield Fiber.yield
end end
end end
PG.connect_listen(PG_URL, "feeds") do |event|
queue << event.payload
end
end end
spawn do spawn do