Merge branch 'logging' into qa

This commit is contained in:
Misty De Méo 2025-02-21 09:27:44 -08:00
commit 5f1e92c23f
15 changed files with 446 additions and 397 deletions

View File

@ -18,6 +18,7 @@ limitations under the License.
""" """
import logging import logging
import structlog
from pkg_resources import get_distribution as _get_distribution from pkg_resources import get_distribution as _get_distribution
__version__ = _get_distribution("brozzler").version __version__ = _get_distribution("brozzler").version
@ -79,32 +80,6 @@ class ReachedLimit(Exception):
return self.__repr__() return self.__repr__()
# monkey-patch log levels TRACE and NOTICE
logging.TRACE = (logging.NOTSET + logging.DEBUG) // 2
def _logger_trace(self, msg, *args, **kwargs):
if self.isEnabledFor(logging.TRACE):
self._log(logging.TRACE, msg, args, **kwargs)
logging.Logger.trace = _logger_trace
logging.trace = logging.root.trace
logging.addLevelName(logging.TRACE, "TRACE")
logging.NOTICE = (logging.INFO + logging.WARN) // 2
def _logger_notice(self, msg, *args, **kwargs):
if self.isEnabledFor(logging.NOTICE):
self._log(logging.NOTICE, msg, args, **kwargs)
logging.Logger.notice = _logger_notice
logging.notice = logging.root.notice
logging.addLevelName(logging.NOTICE, "NOTICE")
# see https://github.com/internetarchive/brozzler/issues/91 # see https://github.com/internetarchive/brozzler/issues/91
def _logging_handler_handle(self, record): def _logging_handler_handle(self, record):
rv = self.filter(record) rv = self.filter(record)
@ -146,7 +121,9 @@ def behavior_script(url, template_parameters=None, behaviors_dir=None):
""" """
Returns the javascript behavior string populated with template_parameters. Returns the javascript behavior string populated with template_parameters.
""" """
import re, logging, json import re, json
logger = structlog.get_logger()
for behavior in behaviors(behaviors_dir=behaviors_dir): for behavior in behaviors(behaviors_dir=behaviors_dir):
if re.match(behavior["url_regex"], url): if re.match(behavior["url_regex"], url):
@ -159,18 +136,18 @@ def behavior_script(url, template_parameters=None, behaviors_dir=None):
behavior["behavior_js_template"] behavior["behavior_js_template"]
) )
script = template.render(parameters) script = template.render(parameters)
logging.info( logger.info(
"using template=%r populated with parameters=%r for %r", "rendering template",
behavior["behavior_js_template"], template=behavior["behavior_js_template"],
json.dumps(parameters), parameters=json.dumps(parameters),
url, url=url,
) )
return script return script
return None return None
class ThreadExceptionGate: class ThreadExceptionGate:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
def __init__(self, thread): def __init__(self, thread):
self.thread = thread self.thread = thread
@ -181,7 +158,9 @@ class ThreadExceptionGate:
def __enter__(self): def __enter__(self):
assert self.thread == threading.current_thread() assert self.thread == threading.current_thread()
if self.pending_exception: if self.pending_exception:
self.logger.info("raising pending exception %s", self.pending_exception) self.logger.info(
"raising pending exception", pending_exception=self.pending_exception
)
tmp = self.pending_exception tmp = self.pending_exception
self.pending_exception = None self.pending_exception = None
raise tmp raise tmp
@ -198,10 +177,10 @@ class ThreadExceptionGate:
with self.lock: with self.lock:
if self.pending_exception: if self.pending_exception:
self.logger.warning( self.logger.warning(
"%r already pending for thread %r, discarding %r", "exception already pending for thread, discarding",
self.pending_exception, pending_exception=self.pending_exception,
self.thread, thread=self.thread,
e, exception=e,
) )
else: else:
self.pending_exception = e self.pending_exception = e
@ -266,7 +245,9 @@ def thread_raise(thread, exctype):
TypeError if `exctype` is not a class TypeError if `exctype` is not a class
ValueError, SystemError in case of unexpected problems ValueError, SystemError in case of unexpected problems
""" """
import ctypes, inspect, threading, logging import ctypes, inspect, threading, structlog
logger = structlog.get_logger(exctype=exctype, thread=thread)
if not inspect.isclass(exctype): if not inspect.isclass(exctype):
raise TypeError( raise TypeError(
@ -278,7 +259,7 @@ def thread_raise(thread, exctype):
with gate.lock: with gate.lock:
if gate.ok_to_raise.is_set() and thread.is_alive(): if gate.ok_to_raise.is_set() and thread.is_alive():
gate.ok_to_raise.clear() gate.ok_to_raise.clear()
logging.info("raising %s in thread %s", exctype, thread) logger.info("raising exception in thread")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc( res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), ctypes.py_object(exctype) ctypes.c_long(thread.ident), ctypes.py_object(exctype)
) )
@ -290,7 +271,7 @@ def thread_raise(thread, exctype):
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0) ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, 0)
raise SystemError("PyThreadState_SetAsyncExc failed") raise SystemError("PyThreadState_SetAsyncExc failed")
else: else:
logging.info("queueing %s for thread %s", exctype, thread) logger.info("queueing exception for thread")
gate.queue_exception(exctype) gate.queue_exception(exctype)

View File

@ -31,6 +31,7 @@ import base64
from ipaddress import AddressValueError from ipaddress import AddressValueError
from brozzler.chrome import Chrome from brozzler.chrome import Chrome
import socket import socket
import structlog
import urlcanon import urlcanon
MAX_UNMATCHED_INVALID_CHECKS = 5 MAX_UNMATCHED_INVALID_CHECKS = 5
@ -53,7 +54,7 @@ class BrowserPool:
debugging protocol. debugging protocol.
""" """
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
def __init__(self, size=3, **kwargs): def __init__(self, size=3, **kwargs):
""" """
@ -144,7 +145,7 @@ class BrowserPool:
class WebsockReceiverThread(threading.Thread): class WebsockReceiverThread(threading.Thread):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
def __init__(self, websock, name=None, daemon=True): def __init__(self, websock, name=None, daemon=True):
super().__init__(name=name, daemon=daemon) super().__init__(name=name, daemon=daemon)
@ -194,7 +195,7 @@ class WebsockReceiverThread(threading.Thread):
): ):
self.logger.error("websocket closed, did chrome die?") self.logger.error("websocket closed, did chrome die?")
else: else:
self.logger.error("exception from websocket receiver thread", exc_info=1) self.logger.exception("exception from websocket receiver thread")
brozzler.thread_raise(self.calling_thread, BrowsingException) brozzler.thread_raise(self.calling_thread, BrowsingException)
def run(self): def run(self):
@ -214,10 +215,9 @@ class WebsockReceiverThread(threading.Thread):
try: try:
self._handle_message(websock, message) self._handle_message(websock, message)
except: except:
self.logger.error( self.logger.exception(
"uncaught exception in _handle_message message=%s", "uncaught exception in _handle_message",
message, message=message,
exc_info=True,
) )
def _network_response_received(self, message): def _network_response_received(self, message):
@ -232,7 +232,7 @@ class WebsockReceiverThread(threading.Thread):
] ]
) )
self.reached_limit = brozzler.ReachedLimit(warcprox_meta=warcprox_meta) self.reached_limit = brozzler.ReachedLimit(warcprox_meta=warcprox_meta)
self.logger.info("reached limit %s", self.reached_limit) self.logger.info("reached limit", limit=self.reached_limit)
brozzler.thread_raise(self.calling_thread, brozzler.ReachedLimit) brozzler.thread_raise(self.calling_thread, brozzler.ReachedLimit)
else: else:
self.logger.info( self.logger.info(
@ -246,7 +246,7 @@ class WebsockReceiverThread(threading.Thread):
self.page_status = status self.page_status = status
def _javascript_dialog_opening(self, message): def _javascript_dialog_opening(self, message):
self.logger.info("javascript dialog opened: %s", message) self.logger.info("javascript dialog opened", message=message)
if message["params"]["type"] == "alert": if message["params"]["type"] == "alert":
accept = True accept = True
else: else:
@ -293,7 +293,7 @@ class WebsockReceiverThread(threading.Thread):
message["params"]["message"]["text"], message["params"]["message"]["text"],
) )
elif message["method"] == "Runtime.exceptionThrown": elif message["method"] == "Runtime.exceptionThrown":
self.logger.debug("uncaught exception: %s", message) self.logger.debug("uncaught exception", exception=message)
elif message["method"] == "Page.javascriptDialogOpening": elif message["method"] == "Page.javascriptDialogOpening":
self._javascript_dialog_opening(message) self._javascript_dialog_opening(message)
elif ( elif (
@ -323,7 +323,7 @@ class Browser:
Manages an instance of Chrome for browsing pages. Manages an instance of Chrome for browsing pages.
""" """
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
def __init__(self, **kwargs): def __init__(self, **kwargs):
""" """
@ -366,11 +366,10 @@ class Browser:
msg_id = next(self._command_id) msg_id = next(self._command_id)
kwargs["id"] = msg_id kwargs["id"] = msg_id
msg = json.dumps(kwargs, separators=",:") msg = json.dumps(kwargs, separators=",:")
logging.log( self.logger.debug(
logging.TRACE if suppress_logging else logging.DEBUG, "sending message",
"sending message to %s: %s", websock=self.websock,
self.websock, message=msg,
msg,
) )
self.websock.send(msg) self.websock.send(msg)
return msg_id return msg_id
@ -398,7 +397,7 @@ class Browser:
# Enable Console & Runtime output only when debugging. # Enable Console & Runtime output only when debugging.
# After all, we just print these events with debug(), we don't use # After all, we just print these events with debug(), we don't use
# them in Brozzler logic. # them in Brozzler logic.
if self.logger.isEnabledFor(logging.DEBUG): if self.logger.is_enabled_for(logging.DEBUG):
self.send_to_chrome(method="Console.enable") self.send_to_chrome(method="Console.enable")
self.send_to_chrome(method="Runtime.enable") self.send_to_chrome(method="Runtime.enable")
self.send_to_chrome(method="ServiceWorker.enable") self.send_to_chrome(method="ServiceWorker.enable")
@ -433,8 +432,8 @@ class Browser:
try: try:
self.websock.close() self.websock.close()
except BaseException as e: except BaseException as e:
self.logger.error( self.logger.exception(
"exception closing websocket %s - %s", self.websock, e "exception closing websocket", websocket=self.websock
) )
self.chrome.stop() self.chrome.stop()
@ -461,7 +460,7 @@ class Browser:
self.websock_url = None self.websock_url = None
except: except:
self.logger.error("problem stopping", exc_info=True) self.logger.exception("problem stopping")
def is_running(self): def is_running(self):
return self.websock_url is not None return self.websock_url is not None
@ -567,7 +566,7 @@ class Browser:
# if login redirected us, return to page_url # if login redirected us, return to page_url
if page_url != self.url().split("#")[0]: if page_url != self.url().split("#")[0]:
self.logger.debug( self.logger.debug(
"login navigated away from %s; returning!", page_url "login navigated away; returning!", page_url=page_url
) )
self.navigate_to_page(page_url, timeout=page_timeout) self.navigate_to_page(page_url, timeout=page_timeout)
# If the target page HTTP status is 4xx/5xx, there is no point # If the target page HTTP status is 4xx/5xx, there is no point
@ -611,7 +610,7 @@ class Browser:
# more information, raise that one # more information, raise that one
raise self.websock_thread.reached_limit raise self.websock_thread.reached_limit
except websocket.WebSocketConnectionClosedException as e: except websocket.WebSocketConnectionClosedException as e:
self.logger.error("websocket closed, did chrome die?") self.logger.exception("websocket closed, did chrome die?")
raise BrowsingException(e) raise BrowsingException(e)
finally: finally:
self.is_browsing = False self.is_browsing = False
@ -633,7 +632,7 @@ class Browser:
on_screenshot(jpeg_bytes) on_screenshot(jpeg_bytes)
return return
except BrowsingTimeout as e: except BrowsingTimeout as e:
logging.error("attempt %s/3: %s", i + 1, e) self.logger.exception("attempt %s/3", i + 1)
def visit_hashtags(self, page_url, hashtags, outlinks): def visit_hashtags(self, page_url, hashtags, outlinks):
_hashtags = set(hashtags or []) _hashtags = set(hashtags or [])
@ -647,7 +646,7 @@ class Browser:
# out which hashtags were visited already and skip those # out which hashtags were visited already and skip those
for hashtag in _hashtags: for hashtag in _hashtags:
# navigate_to_hashtag (nothing to wait for so no timeout?) # navigate_to_hashtag (nothing to wait for so no timeout?)
self.logger.debug("navigating to hashtag %s", hashtag) self.logger.debug("navigating to hashtag", hashtag=hashtag)
url = urlcanon.whatwg(page_url) url = urlcanon.whatwg(page_url)
url.hash_sign = b"#" url.hash_sign = b"#"
url.fragment = hashtag[1:].encode("utf-8") url.fragment = hashtag[1:].encode("utf-8")
@ -687,7 +686,7 @@ class Browser:
) )
def navigate_to_page(self, page_url, timeout=300): def navigate_to_page(self, page_url, timeout=300):
self.logger.info("navigating to page %s", page_url) self.logger.info("navigating to page", page_url=page_url)
self.websock_thread.got_page_load_event = None self.websock_thread.got_page_load_event = None
self.websock_thread.page_status = None self.websock_thread.page_status = None
self.send_to_chrome(method="Page.navigate", params={"url": page_url}) self.send_to_chrome(method="Page.navigate", params={"url": page_url})
@ -715,15 +714,13 @@ class Browser:
try: try:
out.append(str(urlcanon.whatwg(link))) out.append(str(urlcanon.whatwg(link)))
except AddressValueError: except AddressValueError:
self.logger.warning("skip invalid outlink: %s", link) self.logger.warning("skip invalid outlink", outlink=link)
return frozenset(out) return frozenset(out)
else: else:
# no links found # no links found
return frozenset() return frozenset()
else: else:
self.logger.error( self.logger.error("problem extracting outlinks", message=message)
"problem extracting outlinks, result message: %s", message
)
return frozenset() return frozenset()
def screenshot(self, full_page=False, timeout=45): def screenshot(self, full_page=False, timeout=45):
@ -797,11 +794,11 @@ class Browser:
elapsed = time.time() - start elapsed = time.time() - start
if elapsed > timeout: if elapsed > timeout:
logging.info( logging.info(
"behavior reached hard timeout after %.1fs and %s valid checks, and %s invalid checks, for url %s", "behavior reached hard timeout after %.1fs and %s valid checks, and %s invalid checks",
elapsed, elapsed,
valid_behavior_checks, valid_behavior_checks,
invalid_behavior_checks, invalid_behavior_checks,
page_url, page_url=page_url,
) )
return return

