diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 6e1ac63..27a3d7c 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -115,24 +115,29 @@ class RethinkDbFrontier: index="sites_last_disclaimed", ) .order_by(r.desc("claimed"), "last_disclaimed") - .fold( + .fold( # apply functions to sequence {}, - lambda acc, site: acc.merge( - r.branch( + lambda acc, + site: acc.merge( # add the following to the accumulator + r.branch( # if has job_id site.has_fields("job_id"), - r.object( + r.object( # then add this: key is stringified job_id, + # value starts at 0, but is incremented each time a site with + # the same job_id shows up in the result set. Used to get a + # value of how many sites for any given job are active site["job_id"].coerce_to("string"), acc[site["job_id"].coerce_to("string")] .default(0) .add(1), ), - {}, + {}, # else add nothing ) ), - emit=lambda acc, site, new_acc: r.branch( + emit=lambda acc, site, new_acc: r.branch( # big if conditional r.and_( r.or_( # Avoid tight loop when unclaimed site was recently disclaimed + # Not claimed and not disclaimed within last 20 seconds r.and_( site["claimed"].not_(), r.or_( @@ -140,8 +145,10 @@ class RethinkDbFrontier: site["last_disclaimed"].lt(r.now().sub(20)), ), ), + # or last claimed over 1 hour ago site["last_claimed"].lt(r.now().sub(60 * 60)), ), + # and either max_claimed_sites isn't set, or not exceeded r.or_( site.has_fields("max_claimed_sites").not_(), new_acc[site["job_id"].coerce_to("string")].le( @@ -149,14 +156,16 @@ class RethinkDbFrontier: ), ), ), - [site["id"]], - [], + [site["id"]], # then return this + [], # else nothing ), ) - .limit(n) + .limit(n) # trim results to max we want ) ) - .update( + .update( # mark the sites we're claiming, and return changed sites (our final claim + # results) + # # try to avoid a race condition resulting in multiple # brozzler-workers claiming the same site # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038