diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 9d8169c..037afbe 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -62,6 +62,8 @@ class RethinkDbFrontier: def new_site(self, site): self.logger.info("inserting into 'sites' table %s", site) + import pprint + self.logger.info("update_site:\n%s", pprint.pformat(vars(site))) result = self.r.table("sites").insert(site.to_dict()).run() self._vet_result(result, inserted=1) if not site.id: @@ -75,6 +77,8 @@ class RethinkDbFrontier: def update_site(self, site): self.logger.debug("updating 'sites' table entry %s", site) + import pprint + self.logger.info("update_site:\n%s", pprint.pformat(vars(site))) result = self.r.table("sites").get(site.id).replace(site.to_dict()).run() self._vet_result(result, replaced=[0,1], unchanged=[0,1]) @@ -91,7 +95,8 @@ class RethinkDbFrontier: def claim_site(self, worker_id): # XXX keep track of aggregate priority and prioritize sites accordingly? while True: - result = (self.r.table("sites", read_mode="majority") + result = ( + self.r.table("sites", read_mode="majority") .between( ["ACTIVE",rethinkdb.minval], ["ACTIVE",rethinkdb.maxval], @@ -102,11 +107,18 @@ class RethinkDbFrontier: (rethinkdb.row["last_claimed"] < rethinkdb.now() - 2*60*60)) .limit(1) - .update({ - "claimed": True, - "last_claimed_by": worker_id, - "last_claimed": rethinkstuff.utcnow(), - }, return_changes=True)).run() + .update( + # try to avoid a race condition resulting in multiple + # brozzler-workers claiming the same site + # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 + rethinkdb.branch( + (rethinkdb.row["claimed"] != True) | + (rethinkdb.row["last_claimed"] + < rethinkdb.now() - 2*60*60), { + "claimed": True, + "last_claimed_by": worker_id, + "last_claimed": rethinkstuff.utcnow() + }, {}), return_changes=True)).run() self._vet_result(result, replaced=[0,1], unchanged=[0,1]) if result["replaced"] == 1: if result["changes"][0]["old_val"]["claimed"]: @@ -117,6 +129,8 @@ class RethinkDbFrontier: "being disclaimed", result["changes"][0]["old_val"]["last_claimed"]) site = brozzler.Site(**result["changes"][0]["new_val"]) + import pprint + self.logger.info("claim_site:\n%s", pprint.pformat(vars(site))) else: raise brozzler.NothingToClaim # XXX This is the only place we enforce time limit for now. Worker diff --git a/brozzler/worker.py b/brozzler/worker.py index 7355d5a..9ebf88d 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -79,7 +79,6 @@ class BrozzlerWorker: self._browser_pool = brozzler.browser.BrowserPool(max_browsers, chrome_exe=chrome_exe, ignore_cert_errors=True) self._shutdown_requested = threading.Event() - self._id = "{}@{}".format(socket.gethostname(), os.getpid()) def _youtube_dl(self, site): ydl_opts = { @@ -257,7 +256,9 @@ class BrozzlerWorker: while (not self._shutdown_requested.is_set() and time.time() - start < 7 * 60): self._frontier.honor_stop_request(site.job_id) - page = self._frontier.claim_page(site, self._id) + page = self._frontier.claim_page(site, + "{}:{}".format( + socket.gethostname(), browser.chrome_port)) outlinks = self.brozzle_page(browser, ydl, site, page) self._frontier.completed_page(site, page) self._frontier.scope_and_schedule_outlinks(site, page, outlinks) @@ -304,13 +305,15 @@ class BrozzlerWorker: try: browser = self._browser_pool.acquire() try: - site = self._frontier.claim_site(self._id) + site = self._frontier.claim_site("{}:{}".format( + socket.gethostname(), browser.chrome_port)) self.logger.info("brozzling site %s", site) ydl = self._youtube_dl(site) th = threading.Thread( target=lambda: self._brozzle_site( browser, ydl, site), - name="BrowsingThread-{}".format(site.seed)) + name="BrowsingThread:{}-{}".format( + browser.chrome_port, site.seed)) th.start() except: self._browser_pool.release(browser)