mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-04-19 15:25:59 -04:00
claim sites to brozzle in batches to reduce contention over sites table
This commit is contained in:
parent
a125434563
commit
7962444f09
@ -61,6 +61,33 @@ class BrowserPool:
|
||||
self._in_use = set()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _fresh_browser(self):
|
||||
# choose available port
|
||||
sock = socket.socket()
|
||||
sock.bind(('0.0.0.0', 0))
|
||||
port = sock.getsockname()[1]
|
||||
sock.close()
|
||||
|
||||
browser = Browser(port=port, **self.kwargs)
|
||||
return browser
|
||||
|
||||
def acquire_multi(self, n=1):
|
||||
'''
|
||||
Returns a list of up to `n` browsers.
|
||||
|
||||
Raises:
|
||||
NoBrowsersAvailable if none available
|
||||
'''
|
||||
browsers = []
|
||||
with self._lock:
|
||||
if len(self._in_use) >= self.size:
|
||||
raise NoBrowsersAvailable
|
||||
while len(self._in_use) < self.size and len(browsers) < n:
|
||||
browser = self._fresh_browser()
|
||||
browsers.append(browser)
|
||||
self._in_use.add(browser)
|
||||
return browsers
|
||||
|
||||
def acquire(self):
|
||||
'''
|
||||
Returns an available instance.
|
||||
@ -74,14 +101,7 @@ class BrowserPool:
|
||||
with self._lock:
|
||||
if len(self._in_use) >= self.size:
|
||||
raise NoBrowsersAvailable
|
||||
|
||||
# choose available port
|
||||
sock = socket.socket()
|
||||
sock.bind(('0.0.0.0', 0))
|
||||
port = sock.getsockname()[1]
|
||||
sock.close()
|
||||
|
||||
browser = Browser(port=port, **self.kwargs)
|
||||
browser = self._fresh_browser()
|
||||
self._in_use.add(browser)
|
||||
return browser
|
||||
|
||||
@ -90,6 +110,13 @@ class BrowserPool:
|
||||
with self._lock:
|
||||
self._in_use.remove(browser)
|
||||
|
||||
def release_all(self, browsers):
|
||||
for browser in browsers:
|
||||
browser.stop() # make sure
|
||||
with self._lock:
|
||||
for browser in browsers:
|
||||
self._in_use.remove(browser)
|
||||
|
||||
def shutdown_now(self):
|
||||
self.logger.info(
|
||||
'shutting down browser pool (%s browsers in use)',
|
||||
|
@ -93,7 +93,7 @@ class RethinkDbFrontier:
|
||||
raise UnexpectedDbResult("expected %r to be %r in %r" % (
|
||||
k, expected, result))
|
||||
|
||||
def claim_site(self, worker_id):
|
||||
def claim_sites(self, n=1):
|
||||
# XXX keep track of aggregate priority and prioritize sites accordingly?
|
||||
while True:
|
||||
result = (
|
||||
@ -104,34 +104,43 @@ class RethinkDbFrontier:
|
||||
.order_by(index="sites_last_disclaimed")
|
||||
.filter((r.row["claimed"] != True) | (
|
||||
r.row["last_claimed"] < r.now() - 60*60))
|
||||
.limit(1)
|
||||
.limit(n)
|
||||
.update(
|
||||
# try to avoid a race condition resulting in multiple
|
||||
# brozzler-workers claiming the same site
|
||||
# see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
|
||||
r.branch((r.row["claimed"] != True) | (
|
||||
r.row["last_claimed"] < r.now() - 60*60), {
|
||||
"claimed": True, "last_claimed_by": worker_id,
|
||||
"claimed": True,
|
||||
"last_claimed": doublethink.utcnow()}, {}),
|
||||
return_changes=True)).run()
|
||||
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
|
||||
if result["replaced"] == 1:
|
||||
if result["changes"][0]["old_val"]["claimed"]:
|
||||
|
||||
self._vet_result(
|
||||
result, replaced=list(range(n+1)),
|
||||
unchanged=list(range(n+1)))
|
||||
sites = []
|
||||
for i in range(result["replaced"]):
|
||||
if result["changes"][i]["old_val"]["claimed"]:
|
||||
self.logger.warn(
|
||||
"re-claimed site that was still marked 'claimed' "
|
||||
"because it was last claimed a long time ago "
|
||||
"at %s, and presumably some error stopped it from "
|
||||
"being disclaimed",
|
||||
result["changes"][0]["old_val"]["last_claimed"])
|
||||
site = brozzler.Site(self.rr, result["changes"][0]["new_val"])
|
||||
else:
|
||||
result["changes"][i]["old_val"]["last_claimed"])
|
||||
site = brozzler.Site(self.rr, result["changes"][i]["new_val"])
|
||||
sites.append(site)
|
||||
if not sites:
|
||||
raise brozzler.NothingToClaim
|
||||
# XXX This is the only place we enforce time limit for now. Worker
|
||||
# loop should probably check time limit. Maybe frontier needs a
|
||||
# housekeeping thread to ensure that time limits get enforced in a
|
||||
# timely fashion.
|
||||
if not self._enforce_time_limit(site):
|
||||
return site
|
||||
for site in list(sites):
|
||||
if self._enforce_time_limit(site):
|
||||
sites.remove(site)
|
||||
if sites:
|
||||
return sites
|
||||
# else try again
|
||||
|
||||
def _enforce_time_limit(self, site):
|
||||
if (site.time_limit and site.time_limit > 0
|
||||
|
@ -501,6 +501,9 @@ class BrozzlerWorker:
|
||||
|
||||
def brozzle_site(self, browser, site):
|
||||
try:
|
||||
site.last_claimed_by = '%s:%s' % (
|
||||
socket.gethostname(), browser.chrome.port)
|
||||
site.save()
|
||||
start = time.time()
|
||||
page = None
|
||||
self._frontier.honor_stop_request(site)
|
||||
@ -602,36 +605,50 @@ class BrozzlerWorker:
|
||||
if due:
|
||||
self._service_heartbeat()
|
||||
|
||||
def _start_browsing_some_sites(self):
|
||||
'''
|
||||
Starts browsing some sites.
|
||||
|
||||
Raises:
|
||||
NoBrowsersAvailable if none available
|
||||
'''
|
||||
browsers = self._browser_pool.acquire_multi(
|
||||
(self._browser_pool.num_available() + 1) // 2)
|
||||
try:
|
||||
sites = self._frontier.claim_sites(len(browsers))
|
||||
except:
|
||||
self._browser_pool.release_all(browsers)
|
||||
raise
|
||||
|
||||
for i in range(len(browsers)):
|
||||
if i < len(sites):
|
||||
th = threading.Thread(
|
||||
target=self._brozzle_site_thread_target,
|
||||
args=(browsers[i], sites[i]),
|
||||
name="BrozzlingThread:%s" % browsers[i].chrome.port,
|
||||
daemon=True)
|
||||
with self._browsing_threads_lock:
|
||||
self._browsing_threads.add(th)
|
||||
th.start()
|
||||
else:
|
||||
self._browser_pool.release(browsers[i])
|
||||
|
||||
def run(self):
|
||||
self.logger.info("brozzler worker starting")
|
||||
try:
|
||||
latest_state = None
|
||||
while not self._shutdown.is_set():
|
||||
self._service_heartbeat_if_due()
|
||||
try:
|
||||
browser = self._browser_pool.acquire()
|
||||
try:
|
||||
site = self._frontier.claim_site("%s:%s" % (
|
||||
socket.gethostname(), browser.chrome.port))
|
||||
th = threading.Thread(
|
||||
target=self._brozzle_site_thread_target,
|
||||
args=(browser, site),
|
||||
name="BrozzlingThread:%s" % browser.chrome.port,
|
||||
daemon=True)
|
||||
with self._browsing_threads_lock:
|
||||
self._browsing_threads.add(th)
|
||||
th.start()
|
||||
except:
|
||||
self._browser_pool.release(browser)
|
||||
raise
|
||||
self._start_browsing_some_sites()
|
||||
except brozzler.browser.NoBrowsersAvailable:
|
||||
if latest_state != "browsers-busy":
|
||||
self.logger.info(
|
||||
"all %s browsers are busy", self._max_browsers)
|
||||
latest_state = "browsers-busy"
|
||||
logging.trace(
|
||||
"all %s browsers are in use", self._max_browsers)
|
||||
except brozzler.NothingToClaim:
|
||||
pass
|
||||
logging.trace(
|
||||
"all active sites are already claimed by a "
|
||||
"brozzler worker")
|
||||
time.sleep(0.5)
|
||||
|
||||
self.logger.info("shutdown requested")
|
||||
except r.ReqlError as e:
|
||||
self.logger.error(
|
||||
|
@ -826,30 +826,34 @@ def test_claim_site():
|
||||
rr.table('sites').delete().run() # clean slate
|
||||
|
||||
with pytest.raises(brozzler.NothingToClaim):
|
||||
claimed_site = frontier.claim_site(worker_id='test_claim_site')
|
||||
claimed_site = frontier.claim_sites()
|
||||
|
||||
site = brozzler.Site(rr, {'seed': 'http://example.org/'})
|
||||
brozzler.new_site(frontier, site)
|
||||
|
||||
claimed_site = frontier.claim_site(worker_id='test_claim_site')
|
||||
claimed_sites = frontier.claim_sites()
|
||||
assert len(claimed_sites) == 1
|
||||
claimed_site = claimed_sites[0]
|
||||
assert claimed_site.id == site.id
|
||||
assert claimed_site.claimed
|
||||
assert claimed_site.last_claimed >= doublethink.utcnow() - datetime.timedelta(minutes=1)
|
||||
with pytest.raises(brozzler.NothingToClaim):
|
||||
claimed_site = frontier.claim_site(worker_id='test_claim_site')
|
||||
claimed_site = frontier.claim_sites()
|
||||
|
||||
# site last_claimed less than 1 hour ago still not to be reclaimed
|
||||
claimed_site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=55)
|
||||
claimed_site.save()
|
||||
with pytest.raises(brozzler.NothingToClaim):
|
||||
claimed_site = frontier.claim_site(worker_id='test_claim_site')
|
||||
claimed_site = frontier.claim_sites()
|
||||
|
||||
# site last_claimed more than 1 hour ago can be reclaimed
|
||||
site = claimed_site
|
||||
claimed_site = None
|
||||
site.last_claimed = doublethink.utcnow() - datetime.timedelta(minutes=65)
|
||||
site.save()
|
||||
claimed_site = frontier.claim_site(worker_id='test_claim_site')
|
||||
claimed_sites = frontier.claim_sites()
|
||||
assert len(claimed_sites) == 1
|
||||
claimed_site = claimed_sites[0]
|
||||
assert claimed_site.id == site.id
|
||||
|
||||
# clean up
|
||||
|
Loading…
x
Reference in New Issue
Block a user