model: convert to structlog

This commit is contained in:
Misty De Méo 2025-02-18 14:26:39 -08:00
parent d4b493f1ae
commit 7634eb1b57

View File

@ -25,9 +25,9 @@ import datetime
import doublethink import doublethink
import hashlib import hashlib
import json import json
import logging
import os import os
import re import re
import structlog
import time import time
import urlcanon import urlcanon
import urllib import urllib
@ -36,6 +36,7 @@ import yaml
import zlib import zlib
from typing import Optional from typing import Optional
logger = structlog.get_logger()
def load_schema(): def load_schema():
schema_file = os.path.join(os.path.dirname(__file__), "job_schema.yaml") schema_file = os.path.join(os.path.dirname(__file__), "job_schema.yaml")
@ -83,7 +84,7 @@ def merge(a, b):
def new_job_file(frontier, job_conf_file): def new_job_file(frontier, job_conf_file):
"""Returns new Job.""" """Returns new Job."""
logging.info("loading %s", job_conf_file) logger.info("loading", job_conf_file=job_conf_file)
with open(job_conf_file) as f: with open(job_conf_file) as f:
job_conf = yaml.safe_load(f) job_conf = yaml.safe_load(f)
return new_job(frontier, job_conf) return new_job(frontier, job_conf)
@ -117,12 +118,12 @@ def new_job(frontier, job_conf):
# insert in batches to avoid this error # insert in batches to avoid this error
# rethinkdb.errors.ReqlDriverError: Query size (167883036) greater than maximum (134217727) in: # rethinkdb.errors.ReqlDriverError: Query size (167883036) greater than maximum (134217727) in:
for batch in (pages[i : i + 500] for i in range(0, len(pages), 500)): for batch in (pages[i : i + 500] for i in range(0, len(pages), 500)):
logging.info("inserting batch of %s pages", len(batch)) logger.info("inserting batch of %s pages", len(batch))
result = frontier.rr.table("pages").insert(batch).run() result = frontier.rr.table("pages").insert(batch).run()
for batch in (sites[i : i + 100] for i in range(0, len(sites), 100)): for batch in (sites[i : i + 100] for i in range(0, len(sites), 100)):
logging.info("inserting batch of %s sites", len(batch)) logger.info("inserting batch of %s sites", len(batch))
result = frontier.rr.table("sites").insert(batch).run() result = frontier.rr.table("sites").insert(batch).run()
logging.info("job %s fully started", job.id) logger.info("job fully started", job_id=job.id)
return job return job
@ -151,7 +152,7 @@ def new_seed_page(frontier, site):
def new_site(frontier, site): def new_site(frontier, site):
logging.info("new site %s", site) logger.info("new site", site=site)
site.id = site.id or str(uuid.uuid4()) site.id = site.id or str(uuid.uuid4())
# insert the Page into the database before the Site, to avoid situation # insert the Page into the database before the Site, to avoid situation
# where a brozzler worker immediately claims the site, finds no pages # where a brozzler worker immediately claims the site, finds no pages
@ -159,7 +160,7 @@ def new_site(frontier, site):
try: try:
page = new_seed_page(frontier, site) page = new_seed_page(frontier, site)
page.save() page.save()
logging.info("queued page %s", page) logger.info("queued page", page=page)
finally: finally:
# finally block because we want to insert the Site no matter what # finally block because we want to insert the Site no matter what
site.save() site.save()
@ -192,7 +193,7 @@ class ElapsedMixIn(object):
class Job(doublethink.Document, ElapsedMixIn): class Job(doublethink.Document, ElapsedMixIn):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(__module__ + "." + __qualname__)
table = "jobs" table = "jobs"
def populate_defaults(self): def populate_defaults(self):
@ -212,16 +213,16 @@ class Job(doublethink.Document, ElapsedMixIn):
def finish(self): def finish(self):
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]: if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
self.logger.error( self.logger.error(
"job is already finished status=%s " "starts_and_stops[-1]['stop']=%s", "job is already finished",
self.status, status=self.status,
self.starts_and_stops[-1]["stop"], stop=self.starts_and_stops[-1]["stop"],
) )
self.status = "FINISHED" self.status = "FINISHED"
self.starts_and_stops[-1]["stop"] = doublethink.utcnow() self.starts_and_stops[-1]["stop"] = doublethink.utcnow()
class Site(doublethink.Document, ElapsedMixIn): class Site(doublethink.Document, ElapsedMixIn):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(__module__ + "." + __qualname__)
table = "sites" table = "sites"
def populate_defaults(self): def populate_defaults(self):
@ -279,7 +280,7 @@ class Site(doublethink.Document, ElapsedMixIn):
if set(rule.keys()) == {"ssurt"} if set(rule.keys()) == {"ssurt"}
) )
if not any(ssurt.startswith(ss) for ss in simple_rule_ssurts): if not any(ssurt.startswith(ss) for ss in simple_rule_ssurts):
self.logger.info("adding ssurt %s to scope accept rules", ssurt) self.logger.info("adding ssurt to scope accept rules", ssurt=ssurt)
self.scope["accepts"].append({"ssurt": ssurt}) self.scope["accepts"].append({"ssurt": ssurt})
def note_seed_redirect(self, url): def note_seed_redirect(self, url):
@ -377,7 +378,7 @@ class Site(doublethink.Document, ElapsedMixIn):
class Page(doublethink.Document): class Page(doublethink.Document):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(__module__ + "." + __qualname__)
table = "pages" table = "pages"
@staticmethod @staticmethod