From e5536182dc8055c6b6352b184a15b508f32e1388 Mon Sep 17 00:00:00 2001 From: Noah Levitt Date: Thu, 11 Oct 2018 23:28:34 -0700 Subject: [PATCH] use a thread-local callback in monkey-patched finish_frag_download, instead of locking around monkey-patching, to allow different threads to youtube-dl concurrently, but still not interfere with each other --- brozzler/ydl.py | 50 ++++++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/brozzler/ydl.py b/brozzler/ydl.py index d67856c..4c16e17 100644 --- a/brozzler/ydl.py +++ b/brozzler/ydl.py @@ -30,7 +30,21 @@ import doublethink import datetime import threading -global_ydl_lock = threading.Lock() +thread_local = threading.local() +_orig__finish_frag_download = youtube_dl.downloader.fragment.FragmentFD._finish_frag_download +def _finish_frag_download(ffd_self, ctx): + ''' + We monkey-patch this youtube-dl internal method `_finish_frag_download()` + because it gets called after downloading the last segment of a segmented + video, which is a good time to upload the stitched-up video that youtube-dl + creates for us to warcprox. We have it call a thread-local callback + since different threads may be youtube-dl'ing at the same time. + ''' + result = _orig__finish_frag_download(ffd_self, ctx) + if hasattr(thread_local, 'finish_frag_download_callback'): + thread_local.finish_frag_download_callback(ffd_self, ctx) + return result +youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _finish_frag_download _orig_webpage_read_content = youtube_dl.extractor.generic.GenericIE._webpage_read_content def _webpage_read_content(self, *args, **kwargs): @@ -111,9 +125,10 @@ def _build_youtube_dl(worker, destdir, site): - keeps track of urls fetched using a `YoutubeDLSpy` - periodically updates `site.last_claimed` in rethinkdb - - if brozzling through warcprox and downloading fragmented (DASH) videos, - pushes the stitched together video to warcprox using a - WARCPROX_WRITE_RECORD request + - if brozzling through warcprox and downloading segmented videos (e.g. + HLS), pushes the stitched-up video created by youtube-dl to warcprox + using a WARCPROX_WRITE_RECORD request + - adds some logging Args: worker (brozzler.BrozzlerWorker): the calling brozzler worker @@ -199,21 +214,18 @@ def _build_youtube_dl(worker, destdir, site): }) def process_info(self, info_dict): - # lock this section to prevent race condition between threads that - # want to monkey patch _finish_frag_download() at the same time - with global_ydl_lock: - _orig__finish_frag_download = youtube_dl.downloader.fragment.FragmentFD._finish_frag_download - - def _finish_frag_download(ffd_self, ctx): - _orig__finish_frag_download(ffd_self, ctx) - if worker._using_warcprox(site): - self._push_stitched_up_vid_to_warcprox(site, info_dict, ctx) - - try: - youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _finish_frag_download - return super().process_info(info_dict) - finally: - youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _orig__finish_frag_download + ''' + See comment above on `_finish_frag_download()` + ''' + def ffd_callback(ffd_self, ctx): + logging.info('%s') + if worker._using_warcprox(site): + self._push_stitched_up_vid_to_warcprox(site, info_dict, ctx) + try: + thread_local.finish_frag_download_callback = ffd_callback + return super().process_info(info_dict) + finally: + delattr(thread_local, 'finish_frag_download_callback') def maybe_heartbeat_site_last_claimed(*args, **kwargs): # in case youtube-dl takes a long time, heartbeat site.last_claimed