chore: Additional frontier testing and reformat

This commit is contained in:
Adam Miller 2025-03-31 16:03:44 -07:00
parent e7e4225bf2
commit addf73f865
5 changed files with 99 additions and 14 deletions

View file

@ -17,7 +17,7 @@ limitations under the License.
""" """
import datetime import datetime
from typing import List, Dict from typing import Dict, List
import doublethink import doublethink
import rethinkdb as rdb import rethinkdb as rdb
@ -34,7 +34,9 @@ class UnexpectedDbResult(Exception):
def filter_claimable_site_ids( def filter_claimable_site_ids(
active_sites: List[Dict], max_sites_to_claim=1 active_sites: List[Dict],
reclaim_cooldown: int,
max_sites_to_claim=1,
) -> List[str]: ) -> List[str]:
job_counts = {} job_counts = {}
claimable_sites = [] claimable_sites = []
@ -45,7 +47,7 @@ def filter_claimable_site_ids(
# If site not claimed and not disclaimed within last 20 seconds # If site not claimed and not disclaimed within last 20 seconds
if not site["claimed"] and site.get("last_disclaimed", 0) <= ( if not site["claimed"] and site.get("last_disclaimed", 0) <= (
now - datetime.timedelta(seconds=20) now - datetime.timedelta(seconds=reclaim_cooldown)
): ):
is_claimable = True is_claimable = True
@ -176,11 +178,13 @@ class RethinkDbFrontier:
) )
return active_sites return active_sites
def claim_sites(self, n=1) -> List[Dict]: def claim_sites(self, n=1, reclaim_cooldown=20) -> List[Dict]:
self.logger.debug("claiming up to %s sites to brozzle", n) self.logger.debug("claiming up to %s sites to brozzle", n)
active_sites = self.get_active_sites() active_sites = self.get_active_sites()
site_ids_to_claim = filter_claimable_site_ids(active_sites, n) site_ids_to_claim = filter_claimable_site_ids(
active_sites, reclaim_cooldown, max_sites_to_claim=n
)
result = ( result = (
self.rr.table("sites", read_mode="majority") self.rr.table("sites", read_mode="majority")
.get_all(r.args(site_ids_to_claim)) .get_all(r.args(site_ids_to_claim))

View file

@ -76,8 +76,11 @@ def test_run_command(capsys, cmd):
) )
out, err = proc.communicate() out, err = proc.communicate()
# Remove lines from syntax warning in imported library # Remove lines from syntax warning in imported library
filtered_lines = [line for line in err.decode("utf-8").splitlines() if "reppy" not in line and filtered_lines = [
"re.compile" not in line] line
for line in err.decode("utf-8").splitlines()
if "reppy" not in line and "re.compile" not in line
]
assert filtered_lines == [] assert filtered_lines == []
assert out == ("brozzler %s - %s\n" % (brozzler.__version__, cmd)).encode("ascii") assert out == ("brozzler %s - %s\n" % (brozzler.__version__, cmd)).encode("ascii")

View file