View File

@ -16,7 +16,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import logging
import urllib.request import urllib.request
import time import time
import threading import threading
@ -27,6 +26,7 @@ import select
import re import re
import signal import signal
import sqlite3 import sqlite3
import structlog
import json import json
import tempfile import tempfile
import sys import sys
@ -65,7 +65,7 @@ def check_version(chrome_exe):
class Chrome: class Chrome:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
def __init__(self, chrome_exe, port=9222, ignore_cert_errors=False): def __init__(self, chrome_exe, port=9222, ignore_cert_errors=False):
""" """
@ -97,22 +97,22 @@ class Chrome:
def _init_cookie_db(self, cookie_db): def _init_cookie_db(self, cookie_db):
cookie_dir = os.path.join(self._chrome_user_data_dir, "Default") cookie_dir = os.path.join(self._chrome_user_data_dir, "Default")
cookie_location = os.path.join(cookie_dir, "Cookies") cookie_location = os.path.join(cookie_dir, "Cookies")
self.logger.debug("cookie DB provided, writing to %s", cookie_location) cookie_logger = self.logger.bind(cookie_location=cookie_location)
cookie_logger.debug("cookie DB provided, writing to")
os.makedirs(cookie_dir, exist_ok=True) os.makedirs(cookie_dir, exist_ok=True)
try: try:
with open(cookie_location, "wb") as cookie_file: with open(cookie_location, "wb") as cookie_file:
cookie_file.write(cookie_db) cookie_file.write(cookie_db)
except OSError: except OSError:
self.logger.error( cookie_logger.exception("exception writing cookie file")
"exception writing cookie file at %s", cookie_location, exc_info=True
)
def persist_and_read_cookie_db(self): def persist_and_read_cookie_db(self):
cookie_location = os.path.join(self._chrome_user_data_dir, "Default", "Cookies") cookie_location = os.path.join(self._chrome_user_data_dir, "Default", "Cookies")
self.logger.debug( cookie_logger = self.logger.bind(cookie_location=cookie_location)
"marking cookies persistent then reading file into memory: %s", cookie_logger.debug(
cookie_location, "marking cookies persistent then reading file into memory",
) )
try: try:
with sqlite3.connect(cookie_location) as conn: with sqlite3.connect(cookie_location) as conn:
@ -125,20 +125,14 @@ class Chrome:
cur = conn.cursor() cur = conn.cursor()
cur.execute("UPDATE cookies SET persistent = 1") cur.execute("UPDATE cookies SET persistent = 1")
except sqlite3.Error: except sqlite3.Error:
self.logger.error( cookie_logger.exception("exception updating cookie DB")
"exception updating cookie DB %s", cookie_location, exc_info=True
)
cookie_db = None cookie_db = None
try: try:
with open(cookie_location, "rb") as cookie_file: with open(cookie_location, "rb") as cookie_file:
cookie_db = cookie_file.read() cookie_db = cookie_file.read()
except OSError: except OSError:
self.logger.error( cookie_logger.exception("exception reading from cookie DB file")
"exception reading from cookie DB file %s",
cookie_location,
exc_info=True,
)
return cookie_db return cookie_db
def start( def start(
@ -228,7 +222,7 @@ class Chrome:
if proxy: if proxy:
chrome_args.append("--proxy-server=%s" % proxy) chrome_args.append("--proxy-server=%s" % proxy)
chrome_args.append("about:blank") chrome_args.append("about:blank")
self.logger.info("running: %r", subprocess.list2cmdline(chrome_args)) self.logger.info("running", chrome_args=subprocess.list2cmdline(chrome_args))
# start_new_session - new process group so we can kill the whole group # start_new_session - new process group so we can kill the whole group
self.chrome_process = subprocess.Popen( self.chrome_process = subprocess.Popen(
chrome_args, chrome_args,
@ -244,12 +238,13 @@ class Chrome:
daemon=True, daemon=True,
) )
self._out_reader_thread.start() self._out_reader_thread.start()
self.logger.info("chrome running, pid %s" % self.chrome_process.pid) self.logger.info("chrome running", pid=self.chrome_process.pid)
return self._websocket_url(timeout_sec=websocket_timeout) return self._websocket_url(timeout_sec=websocket_timeout)
def _websocket_url(self, timeout_sec=60): def _websocket_url(self, timeout_sec=60):
json_url = "http://localhost:%s/json" % self.port json_url = "http://localhost:%s/json" % self.port
url_logger = self.logger.bind(json_url=json_url)
# make this a member variable so that kill -QUIT reports it # make this a member variable so that kill -QUIT reports it
self._start = time.time() self._start = time.time()
self._last_warning = self._start self._last_warning = self._start
@ -260,24 +255,21 @@ class Chrome:
debug_info = [x for x in all_debug_info if x["url"] == "about:blank"] debug_info = [x for x in all_debug_info if x["url"] == "about:blank"]
if debug_info and "webSocketDebuggerUrl" in debug_info[0]: if debug_info and "webSocketDebuggerUrl" in debug_info[0]:
self.logger.debug("%s returned %s", json_url, raw_json) url_logger.debug("webSocketDebuggerUrl returned", raw_json=raw_json)
url = debug_info[0]["webSocketDebuggerUrl"] url = debug_info[0]["webSocketDebuggerUrl"]
self.logger.info( url_logger.info(
"got chrome window websocket debug url %s from %s", "got chrome window websocket debug url",
url, debug_url=url,
json_url,
) )
return url return url
except brozzler.ShutdownRequested: except brozzler.ShutdownRequested:
raise raise
except Exception as e: except Exception as e:
if time.time() - self._last_warning > 30: if time.time() - self._last_warning > 30:
self.logger.warning( url_logger.warning(
"problem with %s (will keep trying until timeout " "problem accessing url (will keep trying until timeout)",
"of %d seconds): %s", timeout=timeout_sec,
json_url, exc_info=True,
timeout_sec,
e,
) )
self._last_warning = time.time() self._last_warning = time.time()
finally: finally:
@ -323,26 +315,28 @@ class Chrome:
while not self._shutdown.is_set(): while not self._shutdown.is_set():
buf = readline_nonblock(self.chrome_process.stdout) buf = readline_nonblock(self.chrome_process.stdout)
if buf: if buf:
self.logger.trace( self.logger.debug(
"chrome pid %s STDOUT %s", self.chrome_process.pid, buf "chrome pid %s STDOUT %s", self.chrome_process.pid, buf
) )
buf = readline_nonblock(self.chrome_process.stderr) buf = readline_nonblock(self.chrome_process.stderr)
if buf: if buf:
self.logger.trace( self.logger.debug(
"chrome pid %s STDERR %s", self.chrome_process.pid, buf "chrome pid %s STDERR %s", self.chrome_process.pid, buf
) )
except: except:
self.logger.error("unexpected exception", exc_info=True) self.logger.exception("unexpected exception")
def stop(self): def stop(self):
if not self.chrome_process or self._shutdown.is_set(): if not self.chrome_process or self._shutdown.is_set():
return return
self._shutdown.set() self._shutdown.set()
pid_logger = self.logger.bind(pid=self.chrome_process.pid)
timeout_sec = 300 timeout_sec = 300
if self.chrome_process.poll() is None: if self.chrome_process.poll() is None:
self.logger.info("terminating chrome pgid %s", self.chrome_process.pid) pid_logger.info("terminating chrome")
os.killpg(self.chrome_process.pid, signal.SIGTERM) os.killpg(self.chrome_process.pid, signal.SIGTERM)
t0 = time.time() t0 = time.time()
@ -352,14 +346,11 @@ class Chrome:
status = self.chrome_process.poll() status = self.chrome_process.poll()
if status is not None: if status is not None:
if status == 0: if status == 0:
self.logger.info( pid_logger.info("chrome exited normally")
"chrome pid %s exited normally", self.chrome_process.pid
)
else: else:
self.logger.warning( pid_logger.warning(
"chrome pid %s exited with nonzero status %s", "chrome exited with nonzero status",
self.chrome_process.pid, status=status,
status,
) )
# XXX I would like to forcefully kill the process group # XXX I would like to forcefully kill the process group
@ -369,18 +360,16 @@ class Chrome:
return return
time.sleep(0.5) time.sleep(0.5)
self.logger.warning( pid_logger.warning(
"chrome pid %s still alive %.1f seconds after sending " "chrome still alive %.1f seconds after sending "
"SIGTERM, sending SIGKILL", "SIGTERM, sending SIGKILL",
self.chrome_process.pid,
time.time() - t0, time.time() - t0,
) )
os.killpg(self.chrome_process.pid, signal.SIGKILL) os.killpg(self.chrome_process.pid, signal.SIGKILL)
status = self.chrome_process.wait() status = self.chrome_process.wait()
self.logger.warning( pid_logger.warning(
"chrome pid %s reaped (status=%s) after killing with " "SIGKILL", "chrome reaped after killing with " "SIGKILL",
self.chrome_process.pid, status=status,
status,
) )
finally: finally:
@ -389,8 +378,8 @@ class Chrome:
try: try:
self._home_tmpdir.cleanup() self._home_tmpdir.cleanup()
except: except:
self.logger.error( self.logger.exception(
"exception deleting %s", self._home_tmpdir, exc_info=True "exception deleting self._home_tmpdir", tmpdir=self._home_tmpdir
) )
self._out_reader_thread.join() self._out_reader_thread.join()
self.chrome_process = None self.chrome_process = None

View File

@ -29,6 +29,8 @@ import requests
import doublethink import doublethink
import signal import signal
import string import string
import structlog
import subprocess
import sys import sys
import threading import threading
import time import time
@ -41,6 +43,8 @@ import rethinkdb as rdb
r = rdb.RethinkDB() r = rdb.RethinkDB()
logger = structlog.get_logger()
def add_common_options(arg_parser, argv=None): def add_common_options(arg_parser, argv=None):
argv = argv or sys.argv argv = argv or sys.argv
@ -50,7 +54,7 @@ def add_common_options(arg_parser, argv=None):
dest="log_level", dest="log_level",
action="store_const", action="store_const",
default=logging.INFO, default=logging.INFO,
const=logging.NOTICE, const=logging.WARN,
help="quiet logging", help="quiet logging",
) )
arg_parser.add_argument( arg_parser.add_argument(
@ -67,7 +71,7 @@ def add_common_options(arg_parser, argv=None):
dest="log_level", dest="log_level",
action="store_const", action="store_const",
default=logging.INFO, default=logging.INFO,
const=logging.TRACE, const=logging.DEBUG,
help=("very verbose logging"), help=("very verbose logging"),
) )
# arg_parser.add_argument( # arg_parser.add_argument(
@ -108,7 +112,50 @@ def rethinker(args):
return doublethink.Rethinker(servers.split(","), db) return doublethink.Rethinker(servers.split(","), db)
# Decorates the logger name with call location, if provided
def decorate_logger_name(a, b, event_dict):
old_name = event_dict.get("logger_name")
if old_name is None:
return event_dict
try:
filename = event_dict.pop("filename")
func_name = event_dict.pop("func_name")
lineno = event_dict.pop("lineno")
except KeyError:
return event_dict
new_name = f"{old_name}.{func_name}({filename}:{lineno})"
event_dict["logger_name"] = new_name
return event_dict
def configure_logging(args): def configure_logging(args):
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.processors.CallsiteParameterAdder(
[
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
],
),
decorate_logger_name,
structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(args.log_level),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=False,
)
# We still configure logging for now because its handlers
# are used for the gunicorn spawned by the brozzler dashboard.
logging.basicConfig( logging.basicConfig(
stream=sys.stderr, stream=sys.stderr,
level=args.log_level, level=args.log_level,
@ -126,8 +173,36 @@ def configure_logging(args):
) )
def suggest_default_chrome_exe(): def mdfind(identifier):
# mac os x application executable paths try:
result = subprocess.check_output(
["mdfind", f"kMDItemCFBundleIdentifier == {identifier}"], text=True
)
# Just treat any errors as "couldn't find app"
except subprocess.CalledProcessError:
return None
if result:
return result.rstrip("\n")
def suggest_default_chrome_exe_mac():
path = None
# Try Chromium first, then Chrome
result = mdfind("org.chromium.Chromium")
if result is not None:
path = f"{result}/Contents/MacOS/Chromium"
result = mdfind("com.google.Chrome")
if result is not None:
path = f"{result}/Contents/MacOS/Google Chrome"
if path is not None and os.path.exists(path):
return path
# Fall back to default paths if mdfind couldn't find it
# (mdfind might fail to find them even in their default paths
# if the system has Spotlight disabled.)
for path in [ for path in [
"/Applications/Thorium.app/Contents/MacOS/Thorium", "/Applications/Thorium.app/Contents/MacOS/Thorium",
"/Applications/Chromium.app/Contents/MacOS/Chromium", "/Applications/Chromium.app/Contents/MacOS/Chromium",
@ -136,6 +211,14 @@ def suggest_default_chrome_exe():
if os.path.exists(path): if os.path.exists(path):
return path return path
def suggest_default_chrome_exe():
# First ask mdfind, which lets us find it in non-default paths
if sys.platform == "darwin":
path = suggest_default_chrome_exe_mac()
if path is not None:
return path
# "chromium-browser" is the executable on ubuntu trusty # "chromium-browser" is the executable on ubuntu trusty
# https://github.com/internetarchive/brozzler/pull/6/files uses "chromium" # https://github.com/internetarchive/brozzler/pull/6/files uses "chromium"
# google chrome executable names taken from these packages: # google chrome executable names taken from these packages:
@ -319,7 +402,7 @@ def brozzle_page(argv=None):
) )
with open(filename, "wb") as f: with open(filename, "wb") as f:
f.write(screenshot_jpeg) f.write(screenshot_jpeg)
logging.info("wrote screenshot to %s", filename) logger.info("wrote screenshot", filename=filename)
browser = brozzler.Browser(chrome_exe=args.chrome_exe) browser = brozzler.Browser(chrome_exe=args.chrome_exe)
try: try:
@ -335,11 +418,11 @@ def brozzle_page(argv=None):
on_screenshot=on_screenshot, on_screenshot=on_screenshot,
enable_youtube_dl=not args.skip_youtube_dl, enable_youtube_dl=not args.skip_youtube_dl,
) )
logging.info("outlinks: \n\t%s", "\n\t".join(sorted(outlinks))) logger.info("outlinks: \n\t%s", "\n\t".join(sorted(outlinks)))
except brozzler.ReachedLimit as e: except brozzler.ReachedLimit as e:
logging.error("reached limit %s", e) logger.exception("reached limit")
except brozzler.PageInterstitialShown as e: except brozzler.PageInterstitialShown as e:
logging.error("page interstitial shown %s", e) logger.exception("page interstitial shown")
finally: finally:
browser.stop() browser.stop()
@ -597,11 +680,11 @@ def brozzler_worker(argv=None):
state_strs.append("<???:thread:ident=%s>" % ident) state_strs.append("<???:thread:ident=%s>" % ident)
stack = traceback.format_stack(frames[ident]) stack = traceback.format_stack(frames[ident])
state_strs.append("".join(stack)) state_strs.append("".join(stack))
logging.info( logger.info(
"dumping state (caught signal %s)\n%s" % (signum, "\n".join(state_strs)) "dumping state (caught signal)\n%s", signal=signum, state=state_strs
) )
except BaseException as e: except BaseException as e:
logging.error("exception dumping state: %s" % e) logger.exception("exception dumping state")
finally: finally:
signal.signal(signal.SIGQUIT, dump_state) signal.signal(signal.SIGQUIT, dump_state)
@ -612,13 +695,13 @@ def brozzler_worker(argv=None):
with open(YTDLP_PROXY_ENDPOINTS_FILE) as endpoints: with open(YTDLP_PROXY_ENDPOINTS_FILE) as endpoints:
ytdlp_proxy_endpoints = [l for l in endpoints.readlines()] ytdlp_proxy_endpoints = [l for l in endpoints.readlines()]
if ytdlp_proxy_endpoints: if ytdlp_proxy_endpoints:
logging.info( logger.info(
"running with ytdlp proxy endpoints file %s" "running with ytdlp proxy endpoints file",
% YTDLP_PROXY_ENDPOINTS_FILE ytdlp_proxy_endpoints=YTDLP_PROXY_ENDPOINTS_FILE,
) )
except Exception as e: except Exception as e:
ytdlp_proxy_endpoints = [] ytdlp_proxy_endpoints = []
logging.info("running with empty proxy endpoints file") logger.info("running with empty proxy endpoints file")
return ytdlp_proxy_endpoints return ytdlp_proxy_endpoints
rr = rethinker(args) rr = rethinker(args)
@ -650,7 +733,7 @@ def brozzler_worker(argv=None):
th = threading.Thread(target=worker.run, name="BrozzlerWorkerThread") th = threading.Thread(target=worker.run, name="BrozzlerWorkerThread")
th.start() th.start()
th.join() th.join()
logging.info("brozzler-worker is all done, exiting") logger.info("brozzler-worker is all done, exiting")
def brozzler_ensure_tables(argv=None): def brozzler_ensure_tables(argv=None):
@ -724,18 +807,18 @@ def brozzler_list_jobs(argv=None):
except ValueError: except ValueError:
job_id = args.job job_id = args.job
reql = rr.table("jobs").get(job_id) reql = rr.table("jobs").get(job_id)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
result = reql.run() result = reql.run()
if result: if result:
results = [reql.run()] results = [reql.run()]
else: else:
logging.error("no such job with id %r", job_id) logger.error("no such job with id", job_id=job_id)
sys.exit(1) sys.exit(1)
else: else:
reql = rr.table("jobs").order_by("id") reql = rr.table("jobs").order_by("id")
if args.active: if args.active:
reql = reql.filter({"status": "ACTIVE"}) reql = reql.filter({"status": "ACTIVE"})
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
results = reql.run() results = reql.run()
if args.yaml: if args.yaml:
yaml.dump_all( yaml.dump_all(
@ -800,7 +883,7 @@ def brozzler_list_sites(argv=None):
) )
elif args.site: elif args.site:
reql = reql.get_all(args.site) reql = reql.get_all(args.site)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
results = reql.run() results = reql.run()
if args.yaml: if args.yaml:
yaml.dump_all( yaml.dump_all(
@ -868,7 +951,7 @@ def brozzler_list_pages(argv=None):
except ValueError: except ValueError:
job_id = args.job job_id = args.job
reql = rr.table("sites").get_all(job_id, index="job_id")["id"] reql = rr.table("sites").get_all(job_id, index="job_id")["id"]
logging.debug("querying rethinkb: %s", reql) logger.debug("querying rethinkb", query=reql)
site_ids = reql.run() site_ids = reql.run()
elif args.site: elif args.site:
try: try:
@ -897,7 +980,7 @@ def brozzler_list_pages(argv=None):
reql = reql.order_by(index="least_hops") reql = reql.order_by(index="least_hops")
if args.claimed: if args.claimed:
reql = reql.filter({"claimed": True}) reql = reql.filter({"claimed": True})
logging.debug("querying rethinkb: %s", reql) logger.debug("querying rethinkb", query=reql)
results = reql.run() results = reql.run()
if args.yaml: if args.yaml:
yaml.dump_all( yaml.dump_all(
@ -963,20 +1046,20 @@ def brozzler_purge(argv=None):
job_id = args.job job_id = args.job
job = brozzler.Job.load(rr, job_id) job = brozzler.Job.load(rr, job_id)
if not job: if not job:
logging.fatal("no such job %r", job_id) logger.fatal("no such job", job_id=job_id)
sys.exit(1) sys.exit(1)
if job.status == "ACTIVE": if job.status == "ACTIVE":
if args.force: if args.force:
logging.warning( logger.warning(
"job %s has status ACTIVE, purging anyway because " "job has status ACTIVE, purging anyway because "
"--force was supplied", "--force was supplied",
job_id, job_id=job_id,
) )
else: else:
logging.fatal( logger.fatal(
"refusing to purge job %s because status is ACTIVE " "refusing to purge job because status is ACTIVE "
"(override with --force)", "(override with --force)",
job_id, job_id=job_id,
) )
sys.exit(1) sys.exit(1)
_purge_job(rr, job_id) _purge_job(rr, job_id)
@ -984,20 +1067,20 @@ def brozzler_purge(argv=None):
site_id = args.site site_id = args.site
site = brozzler.Site.load(rr, site_id) site = brozzler.Site.load(rr, site_id)
if not site: if not site:
logging.fatal("no such job %r", job_id) logger.fatal("no such job", job_id=job_id)
sys.exit(1) sys.exit(1)
if site.status == "ACTIVE": if site.status == "ACTIVE":
if args.force: if args.force:
logging.warning( logger.warning(
"site %s has status ACTIVE, purging anyway because " "site has status ACTIVE, purging anyway because "
"--force was supplied", "--force was supplied",
site_id, site_id=site_id,
) )
else: else:
logging.fatal( logger.fatal(
"refusing to purge site %s because status is ACTIVE " "refusing to purge site because status is ACTIVE "
"(override with --force)", "(override with --force)",
site_id, site_id=site_id,
) )
sys.exit(1) sys.exit(1)
_purge_site(rr, site_id) _purge_site(rr, site_id)
@ -1016,7 +1099,7 @@ def brozzler_purge(argv=None):
.lt(finished_before) .lt(finished_before)
) )
) )
logging.debug("retrieving jobs older than %s: %s", finished_before, reql) logger.debug("retrieving jobs older than %s", finished_before, query=reql)
for job in reql.run(): for job in reql.run():
# logging.info('job %s finished=%s starts_and_stops[-1]["stop"]=%s', # logging.info('job %s finished=%s starts_and_stops[-1]["stop"]=%s',
# job['id'], job.get('finished'), # job['id'], job.get('finished'),
@ -1034,27 +1117,31 @@ def _purge_site(rr, site_id):
) )
.delete() .delete()
) )
logging.debug("purging pages for site %s: %s", site_id, reql) site_logger = logger.bind(site_id=site_id)
site_logger.debug("purging pages for site", query=reql)
result = reql.run() result = reql.run()
logging.info("purged pages for site %s: %s", site_id, result) site_logger.info("purged pages for site", result=result)
reql = rr.table("sites").get(site_id).delete() reql = rr.table("sites").get(site_id).delete()
logging.debug("purging site %s: %s", site_id, reql) site_logger.debug("purging site", query=reql)
result = reql.run() result = reql.run()
logging.info("purged site %s: %s", site_id, result) site_logger.info("purged site", result=result)
def _purge_job(rr, job_id): def _purge_job(rr, job_id):
job_logger = logger.bind(job_id=job_id)
reql = rr.table("sites").get_all(job_id, index="job_id").get_field("id") reql = rr.table("sites").get_all(job_id, index="job_id").get_field("id")
logging.debug("querying rethinkdb: %s", reql) job_logger.debug("querying rethinkdb", query=reql)
site_ids = list(reql.run()) site_ids = list(reql.run())
for site_id in site_ids: for site_id in site_ids:
_purge_site(rr, site_id) _purge_site(rr, site_id)
reql = rr.table("jobs").get(job_id).delete() reql = rr.table("jobs").get(job_id).delete()
logging.debug("purging job %s: %s", job_id, reql) job_logger.debug("purging job", query=reql)
result = reql.run() result = reql.run()
logging.info("purged job %s: %s", job_id, result) job_logger.info("purged job", result=result)
def brozzler_list_captures(argv=None): def brozzler_list_captures(argv=None):
@ -1101,7 +1188,7 @@ def brozzler_list_captures(argv=None):
if args.url_or_sha1[:5] == "sha1:": if args.url_or_sha1[:5] == "sha1:":
if args.prefix: if args.prefix:
logging.warning( logger.warning(
"ignoring supplied --prefix option which does not apply " "ignoring supplied --prefix option which does not apply "
"to lookup by sha1" "to lookup by sha1"
) )
@ -1112,7 +1199,7 @@ def brozzler_list_captures(argv=None):
[sha1base32, r.maxval, r.maxval], [sha1base32, r.maxval, r.maxval],
index="sha1_warc_type", index="sha1_warc_type",
) )
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
results = reql.run() results = reql.run()
else: else:
key = urlcanon.semantic(args.url_or_sha1).surt().decode("ascii") key = urlcanon.semantic(args.url_or_sha1).surt().decode("ascii")
@ -1135,7 +1222,7 @@ def brozzler_list_captures(argv=None):
lambda capture: (capture["canon_surt"] >= key) lambda capture: (capture["canon_surt"] >= key)
& (capture["canon_surt"] <= end_key) & (capture["canon_surt"] <= end_key)
) )
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
results = reql.run() results = reql.run()
if args.yaml: if args.yaml:
@ -1180,7 +1267,7 @@ def brozzler_stop_crawl(argv=None):
job_id = args.job_id job_id = args.job_id
job = brozzler.Job.load(rr, job_id) job = brozzler.Job.load(rr, job_id)
if not job: if not job:
logging.fatal("job not found with id=%r", job_id) logger.fatal("job not found with", id=job_id)
sys.exit(1) sys.exit(1)
job.stop_requested = doublethink.utcnow() job.stop_requested = doublethink.utcnow()
job.save() job.save()
@ -1191,7 +1278,7 @@ def brozzler_stop_crawl(argv=None):
site_id = args.site_id site_id = args.site_id
site = brozzler.Site.load(rr, site_id) site = brozzler.Site.load(rr, site_id)
if not site: if not site:
logging.fatal("site not found with id=%r", site_id) logger.fatal("site not found with", id=site_id)
sys.exit(1) sys.exit(1)
site.stop_requested = doublethink.utcnow() site.stop_requested = doublethink.utcnow()
site.save() site.save()

