From 23cee477dc730f6a714f26008cfbfc9c0c18cc82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Misty=20De=20M=C3=A9o?= Date: Mon, 24 Feb 2025 16:31:09 -0800 Subject: [PATCH 1/4] feat: set up structlog logging (#325) This ports the logging from `logging` to `structlog`. This updates all of the logger instantiations along with all of the places `logging` was called. Data that was being inlined into log statements has been broken out so that it's now structured arguments to the log statements instead. --- brozzler/__init__.py | 63 +++++-------- brozzler/browser.py | 57 ++++++------ brozzler/chrome.py | 89 +++++++++---------- brozzler/cli.py | 151 ++++++++++++++++++++----------- brozzler/dashboard/__init__.py | 40 +++++---- brozzler/easy.py | 21 +++-- brozzler/frontier.py | 57 ++++++------ brozzler/model.py | 30 ++++--- brozzler/pywb.py | 8 +- brozzler/robots.py | 11 ++- brozzler/worker.py | 156 +++++++++++++++------------------ brozzler/ydl.py | 100 ++++++++++++--------- setup.py | 1 + tests/test_cluster.py | 23 +++-- tests/test_units.py | 10 --- 15 files changed, 421 insertions(+), 396 deletions(-) diff --git a/brozzler/__init__.py b/brozzler/__init__.py index 9309116..7798e5f 100644 --- a/brozzler/__init__.py +++ b/brozzler/__init__.py @@ -18,6 +18,7 @@ limitations under the License. """ import logging +import structlog from pkg_resources import get_distribution as _get_distribution __version__ = _get_distribution("brozzler").version @@ -79,32 +80,6 @@ class ReachedLimit(Exception): return self.__repr__() -# monkey-patch log levels TRACE and NOTICE -logging.TRACE = (logging.NOTSET + logging.DEBUG) // 2 - - -def _logger_trace(self, msg, *args, **kwargs): - if self.isEnabledFor(logging.TRACE): - self._log(logging.TRACE, msg, args, **kwargs) - - -logging.Logger.trace = _logger_trace -logging.trace = logging.root.trace -logging.addLevelName(logging.TRACE, "TRACE") - -logging.NOTICE = (logging.INFO + logging.WARN) // 2 - - -def _logger_notice(self, msg, *args, **kwargs): - if self.isEnabledFor(logging.NOTICE): - self._log(logging.NOTICE, msg, args, **kwargs) - - -logging.Logger.notice = _logger_notice -logging.notice = logging.root.notice -logging.addLevelName(logging.NOTICE, "NOTICE") - - # see https://github.com/internetarchive/brozzler/issues/91 def _logging_handler_handle(self, record): rv = self.filter(record) @@ -146,7 +121,9 @@ def behavior_script(url, template_parameters=None, behaviors_dir=None): """ Returns the javascript behavior string populated with template_parameters. """ - import re, logging, json + import re, json + + logger = structlog.get_logger(logger_name=__name__) for behavior in behaviors(behaviors_dir=behaviors_dir): if re.match(behavior["url_regex"], url): @@ -159,18 +136,18 @@ def behavior_script(url, template_parameters=None, behaviors_dir=None): behavior["behavior_js_template"] ) script = template.render(parameters) - logging.info( - "using template=%r populated with parameters=%r for %r", - behavior["behavior_js_template"], - json.dumps(parameters), - url, + logger.info( + "rendering template", + template=behavior["behavior_js_template"], + parameters=parameters, + url=url, ) return script return None class ThreadExceptionGate: - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) def __init__(self, thread): self.thread = thread @@ -181,7 +158,9 @@ class ThreadExceptionGate: def __enter__(self): assert self.thread == threading.current_thread() if self.pending_exception: - self.logger.info("raising pending exception %s", self.pending_exception) + self.logger.info( + "raising pending exception", pending_exception=self.pending_exception + ) tmp = self.pending_exception self.pending_exception = None raise tmp @@ -198,10 +177,10 @@ class ThreadExceptionGate: with self.lock: if self.pending_exception: self.logger.warning( - "%r already pending for thread %r, discarding %r", - self.pending_exception, - self.thread, - e, + "exception already pending for thread, discarding", + pending_exception=self.pending_exception, + thread=self.thread, + discarded_exception=e, ) else: self.pending_exception = e @@ -266,7 +245,9 @@ def thread_raise(thread, exctype): TypeError if `exctype` is not a class ValueError, SystemError in case of unexpected problems """ - import ctypes, inspect, threading, logging + import ctypes, inspect, threading, structlog + + logger = structlog.get_logger(exctype=exctype, thread=thread) if not inspect.isclass(exctype): raise TypeError( @@ -278,7 +259,7 @@ def thread_raise(thread, exctype): with gate.lock: if gate.ok_to_raise.is_set() and thread.is_alive(): gate.ok_to_raise.clear() - logging.info("raising %s in thread %s", exctype, thread) + logger.info("raising exception in thread") res = ctypes.pythonapi.PyThreadState_SetAsyncExc( ctypes.c_long(thread.ident), ctypes.py_object(exctype) ) @@ -290,7 +271,7 @@ def thread_raise(thread, exctype): ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) raise SystemError("PyThreadState_SetAsyncExc failed") else: - logging.info("queueing %s for thread %s", exctype, thread) + logger.info("queueing exception for thread") gate.queue_exception(exctype) diff --git a/brozzler/browser.py b/brozzler/browser.py index c11fa6c..a5fff4e 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -31,6 +31,7 @@ import base64 from ipaddress import AddressValueError from brozzler.chrome import Chrome import socket +import structlog import urlcanon @@ -52,7 +53,7 @@ class BrowserPool: debugging protocol. """ - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) def __init__(self, size=3, **kwargs): """ @@ -143,7 +144,7 @@ class BrowserPool: class WebsockReceiverThread(threading.Thread): - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) def __init__(self, websock, name=None, daemon=True): super().__init__(name=name, daemon=daemon) @@ -193,7 +194,7 @@ class WebsockReceiverThread(threading.Thread): ): self.logger.error("websocket closed, did chrome die?") else: - self.logger.error("exception from websocket receiver thread", exc_info=1) + self.logger.exception("exception from websocket receiver thread") brozzler.thread_raise(self.calling_thread, BrowsingException) def run(self): @@ -213,10 +214,9 @@ class WebsockReceiverThread(threading.Thread): try: self._handle_message(websock, message) except: - self.logger.error( - "uncaught exception in _handle_message message=%s", - message, - exc_info=True, + self.logger.exception( + "uncaught exception in _handle_message", + message=message, ) def _network_response_received(self, message): @@ -231,7 +231,7 @@ class WebsockReceiverThread(threading.Thread): ] ) self.reached_limit = brozzler.ReachedLimit(warcprox_meta=warcprox_meta) - self.logger.info("reached limit %s", self.reached_limit) + self.logger.info("reached limit", limit=self.reached_limit) brozzler.thread_raise(self.calling_thread, brozzler.ReachedLimit) else: self.logger.info( @@ -245,7 +245,7 @@ class WebsockReceiverThread(threading.Thread): self.page_status = status def _javascript_dialog_opening(self, message): - self.logger.info("javascript dialog opened: %s", message) + self.logger.info("javascript dialog opened", message=message) if message["params"]["type"] == "alert": accept = True else: @@ -292,7 +292,7 @@ class WebsockReceiverThread(threading.Thread): message["params"]["message"]["text"], ) elif message["method"] == "Runtime.exceptionThrown": - self.logger.debug("uncaught exception: %s", message) + self.logger.debug("uncaught exception", message=message) elif message["method"] == "Page.javascriptDialogOpening": self._javascript_dialog_opening(message) elif ( @@ -322,7 +322,7 @@ class Browser: Manages an instance of Chrome for browsing pages. """ - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) def __init__(self, **kwargs): """ @@ -365,11 +365,10 @@ class Browser: msg_id = next(self._command_id) kwargs["id"] = msg_id msg = json.dumps(kwargs, separators=",:") - logging.log( - logging.TRACE if suppress_logging else logging.DEBUG, - "sending message to %s: %s", - self.websock, - msg, + self.logger.debug( + "sending message", + websock=self.websock, + message=msg, ) self.websock.send(msg) return msg_id @@ -397,7 +396,7 @@ class Browser: # Enable Console & Runtime output only when debugging. # After all, we just print these events with debug(), we don't use # them in Brozzler logic. - if self.logger.isEnabledFor(logging.DEBUG): + if self.logger.is_enabled_for(logging.DEBUG): self.send_to_chrome(method="Console.enable") self.send_to_chrome(method="Runtime.enable") self.send_to_chrome(method="ServiceWorker.enable") @@ -432,8 +431,8 @@ class Browser: try: self.websock.close() except BaseException as e: - self.logger.error( - "exception closing websocket %s - %s", self.websock, e + self.logger.exception( + "exception closing websocket", websocket=self.websock ) self.chrome.stop() @@ -460,7 +459,7 @@ class Browser: self.websock_url = None except: - self.logger.error("problem stopping", exc_info=True) + self.logger.exception("problem stopping") def is_running(self): return self.websock_url is not None @@ -566,7 +565,7 @@ class Browser: # if login redirected us, return to page_url if page_url != self.url().split("#")[0]: self.logger.debug( - "login navigated away from %s; returning!", page_url + "login navigated away; returning!", page_url=page_url ) self.navigate_to_page(page_url, timeout=page_timeout) # If the target page HTTP status is 4xx/5xx, there is no point @@ -608,7 +607,7 @@ class Browser: # more information, raise that one raise self.websock_thread.reached_limit except websocket.WebSocketConnectionClosedException as e: - self.logger.error("websocket closed, did chrome die?") + self.logger.exception("websocket closed, did chrome die?") raise BrowsingException(e) finally: self.is_browsing = False @@ -630,7 +629,7 @@ class Browser: on_screenshot(jpeg_bytes) return except BrowsingTimeout as e: - logging.error("attempt %s/3: %s", i + 1, e) + self.logger.exception("attempt %s/3", i + 1) def visit_hashtags(self, page_url, hashtags, outlinks): _hashtags = set(hashtags or []) @@ -644,7 +643,7 @@ class Browser: # out which hashtags were visited already and skip those for hashtag in _hashtags: # navigate_to_hashtag (nothing to wait for so no timeout?) - self.logger.debug("navigating to hashtag %s", hashtag) + self.logger.debug("navigating to hashtag", hashtag=hashtag) url = urlcanon.whatwg(page_url) url.hash_sign = b"#" url.fragment = hashtag[1:].encode("utf-8") @@ -684,7 +683,7 @@ class Browser: ) def navigate_to_page(self, page_url, timeout=300): - self.logger.info("navigating to page %s", page_url) + self.logger.info("navigating to page", page_url=page_url) self.websock_thread.got_page_load_event = None self.websock_thread.page_status = None self.send_to_chrome(method="Page.navigate", params={"url": page_url}) @@ -712,15 +711,13 @@ class Browser: try: out.append(str(urlcanon.whatwg(link))) except AddressValueError: - self.logger.warning("skip invalid outlink: %s", link) + self.logger.warning("skip invalid outlink", outlink=link) return frozenset(out) else: # no links found return frozenset() else: - self.logger.error( - "problem extracting outlinks, result message: %s", message - ) + self.logger.error("problem extracting outlinks", message=message) return frozenset() def screenshot(self, full_page=False, timeout=45): @@ -791,7 +788,7 @@ class Browser: while True: elapsed = time.time() - start if elapsed > timeout: - logging.info("behavior reached hard timeout after %.1fs", elapsed) + self.logger.info("behavior reached hard timeout", elapsed=elapsed) return brozzler.sleep(check_interval) diff --git a/brozzler/chrome.py b/brozzler/chrome.py index 6194a1a..db4f14f 100644 --- a/brozzler/chrome.py +++ b/brozzler/chrome.py @@ -16,7 +16,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -import logging import urllib.request import time import threading @@ -27,6 +26,7 @@ import select import re import signal import sqlite3 +import structlog import json import tempfile import sys @@ -65,7 +65,7 @@ def check_version(chrome_exe): class Chrome: - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) def __init__(self, chrome_exe, port=9222, ignore_cert_errors=False): """ @@ -97,22 +97,22 @@ class Chrome: def _init_cookie_db(self, cookie_db): cookie_dir = os.path.join(self._chrome_user_data_dir, "Default") cookie_location = os.path.join(cookie_dir, "Cookies") - self.logger.debug("cookie DB provided, writing to %s", cookie_location) + cookie_logger = self.logger.bind(cookie_location=cookie_location) + + cookie_logger.debug("cookie DB provided, writing to") os.makedirs(cookie_dir, exist_ok=True) try: with open(cookie_location, "wb") as cookie_file: cookie_file.write(cookie_db) except OSError: - self.logger.error( - "exception writing cookie file at %s", cookie_location, exc_info=True - ) + cookie_logger.exception("exception writing cookie file") def persist_and_read_cookie_db(self): cookie_location = os.path.join(self._chrome_user_data_dir, "Default", "Cookies") - self.logger.debug( - "marking cookies persistent then reading file into memory: %s", - cookie_location, + cookie_logger = self.logger.bind(cookie_location=cookie_location) + cookie_logger.debug( + "marking cookies persistent then reading file into memory", ) try: with sqlite3.connect(cookie_location) as conn: @@ -125,20 +125,14 @@ class Chrome: cur = conn.cursor() cur.execute("UPDATE cookies SET persistent = 1") except sqlite3.Error: - self.logger.error( - "exception updating cookie DB %s", cookie_location, exc_info=True - ) + cookie_logger.exception("exception updating cookie DB") cookie_db = None try: with open(cookie_location, "rb") as cookie_file: cookie_db = cookie_file.read() except OSError: - self.logger.error( - "exception reading from cookie DB file %s", - cookie_location, - exc_info=True, - ) + cookie_logger.exception("exception reading from cookie DB file") return cookie_db def start( @@ -227,7 +221,7 @@ class Chrome: if proxy: chrome_args.append("--proxy-server=%s" % proxy) chrome_args.append("about:blank") - self.logger.info("running: %r", subprocess.list2cmdline(chrome_args)) + self.logger.info("running", chrome_args=subprocess.list2cmdline(chrome_args)) # start_new_session - new process group so we can kill the whole group self.chrome_process = subprocess.Popen( chrome_args, @@ -243,12 +237,13 @@ class Chrome: daemon=True, ) self._out_reader_thread.start() - self.logger.info("chrome running, pid %s" % self.chrome_process.pid) + self.logger.info("chrome running", pid=self.chrome_process.pid) return self._websocket_url(timeout_sec=websocket_timeout) def _websocket_url(self, timeout_sec=60): json_url = "http://localhost:%s/json" % self.port + url_logger = self.logger.bind(json_url=json_url) # make this a member variable so that kill -QUIT reports it self._start = time.time() self._last_warning = self._start @@ -259,24 +254,21 @@ class Chrome: debug_info = [x for x in all_debug_info if x["url"] == "about:blank"] if debug_info and "webSocketDebuggerUrl" in debug_info[0]: - self.logger.debug("%s returned %s", json_url, raw_json) + url_logger.debug("webSocketDebuggerUrl returned", raw_json=raw_json) url = debug_info[0]["webSocketDebuggerUrl"] - self.logger.info( - "got chrome window websocket debug url %s from %s", - url, - json_url, + url_logger.info( + "got chrome window websocket debug url", + debug_url=url, ) return url except brozzler.ShutdownRequested: raise except Exception as e: if time.time() - self._last_warning > 30: - self.logger.warning( - "problem with %s (will keep trying until timeout " - "of %d seconds): %s", - json_url, - timeout_sec, - e, + url_logger.warning( + "problem accessing url (will keep trying until timeout)", + timeout_sec=timeout_sec, + exc_info=True, ) self._last_warning = time.time() finally: @@ -322,26 +314,28 @@ class Chrome: while not self._shutdown.is_set(): buf = readline_nonblock(self.chrome_process.stdout) if buf: - self.logger.trace( + self.logger.debug( "chrome pid %s STDOUT %s", self.chrome_process.pid, buf ) buf = readline_nonblock(self.chrome_process.stderr) if buf: - self.logger.trace( + self.logger.debug( "chrome pid %s STDERR %s", self.chrome_process.pid, buf ) except: - self.logger.error("unexpected exception", exc_info=True) + self.logger.exception("unexpected exception") def stop(self): if not self.chrome_process or self._shutdown.is_set(): return self._shutdown.set() + pid_logger = self.logger.bind(pid=self.chrome_process.pid) + timeout_sec = 300 if self.chrome_process.poll() is None: - self.logger.info("terminating chrome pgid %s", self.chrome_process.pid) + pid_logger.info("terminating chrome") os.killpg(self.chrome_process.pid, signal.SIGTERM) t0 = time.time() @@ -351,14 +345,11 @@ class Chrome: status = self.chrome_process.poll() if status is not None: if status == 0: - self.logger.info( - "chrome pid %s exited normally", self.chrome_process.pid - ) + pid_logger.info("chrome exited normally") else: - self.logger.warning( - "chrome pid %s exited with nonzero status %s", - self.chrome_process.pid, - status, + pid_logger.warning( + "chrome exited with nonzero status", + status=status, ) # XXX I would like to forcefully kill the process group @@ -368,18 +359,16 @@ class Chrome: return time.sleep(0.5) - self.logger.warning( - "chrome pid %s still alive %.1f seconds after sending " + pid_logger.warning( + "chrome still alive %.1f seconds after sending " "SIGTERM, sending SIGKILL", - self.chrome_process.pid, time.time() - t0, ) os.killpg(self.chrome_process.pid, signal.SIGKILL) status = self.chrome_process.wait() - self.logger.warning( - "chrome pid %s reaped (status=%s) after killing with " "SIGKILL", - self.chrome_process.pid, - status, + pid_logger.warning( + "chrome reaped after killing with " "SIGKILL", + status=status, ) finally: @@ -388,8 +377,8 @@ class Chrome: try: self._home_tmpdir.cleanup() except: - self.logger.error( - "exception deleting %s", self._home_tmpdir, exc_info=True + self.logger.exception( + "exception deleting self._home_tmpdir", tmpdir=self._home_tmpdir ) self._out_reader_thread.join() self.chrome_process = None diff --git a/brozzler/cli.py b/brozzler/cli.py index ffdbfe5..e49c5aa 100755 --- a/brozzler/cli.py +++ b/brozzler/cli.py @@ -29,6 +29,7 @@ import requests import doublethink import signal import string +import structlog import subprocess import sys import threading @@ -42,6 +43,8 @@ import rethinkdb as rdb r = rdb.RethinkDB() +logger = structlog.get_logger(logger_name=__name__) + def add_common_options(arg_parser, argv=None): argv = argv or sys.argv @@ -51,7 +54,7 @@ def add_common_options(arg_parser, argv=None): dest="log_level", action="store_const", default=logging.INFO, - const=logging.NOTICE, + const=logging.WARN, help="quiet logging", ) arg_parser.add_argument( @@ -68,7 +71,7 @@ def add_common_options(arg_parser, argv=None): dest="log_level", action="store_const", default=logging.INFO, - const=logging.TRACE, + const=logging.DEBUG, help=("very verbose logging"), ) # arg_parser.add_argument( @@ -109,7 +112,51 @@ def rethinker(args): return doublethink.Rethinker(servers.split(","), db) +def decorate_logger_name(a, b, event_dict): + """Decorates the logger name with call location, if provided""" + + old_name = event_dict.get("logger_name") + if old_name is None: + return event_dict + + try: + filename = event_dict.pop("filename") + func_name = event_dict.pop("func_name") + lineno = event_dict.pop("lineno") + except KeyError: + return event_dict + new_name = f"{old_name}.{func_name}({filename}:{lineno})" + event_dict["logger_name"] = new_name + + return event_dict + + def configure_logging(args): + structlog.configure( + processors=[ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.StackInfoRenderer(), + structlog.dev.set_exc_info, + structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=True), + structlog.processors.CallsiteParameterAdder( + [ + structlog.processors.CallsiteParameter.FILENAME, + structlog.processors.CallsiteParameter.FUNC_NAME, + structlog.processors.CallsiteParameter.LINENO, + ], + ), + decorate_logger_name, + structlog.dev.ConsoleRenderer(), + ], + wrapper_class=structlog.make_filtering_bound_logger(args.log_level), + context_class=dict, + logger_factory=structlog.PrintLoggerFactory(), + cache_logger_on_first_use=False, + ) + + # We still configure logging for now because its handlers + # are used for the gunicorn spawned by the brozzler dashboard. logging.basicConfig( stream=sys.stderr, level=args.log_level, @@ -355,7 +402,7 @@ def brozzle_page(argv=None): ) with open(filename, "wb") as f: f.write(screenshot_jpeg) - logging.info("wrote screenshot to %s", filename) + logger.info("wrote screenshot", filename=filename) browser = brozzler.Browser(chrome_exe=args.chrome_exe) try: @@ -371,11 +418,11 @@ def brozzle_page(argv=None): on_screenshot=on_screenshot, enable_youtube_dl=not args.skip_youtube_dl, ) - logging.info("outlinks: \n\t%s", "\n\t".join(sorted(outlinks))) + logger.info("outlinks", outlinks=sorted(outlinks)) except brozzler.ReachedLimit as e: - logging.error("reached limit %s", e) + logger.exception("reached limit") except brozzler.PageInterstitialShown as e: - logging.error("page interstitial shown %s", e) + logger.exception("page interstitial shown") finally: browser.stop() @@ -633,11 +680,11 @@ def brozzler_worker(argv=None): state_strs.append("" % ident) stack = traceback.format_stack(frames[ident]) state_strs.append("".join(stack)) - logging.info( - "dumping state (caught signal %s)\n%s" % (signum, "\n".join(state_strs)) + logger.info( + "dumping state (caught signal)\n%s", signal=signum, state=state_strs ) except BaseException as e: - logging.error("exception dumping state: %s" % e) + logger.exception("exception dumping state") finally: signal.signal(signal.SIGQUIT, dump_state) @@ -648,10 +695,12 @@ def brozzler_worker(argv=None): # make set from seed IDs in SKIP_AV_SEEDS_FILE with open(SKIP_AV_SEEDS_FILE) as skips: skip_av_seeds = {int(l) for l in skips.readlines()} - logging.info("running with skip_av_seeds file %s" % SKIP_AV_SEEDS_FILE) + logger.info( + "running with skip_av_seeds file", skip_av_seeds=SKIP_AV_SEEDS_FILE + ) except Exception as e: skip_av_seeds = set() - logging.info("running with empty skip_av_seeds") + logger.info("running with empty skip_av_seeds") return skip_av_seeds def get_ytdlp_proxy_endpoints(): @@ -661,13 +710,13 @@ def brozzler_worker(argv=None): with open(YTDLP_PROXY_ENDPOINTS_FILE) as endpoints: ytdlp_proxy_endpoints = [l for l in endpoints.readlines()] if ytdlp_proxy_endpoints: - logging.info( - "running with ytdlp proxy endpoints file %s" - % YTDLP_PROXY_ENDPOINTS_FILE + logger.info( + "running with ytdlp proxy endpoints file", + ytdlp_proxy_endpoints=YTDLP_PROXY_ENDPOINTS_FILE, ) except Exception as e: ytdlp_proxy_endpoints = [] - logging.info("running with empty proxy endpoints file") + logger.info("running with empty proxy endpoints file") return ytdlp_proxy_endpoints rr = rethinker(args) @@ -701,7 +750,7 @@ def brozzler_worker(argv=None): th = threading.Thread(target=worker.run, name="BrozzlerWorkerThread") th.start() th.join() - logging.info("brozzler-worker is all done, exiting") + logger.info("brozzler-worker is all done, exiting") def brozzler_ensure_tables(argv=None): @@ -775,18 +824,18 @@ def brozzler_list_jobs(argv=None): except ValueError: job_id = args.job reql = rr.table("jobs").get(job_id) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) result = reql.run() if result: results = [reql.run()] else: - logging.error("no such job with id %r", job_id) + logger.error("no such job with id", job_id=job_id) sys.exit(1) else: reql = rr.table("jobs").order_by("id") if args.active: reql = reql.filter({"status": "ACTIVE"}) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) results = reql.run() if args.yaml: yaml.dump_all( @@ -851,7 +900,7 @@ def brozzler_list_sites(argv=None): ) elif args.site: reql = reql.get_all(args.site) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) results = reql.run() if args.yaml: yaml.dump_all( @@ -919,7 +968,7 @@ def brozzler_list_pages(argv=None): except ValueError: job_id = args.job reql = rr.table("sites").get_all(job_id, index="job_id")["id"] - logging.debug("querying rethinkb: %s", reql) + logger.debug("querying rethinkb", query=reql) site_ids = reql.run() elif args.site: try: @@ -948,7 +997,7 @@ def brozzler_list_pages(argv=None): reql = reql.order_by(index="least_hops") if args.claimed: reql = reql.filter({"claimed": True}) - logging.debug("querying rethinkb: %s", reql) + logger.debug("querying rethinkb", query=reql) results = reql.run() if args.yaml: yaml.dump_all( @@ -1014,20 +1063,20 @@ def brozzler_purge(argv=None): job_id = args.job job = brozzler.Job.load(rr, job_id) if not job: - logging.fatal("no such job %r", job_id) + logger.fatal("no such job", job_id=job_id) sys.exit(1) if job.status == "ACTIVE": if args.force: - logging.warning( - "job %s has status ACTIVE, purging anyway because " + logger.warning( + "job has status ACTIVE, purging anyway because " "--force was supplied", - job_id, + job_id=job_id, ) else: - logging.fatal( - "refusing to purge job %s because status is ACTIVE " + logger.fatal( + "refusing to purge job because status is ACTIVE " "(override with --force)", - job_id, + job_id=job_id, ) sys.exit(1) _purge_job(rr, job_id) @@ -1035,20 +1084,20 @@ def brozzler_purge(argv=None): site_id = args.site site = brozzler.Site.load(rr, site_id) if not site: - logging.fatal("no such job %r", job_id) + logger.fatal("no such job", job_id=job_id) sys.exit(1) if site.status == "ACTIVE": if args.force: - logging.warning( - "site %s has status ACTIVE, purging anyway because " + logger.warning( + "site has status ACTIVE, purging anyway because " "--force was supplied", - site_id, + site_id=site_id, ) else: - logging.fatal( - "refusing to purge site %s because status is ACTIVE " + logger.fatal( + "refusing to purge site because status is ACTIVE " "(override with --force)", - site_id, + site_id=site_id, ) sys.exit(1) _purge_site(rr, site_id) @@ -1067,7 +1116,7 @@ def brozzler_purge(argv=None): .lt(finished_before) ) ) - logging.debug("retrieving jobs older than %s: %s", finished_before, reql) + logger.debug("retrieving jobs older than %s", finished_before, query=reql) for job in reql.run(): # logging.info('job %s finished=%s starts_and_stops[-1]["stop"]=%s', # job['id'], job.get('finished'), @@ -1085,27 +1134,31 @@ def _purge_site(rr, site_id): ) .delete() ) - logging.debug("purging pages for site %s: %s", site_id, reql) + site_logger = logger.bind(site_id=site_id) + + site_logger.debug("purging pages for site", query=reql) result = reql.run() - logging.info("purged pages for site %s: %s", site_id, result) + site_logger.info("purged pages for site", result=result) reql = rr.table("sites").get(site_id).delete() - logging.debug("purging site %s: %s", site_id, reql) + site_logger.debug("purging site", query=reql) result = reql.run() - logging.info("purged site %s: %s", site_id, result) + site_logger.info("purged site", result=result) def _purge_job(rr, job_id): + job_logger = logger.bind(job_id=job_id) + reql = rr.table("sites").get_all(job_id, index="job_id").get_field("id") - logging.debug("querying rethinkdb: %s", reql) + job_logger.debug("querying rethinkdb", query=reql) site_ids = list(reql.run()) for site_id in site_ids: _purge_site(rr, site_id) reql = rr.table("jobs").get(job_id).delete() - logging.debug("purging job %s: %s", job_id, reql) + job_logger.debug("purging job", query=reql) result = reql.run() - logging.info("purged job %s: %s", job_id, result) + job_logger.info("purged job", result=result) def brozzler_list_captures(argv=None): @@ -1152,7 +1205,7 @@ def brozzler_list_captures(argv=None): if args.url_or_sha1[:5] == "sha1:": if args.prefix: - logging.warning( + logger.warning( "ignoring supplied --prefix option which does not apply " "to lookup by sha1" ) @@ -1163,7 +1216,7 @@ def brozzler_list_captures(argv=None): [sha1base32, r.maxval, r.maxval], index="sha1_warc_type", ) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) results = reql.run() else: key = urlcanon.semantic(args.url_or_sha1).surt().decode("ascii") @@ -1186,7 +1239,7 @@ def brozzler_list_captures(argv=None): lambda capture: (capture["canon_surt"] >= key) & (capture["canon_surt"] <= end_key) ) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) results = reql.run() if args.yaml: @@ -1231,7 +1284,7 @@ def brozzler_stop_crawl(argv=None): job_id = args.job_id job = brozzler.Job.load(rr, job_id) if not job: - logging.fatal("job not found with id=%r", job_id) + logger.fatal("job not found with", id=job_id) sys.exit(1) job.stop_requested = doublethink.utcnow() job.save() @@ -1242,7 +1295,7 @@ def brozzler_stop_crawl(argv=None): site_id = args.site_id site = brozzler.Site.load(rr, site_id) if not site: - logging.fatal("site not found with id=%r", site_id) + logger.fatal("site not found with", id=site_id) sys.exit(1) site.stop_requested = doublethink.utcnow() site.save() diff --git a/brozzler/dashboard/__init__.py b/brozzler/dashboard/__init__.py index 16e34c1..fcac6c5 100644 --- a/brozzler/dashboard/__init__.py +++ b/brozzler/dashboard/__init__.py @@ -17,13 +17,15 @@ See the License for the specific language governing permissions and limitations under the License. """ -import logging +import structlog import sys +logger = structlog.get_logger(logger_name=__name__) + try: import flask except ImportError as e: - logging.critical( + logger.critical( '%s: %s\n\nYou might need to run "pip install ' 'brozzler[dashboard]".\nSee README.rst for more information.', type(e).__name__, @@ -77,7 +79,7 @@ def queued_count(site_id): ) .count() ) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) count = reql.run() return flask.jsonify(count=count) @@ -85,7 +87,7 @@ def queued_count(site_id): @app.route("/api/sites//queue") @app.route("/api/site//queue") def queue(site_id): - logging.debug("flask.request.args=%s", flask.request.args) + logger.debug("flask.request.args", args=flask.request.args) start = flask.request.args.get("start", 0) end = flask.request.args.get("end", start + 90) reql = rr.table("pages").between( @@ -93,7 +95,7 @@ def queue(site_id): [site_id, 0, False, r.maxval], index="priority_by_site", )[start:end] - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) queue_ = reql.run() return flask.jsonify(queue_=list(queue_)) @@ -112,7 +114,7 @@ def page_count(site_id): ) .count() ) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) count = reql.run() return flask.jsonify(count=count) @@ -130,7 +132,7 @@ def pages(site_id): ) .order_by(index="least_hops")[start:end] ) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) pages_ = reql.run() return flask.jsonify(pages=list(pages_)) @@ -139,7 +141,7 @@ def pages(site_id): @app.route("/api/page/") def page(page_id): reql = rr.table("pages").get(page_id) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) page_ = reql.run() return flask.jsonify(page_) @@ -148,7 +150,7 @@ def page(page_id): @app.route("/api/page//yaml") def page_yaml(page_id): reql = rr.table("pages").get(page_id) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) page_ = reql.run() return app.response_class( yaml.dump(page_, default_flow_style=False), mimetype="application/yaml" @@ -159,7 +161,7 @@ def page_yaml(page_id): @app.route("/api/site/") def site(site_id): reql = rr.table("sites").get(site_id) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) s = reql.run() if "cookie_db" in s: s["cookie_db"] = base64.b64encode(s["cookie_db"]).decode("ascii") @@ -170,7 +172,7 @@ def site(site_id): @app.route("/api/site//yaml") def site_yaml(site_id): reql = rr.table("sites").get(site_id) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) site_ = reql.run() return app.response_class( yaml.dump(site_, default_flow_style=False), mimetype="application/yaml" @@ -180,7 +182,7 @@ def site_yaml(site_id): @app.route("/api/stats/") def stats(bucket): reql = rr.table("stats").get(bucket) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) stats_ = reql.run() return flask.jsonify(stats_) @@ -193,7 +195,7 @@ def sites(job_id): except ValueError: jid = job_id reql = rr.table("sites").get_all(jid, index="job_id") - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) sites_ = list(reql.run()) # TypeError: is not JSON serializable for s in sites_: @@ -206,7 +208,7 @@ def sites(job_id): def jobless_sites(): # XXX inefficient (unindexed) query reql = rr.table("sites").filter(~r.row.has_fields("job_id")) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) sites_ = list(reql.run()) # TypeError: is not JSON serializable for s in sites_: @@ -223,7 +225,7 @@ def job(job_id): except ValueError: jid = job_id reql = rr.table("jobs").get(jid) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) job_ = reql.run() return flask.jsonify(job_) @@ -236,7 +238,7 @@ def job_yaml(job_id): except ValueError: jid = job_id reql = rr.table("jobs").get(jid) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) job_ = reql.run() return app.response_class( yaml.dump(job_, default_flow_style=False), mimetype="application/yaml" @@ -258,7 +260,7 @@ def services(): @app.route("/api/jobs") def jobs(): reql = rr.table("jobs").order_by(r.desc("id")) - logging.debug("querying rethinkdb: %s", reql) + logger.debug("querying rethinkdb", query=reql) jobs_ = list(reql.run()) return flask.jsonify(jobs=jobs_) @@ -313,13 +315,13 @@ try: return self.application def run(**options): - logging.info("running brozzler-dashboard using gunicorn") + logger.info("running brozzler-dashboard using gunicorn") GunicornBrozzlerDashboard(app, options).run() except ImportError: def run(): - logging.info("running brozzler-dashboard using simple flask app.run") + logger.info("running brozzler-dashboard using simple flask app.run") app.run(host=SETTINGS["DASHBOARD_INTERFACE"], port=SETTINGS["DASHBOARD_PORT"]) diff --git a/brozzler/easy.py b/brozzler/easy.py index f8392f6..28a6c14 100644 --- a/brozzler/easy.py +++ b/brozzler/easy.py @@ -18,8 +18,10 @@ See the License for the specific language governing permissions and limitations under the License. """ +import structlog import sys -import logging + +logger = structlog.get_logger(logger_name=__name__) try: import warcprox @@ -30,11 +32,11 @@ try: import wsgiref.handlers import brozzler.dashboard except ImportError as e: - logging.critical( + logger.critical( '%s: %s\n\nYou might need to run "pip install ' 'brozzler[easy]".\nSee README.rst for more information.', type(e).__name__, - e, + exc_info=True, ) sys.exit(1) import argparse @@ -156,7 +158,7 @@ class ThreadingWSGIServer( class BrozzlerEasyController: - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) def __init__(self, args): self.stop = threading.Event() @@ -238,11 +240,14 @@ class BrozzlerEasyController: self.logger.info("starting brozzler-worker") self.brozzler_worker.start() - self.logger.info("starting pywb at %s:%s", *self.pywb_httpd.server_address) + self.logger.info( + "starting pywb", address="%s:%s" % self.pywb_httpd.server_address + ) threading.Thread(target=self.pywb_httpd.serve_forever).start() self.logger.info( - "starting brozzler-dashboard at %s:%s", *self.dashboard_httpd.server_address + "starting brozzler-dashboard", + address="%s:%s" % self.dashboard_httpd.server_address, ) threading.Thread(target=self.dashboard_httpd.serve_forever).start() @@ -307,8 +312,8 @@ class BrozzlerEasyController: state_strs.append(str(th)) stack = traceback.format_stack(sys._current_frames()[th.ident]) state_strs.append("".join(stack)) - logging.warning( - "dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)) + logger.warning( + "dumping state (caught signal)", signal=signum, state="\n".join(state_strs) ) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index c6ac971..0ef3924 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -16,12 +16,12 @@ See the License for the specific language governing permissions and limitations under the License. """ -import logging import brozzler import random import time import datetime import rethinkdb as rdb +import structlog import doublethink import urlcanon @@ -33,7 +33,7 @@ class UnexpectedDbResult(Exception): class RethinkDbFrontier: - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) def __init__(self, rr, shards=None, replicas=None): self.rr = rr @@ -42,15 +42,15 @@ class RethinkDbFrontier: self._ensure_db() def _ensure_db(self): + db_logger = self.logger.bind(dbname=self.rr.dbname) + dbs = self.rr.db_list().run() if not self.rr.dbname in dbs: - self.logger.info("creating rethinkdb database %r", self.rr.dbname) + db_logger.info("creating rethinkdb database") self.rr.db_create(self.rr.dbname).run() tables = self.rr.table_list().run() if not "sites" in tables: - self.logger.info( - "creating rethinkdb table 'sites' in database %r", self.rr.dbname - ) + db_logger.info("creating rethinkdb table 'sites' in database") self.rr.table_create( "sites", shards=self.shards, replicas=self.replicas ).run() @@ -59,9 +59,7 @@ class RethinkDbFrontier: ).run() self.rr.table("sites").index_create("job_id").run() if not "pages" in tables: - self.logger.info( - "creating rethinkdb table 'pages' in database %r", self.rr.dbname - ) + db_logger.info("creating rethinkdb table 'pages' in database") self.rr.table_create( "pages", shards=self.shards, replicas=self.replicas ).run() @@ -81,9 +79,7 @@ class RethinkDbFrontier: [r.row["site_id"], r.row["brozzle_count"], r.row["hops_from_seed"]], ).run() if not "jobs" in tables: - self.logger.info( - "creating rethinkdb table 'jobs' in database %r", self.rr.dbname - ) + db_logger.info("creating rethinkdb table 'jobs' in database") self.rr.table_create( "jobs", shards=self.shards, replicas=self.replicas ).run() @@ -108,7 +104,7 @@ class RethinkDbFrontier: ) def claim_sites(self, n=1): - self.logger.trace("claiming up to %s sites to brozzle", n) + self.logger.debug("claiming up to %s sites to brozzle", n) result = ( self.rr.table("sites") .get_all( @@ -186,10 +182,10 @@ class RethinkDbFrontier: if result["changes"][i]["old_val"]["claimed"]: self.logger.warning( "re-claimed site that was still marked 'claimed' " - "because it was last claimed a long time ago " - "at %s, and presumably some error stopped it from " + "because it was last claimed a long time ago, " + "and presumably some error stopped it from " "being disclaimed", - result["changes"][i]["old_val"]["last_claimed"], + last_claimed=result["changes"][i]["old_val"]["last_claimed"], ) site = brozzler.Site(self.rr, result["changes"][i]["new_val"]) sites.append(site) @@ -205,10 +201,10 @@ class RethinkDbFrontier: """ if site.time_limit and site.time_limit > 0 and site.elapsed() > site.time_limit: self.logger.debug( - "site FINISHED_TIME_LIMIT! time_limit=%s " "elapsed=%s %s", - site.time_limit, - site.elapsed(), - site, + "site FINISHED_TIME_LIMIT!", + time_limit=site.time_limit, + elapsed=site.elapsed(), + site=site, ) raise brozzler.ReachedTimeLimit @@ -273,7 +269,7 @@ class RethinkDbFrontier: """Raises brozzler.CrawlStopped if stop has been requested.""" site.refresh() if site.stop_requested and site.stop_requested <= doublethink.utcnow(): - self.logger.info("stop requested for site %s", site.id) + self.logger.info("stop requested for site", site_id=site.id) raise brozzler.CrawlStopped if site.job_id: @@ -283,7 +279,7 @@ class RethinkDbFrontier: and job.stop_requested and job.stop_requested <= doublethink.utcnow() ): - self.logger.info("stop requested for job %s", site.job_id) + self.logger.info("stop requested for job", job_id=site.job_id) raise brozzler.CrawlStopped def _maybe_finish_job(self, job_id): @@ -304,7 +300,7 @@ class RethinkDbFrontier: return False n += 1 - self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id) + self.logger.info("all %s sites finished, job is FINISHED!", n, job_id=job.id) job.finish() job.save() return True @@ -320,7 +316,7 @@ class RethinkDbFrontier: self._maybe_finish_job(site.job_id) def disclaim_site(self, site, page=None): - self.logger.info("disclaiming %s", site) + self.logger.info("disclaiming", site=site) site.claimed = False site.last_disclaimed = doublethink.utcnow() if not page and not self.has_outstanding_pages(site): @@ -468,17 +464,16 @@ class RethinkDbFrontier: try: self.logger.debug("inserting/replacing batch of %s pages", len(batch)) reql = self.rr.table("pages").insert(batch, conflict="replace") - self.logger.trace( + self.logger.debug( 'running query self.rr.table("pages").insert(%r, ' 'conflict="replace")', batch, ) result = reql.run() except Exception as e: - self.logger.error( + self.logger.exception( "problem inserting/replacing batch of %s pages", len(batch), - exc_info=True, ) parent_page.outlinks = {} @@ -497,7 +492,7 @@ class RethinkDbFrontier: ) def reached_limit(self, site, e): - self.logger.info("reached_limit site=%s e=%s", site, e) + self.logger.info("reached_limit", site=site, e=e) assert isinstance(e, brozzler.ReachedLimit) if ( site.reached_limit @@ -530,7 +525,7 @@ class RethinkDbFrontier: ) pages = list(results) if len(pages) > 1: - self.logger.warning("more than one seed page for site_id %s ?", site_id) + self.logger.warning("more than one seed page?", site_id=site_id) if len(pages) < 1: return None return brozzler.Page(self.rr, pages[0]) @@ -550,8 +545,8 @@ class RethinkDbFrontier: [site_id, 0 if brozzled is False else r.maxval, r.maxval, r.maxval], index="priority_by_site", ) - self.logger.trace("running query: %r", query) + self.logger.debug("running query", query=query) results = query.run() for result in results: - self.logger.trace("yielding result: %r", result) + self.logger.debug("yielding result", result=result) yield brozzler.Page(self.rr, result) diff --git a/brozzler/model.py b/brozzler/model.py index 585d69d..1099c3b 100644 --- a/brozzler/model.py +++ b/brozzler/model.py @@ -25,9 +25,9 @@ import datetime import doublethink import hashlib import json -import logging import os import re +import structlog import time import urlcanon import urllib @@ -36,6 +36,8 @@ import yaml import zlib from typing import Optional +logger = structlog.get_logger(logger_name=__name__) + def load_schema(): schema_file = os.path.join(os.path.dirname(__file__), "job_schema.yaml") @@ -83,7 +85,7 @@ def merge(a, b): def new_job_file(frontier, job_conf_file): """Returns new Job.""" - logging.info("loading %s", job_conf_file) + logger.info("loading", job_conf_file=job_conf_file) with open(job_conf_file) as f: job_conf = yaml.safe_load(f) return new_job(frontier, job_conf) @@ -117,12 +119,12 @@ def new_job(frontier, job_conf): # insert in batches to avoid this error # rethinkdb.errors.ReqlDriverError: Query size (167883036) greater than maximum (134217727) in: for batch in (pages[i : i + 500] for i in range(0, len(pages), 500)): - logging.info("inserting batch of %s pages", len(batch)) + logger.info("inserting batch of %s pages", len(batch)) result = frontier.rr.table("pages").insert(batch).run() for batch in (sites[i : i + 100] for i in range(0, len(sites), 100)): - logging.info("inserting batch of %s sites", len(batch)) + logger.info("inserting batch of %s sites", len(batch)) result = frontier.rr.table("sites").insert(batch).run() - logging.info("job %s fully started", job.id) + logger.info("job fully started", job_id=job.id) return job @@ -151,7 +153,7 @@ def new_seed_page(frontier, site): def new_site(frontier, site): - logging.info("new site %s", site) + logger.info("new site", site=site) site.id = site.id or str(uuid.uuid4()) # insert the Page into the database before the Site, to avoid situation # where a brozzler worker immediately claims the site, finds no pages @@ -159,7 +161,7 @@ def new_site(frontier, site): try: page = new_seed_page(frontier, site) page.save() - logging.info("queued page %s", page) + logger.info("queued page", page=page) finally: # finally block because we want to insert the Site no matter what site.save() @@ -192,7 +194,7 @@ class ElapsedMixIn(object): class Job(doublethink.Document, ElapsedMixIn): - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) table = "jobs" def populate_defaults(self): @@ -212,16 +214,16 @@ class Job(doublethink.Document, ElapsedMixIn): def finish(self): if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]: self.logger.error( - "job is already finished status=%s " "starts_and_stops[-1]['stop']=%s", - self.status, - self.starts_and_stops[-1]["stop"], + "job is already finished", + status=self.status, + stop=self.starts_and_stops[-1]["stop"], ) self.status = "FINISHED" self.starts_and_stops[-1]["stop"] = doublethink.utcnow() class Site(doublethink.Document, ElapsedMixIn): - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) table = "sites" def populate_defaults(self): @@ -279,7 +281,7 @@ class Site(doublethink.Document, ElapsedMixIn): if set(rule.keys()) == {"ssurt"} ) if not any(ssurt.startswith(ss) for ss in simple_rule_ssurts): - self.logger.info("adding ssurt %s to scope accept rules", ssurt) + self.logger.info("adding ssurt to scope accept rules", ssurt=ssurt) self.scope["accepts"].append({"ssurt": ssurt}) def note_seed_redirect(self, url): @@ -377,7 +379,7 @@ class Site(doublethink.Document, ElapsedMixIn): class Page(doublethink.Document): - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) table = "pages" @staticmethod diff --git a/brozzler/pywb.py b/brozzler/pywb.py index 8b61841..de0fa44 100644 --- a/brozzler/pywb.py +++ b/brozzler/pywb.py @@ -19,7 +19,9 @@ limitations under the License. """ import sys -import logging +import structlog + +logger = structlog.get_logger(logger_name=__name__) try: import pywb.apps.cli @@ -30,7 +32,7 @@ try: import pywb.framework.basehandlers import pywb.rewrite.wburl except ImportError as e: - logging.critical( + logger.critical( '%s: %s\n\nYou might need to run "pip install ' 'brozzler[easy]".\nSee README.rst for more information.', type(e).__name__, @@ -111,7 +113,7 @@ class RethinkCDXSource(pywb.cdx.cdxsource.CDXSource): ) if cdx_query.limit: reql = reql.limit(cdx_query.limit) - logging.debug("rethinkdb query: %s", reql) + logger.debug("rethinkdb query", query=reql) results = reql.run() return results diff --git a/brozzler/robots.py b/brozzler/robots.py index 744c996..0c9d699 100644 --- a/brozzler/robots.py +++ b/brozzler/robots.py @@ -23,12 +23,12 @@ limitations under the License. """ import json -import logging import brozzler import reppy import reppy.cache import reppy.parser import requests +import structlog __all__ = ["is_permitted_by_robots"] @@ -119,10 +119,9 @@ def is_permitted_by_robots(site, url, proxy=None): # reppy has wrapped an exception that we want to bubble up raise brozzler.ProxyError(e) else: - logging.warning( - "returning true (permitted) after problem fetching " - "robots.txt for %r: %r", - url, - e, + structlog.get_logger(logger_name=__name__).warning( + "returning true (permitted) after problem fetching " "robots.txt", + url=url, + raised_exception=e, ) return True diff --git a/brozzler/worker.py b/brozzler/worker.py index ad1a993..98a9daa 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -import logging import brozzler import brozzler.browser import datetime @@ -31,6 +30,7 @@ import io import socket import random import requests +import structlog import urllib3 from urllib3.exceptions import TimeoutError, ProxyError import doublethink @@ -45,7 +45,7 @@ r = rdb.RethinkDB() class BrozzlerWorker: - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__) # 3⅓ min heartbeat interval => 10 min ttl # This is kind of a long time, because `frontier.claim_sites()`, which runs @@ -126,7 +126,7 @@ class BrozzlerWorker: self._metrics_port, self._registry_url, self._env ) else: - logging.warning( + self.logger.warning( "not starting prometheus scrape endpoint: metrics_port is undefined" ) @@ -174,9 +174,9 @@ class BrozzlerWorker: site.proxy = "%s:%s" % (svc["host"], svc["port"]) site.save() self.logger.info( - "chose warcprox instance %r from service registry for %r", - site.proxy, - site, + "chose warcprox instance from service registry", + instance=site.proxy, + registry=site, ) return site.proxy return None @@ -190,7 +190,7 @@ class BrozzlerWorker: self._proxy_is_warcprox = status["role"] == "warcprox" except Exception as e: self._proxy_is_warcprox = False - logging.info( + self.logger.info( "%s %s warcprox", self._proxy, "IS" if self._proxy_is_warcprox else "IS NOT", @@ -227,18 +227,18 @@ class BrozzlerWorker: with urllib.request.urlopen(request, timeout=600) as response: if response.getcode() != 204: self.logger.warning( - 'got "%s %s" response on warcprox ' + "got unexpected response on warcprox " "WARCPROX_WRITE_RECORD request (expected 204)", - response.getcode(), - response.reason, + code=response.getcode(), + reason=response.reason, ) return request, response except urllib.error.HTTPError as e: self.logger.warning( - 'got "%s %s" response on warcprox ' + "got unexpected response on warcprox " "WARCPROX_WRITE_RECORD request (expected 204)", - e.getcode(), - e.info(), + code=e.getcode(), + reason=e.info(), ) return request, None except urllib.error.URLError as e: @@ -271,16 +271,17 @@ class BrozzlerWorker: on_request=None, enable_youtube_dl=True, ): - self.logger.info("brozzling {}".format(page)) + page_logger = self.logger.bind(page=page) + page_logger.info("brozzling") outlinks = set() page_headers = self._get_page_headers(site, page) if not self._needs_browsing(page_headers): - self.logger.info("needs fetch: %s", page) + page_logger.info("needs fetch") self._fetch_url(site, page=page) else: - self.logger.info("needs browsing: %s", page) + page_logger.info("needs browsing") try: browser_outlinks = self._browse_page( browser, site, page, on_screenshot, on_request @@ -290,7 +291,7 @@ class BrozzlerWorker: if status_code in [502, 504]: raise brozzler.PageConnectionError() except brozzler.PageInterstitialShown: - self.logger.info("page interstitial shown (http auth): %s", page) + page_logger.info("page interstitial shown (http auth)") if enable_youtube_dl and ydl.should_ytdlp( site, page, status_code, self._skip_av_seeds @@ -308,10 +309,7 @@ class BrozzlerWorker: except brozzler.ProxyError: raise except brozzler.VideoExtractorError as e: - logging.error( - "error extracting video info: %s", - e, - ) + self.logger.exception("error extracting video info") except Exception as e: if ( hasattr(e, "exc_info") @@ -320,26 +318,25 @@ class BrozzlerWorker: and e.exc_info[1].code == 430 ): self.logger.info( - "youtube-dl got %s %s processing %s", - e.exc_info[1].code, - e.exc_info[1].msg, - page.url, + "youtube-dl encountered an error", + code=e.exc_info[1].code, + message=e.exc_info[1].msg, + url=page.url, ) else: - self.logger.error( - "youtube_dl raised exception on %s", page, exc_info=True - ) + self.logger.exception("youtube_dl raised exception", page=page) return outlinks @metrics.brozzler_header_processing_duration_seconds.time() @metrics.brozzler_in_progress_headers.track_inprogress() def _get_page_headers(self, site, page): + url_logger = self.logger.bind(url=page.url) # bypassing warcprox, requests' stream=True defers downloading the body of the response # see https://docs.python-requests.org/en/latest/user/advanced/#body-content-workflow try: user_agent = site.get("user_agent") headers = {"User-Agent": user_agent} if user_agent else {} - self.logger.info("getting page headers for %s", page.url) + url_logger.info("getting page headers") with requests.get( page.url, stream=True, @@ -349,11 +346,9 @@ class BrozzlerWorker: ) as r: return r.headers except requests.exceptions.Timeout as e: - self.logger.warning( - "Timed out trying to get headers for %s: %s", page.url, e - ) + url_logger.warning("Timed out trying to get headers", exc_info=True) except requests.exceptions.RequestException as e: - self.logger.warning("Failed to get headers for %s: %s", page.url, e) + url_logger.warning("Failed to get headers", exc_info=True) return {} def _needs_browsing(self, page_headers): @@ -378,10 +373,9 @@ class BrozzlerWorker: on_screenshot(screenshot_jpeg) if self._using_warcprox(site): self.logger.info( - "sending WARCPROX_WRITE_RECORD request to %s with " - "screenshot for %s", - self._proxy_for(site), - page, + "sending WARCPROX_WRITE_RECORD request", + proxy=self._proxy_for(site), + screenshot_for_page=page, ) thumbnail_jpeg = self.thumb_jpeg(screenshot_jpeg) self._warcprox_write_record( @@ -428,7 +422,7 @@ class BrozzlerWorker: video["content-length"] = int(response_headers["content-length"]) if "content-range" in response_headers: video["content-range"] = response_headers["content-range"] - logging.debug("embedded video %s", video) + self.logger.debug("embedded video", video=video) if not "videos" in page: page.videos = [] page.videos.append(video) @@ -437,11 +431,11 @@ class BrozzlerWorker: def _on_service_worker_version_updated(chrome_msg): # https://github.com/internetarchive/brozzler/issues/140 - self.logger.trace("%r", chrome_msg) + self.logger.debug("service worker updated", chrome_msg=chrome_msg) if chrome_msg.get("params", {}).get("versions"): url = chrome_msg.get("params", {}).get("versions")[0].get("scriptURL") if url and url.startswith("http") and url not in sw_fetched: - self.logger.info("fetching service worker script %s", url) + self.logger.info("fetching service worker script", url=url) self._fetch_url(site, url=url) sw_fetched.add(url) @@ -496,7 +490,7 @@ class BrozzlerWorker: headers = {"User-Agent": user_agent} if user_agent else {} headers.update(site.extra_headers(page)) - self.logger.info("fetching url %s", url) + self.logger.info("fetching url", url=url) try: # response is ignored http.request( @@ -506,17 +500,18 @@ class BrozzlerWorker: timeout=self.FETCH_URL_TIMEOUT, retries=False, ) - self.logger.info("Completed fetching url %s", url) + self.logger.info("Completed fetching url", url=url) except TimeoutError as e: - self.logger.warning("Timed out fetching %s", url) + self.logger.warning("Timed out fetching url", url=url) raise brozzler.PageConnectionError() from e except ProxyError as e: raise brozzler.ProxyError("proxy error fetching %s" % url) from e except urllib3.exceptions.RequestError as e: - self.logger.warning("Failed to fetch url %s: %s", url, e) + self.logger.warning("Failed to fetch url", url=url, exc_info=True) raise brozzler.PageConnectionError() from e def brozzle_site(self, browser, site): + site_logger = self.logger.bind(site=site) try: site.last_claimed_by = "%s:%s" % (socket.gethostname(), browser.chrome.port) site.save() @@ -526,9 +521,7 @@ class BrozzlerWorker: self._frontier.honor_stop_request(site) # _proxy_for() call in log statement can raise brozzler.ProxyError # which is why we honor time limit and stop request first☝🏻 - self.logger.info( - "brozzling site (proxy=%r) %s", self._proxy_for(site), site - ) + site_logger.info("brozzling site", proxy=self._proxy_for(site)) while time.time() - start < self.SITE_SESSION_MINUTES * 60: site.refresh() self._frontier.enforce_time_limit(site) @@ -540,7 +533,7 @@ class BrozzlerWorker: if page.needs_robots_check and not brozzler.is_permitted_by_robots( site, page.url, self._proxy_for(site) ): - logging.warning("page %s is blocked by robots.txt", page.url) + self.logger.warning("page is blocked by robots.txt", url=page.url) page.blocked_by_robots = True self._frontier.completed_page(site, page) else: @@ -556,7 +549,7 @@ class BrozzlerWorker: except brozzler.ShutdownRequested: self.logger.info("shutdown requested") except brozzler.NothingToClaim: - self.logger.info("no pages left for site %s", site) + site_logger.info("no pages left for site") except brozzler.ReachedLimit as e: self._frontier.reached_limit(site, e) except brozzler.ReachedTimeLimit as e: @@ -567,29 +560,24 @@ class BrozzlerWorker: # self.logger.info("{} shut down".format(browser)) except brozzler.ProxyError as e: if self._warcprox_auto: - logging.error( - "proxy error (site.proxy=%s), will try to choose a " - "healthy instance next time site is brozzled: %s", - site.proxy, - e, + self.logger.exception( + "proxy error, will try to choose a " + "healthy instance next time site is brozzled", + site_proxy=site.proxy, ) site.proxy = None else: # using brozzler-worker --proxy, nothing to do but try the # same proxy again next time - logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1) + self.logger.exception("proxy error", self_proxy=self._proxy) except (brozzler.PageConnectionError, Exception) as e: if isinstance(e, brozzler.PageConnectionError): - self.logger.error( - "Page status code possibly indicates connection failure between host and warcprox: site=%r page=%r", - site, - page, - exc_info=True, + site_logger.exception( + "Page status code possibly indicates connection failure between host and warcprox", + page=page, ) else: - self.logger.error( - "unexpected exception site=%r page=%r", site, page, exc_info=True - ) + site_logger.exception("unexpected exception", page=page) if page: # Calculate backoff in seconds based on number of failed attempts. # Minimum of 60, max of 135 giving delays of 60, 90, 135, 135... @@ -600,10 +588,10 @@ class BrozzlerWorker: page.failed_attempts = (page.failed_attempts or 0) + 1 if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES: self.logger.info( - 'marking page "completed" after %s unexpected ' - "exceptions attempting to brozzle %s", - page.failed_attempts, - page, + 'marking page "completed" after several unexpected ' + "exceptions attempting to brozzle", + failed_attempts=page.failed_attempts, + page=page, ) self._frontier.completed_page(site, page) page = None @@ -641,13 +629,11 @@ class BrozzlerWorker: try: self.status_info = self._service_registry.heartbeat(status_info) - self.logger.trace("status in service registry: %s", self.status_info) + self.logger.debug("status in service registry", status=self.status_info) except r.ReqlError as e: - self.logger.error( - "failed to send heartbeat and update service registry " - "with info %s: %s", - status_info, - e, + self.logger.exception( + "failed to send heartbeat and update service registry", + info=status_info, ) def _service_heartbeat_if_due(self): @@ -695,9 +681,7 @@ class BrozzlerWorker: self._browser_pool.release(browsers[i]) def run(self): - self.logger.notice( - "brozzler %s - brozzler-worker starting", brozzler.__version__ - ) + self.logger.warn("brozzler %s - brozzler-worker starting", brozzler.__version__) last_nothing_to_claim = 0 try: while not self._shutdown.is_set(): @@ -706,20 +690,20 @@ class BrozzlerWorker: try: self._start_browsing_some_sites() except brozzler.browser.NoBrowsersAvailable: - logging.trace("all %s browsers are in use", self._max_browsers) + self.logger.debug( + "all browsers are in use", max_browsers=self._max_browsers + ) except brozzler.NothingToClaim: last_nothing_to_claim = time.time() - logging.trace( + self.logger.debug( "nothing to claim, all available active sites " "are already claimed by a brozzler worker" ) time.sleep(0.5) - self.logger.notice("shutdown requested") + self.logger.warn("shutdown requested") except r.ReqlError as e: - self.logger.error( - "caught rethinkdb exception, will try to proceed", exc_info=True - ) + self.logger.exception("caught rethinkdb exception, will try to proceed") except brozzler.ShutdownRequested: self.logger.info("shutdown requested") except: @@ -731,12 +715,11 @@ class BrozzlerWorker: try: self._service_registry.unregister(self.status_info["id"]) except: - self.logger.error( - "failed to unregister from service registry", exc_info=True - ) + self.logger.exception("failed to unregister from service registry") self.logger.info( - "shutting down %s brozzling threads", len(self._browsing_threads) + "shutting down brozzling threads", + thread_count=len(self._browsing_threads), ) with self._browsing_threads_lock: for th in self._browsing_threads: @@ -756,6 +739,7 @@ class BrozzlerWorker: ) return self._thread = threading.Thread(target=self.run, name="BrozzlerWorker") + self.logger = self.logger.bind(thread=self._thread) self._thread.start() def shutdown_now(self): diff --git a/brozzler/ydl.py b/brozzler/ydl.py index a1e7fea..de88dda 100644 --- a/brozzler/ydl.py +++ b/brozzler/ydl.py @@ -16,7 +16,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -import logging import yt_dlp from yt_dlp.utils import match_filter_func, ExtractorError import brozzler @@ -29,6 +28,7 @@ import doublethink import datetime from . import metrics import random +import structlog import threading import time @@ -40,14 +40,17 @@ YTDLP_WAIT = 10 YTDLP_MAX_REDIRECTS = 5 +logger = structlog.get_logger(logger_name=__name__) + + def should_ytdlp(site, page, page_status, skip_av_seeds): # called only after we've passed needs_browsing() check if page_status != 200: - logging.info("skipping ytdlp: non-200 page status %s", page_status) + logger.info("skipping ytdlp: non-200 page status", page_status=page_status) return False if site.skip_ytdlp: - logging.info("skipping ytdlp: site marked skip_ytdlp") + logger.info("skipping ytdlp: site marked skip_ytdlp") return False ytdlp_url = page.redirect_url if page.redirect_url else page.url @@ -64,7 +67,7 @@ def should_ytdlp(site, page, page_status, skip_av_seeds): # TODO: develop UI and refactor if ytdlp_seed: if site.skip_ytdlp is None and ytdlp_seed in skip_av_seeds: - logging.info("skipping ytdlp: site in skip_av_seeds") + logger.info("skipping ytdlp: site in skip_av_seeds") site.skip_ytdlp = True return False else: @@ -112,7 +115,13 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): """ class _YoutubeDL(yt_dlp.YoutubeDL): - logger = logging.getLogger(__module__ + "." + __qualname__) + logger = structlog.get_logger(__module__ + "." + __qualname__) + + def __init__(self, url, params=None, auto_init=True): + super().__init__(params, auto_init) + + self.url = url + self.logger = self.logger.bind(url=url) def process_ie_result(self, ie_result, download=True, extra_info=None): if extra_info is None: @@ -122,7 +131,9 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): if result_type in ("url", "url_transparent"): if "extraction_depth" in extra_info: self.logger.info( - f"Following redirect URL: {ie_result['url']} extraction_depth: {extra_info['extraction_depth']}" + f"Following redirect", + redirect_url=ie_result["url"], + extraction_depth=extra_info["extraction_depth"], ) extra_info["extraction_depth"] = 1 + extra_info.get( "extraction_depth", 0 @@ -139,8 +150,9 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): def add_default_extra_info(self, ie_result, ie, url): # hook in some logging super().add_default_extra_info(ie_result, ie, url) + extract_context = self.logger.bind(extractor=ie.IE_NAME) if ie_result.get("_type") == "playlist": - self.logger.info("extractor %r found playlist in %s", ie.IE_NAME, url) + extract_context.info("found playlist") if ie.IE_NAME in { "youtube:playlist", "youtube:tab", @@ -155,22 +167,20 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): try: ie_result["entries_no_dl"] = list(ie_result["entries"]) except Exception as e: - self.logger.warning( - "failed to unroll ie_result['entries']? for %s, %s; exception %s", - ie.IE_NAME, - url, - e, + extract_context.warning( + "failed to unroll entries ie_result['entries']?", + exc_info=True, ) ie_result["entries_no_dl"] = [] ie_result["entries"] = [] self.logger.info( - "not downloading %s media files from this " + "not downloading media files from this " "playlist because we expect to capture them from " "individual watch/track/detail pages", - len(ie_result["entries_no_dl"]), + media_file_count=len(ie_result["entries_no_dl"]), ) else: - self.logger.info("extractor %r found a download in %s", ie.IE_NAME, url) + extract_context.info("found a download") def _push_video_to_warcprox(self, site, info_dict, postprocessor): # 220211 update: does yt-dlp supply content-type? no, not as such @@ -188,7 +198,11 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): mimetype = magic.from_file(info_dict["filepath"], mime=True) except ImportError as e: mimetype = "video/%s" % info_dict["ext"] - self.logger.warning("guessing mimetype %s because %r", mimetype, e) + self.logger.warning( + "guessing mimetype due to error", + mimetype=mimetype, + exc_info=True, + ) # youtube watch page postprocessor is MoveFiles @@ -206,12 +220,11 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): size = os.path.getsize(info_dict["filepath"]) self.logger.info( - "pushing %r video as %s (%s bytes) to " "warcprox at %s with url %s", - info_dict["format"], - mimetype, - size, - worker._proxy_for(site), - url, + "pushing video to warcprox", + format=info_dict["format"], + mimetype=mimetype, + size=size, + warcprox=worker._proxy_for(site), ) with open(info_dict["filepath"], "rb") as f: # include content-length header to avoid chunked @@ -248,23 +261,23 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): ): worker.logger.debug( "heartbeating site.last_claimed to prevent another " - "brozzler-worker claiming this site id=%r", - site.id, + "brozzler-worker claiming this site", + id=site.id, ) site.last_claimed = doublethink.utcnow() site.save() except: worker.logger.debug( - "problem heartbeating site.last_claimed site id=%r", - site.id, + "problem heartbeating site.last_claimed site", + id=site.id, exc_info=True, ) def ydl_postprocess_hook(d): if d["status"] == "finished": - worker.logger.info("[ydl_postprocess_hook] Finished postprocessing") worker.logger.info( - "[ydl_postprocess_hook] postprocessor: {}".format(d["postprocessor"]) + "[ydl_postprocess_hook] Finished postprocessing", + postprocessor=d["postprocessor"], ) is_youtube_host = isyoutubehost(d["info_dict"]["webpage_url"]) @@ -298,7 +311,7 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): # --cache-dir local or.. # this looked like a problem with nsf-mounted homedir, maybe not a problem for brozzler on focal? "cache_dir": "/home/archiveit", - "logger": logging.getLogger("yt_dlp"), + "logger": logger, "verbose": False, "quiet": False, # recommended to avoid bot detection @@ -314,17 +327,16 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints): ytdlp_proxy_for_logs = ( ydl_opts["proxy"].split("@")[1] if "@" in ydl_opts["proxy"] else "@@@" ) - logging.info("using yt-dlp proxy ... %s", ytdlp_proxy_for_logs) + logger.info("using yt-dlp proxy ...", proxy=ytdlp_proxy_for_logs) # skip warcprox proxying yt-dlp v.2023.07.06: youtube extractor using ranges # if worker._proxy_for(site): # ydl_opts["proxy"] = "http://{}".format(worker._proxy_for(site)) - ydl = _YoutubeDL(ydl_opts) + ydl = _YoutubeDL(ytdlp_url, params=ydl_opts) if site.extra_headers(): ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers(page))) ydl.pushed_videos = [] - ydl.url = ytdlp_url ydl.is_youtube_host = is_youtube_host return ydl @@ -344,7 +356,7 @@ def _remember_videos(page, pushed_videos=None): "content-type": pushed_video["content-type"], "content-length": pushed_video["content-length"], } - logging.debug("embedded video %s", video) + logger.debug("embedded video", video=video) page.videos.append(video) @@ -353,7 +365,7 @@ def _try_youtube_dl(worker, ydl, site, page): attempt = 0 while attempt < max_attempts: try: - logging.info("trying yt-dlp on %s", ydl.url) + logger.info("trying yt-dlp", url=ydl.url) # should_download_vid = not ydl.is_youtube_host # then # ydl.extract_info(str(urlcanon.whatwg(ydl.url)), download=should_download_vid) @@ -394,15 +406,18 @@ def _try_youtube_dl(worker, ydl, site, page): # and others... attempt += 1 if attempt == max_attempts: - logging.warning( - "Failed after %s attempt(s). Error: %s", max_attempts, e + logger.warning( + "Failed after %s attempt(s)", + max_attempts, + attempts=max_attempts, + exc_info=True, ) raise brozzler.VideoExtractorError( "yt-dlp hit error extracting info for %s" % ydl.url ) else: retry_wait = min(60, YTDLP_WAIT * (1.5 ** (attempt - 1))) - logging.info( + logger.info( "Attempt %s failed. Retrying in %s seconds...", attempt, retry_wait, @@ -413,15 +428,14 @@ def _try_youtube_dl(worker, ydl, site, page): "yt-dlp hit unknown error extracting info for %s" % ydl.url ) - logging.info("ytdlp completed successfully") + logger.info("ytdlp completed successfully") _remember_videos(page, ydl.pushed_videos) if worker._using_warcprox(site): info_json = json.dumps(ie_result, sort_keys=True, indent=4) - logging.info( - "sending WARCPROX_WRITE_RECORD request to warcprox " - "with yt-dlp json for %s", - ydl.url, + logger.info( + "sending WARCPROX_WRITE_RECORD request to warcprox " "with yt-dlp json", + url=ydl.url, ) worker._warcprox_write_record( warcprox_address=worker._proxy_for(site), @@ -452,7 +466,7 @@ def do_youtube_dl(worker, site, page, ytdlp_proxy_endpoints): with tempfile.TemporaryDirectory( prefix="brzl-ydl-", dir=worker._ytdlp_tmpdir ) as tempdir: - logging.info("tempdir for yt-dlp: %s", tempdir) + logger.info("tempdir for yt-dlp", tempdir=tempdir) ydl = _build_youtube_dl(worker, tempdir, site, page, ytdlp_proxy_endpoints) ie_result = _try_youtube_dl(worker, ydl, site, page) outlinks = set() diff --git a/setup.py b/setup.py index 808ba87..b4db44e 100644 --- a/setup.py +++ b/setup.py @@ -76,6 +76,7 @@ setuptools.setup( "cryptography>=2.3", "python-magic>=0.4.15", "prometheus-client>=0.20.0", + "structlog>=25.1.0", ], extras_require={ "yt-dlp": ["yt-dlp>=2024.7.25"], diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 51d78e3..d3ece4b 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -31,11 +31,14 @@ import datetime import requests import subprocess import http.server -import logging +import structlog import sys import warcprox +logger = structlog.get_logger(logger_name=__name__) + + # https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib def _local_address(): import socket @@ -70,11 +73,19 @@ def stop_service(service): def httpd(request): class RequestHandler(http.server.SimpleHTTPRequestHandler): def do_POST(self): - logging.info("\n%s\n%s", self.requestline, self.headers) + logger.info( + "RequestHandler.do_POST", + requestline=self.requestline, + headers=self.headers, + ) self.do_GET() def do_GET(self): - logging.info("\n%s\n%s", self.requestline, self.headers) + logger.info( + "RequestHandler.do_GET", + requestline=self.requestline, + headers=self.headers, + ) if self.path == "/site5/redirect/": self.send_response(303, "See other") self.send_header("Connection", "close") @@ -270,7 +281,7 @@ def test_proxy_non_warcprox(httpd): def do_HEAD(self): if not hasattr(self.server, "requests"): self.server.requests = [] - logging.info("%s %s", self.command, self.path) + logger.info("%s %s", self.command, self.path) self.server.requests.append("%s %s" % (self.command, self.path)) response = urllib.request.urlopen(self.path) self.wfile.write( @@ -292,7 +303,7 @@ def test_proxy_non_warcprox(httpd): def do_WARCPROX_WRITE_RECORD(self): if not hasattr(self.server, "requests"): self.server.requests = [] - logging.info("%s %s", self.command, self.path) + logger.info("%s %s", self.command, self.path) self.send_error(400) proxy = http.server.HTTPServer(("localhost", 0), DumbProxyRequestHandler) @@ -826,7 +837,7 @@ def test_warcprox_outage_resiliency(httpd): try: stop_service("warcprox") except Exception as e: - logging.warning("problem stopping warcprox service: %s", e) + logger.warning("problem stopping warcprox service: %s", exc_info=True) # queue the site for brozzling brozzler.new_site(frontier, site) diff --git a/tests/test_units.py b/tests/test_units.py index b7a785f..27a89a7 100644 --- a/tests/test_units.py +++ b/tests/test_units.py @@ -24,7 +24,6 @@ import os import brozzler import brozzler.chrome import brozzler.ydl -import logging import yaml import datetime import requests @@ -36,15 +35,6 @@ import sys import threading from unittest import mock -logging.basicConfig( - stream=sys.stderr, - level=logging.INFO, - format=( - "%(asctime)s %(process)d %(levelname)s %(threadName)s " - "%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s" - ), -) - @pytest.fixture(scope="module") def httpd(request): From 6ca1a6248969bccda994701453e34e4e7cc430ed Mon Sep 17 00:00:00 2001 From: Gretchen Leigh Miller Date: Tue, 25 Feb 2025 12:08:35 -0800 Subject: [PATCH 2/4] update minimum Python version in README (#327) --- README.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 13ab3d4..740b59d 100644 --- a/README.rst +++ b/README.rst @@ -19,7 +19,7 @@ Brozzler is designed to work in conjunction with warcprox for web archiving. Requirements ------------ -- Python 3.5 or later +- Python 3.8 or later - RethinkDB deployment - Chromium or Google Chrome >= version 64 @@ -205,4 +205,3 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - From e37c0ad78cdba01123d432b9159bbcf9b351bbae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Misty=20De=20M=C3=A9o?= Date: Mon, 3 Mar 2025 09:24:52 -0800 Subject: [PATCH 3/4] deps: remove `easy` group (#331) This group isn't actually installable right now because of the jinja dependency conflict. --- setup.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/setup.py b/setup.py index b4db44e..21c8b8f 100644 --- a/setup.py +++ b/setup.py @@ -81,11 +81,7 @@ setuptools.setup( extras_require={ "yt-dlp": ["yt-dlp>=2024.7.25"], "dashboard": ["flask>=1.0", "gunicorn>=19.8.1"], - "easy": [ - "warcprox>=2.4.31", - "pywb>=0.33.2,<2", - "flask>=1.0", - "gunicorn>=19.8.1", + "rethinkdb": [ "rethinkdb==2.4.9", "doublethink==0.4.9", ], From e6332c7f9411200cad3472c7405be8deac129d29 Mon Sep 17 00:00:00 2001 From: Barbara Miller Date: Mon, 3 Mar 2025 14:41:30 -0800 Subject: [PATCH 4/4] unused --- .github/workflows/publish-artifacts.yml | 36 ------------------------- 1 file changed, 36 deletions(-) delete mode 100644 .github/workflows/publish-artifacts.yml diff --git a/.github/workflows/publish-artifacts.yml b/.github/workflows/publish-artifacts.yml deleted file mode 100644 index 47c96a3..0000000 --- a/.github/workflows/publish-artifacts.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: Publish Artifacts - -on: - push: - branches: - - main - - master - pull_request: - branches: - - main - - master - -jobs: - build: - name: Build distribution 📦 - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - name: Set up Python 3.8 - uses: actions/setup-python@v5 - with: - python-version: "3.8" - - name: Install pypa/build - run: >- - python3 -m - pip install - build - --user - - name: Build a binary wheel and a source tarball - run: python3 -m build - - name: Store the distribution packages - uses: actions/upload-artifact@v4 - with: - name: python-package-distributions - path: dist/