use a thread-local callback in monkey-patched

finish_frag_download, instead of locking around monkey-patching, to
allow different threads to youtube-dl concurrently, but still not
interfere with each other
This commit is contained in:
Noah Levitt 2018-10-11 23:28:34 -07:00
parent 82cf5c6dbb
commit e5536182dc

View File

@ -30,7 +30,21 @@ import doublethink
import datetime
import threading
global_ydl_lock = threading.Lock()
thread_local = threading.local()
_orig__finish_frag_download = youtube_dl.downloader.fragment.FragmentFD._finish_frag_download
def _finish_frag_download(ffd_self, ctx):
'''
We monkey-patch this youtube-dl internal method `_finish_frag_download()`
because it gets called after downloading the last segment of a segmented
video, which is a good time to upload the stitched-up video that youtube-dl
creates for us to warcprox. We have it call a thread-local callback
since different threads may be youtube-dl'ing at the same time.
'''
result = _orig__finish_frag_download(ffd_self, ctx)
if hasattr(thread_local, 'finish_frag_download_callback'):
thread_local.finish_frag_download_callback(ffd_self, ctx)
return result
youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _finish_frag_download
_orig_webpage_read_content = youtube_dl.extractor.generic.GenericIE._webpage_read_content
def _webpage_read_content(self, *args, **kwargs):
@ -111,9 +125,10 @@ def _build_youtube_dl(worker, destdir, site):
- keeps track of urls fetched using a `YoutubeDLSpy`
- periodically updates `site.last_claimed` in rethinkdb
- if brozzling through warcprox and downloading fragmented (DASH) videos,
pushes the stitched together video to warcprox using a
WARCPROX_WRITE_RECORD request
- if brozzling through warcprox and downloading segmented videos (e.g.
HLS), pushes the stitched-up video created by youtube-dl to warcprox
using a WARCPROX_WRITE_RECORD request
- adds some logging
Args:
worker (brozzler.BrozzlerWorker): the calling brozzler worker
@ -199,21 +214,18 @@ def _build_youtube_dl(worker, destdir, site):
})
def process_info(self, info_dict):
# lock this section to prevent race condition between threads that
# want to monkey patch _finish_frag_download() at the same time
with global_ydl_lock:
_orig__finish_frag_download = youtube_dl.downloader.fragment.FragmentFD._finish_frag_download
def _finish_frag_download(ffd_self, ctx):
_orig__finish_frag_download(ffd_self, ctx)
if worker._using_warcprox(site):
self._push_stitched_up_vid_to_warcprox(site, info_dict, ctx)
try:
youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _finish_frag_download
return super().process_info(info_dict)
finally:
youtube_dl.downloader.fragment.FragmentFD._finish_frag_download = _orig__finish_frag_download
'''
See comment above on `_finish_frag_download()`
'''
def ffd_callback(ffd_self, ctx):
logging.info('%s')
if worker._using_warcprox(site):
self._push_stitched_up_vid_to_warcprox(site, info_dict, ctx)
try:
thread_local.finish_frag_download_callback = ffd_callback
return super().process_info(info_dict)
finally:
delattr(thread_local, 'finish_frag_download_callback')
def maybe_heartbeat_site_last_claimed(*args, **kwargs):
# in case youtube-dl takes a long time, heartbeat site.last_claimed