frontier: convert to structlog

This commit is contained in:
Misty De Méo 2025-02-18 14:22:22 -08:00
parent cf6b423019
commit d4b493f1ae

View File

@ -16,12 +16,12 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
import brozzler
import random
import time
import datetime
import rethinkdb as rdb
import structlog
import doublethink
import urlcanon
@ -33,7 +33,7 @@ class UnexpectedDbResult(Exception):
class RethinkDbFrontier:
logger = logging.getLogger(__module__ + "." + __qualname__)
logger = structlog.get_logger(__module__ + "." + __qualname__)
def __init__(self, rr, shards=None, replicas=None):
self.rr = rr
@ -42,14 +42,16 @@ class RethinkDbFrontier:
self._ensure_db()
def _ensure_db(self):
db_logger = self.logger.bind(dbname=self.rr.dbname)
dbs = self.rr.db_list().run()
if not self.rr.dbname in dbs:
self.logger.info("creating rethinkdb database %r", self.rr.dbname)
db_logger.info("creating rethinkdb database")
self.rr.db_create(self.rr.dbname).run()
tables = self.rr.table_list().run()
if not "sites" in tables:
self.logger.info(
"creating rethinkdb table 'sites' in database %r", self.rr.dbname
db_logger.info(
"creating rethinkdb table 'sites' in database"
)
self.rr.table_create(
"sites", shards=self.shards, replicas=self.replicas
@ -59,8 +61,8 @@ class RethinkDbFrontier:
).run()
self.rr.table("sites").index_create("job_id").run()
if not "pages" in tables:
self.logger.info(
"creating rethinkdb table 'pages' in database %r", self.rr.dbname
db_logger.info(
"creating rethinkdb table 'pages' in database"
)
self.rr.table_create(
"pages", shards=self.shards, replicas=self.replicas
@ -81,8 +83,8 @@ class RethinkDbFrontier:
[r.row["site_id"], r.row["brozzle_count"], r.row["hops_from_seed"]],
).run()
if not "jobs" in tables:
self.logger.info(
"creating rethinkdb table 'jobs' in database %r", self.rr.dbname
db_logger.info(
"creating rethinkdb table 'jobs' in database"
)
self.rr.table_create(
"jobs", shards=self.shards, replicas=self.replicas
@ -108,7 +110,7 @@ class RethinkDbFrontier:
)
def claim_sites(self, n=1):
self.logger.trace("claiming up to %s sites to brozzle", n)
self.logger.debug("claiming up to %s sites to brozzle", n)
result = (
self.rr.table("sites")
.get_all(
@ -186,10 +188,10 @@ class RethinkDbFrontier:
if result["changes"][i]["old_val"]["claimed"]:
self.logger.warning(
"re-claimed site that was still marked 'claimed' "
"because it was last claimed a long time ago "
"at %s, and presumably some error stopped it from "
"because it was last claimed a long time ago, "
"and presumably some error stopped it from "
"being disclaimed",
result["changes"][i]["old_val"]["last_claimed"],
last_claimed=result["changes"][i]["old_val"]["last_claimed"],
)
site = brozzler.Site(self.rr, result["changes"][i]["new_val"])
sites.append(site)
@ -205,10 +207,10 @@ class RethinkDbFrontier:
"""
if site.time_limit and site.time_limit > 0 and site.elapsed() > site.time_limit:
self.logger.debug(
"site FINISHED_TIME_LIMIT! time_limit=%s " "elapsed=%s %s",
site.time_limit,
site.elapsed(),
site,
"site FINISHED_TIME_LIMIT!",
time_limit=site.time_limit,
elapsed=site.elapsed(),
site=site,
)
raise brozzler.ReachedTimeLimit
@ -273,7 +275,7 @@ class RethinkDbFrontier:
"""Raises brozzler.CrawlStopped if stop has been requested."""
site.refresh()
if site.stop_requested and site.stop_requested <= doublethink.utcnow():
self.logger.info("stop requested for site %s", site.id)
self.logger.info("stop requested for site", site_id=site.id)
raise brozzler.CrawlStopped
if site.job_id:
@ -283,7 +285,7 @@ class RethinkDbFrontier:
and job.stop_requested
and job.stop_requested <= doublethink.utcnow()
):
self.logger.info("stop requested for job %s", site.job_id)
self.logger.info("stop requested for job", job_id=site.job_id)
raise brozzler.CrawlStopped
def _maybe_finish_job(self, job_id):
@ -304,7 +306,7 @@ class RethinkDbFrontier:
return False
n += 1
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id)
self.logger.info("all %s sites finished, job is FINISHED!", n, job_id=job.id)
job.finish()
job.save()
return True
@ -320,7 +322,7 @@ class RethinkDbFrontier:
self._maybe_finish_job(site.job_id)
def disclaim_site(self, site, page=None):
self.logger.info("disclaiming %s", site)
self.logger.info("disclaiming", site=site)
site.claimed = False
site.last_disclaimed = doublethink.utcnow()
if not page and not self.has_outstanding_pages(site):
@ -468,17 +470,16 @@ class RethinkDbFrontier:
try:
self.logger.debug("inserting/replacing batch of %s pages", len(batch))
reql = self.rr.table("pages").insert(batch, conflict="replace")
self.logger.trace(
self.logger.debug(
'running query self.rr.table("pages").insert(%r, '
'conflict="replace")',
batch,
)
result = reql.run()
except Exception as e:
self.logger.error(
self.logger.exception(
"problem inserting/replacing batch of %s pages",
len(batch),
exc_info=True,
)
parent_page.outlinks = {}
@ -497,7 +498,7 @@ class RethinkDbFrontier:
)
def reached_limit(self, site, e):
self.logger.info("reached_limit site=%s e=%s", site, e)
self.logger.info("reached_limit", site=site, e=e)
assert isinstance(e, brozzler.ReachedLimit)
if (
site.reached_limit
@ -530,7 +531,7 @@ class RethinkDbFrontier:
)
pages = list(results)
if len(pages) > 1:
self.logger.warning("more than one seed page for site_id %s ?", site_id)
self.logger.warning("more than one seed page?", site_id=site_id)
if len(pages) < 1:
return None
return brozzler.Page(self.rr, pages[0])
@ -550,8 +551,8 @@ class RethinkDbFrontier:
[site_id, 0 if brozzled is False else r.maxval, r.maxval, r.maxval],
index="priority_by_site",
)
self.logger.trace("running query: %r", query)
self.logger.debug("running query", query=query)
results = query.run()
for result in results:
self.logger.trace("yielding result: %r", result)
self.logger.debug("yielding result", result=result)
yield brozzler.Page(self.rr, result)