Merge pull request #347 from internetarchive/adam/annotate_claim_sites
Some checks failed
Python Formatting Check / formatting (push) Has been cancelled
Tests / Run tests (3.12) (push) Has been cancelled
Tests / Run tests (3.8) (push) Has been cancelled
Full test suite / Run tests (push) Has been cancelled

chore: annotate claim_sites()
This commit is contained in:
Adam Miller 2025-03-26 10:19:17 -07:00 committed by GitHub
commit 42b4a88c96
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -115,24 +115,29 @@ class RethinkDbFrontier:
index="sites_last_disclaimed", index="sites_last_disclaimed",
) )
.order_by(r.desc("claimed"), "last_disclaimed") .order_by(r.desc("claimed"), "last_disclaimed")
.fold( .fold( # apply functions to sequence
{}, {},
lambda acc, site: acc.merge( lambda acc,
r.branch( site: acc.merge( # add the following to the accumulator
r.branch( # if has job_id
site.has_fields("job_id"), site.has_fields("job_id"),
r.object( r.object( # then add this: key is stringified job_id,
# value starts at 0, but is incremented each time a site with
# the same job_id shows up in the result set. Used to get a
# value of how many sites for any given job are active
site["job_id"].coerce_to("string"), site["job_id"].coerce_to("string"),
acc[site["job_id"].coerce_to("string")] acc[site["job_id"].coerce_to("string")]
.default(0) .default(0)
.add(1), .add(1),
), ),
{}, {}, # else add nothing
) )
), ),
emit=lambda acc, site, new_acc: r.branch( emit=lambda acc, site, new_acc: r.branch( # big if conditional
r.and_( r.and_(
r.or_( r.or_(
# Avoid tight loop when unclaimed site was recently disclaimed # Avoid tight loop when unclaimed site was recently disclaimed
# Not claimed and not disclaimed within last 20 seconds
r.and_( r.and_(
site["claimed"].not_(), site["claimed"].not_(),
r.or_( r.or_(
@ -140,8 +145,10 @@ class RethinkDbFrontier:
site["last_disclaimed"].lt(r.now().sub(20)), site["last_disclaimed"].lt(r.now().sub(20)),
), ),
), ),
# or last claimed over 1 hour ago
site["last_claimed"].lt(r.now().sub(60 * 60)), site["last_claimed"].lt(r.now().sub(60 * 60)),
), ),
# and either max_claimed_sites isn't set, or not exceeded
r.or_( r.or_(
site.has_fields("max_claimed_sites").not_(), site.has_fields("max_claimed_sites").not_(),
new_acc[site["job_id"].coerce_to("string")].le( new_acc[site["job_id"].coerce_to("string")].le(
@ -149,14 +156,16 @@ class RethinkDbFrontier:
), ),
), ),
), ),
[site["id"]], [site["id"]], # then return this
[], [], # else nothing
), ),
) )
.limit(n) .limit(n) # trim results to max we want
) )
) )
.update( .update( # mark the sites we're claiming, and return changed sites (our final claim
# results)
#
# try to avoid a race condition resulting in multiple # try to avoid a race condition resulting in multiple
# brozzler-workers claiming the same site # brozzler-workers claiming the same site
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038