View File

@ -17,13 +17,15 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import logging import structlog
import sys import sys
logger = structlog.get_logger()
try: try:
import flask import flask
except ImportError as e: except ImportError as e:
logging.critical( logger.critical(
'%s: %s\n\nYou might need to run "pip install ' '%s: %s\n\nYou might need to run "pip install '
'brozzler[dashboard]".\nSee README.rst for more information.', 'brozzler[dashboard]".\nSee README.rst for more information.',
type(e).__name__, type(e).__name__,
@ -77,7 +79,7 @@ def queued_count(site_id):
) )
.count() .count()
) )
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
count = reql.run() count = reql.run()
return flask.jsonify(count=count) return flask.jsonify(count=count)
@ -85,7 +87,7 @@ def queued_count(site_id):
@app.route("/api/sites/<site_id>/queue") @app.route("/api/sites/<site_id>/queue")
@app.route("/api/site/<site_id>/queue") @app.route("/api/site/<site_id>/queue")
def queue(site_id): def queue(site_id):
logging.debug("flask.request.args=%s", flask.request.args) logger.debug("flask.request.args", args=flask.request.args)
start = flask.request.args.get("start", 0) start = flask.request.args.get("start", 0)
end = flask.request.args.get("end", start + 90) end = flask.request.args.get("end", start + 90)
reql = rr.table("pages").between( reql = rr.table("pages").between(
@ -93,7 +95,7 @@ def queue(site_id):
[site_id, 0, False, r.maxval], [site_id, 0, False, r.maxval],
index="priority_by_site", index="priority_by_site",
)[start:end] )[start:end]
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
queue_ = reql.run() queue_ = reql.run()
return flask.jsonify(queue_=list(queue_)) return flask.jsonify(queue_=list(queue_))
@ -112,7 +114,7 @@ def page_count(site_id):
) )
.count() .count()
) )
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
count = reql.run() count = reql.run()
return flask.jsonify(count=count) return flask.jsonify(count=count)
@ -130,7 +132,7 @@ def pages(site_id):
) )
.order_by(index="least_hops")[start:end] .order_by(index="least_hops")[start:end]
) )
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
pages_ = reql.run() pages_ = reql.run()
return flask.jsonify(pages=list(pages_)) return flask.jsonify(pages=list(pages_))
@ -139,7 +141,7 @@ def pages(site_id):
@app.route("/api/page/<page_id>") @app.route("/api/page/<page_id>")
def page(page_id): def page(page_id):
reql = rr.table("pages").get(page_id) reql = rr.table("pages").get(page_id)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
page_ = reql.run() page_ = reql.run()
return flask.jsonify(page_) return flask.jsonify(page_)
@ -148,7 +150,7 @@ def page(page_id):
@app.route("/api/page/<page_id>/yaml") @app.route("/api/page/<page_id>/yaml")
def page_yaml(page_id): def page_yaml(page_id):
reql = rr.table("pages").get(page_id) reql = rr.table("pages").get(page_id)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
page_ = reql.run() page_ = reql.run()
return app.response_class( return app.response_class(
yaml.dump(page_, default_flow_style=False), mimetype="application/yaml" yaml.dump(page_, default_flow_style=False), mimetype="application/yaml"
@ -159,7 +161,7 @@ def page_yaml(page_id):
@app.route("/api/site/<site_id>") @app.route("/api/site/<site_id>")
def site(site_id): def site(site_id):
reql = rr.table("sites").get(site_id) reql = rr.table("sites").get(site_id)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
s = reql.run() s = reql.run()
if "cookie_db" in s: if "cookie_db" in s:
s["cookie_db"] = base64.b64encode(s["cookie_db"]).decode("ascii") s["cookie_db"] = base64.b64encode(s["cookie_db"]).decode("ascii")
@ -170,7 +172,7 @@ def site(site_id):
@app.route("/api/site/<site_id>/yaml") @app.route("/api/site/<site_id>/yaml")
def site_yaml(site_id): def site_yaml(site_id):
reql = rr.table("sites").get(site_id) reql = rr.table("sites").get(site_id)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
site_ = reql.run() site_ = reql.run()
return app.response_class( return app.response_class(
yaml.dump(site_, default_flow_style=False), mimetype="application/yaml" yaml.dump(site_, default_flow_style=False), mimetype="application/yaml"
@ -180,7 +182,7 @@ def site_yaml(site_id):
@app.route("/api/stats/<bucket>") @app.route("/api/stats/<bucket>")
def stats(bucket): def stats(bucket):
reql = rr.table("stats").get(bucket) reql = rr.table("stats").get(bucket)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
stats_ = reql.run() stats_ = reql.run()
return flask.jsonify(stats_) return flask.jsonify(stats_)
@ -193,7 +195,7 @@ def sites(job_id):
except ValueError: except ValueError:
jid = job_id jid = job_id
reql = rr.table("sites").get_all(jid, index="job_id") reql = rr.table("sites").get_all(jid, index="job_id")
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
sites_ = list(reql.run()) sites_ = list(reql.run())
# TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable # TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable
for s in sites_: for s in sites_:
@ -206,7 +208,7 @@ def sites(job_id):
def jobless_sites(): def jobless_sites():
# XXX inefficient (unindexed) query # XXX inefficient (unindexed) query
reql = rr.table("sites").filter(~r.row.has_fields("job_id")) reql = rr.table("sites").filter(~r.row.has_fields("job_id"))
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
sites_ = list(reql.run()) sites_ = list(reql.run())
# TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable # TypeError: <binary, 7168 bytes, '53 51 4c 69 74 65...'> is not JSON serializable
for s in sites_: for s in sites_:
@ -223,7 +225,7 @@ def job(job_id):
except ValueError: except ValueError:
jid = job_id jid = job_id
reql = rr.table("jobs").get(jid) reql = rr.table("jobs").get(jid)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
job_ = reql.run() job_ = reql.run()
return flask.jsonify(job_) return flask.jsonify(job_)
@ -236,7 +238,7 @@ def job_yaml(job_id):
except ValueError: except ValueError:
jid = job_id jid = job_id
reql = rr.table("jobs").get(jid) reql = rr.table("jobs").get(jid)
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
job_ = reql.run() job_ = reql.run()
return app.response_class( return app.response_class(
yaml.dump(job_, default_flow_style=False), mimetype="application/yaml" yaml.dump(job_, default_flow_style=False), mimetype="application/yaml"
@ -258,7 +260,7 @@ def services():
@app.route("/api/jobs") @app.route("/api/jobs")
def jobs(): def jobs():
reql = rr.table("jobs").order_by(r.desc("id")) reql = rr.table("jobs").order_by(r.desc("id"))
logging.debug("querying rethinkdb: %s", reql) logger.debug("querying rethinkdb", query=reql)
jobs_ = list(reql.run()) jobs_ = list(reql.run())
return flask.jsonify(jobs=jobs_) return flask.jsonify(jobs=jobs_)
@ -313,13 +315,13 @@ try:
return self.application return self.application
def run(**options): def run(**options):
logging.info("running brozzler-dashboard using gunicorn") logger.info("running brozzler-dashboard using gunicorn")
GunicornBrozzlerDashboard(app, options).run() GunicornBrozzlerDashboard(app, options).run()
except ImportError: except ImportError:
def run(): def run():
logging.info("running brozzler-dashboard using simple flask app.run") logger.info("running brozzler-dashboard using simple flask app.run")
app.run(host=SETTINGS["DASHBOARD_INTERFACE"], port=SETTINGS["DASHBOARD_PORT"]) app.run(host=SETTINGS["DASHBOARD_INTERFACE"], port=SETTINGS["DASHBOARD_PORT"])

View File

@ -18,8 +18,8 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import structlog
import sys import sys
import logging
try: try:
import warcprox import warcprox
@ -30,11 +30,11 @@ try:
import wsgiref.handlers import wsgiref.handlers
import brozzler.dashboard import brozzler.dashboard
except ImportError as e: except ImportError as e:
logging.critical( structlog.get_logger().critical(
'%s: %s\n\nYou might need to run "pip install ' '%s: %s\n\nYou might need to run "pip install '
'brozzler[easy]".\nSee README.rst for more information.', 'brozzler[easy]".\nSee README.rst for more information.',
type(e).__name__, type(e).__name__,
e, exc_info=True,
) )
sys.exit(1) sys.exit(1)
import argparse import argparse
@ -156,7 +156,7 @@ class ThreadingWSGIServer(
class BrozzlerEasyController: class BrozzlerEasyController:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
def __init__(self, args): def __init__(self, args):
self.stop = threading.Event() self.stop = threading.Event()
@ -238,11 +238,14 @@ class BrozzlerEasyController:
self.logger.info("starting brozzler-worker") self.logger.info("starting brozzler-worker")
self.brozzler_worker.start() self.brozzler_worker.start()
self.logger.info("starting pywb at %s:%s", *self.pywb_httpd.server_address) self.logger.info(
"starting pywb", address="%s:%s" % self.pywb_httpd.server_address
)
threading.Thread(target=self.pywb_httpd.serve_forever).start() threading.Thread(target=self.pywb_httpd.serve_forever).start()
self.logger.info( self.logger.info(
"starting brozzler-dashboard at %s:%s", *self.dashboard_httpd.server_address "starting brozzler-dashboard",
address="%s:%s" % self.dashboard_httpd.server_address,
) )
threading.Thread(target=self.dashboard_httpd.serve_forever).start() threading.Thread(target=self.dashboard_httpd.serve_forever).start()
@ -307,8 +310,8 @@ class BrozzlerEasyController:
state_strs.append(str(th)) state_strs.append(str(th))
stack = traceback.format_stack(sys._current_frames()[th.ident]) stack = traceback.format_stack(sys._current_frames()[th.ident])
state_strs.append("".join(stack)) state_strs.append("".join(stack))
logging.warning( structlog.get_logger().warning(
"dumping state (caught signal {})\n{}".format(signum, "\n".join(state_strs)) "dumping state (caught signal)", signal=signum, state="\n".join(state_strs)
) )

View File

@ -16,12 +16,12 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import logging
import brozzler import brozzler
import random import random
import time import time
import datetime import datetime
import rethinkdb as rdb import rethinkdb as rdb
import structlog
import doublethink import doublethink
import urlcanon import urlcanon
@ -33,7 +33,7 @@ class UnexpectedDbResult(Exception):
class RethinkDbFrontier: class RethinkDbFrontier:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
def __init__(self, rr, shards=None, replicas=None): def __init__(self, rr, shards=None, replicas=None):
self.rr = rr self.rr = rr
@ -42,15 +42,15 @@ class RethinkDbFrontier:
self._ensure_db() self._ensure_db()
def _ensure_db(self): def _ensure_db(self):
db_logger = self.logger.bind(dbname=self.rr.dbname)
dbs = self.rr.db_list().run() dbs = self.rr.db_list().run()
if not self.rr.dbname in dbs: if not self.rr.dbname in dbs:
self.logger.info("creating rethinkdb database %r", self.rr.dbname) db_logger.info("creating rethinkdb database")
self.rr.db_create(self.rr.dbname).run() self.rr.db_create(self.rr.dbname).run()
tables = self.rr.table_list().run() tables = self.rr.table_list().run()
if not "sites" in tables: if not "sites" in tables:
self.logger.info( db_logger.info("creating rethinkdb table 'sites' in database")
"creating rethinkdb table 'sites' in database %r", self.rr.dbname
)
self.rr.table_create( self.rr.table_create(
"sites", shards=self.shards, replicas=self.replicas "sites", shards=self.shards, replicas=self.replicas
).run() ).run()
@ -59,9 +59,7 @@ class RethinkDbFrontier:
).run() ).run()
self.rr.table("sites").index_create("job_id").run() self.rr.table("sites").index_create("job_id").run()
if not "pages" in tables: if not "pages" in tables:
self.logger.info( db_logger.info("creating rethinkdb table 'pages' in database")
"creating rethinkdb table 'pages' in database %r", self.rr.dbname
)
self.rr.table_create( self.rr.table_create(
"pages", shards=self.shards, replicas=self.replicas "pages", shards=self.shards, replicas=self.replicas
).run() ).run()
@ -81,9 +79,7 @@ class RethinkDbFrontier:
[r.row["site_id"], r.row["brozzle_count"], r.row["hops_from_seed"]], [r.row["site_id"], r.row["brozzle_count"], r.row["hops_from_seed"]],
).run() ).run()
if not "jobs" in tables: if not "jobs" in tables:
self.logger.info( db_logger.info("creating rethinkdb table 'jobs' in database")
"creating rethinkdb table 'jobs' in database %r", self.rr.dbname
)
self.rr.table_create( self.rr.table_create(
"jobs", shards=self.shards, replicas=self.replicas "jobs", shards=self.shards, replicas=self.replicas
).run() ).run()
@ -108,7 +104,7 @@ class RethinkDbFrontier:
) )
def claim_sites(self, n=1): def claim_sites(self, n=1):
self.logger.trace("claiming up to %s sites to brozzle", n) self.logger.debug("claiming up to %s sites to brozzle", n)
result = ( result = (
self.rr.table("sites") self.rr.table("sites")
.get_all( .get_all(
@ -186,10 +182,10 @@ class RethinkDbFrontier:
if result["changes"][i]["old_val"]["claimed"]: if result["changes"][i]["old_val"]["claimed"]:
self.logger.warning( self.logger.warning(
"re-claimed site that was still marked 'claimed' " "re-claimed site that was still marked 'claimed' "
"because it was last claimed a long time ago " "because it was last claimed a long time ago, "
"at %s, and presumably some error stopped it from " "and presumably some error stopped it from "
"being disclaimed", "being disclaimed",
result["changes"][i]["old_val"]["last_claimed"], last_claimed=result["changes"][i]["old_val"]["last_claimed"],
) )
site = brozzler.Site(self.rr, result["changes"][i]["new_val"]) site = brozzler.Site(self.rr, result["changes"][i]["new_val"])
sites.append(site) sites.append(site)
@ -205,10 +201,10 @@ class RethinkDbFrontier:
""" """
if site.time_limit and site.time_limit > 0 and site.elapsed() > site.time_limit: if site.time_limit and site.time_limit > 0 and site.elapsed() > site.time_limit:
self.logger.debug( self.logger.debug(
"site FINISHED_TIME_LIMIT! time_limit=%s " "elapsed=%s %s", "site FINISHED_TIME_LIMIT!",
site.time_limit, time_limit=site.time_limit,
site.elapsed(), elapsed=site.elapsed(),
site, site=site,
) )
raise brozzler.ReachedTimeLimit raise brozzler.ReachedTimeLimit
@ -273,7 +269,7 @@ class RethinkDbFrontier:
"""Raises brozzler.CrawlStopped if stop has been requested.""" """Raises brozzler.CrawlStopped if stop has been requested."""
site.refresh() site.refresh()
if site.stop_requested and site.stop_requested <= doublethink.utcnow(): if site.stop_requested and site.stop_requested <= doublethink.utcnow():
self.logger.info("stop requested for site %s", site.id) self.logger.info("stop requested for site", site_id=site.id)
raise brozzler.CrawlStopped raise brozzler.CrawlStopped
if site.job_id: if site.job_id:
@ -283,7 +279,7 @@ class RethinkDbFrontier:
and job.stop_requested and job.stop_requested
and job.stop_requested <= doublethink.utcnow() and job.stop_requested <= doublethink.utcnow()
): ):
self.logger.info("stop requested for job %s", site.job_id) self.logger.info("stop requested for job", job_id=site.job_id)
raise brozzler.CrawlStopped raise brozzler.CrawlStopped
def _maybe_finish_job(self, job_id): def _maybe_finish_job(self, job_id):
@ -304,7 +300,7 @@ class RethinkDbFrontier:
return False return False
n += 1 n += 1
self.logger.info("all %s sites finished, job %s is FINISHED!", n, job.id) self.logger.info("all %s sites finished, job is FINISHED!", n, job_id=job.id)
job.finish() job.finish()
job.save() job.save()
return True return True
@ -320,7 +316,7 @@ class RethinkDbFrontier:
self._maybe_finish_job(site.job_id) self._maybe_finish_job(site.job_id)
def disclaim_site(self, site, page=None): def disclaim_site(self, site, page=None):
self.logger.info("disclaiming %s", site) self.logger.info("disclaiming", site=site)
site.claimed = False site.claimed = False
site.last_disclaimed = doublethink.utcnow() site.last_disclaimed = doublethink.utcnow()
if not page and not self.has_outstanding_pages(site): if not page and not self.has_outstanding_pages(site):
@ -468,17 +464,16 @@ class RethinkDbFrontier:
try: try:
self.logger.debug("inserting/replacing batch of %s pages", len(batch)) self.logger.debug("inserting/replacing batch of %s pages", len(batch))
reql = self.rr.table("pages").insert(batch, conflict="replace") reql = self.rr.table("pages").insert(batch, conflict="replace")
self.logger.trace( self.logger.debug(
'running query self.rr.table("pages").insert(%r, ' 'running query self.rr.table("pages").insert(%r, '
'conflict="replace")', 'conflict="replace")',
batch, batch,
) )
result = reql.run() result = reql.run()
except Exception as e: except Exception as e:
self.logger.error( self.logger.exception(
"problem inserting/replacing batch of %s pages", "problem inserting/replacing batch of %s pages",
len(batch), len(batch),
exc_info=True,
) )
parent_page.outlinks = {} parent_page.outlinks = {}
@ -497,7 +492,7 @@ class RethinkDbFrontier:
) )
def reached_limit(self, site, e): def reached_limit(self, site, e):
self.logger.info("reached_limit site=%s e=%s", site, e) self.logger.info("reached_limit", site=site, e=e)
assert isinstance(e, brozzler.ReachedLimit) assert isinstance(e, brozzler.ReachedLimit)
if ( if (
site.reached_limit site.reached_limit
@ -530,7 +525,7 @@ class RethinkDbFrontier:
) )
pages = list(results) pages = list(results)
if len(pages) > 1: if len(pages) > 1:
self.logger.warning("more than one seed page for site_id %s ?", site_id) self.logger.warning("more than one seed page?", site_id=site_id)
if len(pages) < 1: if len(pages) < 1:
return None return None
return brozzler.Page(self.rr, pages[0]) return brozzler.Page(self.rr, pages[0])
@ -550,8 +545,8 @@ class RethinkDbFrontier:
[site_id, 0 if brozzled is False else r.maxval, r.maxval, r.maxval], [site_id, 0 if brozzled is False else r.maxval, r.maxval, r.maxval],
index="priority_by_site", index="priority_by_site",
) )
self.logger.trace("running query: %r", query) self.logger.debug("running query", query=query)
results = query.run() results = query.run()
for result in results: for result in results:
self.logger.trace("yielding result: %r", result) self.logger.debug("yielding result", result=result)
yield brozzler.Page(self.rr, result) yield brozzler.Page(self.rr, result)

