mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-02-24 16:49:56 -05:00
fix bug claiming site, looks like there could be a race condition with other worker claiming the same site
This commit is contained in:
parent
3c23aa8fd4
commit
5fe2805285
@ -41,6 +41,7 @@ class ReachedLimit(Exception):
|
|||||||
|
|
||||||
class Rethinker:
|
class Rethinker:
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
|
|
||||||
def __init__(self, servers=["localhost"], db=None):
|
def __init__(self, servers=["localhost"], db=None):
|
||||||
@ -50,8 +51,8 @@ class Rethinker:
|
|||||||
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
# https://github.com/rethinkdb/rethinkdb-example-webpy-blog/blob/master/model.py
|
||||||
# "Best practices: Managing connections: a connection per request"
|
# "Best practices: Managing connections: a connection per request"
|
||||||
def _random_server_connection(self):
|
def _random_server_connection(self):
|
||||||
import rethinkdb as r
|
|
||||||
import random
|
import random
|
||||||
|
import rethinkdb as r
|
||||||
while True:
|
while True:
|
||||||
server = random.choice(self.servers)
|
server = random.choice(self.servers)
|
||||||
try:
|
try:
|
||||||
@ -66,6 +67,7 @@ class Rethinker:
|
|||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
|
|
||||||
def run(self, query):
|
def run(self, query):
|
||||||
|
import rethinkdb as r
|
||||||
while True:
|
while True:
|
||||||
with self._random_server_connection() as conn:
|
with self._random_server_connection() as conn:
|
||||||
try:
|
try:
|
||||||
|
@ -85,7 +85,7 @@ class RethinkDbFrontier:
|
|||||||
result = self.r.run(r.table("sites")
|
result = self.r.run(r.table("sites")
|
||||||
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
|
.between(["ACTIVE",False,0], ["ACTIVE",False,250000000000], index="sites_last_disclaimed")
|
||||||
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True))
|
.order_by(index="sites_last_disclaimed").limit(1).update({"claimed":True},return_changes=True))
|
||||||
self._vet_result(result, replaced=[0,1])
|
self._vet_result(result, replaced=[0,1], unchanged=[0,1])
|
||||||
if result["replaced"] == 1:
|
if result["replaced"] == 1:
|
||||||
site = brozzler.Site(**result["changes"][0]["new_val"])
|
site = brozzler.Site(**result["changes"][0]["new_val"])
|
||||||
else:
|
else:
|
||||||
@ -157,7 +157,7 @@ class RethinkDbFrontier:
|
|||||||
for url in outlinks:
|
for url in outlinks:
|
||||||
if site.is_in_scope(url, parent_page):
|
if site.is_in_scope(url, parent_page):
|
||||||
if brozzler.is_permitted_by_robots(site, url):
|
if brozzler.is_permitted_by_robots(site, url):
|
||||||
new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1)
|
new_child_page = brozzler.Page(url, site_id=site.id, hops_from_seed=parent_page.hops_from_seed+1, via_page_id=parent_page.id)
|
||||||
existing_child_page = self.get_page(new_child_page)
|
existing_child_page = self.get_page(new_child_page)
|
||||||
if existing_child_page:
|
if existing_child_page:
|
||||||
existing_child_page.priority += new_child_page.priority
|
existing_child_page.priority += new_child_page.priority
|
||||||
|
@ -81,13 +81,14 @@ class Site(brozzler.BaseDictable):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
class Page(brozzler.BaseDictable):
|
class Page(brozzler.BaseDictable):
|
||||||
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0):
|
def __init__(self, url, id=None, site_id=None, hops_from_seed=0, redirect_url=None, priority=None, claimed=False, brozzle_count=0, via_page_id=None):
|
||||||
self.site_id = site_id
|
self.site_id = site_id
|
||||||
self.url = url
|
self.url = url
|
||||||
self.hops_from_seed = hops_from_seed
|
self.hops_from_seed = hops_from_seed
|
||||||
self.redirect_url = redirect_url
|
self.redirect_url = redirect_url
|
||||||
self.claimed = bool(claimed)
|
self.claimed = bool(claimed)
|
||||||
self.brozzle_count = brozzle_count
|
self.brozzle_count = brozzle_count
|
||||||
|
self.via_page_id = via_page_id
|
||||||
self._canon_hurl = None
|
self._canon_hurl = None
|
||||||
|
|
||||||
if priority is not None:
|
if priority is not None:
|
||||||
|
@ -91,7 +91,7 @@ _reset() {
|
|||||||
|
|
||||||
tstamp=$(date +"%Y%m%d%H%M%S")
|
tstamp=$(date +"%Y%m%d%H%M%S")
|
||||||
echo "renaming rethinkdb database archiveit_brozzler to archiveit_brozzler_$tstamp"
|
echo "renaming rethinkdb database archiveit_brozzler to archiveit_brozzler_$tstamp"
|
||||||
python3.4 <<EOF
|
PYTHONPATH=/home/nlevitt/tmp/brozzler-venv/lib/python3.4/site-packages python3.4 <<EOF
|
||||||
import rethinkdb as r
|
import rethinkdb as r
|
||||||
with r.connect("wbgrp-svc035") as conn:
|
with r.connect("wbgrp-svc035") as conn:
|
||||||
r.db("archiveit_brozzler").config().update({"name":"archiveit_brozzler_$tstamp"}).run(conn)
|
r.db("archiveit_brozzler").config().update({"name":"archiveit_brozzler_$tstamp"}).run(conn)
|
||||||
@ -111,11 +111,11 @@ _start() {
|
|||||||
set -e
|
set -e
|
||||||
set -x
|
set -x
|
||||||
|
|
||||||
/home/nlevitt/tmp/brozzler-venv/bin/warcprox --dir=/1/brzl/warcs --rethinkdb-servers=wbgrp-svc020,wbgrp-svc035,wbgrp-svc036 --rethinkdb-db=archiveit_brozzler --rethinkdb-big-table --cacert=/1/brzl/warcprox-ca.pem --certs-dir=/1/brzl/certs --address=0.0.0.0 --base32 --gzip --rollover-idle-time=180 &>/1/brzl/logs/warcprox.out &
|
PYTHONPATH=/home/nlevitt/tmp/brozzler-venv/lib/python3.4/site-packages:/home/nlevitt/workspace/brozzler:/home/nlevitt/workspace/warcprox:/home/nlevitt/workspace/ait5 /home/nlevitt/tmp/brozzler-venv/bin/warcprox --dir=/1/brzl/warcs --rethinkdb-servers=wbgrp-svc020,wbgrp-svc035,wbgrp-svc036 --rethinkdb-db=archiveit_brozzler --rethinkdb-big-table --cacert=/1/brzl/warcprox-ca.pem --certs-dir=/1/brzl/certs --address=0.0.0.0 --base32 --gzip --rollover-idle-time=180 --kafka-broker-list=qa-archive-it.org:6092 --kafka-capture-feed-topic=ait-brozzler-captures &>/1/brzl/logs/warcprox.out &
|
||||||
|
|
||||||
sleep 5
|
sleep 5
|
||||||
|
|
||||||
/home/nlevitt/workspace/ait5/scripts/brozzler-job-starter.py &> /1/brzl/logs/ait-job-starter.out &
|
PYTHONPATH=/home/nlevitt/tmp/brozzler-venv/lib/python3.4/site-packages:/home/nlevitt/workspace/brozzler:/home/nlevitt/workspace/warcprox:/home/nlevitt/workspace/ait5 /home/nlevitt/workspace/ait5/scripts/brozzler-job-starter.py &> /1/brzl/logs/ait-job-starter.out &
|
||||||
|
|
||||||
sleep 5
|
sleep 5
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user