From b5ee8a9ea7af07b3a86f9cd73a0531db354da704 Mon Sep 17 00:00:00 2001 From: Adam Miller Date: Wed, 26 Mar 2025 18:06:55 -0700 Subject: [PATCH] feat: Create new claim_sites() query, and fix frontier tests --- brozzler/__init__.py | 3 +- brozzler/frontier.py | 143 +++++++++++++++++++++++------------------ tests/test_frontier.py | 103 ++++++++++++++++++++--------- 3 files changed, 155 insertions(+), 94 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 2150190..6c0f638 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -22,6 +22,7 @@ import logging import threading from importlib.metadata import version as _version +import doublethink import structlog import urlcanon @@ -398,7 +399,7 @@ def suggest_default_chrome_exe(): return "chromium-browser" -EPOCH_UTC = datetime.datetime.fromtimestamp(0.0, tz=datetime.timezone.utc) +EPOCH_UTC = datetime.datetime.fromtimestamp(0.0, tz=doublethink.UTC) from brozzler.browser import Browser, BrowserPool, BrowsingException # noqa: E402 from brozzler.robots import is_permitted_by_robots # noqa: E402 diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 27a3d7c..97bb4d2 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -16,6 +16,9 @@ See the License for the specific language governing permissions and limitations under the License. """ +import datetime +from typing import List, Dict + import doublethink import rethinkdb as rdb import structlog @@ -30,6 +33,57 @@ class UnexpectedDbResult(Exception): pass +def filter_claimable_site_ids( + active_sites: List[Dict], max_sites_to_claim=1 +) -> List[str]: + job_counts = {} + claimable_sites = [] + now = datetime.datetime.now(datetime.timezone.utc) + + for site in active_sites: + is_claimable = False + + # If site not claimed and not disclaimed within last 20 seconds + if not site["claimed"] and site.get("last_disclaimed", 0) <= ( + now - datetime.timedelta(seconds=20) + ): + is_claimable = True + + # or site has been disclaimed more than an hour ago + if "last_claimed" in site and site["last_claimed"] <= ( + now - datetime.timedelta(hours=1) + ): + is_claimable = True + + # Count number of claimed sites per job_id (optional field) + if site["claimed"] and "max_claimed_sites" in site and "job_id" in site: + job_id = site["job_id"] + job_counts[job_id] = job_counts.get(job_id, 0) + 1 + + if is_claimable: + claimable_sites.append(site) + + site_ids_to_claim = [] + # gather sites that are under the max without going over + for site in claimable_sites: + if ( + "max_claimed_sites" in site + and "job_id" in site + and job_counts.get(site["job_id"], 0) < site["max_claimed_sites"] + ): + site_ids_to_claim.append(site["id"]) + job_counts[site["job_id"]] = job_counts.get(site["job_id"], 0) + 1 + + if "max_claimed_sites" not in site or "job_id" not in site: + site_ids_to_claim.append(site["id"]) + + # short circuit if we already have more than requested + if len(site_ids_to_claim) >= max_sites_to_claim: + break + + return site_ids_to_claim + + class RethinkDbFrontier: logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) @@ -101,68 +155,35 @@ class RethinkDbFrontier: "expected %r to be %r in %r" % (k, expected, result) ) - def claim_sites(self, n=1): - self.logger.debug("claiming up to %s sites to brozzle", n) - result = ( - self.rr.table("sites") - .get_all( - r.args( - r.db(self.rr.dbname) - .table("sites", read_mode="majority") - .between( - ["ACTIVE", r.minval], - ["ACTIVE", r.maxval], - index="sites_last_disclaimed", - ) - .order_by(r.desc("claimed"), "last_disclaimed") - .fold( # apply functions to sequence - {}, - lambda acc, - site: acc.merge( # add the following to the accumulator - r.branch( # if has job_id - site.has_fields("job_id"), - r.object( # then add this: key is stringified job_id, - # value starts at 0, but is incremented each time a site with - # the same job_id shows up in the result set. Used to get a - # value of how many sites for any given job are active - site["job_id"].coerce_to("string"), - acc[site["job_id"].coerce_to("string")] - .default(0) - .add(1), - ), - {}, # else add nothing - ) - ), - emit=lambda acc, site, new_acc: r.branch( # big if conditional - r.and_( - r.or_( - # Avoid tight loop when unclaimed site was recently disclaimed - # Not claimed and not disclaimed within last 20 seconds - r.and_( - site["claimed"].not_(), - r.or_( - site.has_fields("last_disclaimed").not_(), - site["last_disclaimed"].lt(r.now().sub(20)), - ), - ), - # or last claimed over 1 hour ago - site["last_claimed"].lt(r.now().sub(60 * 60)), - ), - # and either max_claimed_sites isn't set, or not exceeded - r.or_( - site.has_fields("max_claimed_sites").not_(), - new_acc[site["job_id"].coerce_to("string")].le( - site["max_claimed_sites"] - ), - ), - ), - [site["id"]], # then return this - [], # else nothing - ), - ) - .limit(n) # trim results to max we want - ) + def get_active_sites(self) -> List[Dict]: + active_sites = ( + self.rr.table("sites", read_mode="majority") + .between( + ["ACTIVE", r.minval], + ["ACTIVE", r.maxval], + index="sites_last_disclaimed", ) + .pluck( + "id", + "last_disclaimed", + "claimed", + "last_claimed", + "job_id", + "max_claimed_sites", + ) + .order_by(r.desc("claimed"), "last_disclaimed") + .run() + ) + return active_sites + + def claim_sites(self, n=1) -> List[Dict]: + self.logger.debug("claiming up to %s sites to brozzle", n) + + active_sites = self.get_active_sites() + site_ids_to_claim = filter_claimable_site_ids(active_sites, n) + result = ( + self.rr.table("sites", read_mode="majority") + .get_all(r.args(site_ids_to_claim)) .update( # mark the sites we're claiming, and return changed sites (our final claim # results) # diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 4d48e73..b0735f3 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -21,6 +21,7 @@ limitations under the License. import argparse import datetime import logging +import os import time import doublethink @@ -35,15 +36,23 @@ args.log_level = logging.INFO brozzler.cli.configure_logging(args) -def test_rethinkdb_up(): +@pytest.fixture(scope="module") +def rethinker(request): + db = request.param if hasattr(request, "param") else "ignoreme" + servers = os.environ.get("BROZZLER_RETHINKDB_SERVERS", "localhost") + return doublethink.Rethinker(db=db, servers=servers.split(",")) # built-in db + + +@pytest.mark.parametrize("rethinker", ["rethinkdb"], indirect=True) +def test_rethinkdb_up(rethinker): """Checks that rethinkdb is listening and looks sane.""" - rr = doublethink.Rethinker(db="rethinkdb") # built-in db + rr = rethinker tbls = rr.table_list().run() assert len(tbls) > 10 -def test_basics(): - rr = doublethink.Rethinker(db="ignoreme") +def test_basics(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) job_conf = { "seeds": [{"url": "http://example.com"}, {"url": "https://example.org/"}] @@ -73,6 +82,7 @@ def test_basics(): "last_disclaimed": brozzler.EPOCH_UTC, "scope": {"accepts": [{"ssurt": "com,example,//http:/"}]}, "seed": "http://example.com", + "skip_ytdlp": None, "starts_and_stops": [ {"start": sites[0].starts_and_stops[0]["start"], "stop": None} ], @@ -86,6 +96,7 @@ def test_basics(): "last_disclaimed": brozzler.EPOCH_UTC, "scope": {"accepts": [{"ssurt": "org,example,//https:/"}]}, "seed": "https://example.org/", + "skip_ytdlp": None, "starts_and_stops": [ { "start": sites[1].starts_and_stops[0]["start"], @@ -100,28 +111,36 @@ def test_basics(): assert pages[0] == { "brozzle_count": 0, "claimed": False, + "failed_attempts": 0, + "hop_path": None, "hops_from_seed": 0, "hops_off": 0, "id": brozzler.Page.compute_id(sites[0].id, "http://example.com"), "job_id": job.id, "needs_robots_check": True, "priority": 1000, + "retry_after": None, "site_id": sites[0].id, "url": "http://example.com", + "via_page_url": None, } pages = list(frontier.site_pages(sites[1].id)) assert len(pages) == 1 assert pages[0] == { "brozzle_count": 0, "claimed": False, + "failed_attempts": 0, + "hop_path": None, "hops_from_seed": 0, "hops_off": 0, "id": brozzler.Page.compute_id(sites[1].id, "https://example.org/"), "job_id": job.id, "needs_robots_check": True, "priority": 1000, + "retry_after": None, "site_id": sites[1].id, "url": "https://example.org/", + "via_page_url": None, } # test "brozzled" parameter of frontier.site_pages @@ -140,13 +159,13 @@ def test_basics(): assert len(list(frontier.site_pages(sites[1].id, brozzled=False))) == 0 -def test_resume_job(): +def test_resume_job(rethinker): """ Tests that the right stuff gets twiddled in rethinkdb when we "start" and "finish" crawling a job. Doesn't actually crawl anything. """ # vagrant brozzler-worker isn't configured to look at the "ignoreme" db - rr = doublethink.Rethinker(db="ignoreme") + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) job_conf = {"seeds": [{"url": "http://example.com/"}]} job = brozzler.new_job(frontier, job_conf) @@ -343,12 +362,12 @@ def test_resume_job(): assert site2.starts_and_stops[1]["stop"] is None -def test_time_limit(): +def test_time_limit(rethinker): # XXX test not thoroughly adapted to change in time accounting, since # starts_and_stops is no longer used to enforce time limits # vagrant brozzler-worker isn't configured to look at the "ignoreme" db - rr = doublethink.Rethinker("localhost", db="ignoreme") + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) site = brozzler.Site(rr, {"seed": "http://example.com/", "time_limit": 99999}) brozzler.new_site(frontier, site) @@ -395,8 +414,8 @@ def test_time_limit(): frontier.enforce_time_limit(site) -def test_field_defaults(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_field_defaults(rethinker): + rr = rethinker # page brozzler.Page.table_ensure(rr) @@ -466,8 +485,8 @@ def test_field_defaults(): assert kob.starts_and_stops -def test_scope_and_schedule_outlinks(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_scope_and_schedule_outlinks(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) site = brozzler.Site(rr, {"seed": "http://example.com/"}) parent_page = brozzler.Page( @@ -510,8 +529,8 @@ def test_scope_and_schedule_outlinks(): assert brozzler.Page.load(rr, id) -def test_parent_url_scoping(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_parent_url_scoping(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) # scope rules that look at parent page url should consider both the @@ -624,8 +643,8 @@ def test_parent_url_scoping(): assert parent_page.outlinks["accepted"] == [] -def test_completed_page(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_completed_page(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) # redirect that changes scope surt @@ -718,8 +737,8 @@ def test_completed_page(): assert page.claimed is False -def test_seed_page(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_seed_page(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) site = brozzler.Site(rr, {"seed": "http://example.com/a/"}) @@ -742,8 +761,8 @@ def test_seed_page(): assert frontier.seed_page(site.id) == page0 -def test_hashtag_seed(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_hashtag_seed(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) # no hash tag @@ -771,8 +790,8 @@ def test_hashtag_seed(): ] -def test_hashtag_links(): - rr = doublethink.Rethinker("localhost", db="test_hashtag_links") +def test_hashtag_links(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) site = brozzler.Site(rr, {"seed": "http://example.org/"}) @@ -813,8 +832,8 @@ def test_hashtag_links(): assert pages[2].priority == 12 -def test_honor_stop_request(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_honor_stop_request(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) # 1. test stop request on job @@ -854,8 +873,8 @@ def test_honor_stop_request(): frontier.honor_stop_request(site) -def test_claim_site(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_claim_site(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) rr.table("sites").delete().run() # clean slate @@ -897,10 +916,10 @@ def test_claim_site(): rr.table("sites").get(claimed_site.id).delete().run() -def test_max_claimed_sites(): +def test_max_claimed_sites(rethinker): # max_claimed_sites is a brozzler job setting that puts a cap on the number # of the job's sites that can be brozzled simultaneously across the cluster - rr = doublethink.Rethinker("localhost", db="ignoreme") + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) # clean slate @@ -938,8 +957,8 @@ def test_max_claimed_sites(): rr.table("sites").delete().run() -def test_choose_warcprox(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_choose_warcprox(rethinker): + rr = rethinker svcreg = doublethink.ServiceRegistry(rr) frontier = brozzler.RethinkDbFrontier(rr) @@ -1060,8 +1079,8 @@ def test_choose_warcprox(): rr.table("services").delete().run() -def test_max_hops_off(): - rr = doublethink.Rethinker("localhost", db="ignoreme") +def test_max_hops_off(rethinker): + rr = rethinker frontier = brozzler.RethinkDbFrontier(rr) site = brozzler.Site( rr, @@ -1120,44 +1139,56 @@ def test_max_hops_off(): assert { "brozzle_count": 0, "claimed": False, + "failed_attempts": 0, "hashtags": [], + "hop_path": "L", "hops_from_seed": 1, "hops_off": 0, "id": brozzler.Page.compute_id(site.id, "http://example.com/toot"), "job_id": None, "needs_robots_check": False, "priority": 12, + "retry_after": None, "site_id": site.id, "url": "http://example.com/toot", "via_page_id": seed_page.id, + "via_page_url": "http://example.com/", } in pages assert { "brozzle_count": 0, "claimed": False, + "failed_attempts": 0, "hashtags": [], + "hop_path": "L", "hops_from_seed": 1, "hops_off": 1, "id": brozzler.Page.compute_id(site.id, "http://foo.org/"), "job_id": None, "needs_robots_check": False, "priority": 12, + "retry_after": None, "site_id": site.id, "url": "http://foo.org/", "via_page_id": seed_page.id, + "via_page_url": "http://example.com/", } in pages assert { "brozzle_count": 0, "claimed": False, + "failed_attempts": 0, "hashtags": [], + "hop_path": "L", "hops_from_seed": 1, "hops_off": 1, "id": brozzler.Page.compute_id(site.id, "https://example.com/toot"), "job_id": None, "needs_robots_check": False, "priority": 12, + "retry_after": None, "site_id": site.id, "url": "https://example.com/toot", "via_page_id": seed_page.id, + "via_page_url": "http://example.com/", } in pages # next hop is past max_hops_off, but normal in scope url is in scope @@ -1173,16 +1204,20 @@ def test_max_hops_off(): assert foo_page == { "brozzle_count": 0, "claimed": False, + "failed_attempts": 0, "hashtags": [], + "hop_path": "L", "hops_from_seed": 1, "hops_off": 1, "id": brozzler.Page.compute_id(site.id, "http://foo.org/"), "job_id": None, "needs_robots_check": False, "priority": 12, + "retry_after": None, "site_id": site.id, "url": "http://foo.org/", "via_page_id": seed_page.id, + "via_page_url": "http://example.com/", "outlinks": { "accepted": ["http://example.com/blah"], "blocked": [], @@ -1194,14 +1229,18 @@ def test_max_hops_off(): assert { "brozzle_count": 0, "claimed": False, + "failed_attempts": 0, "hashtags": [], + "hop_path": "LL", "hops_from_seed": 2, "hops_off": 0, "id": brozzler.Page.compute_id(site.id, "http://example.com/blah"), "job_id": None, "needs_robots_check": False, "priority": 11, + "retry_after": None, "site_id": site.id, "url": "http://example.com/blah", "via_page_id": foo_page.id, + "via_page_url": "http://foo.org/", } in pages