From 33520da8f9d88c7241317bab130cc0666351cbc3 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Tue, 14 Aug 2018 15:10:48 -0700 Subject: [PATCH] move youtube-dl code into separate file --- brozzler/worker.py | 271 ++------------------------------------ brozzler/ydl.py | 322 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+), 257 deletions(-) create mode 100644 brozzler/ydl.py diff --git a/brozzler/worker.py b/brozzler/worker.py index 7af9ea5..6994be3 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -23,92 +23,18 @@ import brozzler import brozzler.browser import threading import time -import youtube_dl import urllib.request import json import PIL.Image import io import socket -import collections import requests import doublethink import tempfile import urlcanon from requests.structures import CaseInsensitiveDict import rethinkdb as r -import datetime -import urllib.parse - -_orig_webpage_read_content = youtube_dl.extractor.generic.GenericIE._webpage_read_content -def _webpage_read_content(self, *args, **kwargs): - content = _orig_webpage_read_content(self, *args, **kwargs) - if len(content) > 20000000: - logging.warn( - 'bypassing youtube-dl extraction because content is ' - 'too large (%s characters)', len(content)) - return '' - return content -youtube_dl.extractor.generic.GenericIE._webpage_read_content = _webpage_read_content - -class ExtraHeaderAdder(urllib.request.BaseHandler): - def __init__(self, extra_headers): - self.extra_headers = extra_headers - self.http_request = self._http_request - self.https_request = self._http_request - - def _http_request(self, req): - for h, v in self.extra_headers.items(): - if h.capitalize() not in req.headers: - req.add_header(h, v) - return req - -class YoutubeDLSpy(urllib.request.BaseHandler): - logger = logging.getLogger(__module__ + "." + __qualname__) - - def __init__(self): - self.reset() - - def _http_response(self, request, response): - txn = { - 'url': request.full_url, - 'method': request.get_method(), - 'status_code': response.code, - 'response_headers': response.headers, - } - self.transactions.append(txn) - return response - - http_response = https_response = _http_response - - def reset(self): - self.transactions = [] - - def final_bounces(self, url): - """ - Resolves redirect chains in self.transactions, returns a list of - Transaction representing the final redirect destinations of the given - url. There could be more than one if for example youtube-dl hit the - same url with HEAD and then GET requests. - """ - redirects = {} - for txn in self.transactions: - # XXX check http status 301,302,303,307? check for "uri" header - # as well as "location"? see urllib.request.HTTPRedirectHandler - if 'location' in txn['response_headers']: - redirects[txn['url']] = txn - - final_url = url - while final_url in redirects: - txn = redirects.pop(final_url) - final_url = urllib.parse.urljoin( - txn['url'], txn['response_headers']['location']) - - final_bounces = [] - for txn in self.transactions: - if txn['url'] == final_url: - final_bounces.append(txn) - - return final_bounces +from . import ydl class BrozzlerWorker: logger = logging.getLogger(__module__ + "." + __qualname__) @@ -204,99 +130,6 @@ class BrozzlerWorker: # is warcprox return bool(site.proxy or self._warcprox_auto) - def _youtube_dl(self, destdir, site): - class _YoutubeDL(youtube_dl.YoutubeDL): - logger = logging.getLogger(__module__ + "." + __qualname__) - - def get_info_extractor(self, ie_key): - ie = super().get_info_extractor(ie_key) - self.logger.info('youtube-dl using extractor %s', ie) - return ie - - # def process_ie_result( - # ydl_self, ie_result, download=True, extra_info={}): - # ie_result = super().process_ie_result( - # ie_result, download, extra_info) - # return ie_result - - def process_info(ydl_self, info_dict): - _orig__finish_frag_download = youtube_dl.downloader.fragment.FragmentFD._finish_frag_download - def _finish_frag_download(ffd_self, ctx): - _orig__finish_frag_download(ffd_self, ctx) - if self._using_warcprox(site): - try: - import magic - mimetype = magic.from_file( - ctx['filename'], mime=True) - except ImportError as e: - mimetype = 'video/%s' % info_dict['ext'] - ydl_self.logger.warn( - 'guessing mimetype %s because %r', - mimetype, e) - url = 'youtube-dl:%05d:%s' % ( - info_dict['playlist_index'], - info_dict['webpage_url']) - ydl_self.logger.info( - 'pushing %r video stitched-up as %s (%s ' - 'bytes) to warcprox at %s with url %s', - info_dict['format'], mimetype, - ctx['complete_frags_downloaded_bytes'], - self._proxy_for(site), url) - with open(ctx['filename'], 'rb') as f: - # include content-length header to avoid chunked - # transfer, which warcprox currently does not - # accept - # XXX is `ctx['complete_frags_downloaded_bytes']` - # always == `os.path.getsize(ctx['filename'])`? - self._warcprox_write_record( - warcprox_address=self._proxy_for(site), - url=url, warc_type='resource', - content_type=mimetype, payload=f, - extra_headers={'content-length': ctx['complete_frags_downloaded_bytes']}) - - youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _finish_frag_download - return super().process_info(info_dict) - - def ydl_progress(*args, **kwargs): - # in case youtube-dl takes a long time, heartbeat site.last_claimed - # to prevent another brozzler-worker from claiming the site - try: - if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=self.SITE_SESSION_MINUTES): - self.logger.debug( - 'heartbeating site.last_claimed to prevent another ' - 'brozzler-worker claiming this site id=%r', site.id) - site.last_claimed = doublethink.utcnow() - site.save() - except: - self.logger.debug( - 'problem heartbeating site.last_claimed site id=%r', - site.id, exc_info=True) - - ydl_opts = { - "outtmpl": "{}/ydl%(autonumber)s.out".format(destdir), - "verbose": False, - "retries": 1, - "logger": logging.getLogger("youtube_dl"), - "nocheckcertificate": True, - "hls_prefer_native": True, - "noprogress": True, - "nopart": True, - "no_color": True, - "progress_hooks": [ydl_progress], - # https://github.com/rg3/youtube-dl/blob/master/README.md#format-selection - # "best: Select the best quality format represented by a single - # file with video and audio." - "format": "best/bestvideo+bestaudio", - } - if self._proxy_for(site): - ydl_opts["proxy"] = "http://{}".format(self._proxy_for(site)) - ydl = _YoutubeDL(ydl_opts) - if site.extra_headers(): - ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers())) - ydl.brozzler_spy = YoutubeDLSpy() - ydl._opener.add_handler(ydl.brozzler_spy) - return ydl - def _warcprox_write_record( self, warcprox_address, url, warc_type, content_type, payload, extra_headers=None): @@ -318,11 +151,13 @@ class BrozzlerWorker: 'got "%s %s" response on warcprox ' 'WARCPROX_WRITE_RECORD request (expected 204)', response.getcode(), response.reason) + return request, response except urllib.error.HTTPError as e: self.logger.warn( 'got "%s %s" response on warcprox ' 'WARCPROX_WRITE_RECORD request (expected 204)', e.getcode(), e.info()) + return request, None except urllib.error.URLError as e: raise brozzler.ProxyError( 'proxy error on WARCPROX_WRITE_RECORD %s' % url) from e @@ -330,80 +165,6 @@ class BrozzlerWorker: raise brozzler.ProxyError( 'proxy error on WARCPROX_WRITE_RECORD %s' % url) from e - def _remember_videos(self, page, ydl_spy): - if not 'videos' in page: - page.videos = [] - for txn in ydl_spy.transactions: - content_type = txn['response_headers'].get_content_type() - if (content_type.startswith('video/') - # skip manifests of DASH segmented video - - # see https://github.com/internetarchive/brozzler/pull/70 - and content_type != 'video/vnd.mpeg.dash.mpd' - and txn['method'] == 'GET' - and txn['status_code'] in (200, 206)): - video = { - 'blame': 'youtube-dl', - 'url': txn['url'], - 'response_code': txn['status_code'], - 'content-type': content_type, - } - if 'content-length' in txn['response_headers']: - video['content-length'] = int( - txn['response_headers']['content-length']) - if 'content-range' in txn['response_headers']: - video['content-range'] = txn[ - 'response_headers']['content-range'] - logging.debug('embedded video %s', video) - page.videos.append(video) - - def _try_youtube_dl(self, ydl, site, page): - try: - self.logger.info("trying youtube-dl on {}".format(page)) - - with brozzler.thread_accept_exceptions(): - # we do whatwg canonicalization here to avoid "" resulting in ProxyError - # needs automated test - ie_result = ydl.extract_info(str(urlcanon.whatwg(page.url))) - # ie_result = ydl.extract_info( - # str(urlcanon.whatwg(page.url)), download=False) - # if ie_result.get('_type') in ('playlist', 'multi_video'): - # ie_result = self._ydl_playlist(ie_result) - # else: - # ie_result = process_ie_result(ie_result, download=True) - self._remember_videos(page, ydl.brozzler_spy) - if self._using_warcprox(site): - info_json = json.dumps(info, sort_keys=True, indent=4) - self.logger.info( - "sending WARCPROX_WRITE_RECORD request to warcprox " - "with youtube-dl json for %s", page) - self._warcprox_write_record( - warcprox_address=self._proxy_for(site), - url="youtube-dl:%s" % str(urlcanon.semantic(page.url)), - warc_type="metadata", - content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", - payload=info_json.encode("utf-8"), - extra_headers=site.extra_headers()) - except brozzler.ShutdownRequested as e: - raise - except BaseException as e: - if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError: - pass - elif (hasattr(e, "exc_info") - and e.exc_info[0] == urllib.error.HTTPError - and hasattr(e.exc_info[1], "code") - and e.exc_info[1].code == 420): - raise brozzler.ReachedLimit(e.exc_info[1]) - elif (hasattr(e, 'exc_info') - and e.exc_info[0] == urllib.error.URLError - and self._proxy_for(site)): - # connection problem when using a proxy == proxy error (XXX?) - raise brozzler.ProxyError( - 'youtube-dl hit apparent proxy error from ' - '%s' % page.url) from e - else: - raise - def full_and_thumb_jpegs(self, large_png): # these screenshots never have any alpha (right?) img = PIL.Image.open(io.BytesIO(large_png)).convert('RGB') @@ -424,12 +185,10 @@ class BrozzlerWorker: def brozzle_page(self, browser, site, page, on_screenshot=None, on_request=None, enable_youtube_dl=True): self.logger.info("brozzling {}".format(page)) + ydl_fetches = None if enable_youtube_dl: try: - with tempfile.TemporaryDirectory(prefix='brzl-ydl-') as tempdir: - ydl = self._youtube_dl(tempdir, site) - ydl_spy = ydl.brozzler_spy # remember for later - self._try_youtube_dl(ydl, site, page) + ydl_fetches = ydl.do_youtube_dl(self, site, page) except brozzler.ReachedLimit as e: raise except brozzler.ShutdownRequested: @@ -447,16 +206,14 @@ class BrozzlerWorker: self.logger.error( 'youtube_dl raised exception on %s', page, exc_info=True) - else: - ydl_spy = False - if self._needs_browsing(page, ydl_spy): + if self._needs_browsing(page, ydl_fetches): self.logger.info('needs browsing: %s', page) outlinks = self._browse_page(browser, site, page, on_screenshot, on_request) return outlinks else: - if not self._already_fetched(page, ydl_spy): + if not self._already_fetched(page, ydl_fetches): self.logger.info('needs fetch: %s', page) self._fetch_url(site, page) else: @@ -550,9 +307,9 @@ class BrozzlerWorker: raise brozzler.ProxyError( 'proxy error fetching %s' % page.url) from e - def _needs_browsing(self, page, brozzler_spy): - if brozzler_spy: - final_bounces = brozzler_spy.final_bounces(page.url) + def _needs_browsing(self, page, ydl_fetches): + if ydl_fetches: + final_bounces = ydl.final_bounces(ydl_fetches, page.url) if not final_bounces: return True for txn in final_bounces: @@ -563,9 +320,9 @@ class BrozzlerWorker: else: return True - def _already_fetched(self, page, brozzler_spy): - if brozzler_spy: - for txn in brozzler_spy.final_bounces(page.url): + def _already_fetched(self, page, ydl_fetches): + if ydl_fetches: + for txn in final_bounces(ydl_fetches, page.url): if (txn['method'] == 'GET' and txn['status_code'] == 200): return True return False @@ -582,7 +339,7 @@ class BrozzlerWorker: # _proxy_for() call in log statement can raise brozzler.ProxyError # which is why we honor time limit and stop request first☝🏻 self.logger.info( - "brozzling site (proxy=%r) %r", + "brozzling site (proxy=%r) %s", self._proxy_for(site), site) while time.time() - start < self.SITE_SESSION_MINUTES * 60: site.refresh() diff --git a/brozzler/ydl.py b/brozzler/ydl.py new file mode 100644 index 0000000..9ffd368 --- /dev/null +++ b/brozzler/ydl.py @@ -0,0 +1,322 @@ +''' +brozzler/ydl.py - youtube-dl support for brozzler + +This code was extracted from worker.py and + +Copyright (C) 2018 Internet Archive + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import youtube_dl +import brozzler +import urllib.request +import tempfile +import urlcanon +import os +import json + +_orig_webpage_read_content = youtube_dl.extractor.generic.GenericIE._webpage_read_content +def _webpage_read_content(self, *args, **kwargs): + content = _orig_webpage_read_content(self, *args, **kwargs) + if len(content) > 20000000: + logging.warn( + 'bypassing youtube-dl extraction because content is ' + 'too large (%s characters)', len(content)) + return '' + return content +youtube_dl.extractor.generic.GenericIE._webpage_read_content = _webpage_read_content + +class ExtraHeaderAdder(urllib.request.BaseHandler): + def __init__(self, extra_headers): + self.extra_headers = extra_headers + self.http_request = self._http_request + self.https_request = self._http_request + + def _http_request(self, req): + for h, v in self.extra_headers.items(): + if h.capitalize() not in req.headers: + req.add_header(h, v) + return req + +class YoutubeDLSpy(urllib.request.BaseHandler): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def __init__(self): + self.reset() + + def _http_response(self, request, response): + fetch = { + 'url': request.full_url, + 'method': request.get_method(), + 'response_code': response.code, + 'response_headers': response.headers, + } + self.fetches.append(fetch) + return response + + http_response = https_response = _http_response + + def reset(self): + self.fetches = [] + +def final_bounces(fetches, url): + """ + Resolves redirect chains in `fetches` and returns a list of fetches + representing the final redirect destinations of the given url. There could + be more than one if for example youtube-dl hit the same url with HEAD and + then GET requests. + """ + redirects = {} + for fetch in fetches: + # XXX check http status 301,302,303,307? check for "uri" header + # as well as "location"? see urllib.request.HTTPRedirectHandler + if 'location' in fetch['response_headers']: + redirects[fetch['url']] = fetch + + final_url = url + while final_url in redirects: + fetch = redirects.pop(final_url) + final_url = urllib.parse.urljoin( + fetch['url'], fetch['response_headers']['location']) + + final_bounces = [] + for fetch in fetches: + if fetch['url'] == final_url: + final_bounces.append(fetch) + + return final_bounces + +def _build_youtube_dl(worker, destdir, site): + ''' + Builds a `youtube_dl.YoutubeDL` for brozzling `site` with `worker`. + + The `YoutubeDL` instance does a few special brozzler-specific things: + + - keeps track of urls fetched using a `YoutubeDLSpy` + - periodically updates `site.last_claimed` in rethinkdb + - if brozzling through warcprox and downloading fragmented (DASH) videos, + pushes the stitched together video to warcprox using a + WARCPROX_WRITE_RECORD request + + Args: + worker (brozzler.BrozzlerWorker): the calling brozzler worker + destdir (str): where to save downloaded videos + site (brozzler.Site): the site we are brozzling + + Returns: + a `youtube_dl.YoutubeDL` instance + ''' + + class _YoutubeDL(youtube_dl.YoutubeDL): + logger = logging.getLogger(__module__ + "." + __qualname__) + + def add_default_extra_info(self, ie_result, ie, url): + # hook in some logging + super().add_default_extra_info(ie_result, ie, url) + if ie_result.get('_type') == 'playlist': + self.logger.info( + 'extractor %r found playlist in %s', ie.IE_NAME, url) + else: + self.logger.info( + 'extractor %r found a video in %s', ie.IE_NAME, url) + + def _push_stitched_up_vid_to_warcprox(self, site, info_dict, ctx): + try: + import magic + mimetype = magic.from_file(ctx['filename'], mime=True) + except ImportError as e: + mimetype = 'video/%s' % info_dict['ext'] + self.logger.warn('guessing mimetype %s because %r', mimetype, e) + url = 'youtube-dl:%05d:%s' % ( + info_dict.get('playlist_index') or 1, + info_dict['webpage_url']) + size = os.path.getsize(ctx['filename']) + self.logger.info( + 'pushing %r video stitched-up as %s (%s bytes) to ' + 'warcprox at %s with url %s', info_dict['format'], + mimetype, size, worker._proxy_for(site), url) + with open(ctx['filename'], 'rb') as f: + # include content-length header to avoid chunked + # transfer, which warcprox currently rejects + request, response = worker._warcprox_write_record( + warcprox_address=worker._proxy_for(site), url=url, + warc_type='resource', content_type=mimetype, payload=f, + extra_headers={'content-length': size}) + # consulted by _remember_videos() + self.stitch_ups.append({ + 'url': url, + 'response_code': response.code, + 'content-type': mimetype, + 'content-length': size, + }) + + def process_info(self, info_dict): + _orig__finish_frag_download = youtube_dl.downloader.fragment.FragmentFD._finish_frag_download + + def _finish_frag_download(ffd_self, ctx): + _orig__finish_frag_download(ffd_self, ctx) + if worker._using_warcprox(site): + self._push_stitched_up_vid_to_warcprox(site, info_dict, ctx) + + youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _finish_frag_download + return super().process_info(info_dict) + + def maybe_heartbeat_site_last_claimed(*args, **kwargs): + # in case youtube-dl takes a long time, heartbeat site.last_claimed + # to prevent another brozzler-worker from claiming the site + try: + if site.rr and doublethink.utcnow() - site.last_claimed > datetime.timedelta(minutes=worker.SITE_SESSION_MINUTES): + worker.logger.debug( + 'heartbeating site.last_claimed to prevent another ' + 'brozzler-worker claiming this site id=%r', site.id) + site.last_claimed = doublethink.utcnow() + site.save() + except: + worker.logger.debug( + 'problem heartbeating site.last_claimed site id=%r', + site.id, exc_info=True) + + ydl_opts = { + "outtmpl": "{}/ydl%(autonumber)s.out".format(destdir), + "verbose": False, + "retries": 1, + "logger": logging.getLogger("youtube_dl"), + "nocheckcertificate": True, + "hls_prefer_native": True, + "noprogress": True, + "nopart": True, + "no_color": True, + "progress_hooks": [maybe_heartbeat_site_last_claimed], + # https://github.com/rg3/youtube-dl/blob/master/README.md#format-selection + # "best: Select the best quality format represented by a single + # file with video and audio." + "format": "best/bestvideo+bestaudio", + } + if worker._proxy_for(site): + ydl_opts["proxy"] = "http://{}".format(worker._proxy_for(site)) + ydl = _YoutubeDL(ydl_opts) + if site.extra_headers(): + ydl._opener.add_handler(ExtraHeaderAdder(site.extra_headers())) + ydl.fetch_spy = YoutubeDLSpy() + ydl.stitch_ups = [] + ydl._opener.add_handler(ydl.fetch_spy) + return ydl + +def _remember_videos(page, fetches, stitch_ups=None): + ''' + Saves info about videos captured by youtube-dl in `page.videos`. + ''' + if not 'videos' in page: + page.videos = [] + for fetch in fetches or []: + content_type = fetch['response_headers'].get_content_type() + if (content_type.startswith('video/') + # skip manifests of DASH segmented video - + # see https://github.com/internetarchive/brozzler/pull/70 + and content_type != 'video/vnd.mpeg.dash.mpd' + and fetch['method'] == 'GET' + and fetch['response_code'] in (200, 206)): + video = { + 'blame': 'youtube-dl', + 'url': fetch['url'], + 'response_code': fetch['response_code'], + 'content-type': content_type, + } + if 'content-length' in fetch['response_headers']: + video['content-length'] = int( + fetch['response_headers']['content-length']) + if 'content-range' in fetch['response_headers']: + video['content-range'] = fetch[ + 'response_headers']['content-range'] + logging.debug('embedded video %s', video) + page.videos.append(video) + for stitch_up in stitch_ups or []: + if stitch_up['content-type'].startswith('video/'): + video = { + 'blame': 'youtube-dl', + 'url': stitch_up['url'], + 'response_code': stitch_up['response_code'], + 'content-type': stitch_up['content-type'], + 'content-length': stitch_up['content-length'], + } + logging.debug('embedded video %s', video) + page.videos.append(video) + +def _try_youtube_dl(worker, ydl, site, page): + try: + logging.info("trying youtube-dl on %s", page) + + with brozzler.thread_accept_exceptions(): + # we do whatwg canonicalization here to avoid "" resulting in ProxyError + # needs automated test + ie_result = ydl.extract_info(str(urlcanon.whatwg(page.url))) + _remember_videos(page, ydl.fetch_spy.fetches, ydl.stitch_ups) + if worker._using_warcprox(site): + info_json = json.dumps(ie_result, sort_keys=True, indent=4) + logging.info( + "sending WARCPROX_WRITE_RECORD request to warcprox " + "with youtube-dl json for %s", page) + worker._warcprox_write_record( + warcprox_address=worker._proxy_for(site), + url="youtube-dl:%s" % str(urlcanon.semantic(page.url)), + warc_type="metadata", + content_type="application/vnd.youtube-dl_formats+json;charset=utf-8", + payload=info_json.encode("utf-8"), + extra_headers=site.extra_headers()) + except brozzler.ShutdownRequested as e: + raise + except BaseException as e: + if hasattr(e, "exc_info") and e.exc_info[0] == youtube_dl.utils.UnsupportedError: + pass + elif (hasattr(e, "exc_info") + and e.exc_info[0] == urllib.error.HTTPError + and hasattr(e.exc_info[1], "code") + and e.exc_info[1].code == 420): + raise brozzler.ReachedLimit(e.exc_info[1]) + elif (hasattr(e, 'exc_info') + and e.exc_info[0] == urllib.error.URLError + and worker._proxy_for(site)): + # connection problem when using a proxy == proxy error (XXX?) + raise brozzler.ProxyError( + 'youtube-dl hit apparent proxy error from ' + '%s' % page.url) from e + else: + raise + +def do_youtube_dl(worker, site, page): + ''' + Runs youtube-dl configured for `worker` and `site` to download videos from + `page`. + + Args: + worker (brozzler.BrozzlerWorker): the calling brozzler worker + site (brozzler.Site): the site we are brozzling + page (brozzler.Page): the page we are brozzling + + Returns: + `list` of `dict`: with info about urls fetched: + + [{ + 'url': ..., + 'method': ..., + 'response_code': ..., + 'response_headers': ..., + }, ...] + ''' + with tempfile.TemporaryDirectory(prefix='brzl-ydl-') as tempdir: + ydl = _build_youtube_dl(worker, tempdir, site) + _try_youtube_dl(worker, ydl, site, page) + return ydl.fetch_spy.fetches