feat: Create new claim_sites() query, and fix frontier tests

This commit is contained in:
Adam Miller 2025-03-26 18:06:55 -07:00
parent 42b4a88c96
commit b5ee8a9ea7
3 changed files with 155 additions and 94 deletions

View file

@ -22,6 +22,7 @@ import logging
import threading import threading
from importlib.metadata import version as _version from importlib.metadata import version as _version
import doublethink
import structlog import structlog
import urlcanon import urlcanon
@ -398,7 +399,7 @@ def suggest_default_chrome_exe():
return "chromium-browser" return "chromium-browser"
EPOCH_UTC = datetime.datetime.fromtimestamp(0.0, tz=datetime.timezone.utc) EPOCH_UTC = datetime.datetime.fromtimestamp(0.0, tz=doublethink.UTC)
from brozzler.browser import Browser, BrowserPool, BrowsingException # noqa: E402 from brozzler.browser import Browser, BrowserPool, BrowsingException # noqa: E402
from brozzler.robots import is_permitted_by_robots # noqa: E402 from brozzler.robots import is_permitted_by_robots # noqa: E402

View file

@ -16,6 +16,9 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import datetime
from typing import List, Dict
import doublethink import doublethink
import rethinkdb as rdb import rethinkdb as rdb
import structlog import structlog
@ -30,6 +33,57 @@ class UnexpectedDbResult(Exception):
pass pass
def filter_claimable_site_ids(
active_sites: List[Dict], max_sites_to_claim=1
) -> List[str]:
job_counts = {}
claimable_sites = []
now = datetime.datetime.now(datetime.timezone.utc)
for site in active_sites:
is_claimable = False
# If site not claimed and not disclaimed within last 20 seconds
if not site["claimed"] and site.get("last_disclaimed", 0) <= (
now - datetime.timedelta(seconds=20)
):
is_claimable = True
# or site has been disclaimed more than an hour ago
if "last_claimed" in site and site["last_claimed"] <= (
now - datetime.timedelta(hours=1)
):
is_claimable = True
# Count number of claimed sites per job_id (optional field)
if site["claimed"] and "max_claimed_sites" in site and "job_id" in site:
job_id = site["job_id"]
job_counts[job_id] = job_counts.get(job_id, 0) + 1
if is_claimable:
claimable_sites.append(site)
site_ids_to_claim = []
# gather sites that are under the max without going over
for site in claimable_sites:
if (
"max_claimed_sites" in site
and "job_id" in site
and job_counts.get(site["job_id"], 0) < site["max_claimed_sites"]
):
site_ids_to_claim.append(site["id"])
job_counts[site["job_id"]] = job_counts.get(site["job_id"], 0) + 1
if "max_claimed_sites" not in site or "job_id" not in site:
site_ids_to_claim.append(site["id"])
# short circuit if we already have more than requested
if len(site_ids_to_claim) >= max_sites_to_claim:
break
return site_ids_to_claim
class RethinkDbFrontier: class RethinkDbFrontier:
logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
@ -101,68 +155,35 @@ class RethinkDbFrontier:
"expected %r to be %r in %r" % (k, expected, result) "expected %r to be %r in %r" % (k, expected, result)
) )
def claim_sites(self, n=1): def get_active_sites(self) -> List[Dict]:
self.logger.debug("claiming up to %s sites to brozzle", n) active_sites = (
result = ( self.rr.table("sites", read_mode="majority")
self.rr.table("sites")
.get_all(
r.args(
r.db(self.rr.dbname)
.table("sites", read_mode="majority")
.between( .between(
["ACTIVE", r.minval], ["ACTIVE", r.minval],
["ACTIVE", r.maxval], ["ACTIVE", r.maxval],
index="sites_last_disclaimed", index="sites_last_disclaimed",
) )
.pluck(
"id",
"last_disclaimed",
"claimed",
"last_claimed",
"job_id",
"max_claimed_sites",
)
.order_by(r.desc("claimed"), "last_disclaimed") .order_by(r.desc("claimed"), "last_disclaimed")
.fold( # apply functions to sequence .run()
{},
lambda acc,
site: acc.merge( # add the following to the accumulator
r.branch( # if has job_id
site.has_fields("job_id"),
r.object( # then add this: key is stringified job_id,
# value starts at 0, but is incremented each time a site with
# the same job_id shows up in the result set. Used to get a
# value of how many sites for any given job are active
site["job_id"].coerce_to("string"),
acc[site["job_id"].coerce_to("string")]
.default(0)
.add(1),
),
{}, # else add nothing
)
),
emit=lambda acc, site, new_acc: r.branch( # big if conditional
r.and_(
r.or_(
# Avoid tight loop when unclaimed site was recently disclaimed
# Not claimed and not disclaimed within last 20 seconds
r.and_(
site["claimed"].not_(),
r.or_(
site.has_fields("last_disclaimed").not_(),
site["last_disclaimed"].lt(r.now().sub(20)),
),
),
# or last claimed over 1 hour ago
site["last_claimed"].lt(r.now().sub(60 * 60)),
),
# and either max_claimed_sites isn't set, or not exceeded
r.or_(
site.has_fields("max_claimed_sites").not_(),
new_acc[site["job_id"].coerce_to("string")].le(
site["max_claimed_sites"]
),
),
),
[site["id"]], # then return this
[], # else nothing
),
)
.limit(n) # trim results to max we want
)
) )
return active_sites
def claim_sites(self, n=1) -> List[Dict]:
self.logger.debug("claiming up to %s sites to brozzle", n)
active_sites = self.get_active_sites()
site_ids_to_claim = filter_claimable_site_ids(active_sites, n)
result = (
self.rr.table("sites", read_mode="majority")
.get_all(r.args(site_ids_to_claim))
.update( # mark the sites we're claiming, and return changed sites (our final claim .update( # mark the sites we're claiming, and return changed sites (our final claim
# results) # results)
# #

View file

@ -21,6 +21,7 @@ limitations under the License.
import argparse import argparse
import datetime import datetime
import logging import logging
import os
import time import time
import doublethink import doublethink
@ -35,15 +36,23 @@ args.log_level = logging.INFO
brozzler.cli.configure_logging(args) brozzler.cli.configure_logging(args)
def test_rethinkdb_up(): @pytest.fixture(scope="module")
def rethinker(request):
db = request.param if hasattr(request, "param") else "ignoreme"
servers = os.environ.get("BROZZLER_RETHINKDB_SERVERS", "localhost")
return doublethink.Rethinker(db=db, servers=servers.split(",")) # built-in db
@pytest.mark.parametrize("rethinker", ["rethinkdb"], indirect=True)
def test_rethinkdb_up(rethinker):
"""Checks that rethinkdb is listening and looks sane.""" """Checks that rethinkdb is listening and looks sane."""
rr = doublethink.Rethinker(db="rethinkdb") # built-in db rr = rethinker
tbls = rr.table_list().run() tbls = rr.table_list().run()
assert len(tbls) > 10 assert len(tbls) > 10
def test_basics(): def test_basics(rethinker):
rr = doublethink.Rethinker(db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
job_conf = { job_conf = {
"seeds": [{"url": "http://example.com"}, {"url": "https://example.org/"}] "seeds": [{"url": "http://example.com"}, {"url": "https://example.org/"}]
@ -73,6 +82,7 @@ def test_basics():
"last_disclaimed": brozzler.EPOCH_UTC, "last_disclaimed": brozzler.EPOCH_UTC,
"scope": {"accepts": [{"ssurt": "com,example,//http:/"}]}, "scope": {"accepts": [{"ssurt": "com,example,//http:/"}]},
"seed": "http://example.com", "seed": "http://example.com",
"skip_ytdlp": None,
"starts_and_stops": [ "starts_and_stops": [
{"start": sites[0].starts_and_stops[0]["start"], "stop": None} {"start": sites[0].starts_and_stops[0]["start"], "stop": None}
], ],
@ -86,6 +96,7 @@ def test_basics():
"last_disclaimed": brozzler.EPOCH_UTC, "last_disclaimed": brozzler.EPOCH_UTC,
"scope": {"accepts": [{"ssurt": "org,example,//https:/"}]}, "scope": {"accepts": [{"ssurt": "org,example,//https:/"}]},
"seed": "https://example.org/", "seed": "https://example.org/",
"skip_ytdlp": None,
"starts_and_stops": [ "starts_and_stops": [
{ {
"start": sites[1].starts_and_stops[0]["start"], "start": sites[1].starts_and_stops[0]["start"],
@ -100,28 +111,36 @@ def test_basics():
assert pages[0] == { assert pages[0] == {
"brozzle_count": 0, "brozzle_count": 0,
"claimed": False, "claimed": False,
"failed_attempts": 0,
"hop_path": None,
"hops_from_seed": 0, "hops_from_seed": 0,
"hops_off": 0, "hops_off": 0,
"id": brozzler.Page.compute_id(sites[0].id, "http://example.com"), "id": brozzler.Page.compute_id(sites[0].id, "http://example.com"),
"job_id": job.id, "job_id": job.id,
"needs_robots_check": True, "needs_robots_check": True,
"priority": 1000, "priority": 1000,
"retry_after": None,
"site_id": sites[0].id, "site_id": sites[0].id,
"url": "http://example.com", "url": "http://example.com",
"via_page_url": None,
} }
pages = list(frontier.site_pages(sites[1].id)) pages = list(frontier.site_pages(sites[1].id))
assert len(pages) == 1 assert len(pages) == 1
assert pages[0] == { assert pages[0] == {
"brozzle_count": 0, "brozzle_count": 0,
"claimed": False, "claimed": False,
"failed_attempts": 0,
"hop_path": None,
"hops_from_seed": 0, "hops_from_seed": 0,
"hops_off": 0, "hops_off": 0,
"id": brozzler.Page.compute_id(sites[1].id, "https://example.org/"), "id": brozzler.Page.compute_id(sites[1].id, "https://example.org/"),
"job_id": job.id, "job_id": job.id,
"needs_robots_check": True, "needs_robots_check": True,
"priority": 1000, "priority": 1000,
"retry_after": None,
"site_id": sites[1].id, "site_id": sites[1].id,
"url": "https://example.org/", "url": "https://example.org/",
"via_page_url": None,
} }
# test "brozzled" parameter of frontier.site_pages # test "brozzled" parameter of frontier.site_pages
@ -140,13 +159,13 @@ def test_basics():
assert len(list(frontier.site_pages(sites[1].id, brozzled=False))) == 0 assert len(list(frontier.site_pages(sites[1].id, brozzled=False))) == 0
def test_resume_job(): def test_resume_job(rethinker):
""" """
Tests that the right stuff gets twiddled in rethinkdb when we "start" and Tests that the right stuff gets twiddled in rethinkdb when we "start" and
"finish" crawling a job. Doesn't actually crawl anything. "finish" crawling a job. Doesn't actually crawl anything.
""" """
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db # vagrant brozzler-worker isn't configured to look at the "ignoreme" db
rr = doublethink.Rethinker(db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
job_conf = {"seeds": [{"url": "http://example.com/"}]} job_conf = {"seeds": [{"url": "http://example.com/"}]}
job = brozzler.new_job(frontier, job_conf) job = brozzler.new_job(frontier, job_conf)
@ -343,12 +362,12 @@ def test_resume_job():
assert site2.starts_and_stops[1]["stop"] is None assert site2.starts_and_stops[1]["stop"] is None
def test_time_limit(): def test_time_limit(rethinker):
# XXX test not thoroughly adapted to change in time accounting, since # XXX test not thoroughly adapted to change in time accounting, since
# starts_and_stops is no longer used to enforce time limits # starts_and_stops is no longer used to enforce time limits
# vagrant brozzler-worker isn't configured to look at the "ignoreme" db # vagrant brozzler-worker isn't configured to look at the "ignoreme" db
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
site = brozzler.Site(rr, {"seed": "http://example.com/", "time_limit": 99999}) site = brozzler.Site(rr, {"seed": "http://example.com/", "time_limit": 99999})
brozzler.new_site(frontier, site) brozzler.new_site(frontier, site)
@ -395,8 +414,8 @@ def test_time_limit():
frontier.enforce_time_limit(site) frontier.enforce_time_limit(site)
def test_field_defaults(): def test_field_defaults(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
# page # page
brozzler.Page.table_ensure(rr) brozzler.Page.table_ensure(rr)
@ -466,8 +485,8 @@ def test_field_defaults():
assert kob.starts_and_stops assert kob.starts_and_stops
def test_scope_and_schedule_outlinks(): def test_scope_and_schedule_outlinks(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
site = brozzler.Site(rr, {"seed": "http://example.com/"}) site = brozzler.Site(rr, {"seed": "http://example.com/"})
parent_page = brozzler.Page( parent_page = brozzler.Page(
@ -510,8 +529,8 @@ def test_scope_and_schedule_outlinks():
assert brozzler.Page.load(rr, id) assert brozzler.Page.load(rr, id)
def test_parent_url_scoping(): def test_parent_url_scoping(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
# scope rules that look at parent page url should consider both the # scope rules that look at parent page url should consider both the
@ -624,8 +643,8 @@ def test_parent_url_scoping():
assert parent_page.outlinks["accepted"] == [] assert parent_page.outlinks["accepted"] == []
def test_completed_page(): def test_completed_page(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
# redirect that changes scope surt # redirect that changes scope surt
@ -718,8 +737,8 @@ def test_completed_page():
assert page.claimed is False assert page.claimed is False
def test_seed_page(): def test_seed_page(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
site = brozzler.Site(rr, {"seed": "http://example.com/a/"}) site = brozzler.Site(rr, {"seed": "http://example.com/a/"})
@ -742,8 +761,8 @@ def test_seed_page():
assert frontier.seed_page(site.id) == page0 assert frontier.seed_page(site.id) == page0
def test_hashtag_seed(): def test_hashtag_seed(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
# no hash tag # no hash tag
@ -771,8 +790,8 @@ def test_hashtag_seed():
] ]
def test_hashtag_links(): def test_hashtag_links(rethinker):
rr = doublethink.Rethinker("localhost", db="test_hashtag_links") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
site = brozzler.Site(rr, {"seed": "http://example.org/"}) site = brozzler.Site(rr, {"seed": "http://example.org/"})
@ -813,8 +832,8 @@ def test_hashtag_links():
assert pages[2].priority == 12 assert pages[2].priority == 12
def test_honor_stop_request(): def test_honor_stop_request(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
# 1. test stop request on job # 1. test stop request on job
@ -854,8 +873,8 @@ def test_honor_stop_request():
frontier.honor_stop_request(site) frontier.honor_stop_request(site)
def test_claim_site(): def test_claim_site(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
rr.table("sites").delete().run() # clean slate rr.table("sites").delete().run() # clean slate
@ -897,10 +916,10 @@ def test_claim_site():
rr.table("sites").get(claimed_site.id).delete().run() rr.table("sites").get(claimed_site.id).delete().run()
def test_max_claimed_sites(): def test_max_claimed_sites(rethinker):
# max_claimed_sites is a brozzler job setting that puts a cap on the number # max_claimed_sites is a brozzler job setting that puts a cap on the number
# of the job's sites that can be brozzled simultaneously across the cluster # of the job's sites that can be brozzled simultaneously across the cluster
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
# clean slate # clean slate
@ -938,8 +957,8 @@ def test_max_claimed_sites():
rr.table("sites").delete().run() rr.table("sites").delete().run()
def test_choose_warcprox(): def test_choose_warcprox(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
svcreg = doublethink.ServiceRegistry(rr) svcreg = doublethink.ServiceRegistry(rr)
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
@ -1060,8 +1079,8 @@ def test_choose_warcprox():
rr.table("services").delete().run() rr.table("services").delete().run()
def test_max_hops_off(): def test_max_hops_off(rethinker):
rr = doublethink.Rethinker("localhost", db="ignoreme") rr = rethinker
frontier = brozzler.RethinkDbFrontier(rr) frontier = brozzler.RethinkDbFrontier(rr)
site = brozzler.Site( site = brozzler.Site(
rr, rr,
@ -1120,44 +1139,56 @@ def test_max_hops_off():
assert { assert {
"brozzle_count": 0, "brozzle_count": 0,
"claimed": False, "claimed": False,
"failed_attempts": 0,
"hashtags": [], "hashtags": [],
"hop_path": "L",
"hops_from_seed": 1, "hops_from_seed": 1,
"hops_off": 0, "hops_off": 0,
"id": brozzler.Page.compute_id(site.id, "http://example.com/toot"), "id": brozzler.Page.compute_id(site.id, "http://example.com/toot"),
"job_id": None, "job_id": None,
"needs_robots_check": False, "needs_robots_check": False,
"priority": 12, "priority": 12,
"retry_after": None,
"site_id": site.id, "site_id": site.id,
"url": "http://example.com/toot", "url": "http://example.com/toot",
"via_page_id": seed_page.id, "via_page_id": seed_page.id,
"via_page_url": "http://example.com/",
} in pages } in pages
assert { assert {
"brozzle_count": 0, "brozzle_count": 0,
"claimed": False, "claimed": False,
"failed_attempts": 0,
"hashtags": [], "hashtags": [],
"hop_path": "L",
"hops_from_seed": 1, "hops_from_seed": 1,
"hops_off": 1, "hops_off": 1,
"id": brozzler.Page.compute_id(site.id, "http://foo.org/"), "id": brozzler.Page.compute_id(site.id, "http://foo.org/"),
"job_id": None, "job_id": None,
"needs_robots_check": False, "needs_robots_check": False,
"priority": 12, "priority": 12,
"retry_after": None,
"site_id": site.id, "site_id": site.id,
"url": "http://foo.org/", "url": "http://foo.org/",
"via_page_id": seed_page.id, "via_page_id": seed_page.id,
"via_page_url": "http://example.com/",
} in pages } in pages
assert { assert {
"brozzle_count": 0, "brozzle_count": 0,
"claimed": False, "claimed": False,
"failed_attempts": 0,
"hashtags": [], "hashtags": [],
"hop_path": "L",
"hops_from_seed": 1, "hops_from_seed": 1,
"hops_off": 1, "hops_off": 1,
"id": brozzler.Page.compute_id(site.id, "https://example.com/toot"), "id": brozzler.Page.compute_id(site.id, "https://example.com/toot"),
"job_id": None, "job_id": None,
"needs_robots_check": False, "needs_robots_check": False,
"priority": 12, "priority": 12,
"retry_after": None,
"site_id": site.id, "site_id": site.id,
"url": "https://example.com/toot", "url": "https://example.com/toot",
"via_page_id": seed_page.id, "via_page_id": seed_page.id,
"via_page_url": "http://example.com/",
} in pages } in pages
# next hop is past max_hops_off, but normal in scope url is in scope # next hop is past max_hops_off, but normal in scope url is in scope
@ -1173,16 +1204,20 @@ def test_max_hops_off():
assert foo_page == { assert foo_page == {
"brozzle_count": 0, "brozzle_count": 0,
"claimed": False, "claimed": False,
"failed_attempts": 0,
"hashtags": [], "hashtags": [],
"hop_path": "L",
"hops_from_seed": 1, "hops_from_seed": 1,
"hops_off": 1, "hops_off": 1,
"id": brozzler.Page.compute_id(site.id, "http://foo.org/"), "id": brozzler.Page.compute_id(site.id, "http://foo.org/"),
"job_id": None, "job_id": None,
"needs_robots_check": False, "needs_robots_check": False,
"priority": 12, "priority": 12,
"retry_after": None,
"site_id": site.id, "site_id": site.id,
"url": "http://foo.org/", "url": "http://foo.org/",
"via_page_id": seed_page.id, "via_page_id": seed_page.id,
"via_page_url": "http://example.com/",
"outlinks": { "outlinks": {
"accepted": ["http://example.com/blah"], "accepted": ["http://example.com/blah"],
"blocked": [], "blocked": [],
@ -1194,14 +1229,18 @@ def test_max_hops_off():
assert { assert {
"brozzle_count": 0, "brozzle_count": 0,
"claimed": False, "claimed": False,
"failed_attempts": 0,
"hashtags": [], "hashtags": [],
"hop_path": "LL",
"hops_from_seed": 2, "hops_from_seed": 2,
"hops_off": 0, "hops_off": 0,
"id": brozzler.Page.compute_id(site.id, "http://example.com/blah"), "id": brozzler.Page.compute_id(site.id, "http://example.com/blah"),
"job_id": None, "job_id": None,
"needs_robots_check": False, "needs_robots_check": False,
"priority": 11, "priority": 11,
"retry_after": None,
"site_id": site.id, "site_id": site.id,
"url": "http://example.com/blah", "url": "http://example.com/blah",
"via_page_id": foo_page.id, "via_page_id": foo_page.id,
"via_page_url": "http://foo.org/",
} in pages } in pages