diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 3608d06..c9e0e9b 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -27,7 +27,7 @@ class RethinkDbFrontier: if not "sites" in tables: self.logger.info("creating rethinkdb table 'sites' in database %s", repr(self.r.dbname)) self.r.table_create("sites", shards=self.shards, replicas=self.replicas).run() - self.r.table("sites").index_create("sites_last_disclaimed", [self.r.row["status"], self.r.row["claimed"], self.r.row["last_disclaimed"]]).run() + self.r.table("sites").index_create("sites_last_disclaimed", [self.r.row["status"], self.r.row["last_disclaimed"]]).run() self.r.table("sites").index_create("job_id").run() if not "pages" in tables: self.logger.info("creating rethinkdb table 'pages' in database %s", repr(self.r.dbname)) @@ -93,14 +93,24 @@ class RethinkDbFrontier: while True: result = (self.r.table("sites") .between( - ["ACTIVE",False,rethinkdb.minval], - ["ACTIVE",False,rethinkdb.maxval], + ["ACTIVE",rethinkdb.minval], + ["ACTIVE",rethinkdb.maxval], index="sites_last_disclaimed") .order_by(index="sites_last_disclaimed").limit(1) + .filter( + (rethinkdb.row["claimed"] != True) | + (rethinkdb.row["last_disclaimed"] + < rethinkdb.now() - 2*60*60)) .update({"claimed":True,"last_claimed_by":worker_id}, return_changes=True)).run() self._vet_result(result, replaced=[0,1], unchanged=[0,1]) if result["replaced"] == 1: + if result["changes"][0]["old_val"]["claimed"]: + self.logger.warn( + "re-claimed site that was still marked 'claimed' " + "because it was last disclaimed a long time ago " + "at %s", + result["changes"][0]["old_val"]["last_disclaimed"]) site = brozzler.Site(**result["changes"][0]["new_val"]) else: raise brozzler.NothingToClaim