From addf73f8656307586fc4ae585bdda9b6e38a1743 Mon Sep 17 00:00:00 2001 From: Adam Miller Date: Mon, 31 Mar 2025 16:03:44 -0700 Subject: [PATCH] chore: Additional frontier testing and reformat --- brozzler/frontier.py | 14 ++++--- tests/test_brozzling.py | 2 +- tests/test_cli.py | 7 +++- tests/test_frontier.py | 89 ++++++++++++++++++++++++++++++++++++++--- tests/test_units.py | 1 + 5 files changed, 99 insertions(+), 14 deletions(-) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 97bb4d2..b199c1c 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -17,7 +17,7 @@ limitations under the License. """ import datetime -from typing import List, Dict +from typing import Dict, List import doublethink import rethinkdb as rdb @@ -34,7 +34,9 @@ class UnexpectedDbResult(Exception): def filter_claimable_site_ids( - active_sites: List[Dict], max_sites_to_claim=1 + active_sites: List[Dict], + reclaim_cooldown: int, + max_sites_to_claim=1, ) -> List[str]: job_counts = {} claimable_sites = [] @@ -45,7 +47,7 @@ def filter_claimable_site_ids( # If site not claimed and not disclaimed within last 20 seconds if not site["claimed"] and site.get("last_disclaimed", 0) <= ( - now - datetime.timedelta(seconds=20) + now - datetime.timedelta(seconds=reclaim_cooldown) ): is_claimable = True @@ -176,11 +178,13 @@ class RethinkDbFrontier: ) return active_sites - def claim_sites(self, n=1) -> List[Dict]: + def claim_sites(self, n=1, reclaim_cooldown=20) -> List[Dict]: self.logger.debug("claiming up to %s sites to brozzle", n) active_sites = self.get_active_sites() - site_ids_to_claim = filter_claimable_site_ids(active_sites, n) + site_ids_to_claim = filter_claimable_site_ids( + active_sites, reclaim_cooldown, max_sites_to_claim=n + ) result = ( self.rr.table("sites", read_mode="majority") .get_all(r.args(site_ids_to_claim)) diff --git a/tests/test_brozzling.py b/tests/test_brozzling.py index 0616e22..6216637 100755 --- a/tests/test_brozzling.py +++ b/tests/test_brozzling.py @@ -271,7 +271,7 @@ def test_proxy_down(): chrome_exe = brozzler.suggest_default_chrome_exe() with brozzler.Browser(chrome_exe=chrome_exe) as browser: - browser.stop() # We're manually instantiating the browser without arguments, + browser.stop() # We're manually instantiating the browser without arguments, # so it is running without a proxy. Stop it first. with pytest.raises(brozzler.ProxyError): worker.brozzle_page(browser, site, page) diff --git a/tests/test_cli.py b/tests/test_cli.py index 15f04f1..2e23ac1 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -76,8 +76,11 @@ def test_run_command(capsys, cmd): ) out, err = proc.communicate() # Remove lines from syntax warning in imported library - filtered_lines = [line for line in err.decode("utf-8").splitlines() if "reppy" not in line and - "re.compile" not in line] + filtered_lines = [ + line + for line in err.decode("utf-8").splitlines() + if "reppy" not in line and "re.compile" not in line + ] assert filtered_lines == [] assert out == ("brozzler %s - %s\n" % (brozzler.__version__, cmd)).encode("ascii") diff --git a/tests/test_frontier.py b/tests/test_frontier.py index bb3b69c..4af473c 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -20,6 +20,7 @@ limitations under the License. import argparse import datetime +import itertools import logging import os import time @@ -933,6 +934,7 @@ def test_max_claimed_sites(rethinker): rr.table("sites").delete().run() job_conf = { + "id": 1, "seeds": [ {"url": "http://example.com/1"}, {"url": "http://example.com/2"}, @@ -942,7 +944,7 @@ def test_max_claimed_sites(rethinker): ], "max_claimed_sites": 3, } - + seeds_seen = [] job = brozzler.new_job(frontier, job_conf) assert job.id @@ -962,13 +964,88 @@ def test_max_claimed_sites(rethinker): rr.table("jobs").delete().run() rr.table("sites").delete().run() - job = brozzler.new_job(frontier, job_conf) - claimed_sites = frontier.claim_sites(2) - assert len(claimed_sites) == 2 - claimed_sites = frontier.claim_sites(1) - assert len(claimed_sites) == 1 + +def test_max_claimed_sites_cross_job(rethinker): + rr = rethinker + frontier = brozzler.RethinkDbFrontier(rr) + + # clean slate + rr.table("jobs").delete().run() + rr.table("sites").delete().run() + + job_conf_1 = { + "id": 1, + "seeds": [ + {"url": "http://example.com/1"}, + {"url": "http://example.com/2"}, + {"url": "http://example.com/3"}, + {"url": "http://example.com/4"}, + {"url": "http://example.com/5"}, + ], + "max_claimed_sites": 3, + } + job_conf_2 = { + "id": 2, + "seeds": [ + {"url": "http://example.com/6"}, + {"url": "http://example.com/7"}, + {"url": "http://example.com/8"}, + {"url": "http://example.com/9"}, + {"url": "http://example.com/10"}, + ], + "max_claimed_sites": 3, + } + + seeds_seen = [] + job_1 = brozzler.new_job(frontier, job_conf_1) + job_2 = brozzler.new_job(frontier, job_conf_2) + + assert len(list(frontier.job_sites(job_1.id))) == 5 + assert len(list(frontier.job_sites(job_2.id))) == 5 + + claimed_sites_1 = frontier.claim_sites(4) + assert len(claimed_sites_1) == 4 + + sites_per_job = {} + for site in claimed_sites_1: + sites_per_job[site["job_id"]] = sites_per_job.get(site["job_id"], 0) + 1 + + # 2 jobs, max of 3 each. + assert len(sites_per_job.keys()) == 2 + assert sites_per_job[1] + sites_per_job[2] == 4 + assert sites_per_job[1] <= 3 and sites_per_job[2] <= 3 + + # 6 sites left in queue, but only 2 are still claimable due to max + claimed_sites_2 = frontier.claim_sites(6) + assert len(claimed_sites_2) == 2 + + # disclaim sites + for site in itertools.chain(claimed_sites_1, claimed_sites_2): + frontier.disclaim_site(site) + seeds_seen.append(site["seed"]) + + # Only 4 sites left in queue, that aren't recently claimed + claimed_sites_3 = frontier.claim_sites(6) + assert len(claimed_sites_3) == 4 + with pytest.raises(brozzler.NothingToClaim): claimed_sites = frontier.claim_sites(1) + assert len(claimed_sites) == 1 + + for site in claimed_sites_3: + seeds_seen.append(site["seed"]) + + # ensure all sites have been claimed at this point + for seed in itertools.chain(job_conf_1["seeds"], job_conf_2["seeds"]): + assert seed["url"] in seeds_seen + + # All unclaimed sites have been recently disclaimed and are not claimable + with pytest.raises(brozzler.NothingToClaim): + frontier.claim_sites(3) + + # Disable reclaim cooldown. With 4 claimed, we should have 2 available + claimed_sites_4 = frontier.claim_sites(4, reclaim_cooldown=0) + assert len(claimed_sites_4) == 2 # clean slate for the next one rr.table("jobs").delete().run() diff --git a/tests/test_units.py b/tests/test_units.py index 548aaef..a6ff9db 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -277,6 +277,7 @@ def test_ydl_proxy_down(): with pytest.raises(brozzler.ProxyError): brozzler.ydl.do_youtube_dl(worker, site, page) + def test_proxy_down(): """ Test all fetching scenarios raise `brozzler.ProxyError` when proxy is down.