working on avoiding race condition resulting in multiple brozzler-workers claiming the same site

This commit is contained in:
Noah Levitt 2016-04-22 01:27:50 +00:00
parent 2825ffea15
commit 8d9fc7d3e3
2 changed files with 27 additions and 10 deletions

View File

@ -62,6 +62,8 @@ class RethinkDbFrontier:
def new_site(self, site): def new_site(self, site):
self.logger.info("inserting into 'sites' table %s", site) self.logger.info("inserting into 'sites' table %s", site)
import pprint
self.logger.info("update_site:\n%s", pprint.pformat(vars(site)))
result = self.r.table("sites").insert(site.to_dict()).run() result = self.r.table("sites").insert(site.to_dict()).run()
self._vet_result(result, inserted=1) self._vet_result(result, inserted=1)
if not site.id: if not site.id:
@ -75,6 +77,8 @@ class RethinkDbFrontier:
def update_site(self, site): def update_site(self, site):
self.logger.debug("updating 'sites' table entry %s", site) self.logger.debug("updating 'sites' table entry %s", site)
import pprint
self.logger.info("update_site:\n%s", pprint.pformat(vars(site)))
result = self.r.table("sites").get(site.id).replace(site.to_dict()).run() result = self.r.table("sites").get(site.id).replace(site.to_dict()).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
@ -91,7 +95,8 @@ class RethinkDbFrontier:
def claim_site(self, worker_id): def claim_site(self, worker_id):
# XXX keep track of aggregate priority and prioritize sites accordingly? # XXX keep track of aggregate priority and prioritize sites accordingly?
while True: while True:
result = (self.r.table("sites", read_mode="majority") result = (
self.r.table("sites", read_mode="majority")
.between( .between(
["ACTIVE",rethinkdb.minval], ["ACTIVE",rethinkdb.minval],
["ACTIVE",rethinkdb.maxval], ["ACTIVE",rethinkdb.maxval],
@ -102,11 +107,18 @@ class RethinkDbFrontier:
(rethinkdb.row["last_claimed"] (rethinkdb.row["last_claimed"]
< rethinkdb.now() - 2*60*60)) < rethinkdb.now() - 2*60*60))
.limit(1) .limit(1)
.update({ .update(
"claimed": True, # try to avoid a race condition resulting in multiple
"last_claimed_by": worker_id, # brozzler-workers claiming the same site
"last_claimed": rethinkstuff.utcnow(), # see https://github.com/rethinkdb/rethinkdb/issues/3235#issuecomment-60283038
}, return_changes=True)).run() rethinkdb.branch(
(rethinkdb.row["claimed"] != True) |
(rethinkdb.row["last_claimed"]
< rethinkdb.now() - 2*60*60), {
"claimed": True,
"last_claimed_by": worker_id,
"last_claimed": rethinkstuff.utcnow()
}, {}), return_changes=True)).run()
self._vet_result(result, replaced=[0,1], unchanged=[0,1]) self._vet_result(result, replaced=[0,1], unchanged=[0,1])
if result["replaced"] == 1: if result["replaced"] == 1:
if result["changes"][0]["old_val"]["claimed"]: if result["changes"][0]["old_val"]["claimed"]:
@ -117,6 +129,8 @@ class RethinkDbFrontier:
"being disclaimed", "being disclaimed",
result["changes"][0]["old_val"]["last_claimed"]) result["changes"][0]["old_val"]["last_claimed"])
site = brozzler.Site(**result["changes"][0]["new_val"]) site = brozzler.Site(**result["changes"][0]["new_val"])
import pprint
self.logger.info("claim_site:\n%s", pprint.pformat(vars(site)))
else: else:
raise brozzler.NothingToClaim raise brozzler.NothingToClaim
# XXX This is the only place we enforce time limit for now. Worker # XXX This is the only place we enforce time limit for now. Worker

View File

@ -79,7 +79,6 @@ class BrozzlerWorker:
self._browser_pool = brozzler.browser.BrowserPool(max_browsers, self._browser_pool = brozzler.browser.BrowserPool(max_browsers,
chrome_exe=chrome_exe, ignore_cert_errors=True) chrome_exe=chrome_exe, ignore_cert_errors=True)
self._shutdown_requested = threading.Event() self._shutdown_requested = threading.Event()
self._id = "{}@{}".format(socket.gethostname(), os.getpid())
def _youtube_dl(self, site): def _youtube_dl(self, site):
ydl_opts = { ydl_opts = {
@ -257,7 +256,9 @@ class BrozzlerWorker:
while (not self._shutdown_requested.is_set() while (not self._shutdown_requested.is_set()
and time.time() - start < 7 * 60): and time.time() - start < 7 * 60):
self._frontier.honor_stop_request(site.job_id) self._frontier.honor_stop_request(site.job_id)
page = self._frontier.claim_page(site, self._id) page = self._frontier.claim_page(site,
"{}:{}".format(
socket.gethostname(), browser.chrome_port))
outlinks = self.brozzle_page(browser, ydl, site, page) outlinks = self.brozzle_page(browser, ydl, site, page)
self._frontier.completed_page(site, page) self._frontier.completed_page(site, page)
self._frontier.scope_and_schedule_outlinks(site, page, outlinks) self._frontier.scope_and_schedule_outlinks(site, page, outlinks)
@ -304,13 +305,15 @@ class BrozzlerWorker:
try: try:
browser = self._browser_pool.acquire() browser = self._browser_pool.acquire()
try: try:
site = self._frontier.claim_site(self._id) site = self._frontier.claim_site("{}:{}".format(
socket.gethostname(), browser.chrome_port))
self.logger.info("brozzling site %s", site) self.logger.info("brozzling site %s", site)
ydl = self._youtube_dl(site) ydl = self._youtube_dl(site)
th = threading.Thread( th = threading.Thread(
target=lambda: self._brozzle_site( target=lambda: self._brozzle_site(
browser, ydl, site), browser, ydl, site),
name="BrowsingThread-{}".format(site.seed)) name="BrowsingThread:{}-{}".format(
browser.chrome_port, site.seed))
th.start() th.start()
except: except:
self._browser_pool.release(browser) self._browser_pool.release(browser)