diff --git a/brozzler/browser.py b/brozzler/browser.py index ead3b6e..18959b3 100644 --- a/brozzler/browser.py +++ b/brozzler/browser.py @@ -61,6 +61,33 @@ class BrowserPool: self._in_use = set() self._lock = threading.Lock() + def _fresh_browser(self): + # choose available port + sock = socket.socket() + sock.bind(('0.0.0.0', 0)) + port = sock.getsockname()[1] + sock.close() + + browser = Browser(port=port, **self.kwargs) + return browser + + def acquire_multi(self, n=1): + ''' + Returns a list of up to `n` browsers. + + Raises: + NoBrowsersAvailable if none available + ''' + browsers = [] + with self._lock: + if len(self._in_use) >= self.size: + raise NoBrowsersAvailable + while len(self._in_use) < self.size and len(browsers) < n: + browser = self._fresh_browser() + browsers.append(browser) + self._in_use.add(browser) + return browsers + def acquire(self): ''' Returns an available instance. @@ -74,14 +101,7 @@ class BrowserPool: with self._lock: if len(self._in_use) >= self.size: raise NoBrowsersAvailable - - # choose available port - sock = socket.socket() - sock.bind(('0.0.0.0', 0)) - port = sock.getsockname()[1] - sock.close() - - browser = Browser(port=port, **self.kwargs) + browser = self._fresh_browser() self._in_use.add(browser) return browser @@ -90,6 +110,13 @@ class BrowserPool: with self._lock: self._in_use.remove(browser) + def release_all(self, browsers): + for browser in browsers: + browser.stop() # make sure + with self._lock: + for browser in browsers: + self._in_use.remove(browser) + def shutdown_now(self): self.logger.info( 'shutting down browser pool (%s browsers in use)', diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 3bd6221..ee43966 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -93,7 +93,7 @@ class RethinkDbFrontier: raise UnexpectedDbResult("expected %r to be %r in %r" % ( k, expected, result)) - def claim_site(self, worker_id): + def claim_sites(self, n=1): # XXX keep track of aggregate priority and prioritize sites accordingly? while True: result = ( @@ -104,34 +104,43 @@ class RethinkDbFrontier: .order_by(index="sites_last_disclaimed") .filter((r.row["claimed"] != True) | ( r.row["last_claimed"] < r.now() - 60*60)) - .limit(1) + .limit(n) .update( # try to avoid a race condition resulting in multiple # brozzler-workers claiming the same site # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038 r.branch((r.row["claimed"] != True) | ( r.row["last_claimed"] < r.now() - 60*60), { - "claimed": True, "last_claimed_by": worker_id, + "claimed": True, "last_claimed": doublethink.utcnow()}, {}), return_changes=True)).run() - self._vet_result(result, replaced=[0,1], unchanged=[0,1]) - if result["replaced"] == 1: - if result["changes"][0]["old_val"]["claimed"]: + + self._vet_result( + result, replaced=list(range(n+1)), + unchanged=list(range(n+1))) + sites = [] + for i in range(result["replaced"]): + if result["changes"][i]["old_val"]["claimed"]: self.logger.warn( "re-claimed site that was still marked 'claimed' " "because it was last claimed a long time ago " "at %s, and presumably some error stopped it from " "being disclaimed", - result["changes"][0]["old_val"]["last_claimed"]) - site = brozzler.Site(self.rr, result["changes"][0]["new_val"]) - else: + result["changes"][i]["old_val"]["last_claimed"]) + site = brozzler.Site(self.rr, result["changes"][i]["new_val"]) + sites.append(site) + if not sites: raise brozzler.NothingToClaim # XXX This is the only place we enforce time limit for now. Worker # loop should probably check time limit. Maybe frontier needs a # housekeeping thread to ensure that time limits get enforced in a # timely fashion. - if not self._enforce_time_limit(site): - return site + for site in list(sites): + if self._enforce_time_limit(site): + sites.remove(site) + if sites: + return sites + # else try again def _enforce_time_limit(self, site): if (site.time_limit and site.time_limit > 0 diff --git a/brozzler/worker.py b/brozzler/worker.py index d862f09..c6d6f8f 100644 --- a/brozzler/worker.py +++ b/brozzler/worker.py @@ -501,6 +501,9 @@ class BrozzlerWorker: def brozzle_site(self, browser, site): try: + site.last_claimed_by = '%s:%s' % ( + socket.gethostname(), browser.chrome.port) + site.save() start = time.time() page = None self._frontier.honor_stop_request(site) @@ -602,36 +605,50 @@ class BrozzlerWorker: if due: self._service_heartbeat() + def _start_browsing_some_sites(self): + ''' + Starts browsing some sites. + + Raises: + NoBrowsersAvailable if none available + ''' + browsers = self._browser_pool.acquire_multi( + (self._browser_pool.num_available() + 1) // 2) + try: + sites = self._frontier.claim_sites(len(browsers)) + except: + self._browser_pool.release_all(browsers) + raise + + for i in range(len(browsers)): + if i < len(sites): + th = threading.Thread( + target=self._brozzle_site_thread_target, + args=(browsers[i], sites[i]), + name="BrozzlingThread:%s" % browsers[i].chrome.port, + daemon=True) + with self._browsing_threads_lock: + self._browsing_threads.add(th) + th.start() + else: + self._browser_pool.release(browsers[i]) + def run(self): self.logger.info("brozzler worker starting") try: - latest_state = None while not self._shutdown.is_set(): self._service_heartbeat_if_due() try: - browser = self._browser_pool.acquire() - try: - site = self._frontier.claim_site("%s:%s" % ( - socket.gethostname(), browser.chrome.port)) - th = threading.Thread( - target=self._brozzle_site_thread_target, - args=(browser, site), - name="BrozzlingThread:%s" % browser.chrome.port, - daemon=True) - with self._browsing_threads_lock: - self._browsing_threads.add(th) - th.start() - except: - self._browser_pool.release(browser) - raise + self._start_browsing_some_sites() except brozzler.browser.NoBrowsersAvailable: - if latest_state != "browsers-busy": - self.logger.info( - "all %s browsers are busy", self._max_browsers) - latest_state = "browsers-busy" + logging.trace( + "all %s browsers are in use", self._max_browsers) except brozzler.NothingToClaim: - pass + logging.trace( + "all active sites are already claimed by a " + "brozzler worker") time.sleep(0.5) + self.logger.info("shutdown requested") except r.ReqlError as e: self.logger.error( diff --git a/tests/test_frontier.py b/tests/test_frontier.py index fdae726..5170a04 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -826,30 +826,34 @@ def test_claim_site(): rr.table('sites').delete().run() # clean slate with pytest.raises(brozzler.NothingToClaim): - claimed_site = frontier.claim_site(worker_id='test_claim_site') + claimed_site = frontier.claim_sites() site = brozzler.Site(rr, {'seed': 'http://example.org/'}) brozzler.new_site(frontier, site) - claimed_site = frontier.claim_site(worker_id='test_claim_site') + claimed_sites = frontier.claim_sites() + assert len(claimed_sites) == 1 + claimed_site = claimed_sites[0] assert claimed_site.id == site.id assert claimed_site.claimed assert claimed_site.last_claimed >= doublethink.utcnow() - datetime.timedelta(minutes=1) with pytest.raises(brozzler.NothingToClaim): - claimed_site = frontier.claim_site(worker_id='test_claim_site') + claimed_site = frontier.claim_sites() # site last_claimed less than 1 hour ago still not to be reclaimed claimed_site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=55) claimed_site.save() with pytest.raises(brozzler.NothingToClaim): - claimed_site = frontier.claim_site(worker_id='test_claim_site') + claimed_site = frontier.claim_sites() # site last_claimed more than 1 hour ago can be reclaimed site = claimed_site claimed_site = None site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=65) site.save() - claimed_site = frontier.claim_site(worker_id='test_claim_site') + claimed_sites = frontier.claim_sites() + assert len(claimed_sites) == 1 + claimed_site = claimed_sites[0] assert claimed_site.id == site.id # clean up