View File

@ -25,9 +25,9 @@ import datetime
import doublethink import doublethink
import hashlib import hashlib
import json import json
import logging
import os import os
import re import re
import structlog
import time import time
import urlcanon import urlcanon
import urllib import urllib
@ -37,6 +37,8 @@ import zlib
from enum import Enum from enum import Enum
from typing import Optional from typing import Optional
logger = structlog.get_logger()
def load_schema(): def load_schema():
schema_file = os.path.join(os.path.dirname(__file__), "job_schema.yaml") schema_file = os.path.join(os.path.dirname(__file__), "job_schema.yaml")
@ -84,7 +86,7 @@ def merge(a, b):
def new_job_file(frontier, job_conf_file): def new_job_file(frontier, job_conf_file):
"""Returns new Job.""" """Returns new Job."""
logging.info("loading %s", job_conf_file) logger.info("loading", job_conf_file=job_conf_file)
with open(job_conf_file) as f: with open(job_conf_file) as f:
job_conf = yaml.safe_load(f) job_conf = yaml.safe_load(f)
return new_job(frontier, job_conf) return new_job(frontier, job_conf)
@ -120,12 +122,12 @@ def new_job(frontier, job_conf):
# insert in batches to avoid this error # insert in batches to avoid this error
# rethinkdb.errors.ReqlDriverError: Query size (167883036) greater than maximum (134217727) in: # rethinkdb.errors.ReqlDriverError: Query size (167883036) greater than maximum (134217727) in:
for batch in (pages[i : i + 500] for i in range(0, len(pages), 500)): for batch in (pages[i : i + 500] for i in range(0, len(pages), 500)):
logging.info("inserting batch of %s pages", len(batch)) logger.info("inserting batch of %s pages", len(batch))
result = frontier.rr.table("pages").insert(batch).run() result = frontier.rr.table("pages").insert(batch).run()
for batch in (sites[i : i + 100] for i in range(0, len(sites), 100)): for batch in (sites[i : i + 100] for i in range(0, len(sites), 100)):
logging.info("inserting batch of %s sites", len(batch)) logger.info("inserting batch of %s sites", len(batch))
result = frontier.rr.table("sites").insert(batch).run() result = frontier.rr.table("sites").insert(batch).run()
logging.info("job %s fully started", job.id) logger.info("job fully started", job_id=job.id)
return job return job
@ -154,7 +156,7 @@ def new_seed_page(frontier, site):
def new_site(frontier, site): def new_site(frontier, site):
logging.info("new site %s", site) logger.info("new site", site=site)
site.id = site.id or str(uuid.uuid4()) site.id = site.id or str(uuid.uuid4())
# insert the Page into the database before the Site, to avoid situation # insert the Page into the database before the Site, to avoid situation
# where a brozzler worker immediately claims the site, finds no pages # where a brozzler worker immediately claims the site, finds no pages
@ -162,7 +164,7 @@ def new_site(frontier, site):
try: try:
page = new_seed_page(frontier, site) page = new_seed_page(frontier, site)
page.save() page.save()
logging.info("queued page %s", page) logger.info("queued page", page=page)
finally: finally:
# finally block because we want to insert the Site no matter what # finally block because we want to insert the Site no matter what
site.save() site.save()
@ -195,7 +197,7 @@ class ElapsedMixIn(object):
class Job(doublethink.Document, ElapsedMixIn): class Job(doublethink.Document, ElapsedMixIn):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
table = "jobs" table = "jobs"
def populate_defaults(self): def populate_defaults(self):
@ -217,9 +219,9 @@ class Job(doublethink.Document, ElapsedMixIn):
def finish(self): def finish(self):
if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]: if self.status == "FINISHED" or self.starts_and_stops[-1]["stop"]:
self.logger.error( self.logger.error(
"job is already finished status=%s " "starts_and_stops[-1]['stop']=%s", "job is already finished",
self.status, status=self.status,
self.starts_and_stops[-1]["stop"], stop=self.starts_and_stops[-1]["stop"],
) )
self.status = "FINISHED" self.status = "FINISHED"
self.starts_and_stops[-1]["stop"] = doublethink.utcnow() self.starts_and_stops[-1]["stop"] = doublethink.utcnow()
@ -246,7 +248,7 @@ class VideoCaptureOptions(Enum):
class Site(doublethink.Document, ElapsedMixIn): class Site(doublethink.Document, ElapsedMixIn):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
table = "sites" table = "sites"
def populate_defaults(self): def populate_defaults(self):
@ -304,7 +306,7 @@ class Site(doublethink.Document, ElapsedMixIn):
if set(rule.keys()) == {"ssurt"} if set(rule.keys()) == {"ssurt"}
) )
if not any(ssurt.startswith(ss) for ss in simple_rule_ssurts): if not any(ssurt.startswith(ss) for ss in simple_rule_ssurts):
self.logger.info("adding ssurt %s to scope accept rules", ssurt) self.logger.info("adding ssurt to scope accept rules", ssurt=ssurt)
self.scope["accepts"].append({"ssurt": ssurt}) self.scope["accepts"].append({"ssurt": ssurt})
def note_seed_redirect(self, url): def note_seed_redirect(self, url):
@ -402,7 +404,7 @@ class Site(doublethink.Document, ElapsedMixIn):
class Page(doublethink.Document): class Page(doublethink.Document):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
table = "pages" table = "pages"
@staticmethod @staticmethod