@ -20,6 +20,7 @@ limitations under the License.
import argparse import argparse
import datetime import datetime
import itertools
import logging import logging
import os import os
import time import time
@ -933,6 +934,7 @@ def test_max_claimed_sites(rethinker):
rr.table("sites").delete().run() rr.table("sites").delete().run()
job_conf = { job_conf = {
"id": 1,
"seeds": [ "seeds": [
{"url": "http://example.com/1"}, {"url": "http://example.com/1"},
{"url": "http://example.com/2"}, {"url": "http://example.com/2"},
@ -942,7 +944,7 @@ def test_max_claimed_sites(rethinker):
], ],
"max_claimed_sites": 3, "max_claimed_sites": 3,
} }
seeds_seen = []
job = brozzler.new_job(frontier, job_conf) job = brozzler.new_job(frontier, job_conf)
assert job.id assert job.id
@ -962,13 +964,88 @@ def test_max_claimed_sites(rethinker):
rr.table("jobs").delete().run() rr.table("jobs").delete().run()
rr.table("sites").delete().run() rr.table("sites").delete().run()
job = brozzler.new_job(frontier, job_conf)
claimed_sites = frontier.claim_sites(2) def test_max_claimed_sites_cross_job(rethinker):
assert len(claimed_sites) == 2 rr = rethinker
claimed_sites = frontier.claim_sites(1) frontier = brozzler.RethinkDbFrontier(rr)
assert len(claimed_sites) == 1
# clean slate
rr.table("jobs").delete().run()
rr.table("sites").delete().run()
job_conf_1 = {
"id": 1,
"seeds": [
{"url": "http://example.com/1"},
{"url": "http://example.com/2"},
{"url": "http://example.com/3"},
{"url": "http://example.com/4"},
{"url": "http://example.com/5"},
],
"max_claimed_sites": 3,
}
job_conf_2 = {
"id": 2,
"seeds": [
{"url": "http://example.com/6"},
{"url": "http://example.com/7"},
{"url": "http://example.com/8"},
{"url": "http://example.com/9"},
{"url": "http://example.com/10"},
],
"max_claimed_sites": 3,
}
seeds_seen = []
job_1 = brozzler.new_job(frontier, job_conf_1)
job_2 = brozzler.new_job(frontier, job_conf_2)
assert len(list(frontier.job_sites(job_1.id))) == 5
assert len(list(frontier.job_sites(job_2.id))) == 5
claimed_sites_1 = frontier.claim_sites(4)
assert len(claimed_sites_1) == 4
sites_per_job = {}
for site in claimed_sites_1:
sites_per_job[site["job_id"]] = sites_per_job.get(site["job_id"], 0) + 1
# 2 jobs, max of 3 each.
assert len(sites_per_job.keys()) == 2
assert sites_per_job[1] + sites_per_job[2] == 4
assert sites_per_job[1] <= 3 and sites_per_job[2] <= 3
# 6 sites left in queue, but only 2 are still claimable due to max
claimed_sites_2 = frontier.claim_sites(6)
assert len(claimed_sites_2) == 2
# disclaim sites
for site in itertools.chain(claimed_sites_1, claimed_sites_2):
frontier.disclaim_site(site)
seeds_seen.append(site["seed"])
# Only 4 sites left in queue, that aren't recently claimed
claimed_sites_3 = frontier.claim_sites(6)
assert len(claimed_sites_3) == 4
with pytest.raises(brozzler.NothingToClaim): with pytest.raises(brozzler.NothingToClaim):
claimed_sites = frontier.claim_sites(1) claimed_sites = frontier.claim_sites(1)
assert len(claimed_sites) == 1
for site in claimed_sites_3:
seeds_seen.append(site["seed"])
# ensure all sites have been claimed at this point
for seed in itertools.chain(job_conf_1["seeds"], job_conf_2["seeds"]):
assert seed["url"] in seeds_seen
# All unclaimed sites have been recently disclaimed and are not claimable
with pytest.raises(brozzler.NothingToClaim):
frontier.claim_sites(3)
# Disable reclaim cooldown. With 4 claimed, we should have 2 available
claimed_sites_4 = frontier.claim_sites(4, reclaim_cooldown=0)
assert len(claimed_sites_4) == 2
# clean slate for the next one # clean slate for the next one
rr.table("jobs").delete().run() rr.table("jobs").delete().run()

View file

@ -277,6 +277,7 @@ def test_ydl_proxy_down():
with pytest.raises(brozzler.ProxyError): with pytest.raises(brozzler.ProxyError):
brozzler.ydl.do_youtube_dl(worker, site, page) brozzler.ydl.do_youtube_dl(worker, site, page)
def test_proxy_down(): def test_proxy_down():
""" """
Test all fetching scenarios raise `brozzler.ProxyError` when proxy is down. Test all fetching scenarios raise `brozzler.ProxyError` when proxy is down.