View File

@ -19,7 +19,9 @@ limitations under the License.
""" """
import sys import sys
import logging import structlog
logger = structlog.get_logger()
try: try:
import pywb.apps.cli import pywb.apps.cli
@ -30,7 +32,7 @@ try:
import pywb.framework.basehandlers import pywb.framework.basehandlers
import pywb.rewrite.wburl import pywb.rewrite.wburl
except ImportError as e: except ImportError as e:
logging.critical( logger.critical(
'%s: %s\n\nYou might need to run "pip install ' '%s: %s\n\nYou might need to run "pip install '
'brozzler[easy]".\nSee README.rst for more information.', 'brozzler[easy]".\nSee README.rst for more information.',
type(e).__name__, type(e).__name__,
@ -111,7 +113,7 @@ class RethinkCDXSource(pywb.cdx.cdxsource.CDXSource):
) )
if cdx_query.limit: if cdx_query.limit:
reql = reql.limit(cdx_query.limit) reql = reql.limit(cdx_query.limit)
logging.debug("rethinkdb query: %s", reql) logger.debug("rethinkdb query", query=reql)
results = reql.run() results = reql.run()
return results return results

View File

@ -23,12 +23,12 @@ limitations under the License.
""" """
import json import json
import logging
import brozzler import brozzler
import reppy import reppy
import reppy.cache import reppy.cache
import reppy.parser import reppy.parser
import requests import requests
import structlog
__all__ = ["is_permitted_by_robots"] __all__ = ["is_permitted_by_robots"]
@ -119,10 +119,9 @@ def is_permitted_by_robots(site, url, proxy=None):
# reppy has wrapped an exception that we want to bubble up # reppy has wrapped an exception that we want to bubble up
raise brozzler.ProxyError(e) raise brozzler.ProxyError(e)
else: else:
logging.warning( structlog.get_logger().warning(
"returning true (permitted) after problem fetching " "returning true (permitted) after problem fetching " "robots.txt",
"robots.txt for %r: %r", url=url,
url, exception=e,
e,
) )
return True return True

View File

@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import logging
import brozzler import brozzler
import brozzler.browser import brozzler.browser
from brozzler.model import VideoCaptureOptions from brozzler.model import VideoCaptureOptions
@ -32,6 +31,7 @@ import io
import socket import socket
import random import random
import requests import requests
import structlog
import urllib3 import urllib3
from urllib3.exceptions import TimeoutError, ProxyError from urllib3.exceptions import TimeoutError, ProxyError
import doublethink import doublethink
@ -46,7 +46,7 @@ r = rdb.RethinkDB()
class BrozzlerWorker: class BrozzlerWorker:
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(logger_name=__module__ + "." + __qualname__)
# 3⅓ min heartbeat interval => 10 min ttl # 3⅓ min heartbeat interval => 10 min ttl
# This is kind of a long time, because `frontier.claim_sites()`, which runs # This is kind of a long time, because `frontier.claim_sites()`, which runs
@ -125,7 +125,7 @@ class BrozzlerWorker:
self._metrics_port, self._registry_url, self._env self._metrics_port, self._registry_url, self._env
) )
else: else:
logging.warning( self.logger.warning(
"not starting prometheus scrape endpoint: metrics_port is undefined" "not starting prometheus scrape endpoint: metrics_port is undefined"
) )
@ -173,9 +173,9 @@ class BrozzlerWorker:
site.proxy = "%s:%s" % (svc["host"], svc["port"]) site.proxy = "%s:%s" % (svc["host"], svc["port"])
site.save() site.save()
self.logger.info( self.logger.info(
"chose warcprox instance %r from service registry for %r", "chose warcprox instance from service registry",
site.proxy, instance=site.proxy,
site, registry=site,
) )
return site.proxy return site.proxy
return None return None
@ -189,7 +189,7 @@ class BrozzlerWorker:
self._proxy_is_warcprox = status["role"] == "warcprox" self._proxy_is_warcprox = status["role"] == "warcprox"
except Exception as e: except Exception as e:
self._proxy_is_warcprox = False self._proxy_is_warcprox = False
logging.info( self.logger.info(
"%s %s warcprox", "%s %s warcprox",
self._proxy, self._proxy,
"IS" if self._proxy_is_warcprox else "IS NOT", "IS" if self._proxy_is_warcprox else "IS NOT",
@ -227,18 +227,18 @@ class BrozzlerWorker:
with urllib.request.urlopen(request, timeout=900) as response: with urllib.request.urlopen(request, timeout=900) as response:
if response.getcode() != 204: if response.getcode() != 204:
self.logger.warning( self.logger.warning(
'got "%s %s" response on warcprox ' "got unexpected response on warcprox "
"WARCPROX_WRITE_RECORD request (expected 204)", "WARCPROX_WRITE_RECORD request (expected 204)",
response.getcode(), code=response.getcode(),
response.reason, reason=response.reason,
) )
return request, response return request, response
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
self.logger.warning( self.logger.warning(
'got "%s %s" response on warcprox ' "got unexpected response on warcprox "
"WARCPROX_WRITE_RECORD request (expected 204)", "WARCPROX_WRITE_RECORD request (expected 204)",
e.getcode(), code=e.getcode(),
e.info(), reason=e.info(),
) )
return request, None return request, None
except urllib.error.URLError as e: except urllib.error.URLError as e:
@ -271,26 +271,27 @@ class BrozzlerWorker:
on_request=None, on_request=None,
enable_youtube_dl=True, enable_youtube_dl=True,
): ):
self.logger.info("brozzling {}".format(page)) page_logger = self.logger.bind(page=page)
page_logger.info("brozzling")
outlinks = set() outlinks = set()
page_headers = self._get_page_headers(site, page) page_headers = self._get_page_headers(site, page)
if not self._needs_browsing(page_headers): if not self._needs_browsing(page_headers):
self.logger.info("needs fetch: %s", page) page_logger.info("needs fetch")
if site.pdfs_only and not self._is_pdf(page_headers): if site.pdfs_only and not self._is_pdf(page_headers):
self.logger.info("skipping non-PDF content: PDFs only option enabled") self.logger.info("skipping non-PDF content: PDFs only option enabled")
elif site.video_capture in [ elif site.video_capture in [
VideoCaptureOptions.DISABLE_VIDEO_CAPTURE.value, VideoCaptureOptions.DISABLE_VIDEO_CAPTURE.value,
VideoCaptureOptions.BLOCK_VIDEO_MIME_TYPES.value, VideoCaptureOptions.BLOCK_VIDEO_MIME_TYPES.value,
] and self._is_video_type(page_headers): ] and self._is_video_type(page_headers):
self.logger.info( page_logger.info(
"skipping video content: video MIME type capture disabled for site" "skipping video content: video MIME type capture disabled for site"
) )
else: else:
self._fetch_url(site, page=page) self._fetch_url(site, page=page)
else: else:
self.logger.info("needs browsing: %s", page) page_logger.info("needs browsing")
try: try:
browser_outlinks = self._browse_page( browser_outlinks = self._browse_page(
browser, site, page, on_screenshot, on_request browser, site, page, on_screenshot, on_request
@ -300,7 +301,7 @@ class BrozzlerWorker:
if status_code in [502, 504]: if status_code in [502, 504]:
raise brozzler.PageConnectionError() raise brozzler.PageConnectionError()
except brozzler.PageInterstitialShown: except brozzler.PageInterstitialShown:
self.logger.info("page interstitial shown (http auth): %s", page) page_logger.info("page interstitial shown (http auth)")
if enable_youtube_dl and ydl.should_ytdlp(site, page, status_code): if enable_youtube_dl and ydl.should_ytdlp(site, page, status_code):
try: try:
@ -316,10 +317,7 @@ class BrozzlerWorker:
except brozzler.ProxyError: except brozzler.ProxyError:
raise raise
except brozzler.VideoExtractorError as e: except brozzler.VideoExtractorError as e:
logging.error( self.logger.exception("error extracting video info")
"error extracting video info: %s",
e,
)
except Exception as e: except Exception as e:
if ( if (
hasattr(e, "exc_info") hasattr(e, "exc_info")
@ -328,26 +326,25 @@ class BrozzlerWorker:
and e.exc_info[1].code == 430 and e.exc_info[1].code == 430
): ):
self.logger.info( self.logger.info(
"youtube-dl got %s %s processing %s", "youtube-dl encountered an error",
e.exc_info[1].code, code=e.exc_info[1].code,
e.exc_info[1].msg, message=e.exc_info[1].msg,
page.url, url=page.url,
) )
else: else:
self.logger.error( self.logger.exception("youtube_dl raised exception", page=page)
"youtube_dl raised exception on %s", page, exc_info=True
)
return outlinks return outlinks
@metrics.brozzler_header_processing_duration_seconds.time() @metrics.brozzler_header_processing_duration_seconds.time()
@metrics.brozzler_in_progress_headers.track_inprogress() @metrics.brozzler_in_progress_headers.track_inprogress()
def _get_page_headers(self, site, page): def _get_page_headers(self, site, page):
url_logger = self.logger.bind(url=page.url)
# bypassing warcprox, requests' stream=True defers downloading the body of the response # bypassing warcprox, requests' stream=True defers downloading the body of the response
# see https://docs.python-requests.org/en/latest/user/advanced/#body-content-workflow # see https://docs.python-requests.org/en/latest/user/advanced/#body-content-workflow
try: try:
user_agent = site.get("user_agent") user_agent = site.get("user_agent")
headers = {"User-Agent": user_agent} if user_agent else {} headers = {"User-Agent": user_agent} if user_agent else {}
self.logger.info("getting page headers for %s", page.url) url_logger.info("getting page headers")
with requests.get( with requests.get(
page.url, page.url,
stream=True, stream=True,
@ -357,11 +354,9 @@ class BrozzlerWorker:
) as r: ) as r:
return r.headers return r.headers
except requests.exceptions.Timeout as e: except requests.exceptions.Timeout as e:
self.logger.warning( url_logger.warning("Timed out trying to get headers", exc_info=True)
"Timed out trying to get headers for %s: %s", page.url, e
)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
self.logger.warning("Failed to get headers for %s: %s", page.url, e) url_logger.warning("Failed to get headers", exc_info=True)
return {} return {}
def _needs_browsing(self, page_headers) -> bool: def _needs_browsing(self, page_headers) -> bool:
@ -402,10 +397,9 @@ class BrozzlerWorker:
on_screenshot(screenshot_jpeg) on_screenshot(screenshot_jpeg)
if self._using_warcprox(site): if self._using_warcprox(site):
self.logger.info( self.logger.info(
"sending WARCPROX_WRITE_RECORD request to %s with " "sending WARCPROX_WRITE_RECORD request",
"screenshot for %s", proxy=self._proxy_for(site),
self._proxy_for(site), screenshot_for_page=page,
page,
) )
thumbnail_jpeg = self.thumb_jpeg(screenshot_jpeg) thumbnail_jpeg = self.thumb_jpeg(screenshot_jpeg)
self._warcprox_write_record( self._warcprox_write_record(
@ -452,7 +446,7 @@ class BrozzlerWorker:
video["content-length"] = int(response_headers["content-length"]) video["content-length"] = int(response_headers["content-length"])
if "content-range" in response_headers: if "content-range" in response_headers:
video["content-range"] = response_headers["content-range"] video["content-range"] = response_headers["content-range"]
logging.debug("embedded video %s", video) self.logger.debug("embedded video", video=video)
if not "videos" in page: if not "videos" in page:
page.videos = [] page.videos = []
page.videos.append(video) page.videos.append(video)
@ -461,11 +455,11 @@ class BrozzlerWorker:
def _on_service_worker_version_updated(chrome_msg): def _on_service_worker_version_updated(chrome_msg):
# https://github.com/internetarchive/brozzler/issues/140 # https://github.com/internetarchive/brozzler/issues/140
self.logger.trace("%r", chrome_msg) self.logger.debug("service worker updated", chrome_msg=chrome_msg)
if chrome_msg.get("params", {}).get("versions"): if chrome_msg.get("params", {}).get("versions"):
url = chrome_msg.get("params", {}).get("versions")[0].get("scriptURL") url = chrome_msg.get("params", {}).get("versions")[0].get("scriptURL")
if url and url.startswith("http") and url not in sw_fetched: if url and url.startswith("http") and url not in sw_fetched:
self.logger.info("fetching service worker script %s", url) self.logger.info("fetching service worker script", url=url)
self._fetch_url(site, url=url) self._fetch_url(site, url=url)
sw_fetched.add(url) sw_fetched.add(url)
@ -520,7 +514,7 @@ class BrozzlerWorker:
headers = {"User-Agent": user_agent} if user_agent else {} headers = {"User-Agent": user_agent} if user_agent else {}
headers.update(site.extra_headers(page)) headers.update(site.extra_headers(page))
self.logger.info("fetching url %s", url) self.logger.info("fetching url", url=url)
try: try:
# response is ignored # response is ignored
http.request( http.request(
@ -530,17 +524,18 @@ class BrozzlerWorker:
timeout=self.FETCH_URL_TIMEOUT, timeout=self.FETCH_URL_TIMEOUT,
retries=False, retries=False,
) )
self.logger.info("Completed fetching url %s", url) self.logger.info("Completed fetching url", url=url)
except TimeoutError as e: except TimeoutError as e:
self.logger.warning("Timed out fetching %s", url) self.logger.warning("Timed out fetching url", url=url)
raise brozzler.PageConnectionError() from e raise brozzler.PageConnectionError() from e
except ProxyError as e: except ProxyError as e:
raise brozzler.ProxyError("proxy error fetching %s" % url) from e raise brozzler.ProxyError("proxy error fetching %s" % url) from e
except urllib3.exceptions.RequestError as e: except urllib3.exceptions.RequestError as e:
self.logger.warning("Failed to fetch url %s: %s", url, e) self.logger.warning("Failed to fetch url", url=url, exc_info=True)
raise brozzler.PageConnectionError() from e raise brozzler.PageConnectionError() from e
def brozzle_site(self, browser, site): def brozzle_site(self, browser, site):
site_logger = self.logger.bind(site=site)
try: try:
site.last_claimed_by = "%s:%s" % (socket.gethostname(), browser.chrome.port) site.last_claimed_by = "%s:%s" % (socket.gethostname(), browser.chrome.port)
site.save() site.save()
@ -550,9 +545,7 @@ class BrozzlerWorker:
self._frontier.honor_stop_request(site) self._frontier.honor_stop_request(site)
# _proxy_for() call in log statement can raise brozzler.ProxyError # _proxy_for() call in log statement can raise brozzler.ProxyError
# which is why we honor time limit and stop request first☝🏻 # which is why we honor time limit and stop request first☝🏻
self.logger.info( site_logger.info("brozzling site", proxy=self._proxy_for(site))
"brozzling site (proxy=%r) %s", self._proxy_for(site), site
)
while time.time() - start < self.SITE_SESSION_MINUTES * 60: while time.time() - start < self.SITE_SESSION_MINUTES * 60:
site.refresh() site.refresh()
self._frontier.enforce_time_limit(site) self._frontier.enforce_time_limit(site)
@ -564,7 +557,7 @@ class BrozzlerWorker:
if page.needs_robots_check and not brozzler.is_permitted_by_robots( if page.needs_robots_check and not brozzler.is_permitted_by_robots(
site, page.url, self._proxy_for(site) site, page.url, self._proxy_for(site)
): ):
logging.warning("page %s is blocked by robots.txt", page.url) self.logger.warning("page is blocked by robots.txt", url=page.url)
page.blocked_by_robots = True page.blocked_by_robots = True
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
else: else:
@ -580,7 +573,7 @@ class BrozzlerWorker:
except brozzler.ShutdownRequested: except brozzler.ShutdownRequested:
self.logger.info("shutdown requested") self.logger.info("shutdown requested")
except brozzler.NothingToClaim: except brozzler.NothingToClaim:
self.logger.info("no pages left for site %s", site) site_logger.info("no pages left for site")
except brozzler.ReachedLimit as e: except brozzler.ReachedLimit as e:
self._frontier.reached_limit(site, e) self._frontier.reached_limit(site, e)
except brozzler.ReachedTimeLimit as e: except brozzler.ReachedTimeLimit as e:
@ -591,29 +584,24 @@ class BrozzlerWorker:
# self.logger.info("{} shut down".format(browser)) # self.logger.info("{} shut down".format(browser))
except brozzler.ProxyError as e: except brozzler.ProxyError as e:
if self._warcprox_auto: if self._warcprox_auto:
logging.error( self.logger.exception(
"proxy error (site.proxy=%s), will try to choose a " "proxy error, will try to choose a "
"healthy instance next time site is brozzled: %s", "healthy instance next time site is brozzled",
site.proxy, site_proxy=site.proxy,
e,
) )
site.proxy = None site.proxy = None
else: else:
# using brozzler-worker --proxy, nothing to do but try the # using brozzler-worker --proxy, nothing to do but try the
# same proxy again next time # same proxy again next time
logging.error("proxy error (self._proxy=%r)", self._proxy, exc_info=1) self.logger.exception("proxy error", self_proxy=self._proxy)
except (brozzler.PageConnectionError, Exception) as e: except (brozzler.PageConnectionError, Exception) as e:
if isinstance(e, brozzler.PageConnectionError): if isinstance(e, brozzler.PageConnectionError):
self.logger.error( site_logger.exception(
"Page status code possibly indicates connection failure between host and warcprox: site=%r page=%r", "Page status code possibly indicates connection failure between host and warcprox",
site, page=page,
page,
exc_info=True,
) )
else: else:
self.logger.error( site_logger.exception("unexpected exception", page=page)
"unexpected exception site=%r page=%r", site, page, exc_info=True
)
if page: if page:
# Calculate backoff in seconds based on number of failed attempts. # Calculate backoff in seconds based on number of failed attempts.
# Minimum of 60, max of 135 giving delays of 60, 90, 135, 135... # Minimum of 60, max of 135 giving delays of 60, 90, 135, 135...
@ -624,10 +612,10 @@ class BrozzlerWorker:
page.failed_attempts = (page.failed_attempts or 0) + 1 page.failed_attempts = (page.failed_attempts or 0) + 1
if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES: if page.failed_attempts >= brozzler.MAX_PAGE_FAILURES:
self.logger.info( self.logger.info(
'marking page "completed" after %s unexpected ' 'marking page "completed" after several unexpected '
"exceptions attempting to brozzle %s", "exceptions attempting to brozzle",
page.failed_attempts, failed_attempts=page.failed_attempts,
page, page=page,
) )
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
page = None page = None
@ -665,13 +653,11 @@ class BrozzlerWorker:
try: try:
self.status_info = self._service_registry.heartbeat(status_info) self.status_info = self._service_registry.heartbeat(status_info)
self.logger.trace("status in service registry: %s", self.status_info) self.logger.debug("status in service registry", status=self.status_info)
except r.ReqlError as e: except r.ReqlError as e:
self.logger.error( self.logger.exception(
"failed to send heartbeat and update service registry " "failed to send heartbeat and update service registry",
"with info %s: %s", info=status_info,
status_info,
e,
) )
def _service_heartbeat_if_due(self): def _service_heartbeat_if_due(self):
@ -719,9 +705,7 @@ class BrozzlerWorker:
self._browser_pool.release(browsers[i]) self._browser_pool.release(browsers[i])
def run(self): def run(self):
self.logger.notice( self.logger.warn("brozzler %s - brozzler-worker starting", brozzler.__version__)
"brozzler %s - brozzler-worker starting", brozzler.__version__
)
last_nothing_to_claim = 0 last_nothing_to_claim = 0
try: try:
while not self._shutdown.is_set(): while not self._shutdown.is_set():
@ -730,20 +714,20 @@ class BrozzlerWorker:
try: try:
self._start_browsing_some_sites() self._start_browsing_some_sites()
except brozzler.browser.NoBrowsersAvailable: except brozzler.browser.NoBrowsersAvailable:
logging.trace("all %s browsers are in use", self._max_browsers) self.logger.debug(
"all browsers are in use", max_browsers=self._max_browsers
)
except brozzler.NothingToClaim: except brozzler.NothingToClaim:
last_nothing_to_claim = time.time() last_nothing_to_claim = time.time()
logging.trace( self.logger.debug(
"nothing to claim, all available active sites " "nothing to claim, all available active sites "
"are already claimed by a brozzler worker" "are already claimed by a brozzler worker"
) )
time.sleep(0.5) time.sleep(0.5)
self.logger.notice("shutdown requested") self.logger.warn("shutdown requested")
except r.ReqlError as e: except r.ReqlError as e:
self.logger.error( self.logger.exception("caught rethinkdb exception, will try to proceed")
"caught rethinkdb exception, will try to proceed", exc_info=True
)
except brozzler.ShutdownRequested: except brozzler.ShutdownRequested:
self.logger.info("shutdown requested") self.logger.info("shutdown requested")
except: except:
@ -755,12 +739,11 @@ class BrozzlerWorker:
try: try:
self._service_registry.unregister(self.status_info["id"]) self._service_registry.unregister(self.status_info["id"])
except: except:
self.logger.error( self.logger.exception("failed to unregister from service registry")
"failed to unregister from service registry", exc_info=True
)
self.logger.info( self.logger.info(
"shutting down %s brozzling threads", len(self._browsing_threads) "shutting down brozzling threads",
thread_count=len(self._browsing_threads),
) )
with self._browsing_threads_lock: with self._browsing_threads_lock:
for th in self._browsing_threads: for th in self._browsing_threads:
@ -780,6 +763,7 @@ class BrozzlerWorker:
) )
return return
self._thread = threading.Thread(target=self.run, name="BrozzlerWorker") self._thread = threading.Thread(target=self.run, name="BrozzlerWorker")
self.logger = self.logger.bind(thread=self._thread)
self._thread.start() self._thread.start()
def shutdown_now(self): def shutdown_now(self):

View File

@ -16,7 +16,6 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
""" """
import logging
import yt_dlp import yt_dlp
from yt_dlp.utils import match_filter_func, ExtractorError from yt_dlp.utils import match_filter_func, ExtractorError
import brozzler import brozzler
@ -31,6 +30,7 @@ import datetime
from . import metrics from . import metrics
import random import random
import structlog
import threading import threading
import traceback import traceback
import doublethink import doublethink
@ -44,17 +44,20 @@ YTDLP_WAIT = 10
YTDLP_MAX_REDIRECTS = 5 YTDLP_MAX_REDIRECTS = 5
logger = structlog.get_logger()
def should_ytdlp(site, page, page_status): def should_ytdlp(site, page, page_status):
# called only after we've passed needs_browsing() check # called only after we've passed needs_browsing() check
if page_status != 200: if page_status != 200:
logging.info("skipping ytdlp: non-200 page status %s", page_status) logger.info("skipping ytdlp: non-200 page status %s", page_status)
return False return False
if site.video_capture in [ if site.video_capture in [
VideoCaptureOptions.DISABLE_VIDEO_CAPTURE.value, VideoCaptureOptions.DISABLE_VIDEO_CAPTURE.value,
VideoCaptureOptions.DISABLE_YTDLP_CAPTURE.value, VideoCaptureOptions.DISABLE_YTDLP_CAPTURE.value,
]: ]:
logging.info("skipping ytdlp: site has video capture disabled") logger.info("skipping ytdlp: site has video capture disabled")
return False return False
ytdlp_url = page.redirect_url if page.redirect_url else page.url ytdlp_url = page.redirect_url if page.redirect_url else page.url
@ -104,7 +107,13 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
""" """
class _YoutubeDL(yt_dlp.YoutubeDL): class _YoutubeDL(yt_dlp.YoutubeDL):
logger = logging.getLogger(__module__ + "." + __qualname__) logger = structlog.get_logger(__module__ + "." + __qualname__)
def __init__(self, url, params=None, auto_init=True):
super().__init__(params, auto_init)
self.url = url
self.logger = self.logger.bind(url=url)
def process_ie_result(self, ie_result, download=True, extra_info=None): def process_ie_result(self, ie_result, download=True, extra_info=None):
if extra_info is None: if extra_info is None:
@ -114,7 +123,9 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
if result_type in ("url", "url_transparent"): if result_type in ("url", "url_transparent"):
if "extraction_depth" in extra_info: if "extraction_depth" in extra_info:
self.logger.info( self.logger.info(
f"Following redirect URL: {ie_result['url']} extraction_depth: {extra_info['extraction_depth']}" f"Following redirect",
redirect_url=ie_result["url"],
extraction_depth=extra_info["extraction_depth"],
) )
extra_info["extraction_depth"] = 1 + extra_info.get( extra_info["extraction_depth"] = 1 + extra_info.get(
"extraction_depth", 0 "extraction_depth", 0
@ -131,8 +142,9 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
def add_default_extra_info(self, ie_result, ie, url): def add_default_extra_info(self, ie_result, ie, url):
# hook in some logging # hook in some logging
super().add_default_extra_info(ie_result, ie, url) super().add_default_extra_info(ie_result, ie, url)
extract_context = self.logger.bind(extractor=ie.IE_NAME)
if ie_result.get("_type") == "playlist": if ie_result.get("_type") == "playlist":
self.logger.info("extractor %r found playlist in %s", ie.IE_NAME, url) extract_context.info("found playlist")
if ie.IE_NAME in { if ie.IE_NAME in {
"youtube:playlist", "youtube:playlist",
"youtube:tab", "youtube:tab",
@ -147,22 +159,20 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
try: try:
ie_result["entries_no_dl"] = list(ie_result["entries"]) ie_result["entries_no_dl"] = list(ie_result["entries"])
except Exception as e: except Exception as e:
self.logger.warning( extract_context.warning(
"failed to unroll ie_result['entries']? for %s, %s; exception %s", "failed to unroll entries ie_result['entries']?",
ie.IE_NAME, exc_info=True,
url,
e,
) )
ie_result["entries_no_dl"] = [] ie_result["entries_no_dl"] = []
ie_result["entries"] = [] ie_result["entries"] = []
self.logger.info( self.logger.info(
"not downloading %s media files from this " "not downloading media files from this "
"playlist because we expect to capture them from " "playlist because we expect to capture them from "
"individual watch/track/detail pages", "individual watch/track/detail pages",
len(ie_result["entries_no_dl"]), media_file_count=len(ie_result["entries_no_dl"]),
) )
else: else:
self.logger.info("extractor %r found a download in %s", ie.IE_NAME, url) extract_context.info("found a download")
def _push_video_to_warcprox(self, site, info_dict, postprocessor): def _push_video_to_warcprox(self, site, info_dict, postprocessor):
# 220211 update: does yt-dlp supply content-type? no, not as such # 220211 update: does yt-dlp supply content-type? no, not as such
@ -180,7 +190,11 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
mimetype = magic.from_file(info_dict["filepath"], mime=True) mimetype = magic.from_file(info_dict["filepath"], mime=True)
except ImportError as e: except ImportError as e:
mimetype = "video/%s" % info_dict["ext"] mimetype = "video/%s" % info_dict["ext"]
self.logger.warning("guessing mimetype %s because %r", mimetype, e) self.logger.warning(
"guessing mimetype due to error",
mimetype=mimetype,
exc_info=True,
)
# youtube watch page postprocessor is MoveFiles # youtube watch page postprocessor is MoveFiles
@ -198,12 +212,11 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
size = os.path.getsize(info_dict["filepath"]) size = os.path.getsize(info_dict["filepath"])
self.logger.info( self.logger.info(
"pushing %r video as %s (%s bytes) to " "warcprox at %s with url %s", "pushing video to warcprox",
info_dict["format"], format=info_dict["format"],
mimetype, mimetype=mimetype,
size, size=size,
worker._proxy_for(site), warcprox=worker._proxy_for(site),
url,
) )
with open(info_dict["filepath"], "rb") as f: with open(info_dict["filepath"], "rb") as f:
# include content-length header to avoid chunked # include content-length header to avoid chunked
@ -240,23 +253,23 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
): ):
worker.logger.debug( worker.logger.debug(
"heartbeating site.last_claimed to prevent another " "heartbeating site.last_claimed to prevent another "
"brozzler-worker claiming this site id=%r", "brozzler-worker claiming this site",
site.id, id=site.id,
) )
site.last_claimed = doublethink.utcnow() site.last_claimed = doublethink.utcnow()
site.save() site.save()
except: except:
worker.logger.debug( worker.logger.debug(
"problem heartbeating site.last_claimed site id=%r", "problem heartbeating site.last_claimed site",
site.id, id=site.id,
exc_info=True, exc_info=True,
) )
def ydl_postprocess_hook(d): def ydl_postprocess_hook(d):
if d["status"] == "finished": if d["status"] == "finished":
worker.logger.info("[ydl_postprocess_hook] Finished postprocessing")
worker.logger.info( worker.logger.info(
"[ydl_postprocess_hook] postprocessor: {}".format(d["postprocessor"]) "[ydl_postprocess_hook] Finished postprocessing",
postprocessor=d["postprocessor"],
) )
is_youtube_host = isyoutubehost(d["info_dict"]["webpage_url"]) is_youtube_host = isyoutubehost(d["info_dict"]["webpage_url"])
@ -290,7 +303,7 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
# --cache-dir local or.. # --cache-dir local or..
# this looked like a problem with nsf-mounted homedir, maybe not a problem for brozzler on focal? # this looked like a problem with nsf-mounted homedir, maybe not a problem for brozzler on focal?
"cache_dir": "/home/archiveit", "cache_dir": "/home/archiveit",
"logger": logging.getLogger("yt_dlp"), "logger": logger,
"verbose": False, "verbose": False,
"quiet": False, "quiet": False,
# recommended to avoid bot detection # recommended to avoid bot detection
@ -306,17 +319,16 @@ def _build_youtube_dl(worker, destdir, site, page, ytdlp_proxy_endpoints):
ytdlp_proxy_for_logs = ( ytdlp_proxy_for_logs = (
ydl_opts["proxy"].split("@")[1] if "@" in ydl_opts["proxy"] else "@@@" ydl_opts["proxy"].split("@")[1] if "@" in ydl_opts["proxy"] else "@@@"
) )
logging.info("using yt-dlp proxy ... %s", ytdlp_proxy_for_logs) logger.info("using yt-dlp proxy ...", proxy=ytdlp_proxy_for_logs)
# skip warcprox proxying yt-dlp v.2023.07.06: youtube extractor using ranges # skip warcprox proxying yt-dlp v.2023.07.06: youtube extractor using ranges
# if worker._proxy_for(site): # if worker._proxy_for(site):
# ydl_opts["proxy"] = "http://{}".format(worker._proxy_for(site)) # ydl_opts["proxy"] = "http://{}".format(worker._proxy_for(site))
ydl = _YoutubeDL(ydl_opts) ydl = _YoutubeDL(ytdlp_url, params=ydl_opts)
if site.extra_headers(): if site.extra_headers():
ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers(page))) ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers(page)))
ydl.pushed_videos = [] ydl.pushed_videos = []
ydl.url = ytdlp_url
ydl.is_youtube_host = is_youtube_host ydl.is_youtube_host = is_youtube_host
return ydl return ydl
@ -336,7 +348,7 @@ def _remember_videos(page, pushed_videos=None):
"content-type": pushed_video["content-type"], "content-type": pushed_video["content-type"],
"content-length": pushed_video["content-length"], "content-length": pushed_video["content-length"],
} }
logging.debug("pushed video %s", video) logger.debug("pushed video", video=video)
page.videos.append(video) page.videos.append(video)
@ -345,7 +357,7 @@ def _try_youtube_dl(worker, ydl, site, page):
attempt = 0 attempt = 0
while attempt < max_attempts: while attempt < max_attempts:
try: try:
logging.info("trying yt-dlp on %s", ydl.url) logger.info("trying yt-dlp", url=ydl.url)
# should_download_vid = not ydl.is_youtube_host # should_download_vid = not ydl.is_youtube_host
# then # then
# ydl.extract_info(str(urlcanon.whatwg(ydl.url)), download=should_download_vid) # ydl.extract_info(str(urlcanon.whatwg(ydl.url)), download=should_download_vid)
@ -386,15 +398,18 @@ def _try_youtube_dl(worker, ydl, site, page):
# and others... # and others...
attempt += 1 attempt += 1
if attempt == max_attempts: if attempt == max_attempts:
logging.warning( logger.warning(
"Failed after %s attempt(s). Error: %s", max_attempts, e "Failed after %s attempt(s)",
max_attempts,
attempts=max_attempts,
exc_info=True,
) )
raise brozzler.VideoExtractorError( raise brozzler.VideoExtractorError(
"yt-dlp hit error extracting info for %s" % ydl.url "yt-dlp hit error extracting info for %s" % ydl.url
) )
else: else:
retry_wait = min(60, YTDLP_WAIT * (1.5 ** (attempt - 1))) retry_wait = min(60, YTDLP_WAIT * (1.5 ** (attempt - 1)))
logging.info( logger.info(
"Attempt %s failed. Retrying in %s seconds...", "Attempt %s failed. Retrying in %s seconds...",
attempt, attempt,
retry_wait, retry_wait,
@ -405,15 +420,14 @@ def _try_youtube_dl(worker, ydl, site, page):
"yt-dlp hit unknown error extracting info for %s" % ydl.url "yt-dlp hit unknown error extracting info for %s" % ydl.url
) )
logging.info("ytdlp completed successfully") logger.info("ytdlp completed successfully")
_remember_videos(page, ydl.pushed_videos) _remember_videos(page, ydl.pushed_videos)
if worker._using_warcprox(site): if worker._using_warcprox(site):
info_json = json.dumps(ie_result, sort_keys=True, indent=4) info_json = json.dumps(ie_result, sort_keys=True, indent=4)
logging.info( logger.info(
"sending WARCPROX_WRITE_RECORD request to warcprox " "sending WARCPROX_WRITE_RECORD request to warcprox " "with yt-dlp json",
"with yt-dlp json for %s", url=ydl.url,
ydl.url,
) )
worker._warcprox_write_record( worker._warcprox_write_record(
warcprox_address=worker._proxy_for(site), warcprox_address=worker._proxy_for(site),
@ -444,7 +458,7 @@ def do_youtube_dl(worker, site, page, ytdlp_proxy_endpoints):
with tempfile.TemporaryDirectory( with tempfile.TemporaryDirectory(
prefix="brzl-ydl-", dir=worker._ytdlp_tmpdir prefix="brzl-ydl-", dir=worker._ytdlp_tmpdir
) as tempdir: ) as tempdir:
logging.info("tempdir for yt-dlp: %s", tempdir) logger.info("tempdir for yt-dlp", tempdir=tempdir)
ydl = _build_youtube_dl(worker, tempdir, site, page, ytdlp_proxy_endpoints) ydl = _build_youtube_dl(worker, tempdir, site, page, ytdlp_proxy_endpoints)
ie_result = _try_youtube_dl(worker, ydl, site, page) ie_result = _try_youtube_dl(worker, ydl, site, page)
outlinks = set() outlinks = set()

View File

@ -76,6 +76,7 @@ setuptools.setup(
"cryptography>=2.3", "cryptography>=2.3",
"python-magic>=0.4.15", "python-magic>=0.4.15",
"prometheus-client>=0.20.0", "prometheus-client>=0.20.0",
"structlog>=25.1.0",
], ],
extras_require={ extras_require={
"yt-dlp": ["yt-dlp>=2024.7.25"], "yt-dlp": ["yt-dlp>=2024.7.25"],

View File

@ -31,11 +31,14 @@ import datetime
import requests import requests
import subprocess import subprocess
import http.server import http.server
import logging import structlog
import sys import sys
import warcprox import warcprox
logger = structlog.get_logger()
# https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib # https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
def _local_address(): def _local_address():
import socket import socket
@ -70,11 +73,11 @@ def stop_service(service):
def httpd(request): def httpd(request):
class RequestHandler(http.server.SimpleHTTPRequestHandler): class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_POST(self): def do_POST(self):
logging.info("\n%s\n%s", self.requestline, self.headers) logger.info("\n%s\n%s", self.requestline, self.headers)
self.do_GET() self.do_GET()
def do_GET(self): def do_GET(self):
logging.info("\n%s\n%s", self.requestline, self.headers) logger.info("\n%s\n%s", self.requestline, self.headers)
if self.path == "/site5/redirect/": if self.path == "/site5/redirect/":
self.send_response(303, "See other") self.send_response(303, "See other")
self.send_header("Connection", "close") self.send_header("Connection", "close")
@ -270,7 +273,7 @@ def test_proxy_non_warcprox(httpd):
def do_HEAD(self): def do_HEAD(self):
if not hasattr(self.server, "requests"): if not hasattr(self.server, "requests"):
self.server.requests = [] self.server.requests = []
logging.info("%s %s", self.command, self.path) logger.info("%s %s", self.command, self.path)
self.server.requests.append("%s %s" % (self.command, self.path)) self.server.requests.append("%s %s" % (self.command, self.path))
response = urllib.request.urlopen(self.path) response = urllib.request.urlopen(self.path)
self.wfile.write( self.wfile.write(
@ -292,7 +295,7 @@ def test_proxy_non_warcprox(httpd):
def do_WARCPROX_WRITE_RECORD(self): def do_WARCPROX_WRITE_RECORD(self):
if not hasattr(self.server, "requests"): if not hasattr(self.server, "requests"):
self.server.requests = [] self.server.requests = []
logging.info("%s %s", self.command, self.path) logger.info("%s %s", self.command, self.path)
self.send_error(400) self.send_error(400)
proxy = http.server.HTTPServer(("localhost", 0), DumbProxyRequestHandler) proxy = http.server.HTTPServer(("localhost", 0), DumbProxyRequestHandler)
@ -826,7 +829,7 @@ def test_warcprox_outage_resiliency(httpd):
try: try:
stop_service("warcprox") stop_service("warcprox")
except Exception as e: except Exception as e:
logging.warning("problem stopping warcprox service: %s", e) logger.warning("problem stopping warcprox service: %s", exc_info=True)
# queue the site for brozzling # queue the site for brozzling
brozzler.new_site(frontier, site) brozzler.new_site(frontier, site)

View File

@ -24,7 +24,6 @@ import os
import brozzler import brozzler
import brozzler.chrome import brozzler.chrome
import brozzler.ydl import brozzler.ydl
import logging
import yaml import yaml
import datetime import datetime
import requests import requests
@ -36,15 +35,6 @@ import sys
import threading import threading
from unittest import mock from unittest import mock
logging.basicConfig(
stream=sys.stderr,
level=logging.INFO,
format=(
"%(asctime)s %(process)d %(levelname)s %(threadName)s "
"%(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s"
),
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def httpd(request): def httpd(request):