mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-08-21 04:18:11 -04:00
feed seed url to the crawl url queue
This commit is contained in:
parent
1d068f4f86
commit
8aa1e6715a
1 changed files with 75 additions and 13 deletions
|
@ -29,17 +29,37 @@ args = arg_parser.parse_args(args=sys.argv[1:])
|
||||||
logging.basicConfig(stream=sys.stdout, level=args.log_level,
|
logging.basicConfig(stream=sys.stdout, level=args.log_level,
|
||||||
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
|
format="%(asctime)s %(process)d %(levelname)s %(threadName)s %(name)s.%(funcName)s(%(filename)s:%(lineno)d) %(message)s")
|
||||||
|
|
||||||
|
class CrawlUrl:
|
||||||
|
def __init__(self, url, id=None, site_id=None, hops_from_seed=0):
|
||||||
|
self.id = id
|
||||||
|
self.site_id = site_id
|
||||||
|
self.url = url
|
||||||
|
self.hops_from_seed = hops_from_seed
|
||||||
|
self._canon_hurl = None
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return """CrawlUrl(url="{}",site_id={},hops_from_seed={})""".format(
|
||||||
|
self.url, self.site_id, self.hops_from_seed)
|
||||||
|
|
||||||
|
def canonical(self):
|
||||||
|
if self._canon_hurl is None:
|
||||||
|
self._canon_hurl = surt.handyurl.parse(self.url)
|
||||||
|
surt.GoogleURLCanonicalizer.canonicalize(self._canon_hurl)
|
||||||
|
return self._canon_hurl.geturl()
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
d = dict(id=self.id, site_id=self.site_id, url=self.url, hops_from_seed=self.hops_from_seed)
|
||||||
|
return json.dumps(d, separators=(',', ':'))
|
||||||
|
|
||||||
class BrozzlerHQDb:
|
class BrozzlerHQDb:
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
|
|
||||||
def __init__(self, db_file="./brozzler-hq-0.db"):
|
def __init__(self, db_file="./brozzler-hq-0.db"):
|
||||||
self.db_file = db_file
|
self._conn = sqlite3.connect(db_file)
|
||||||
self.db_conn = sqlite3.connect(db_file)
|
|
||||||
|
|
||||||
self._create_tables()
|
self._create_tables()
|
||||||
|
|
||||||
def _create_tables(self):
|
def _create_tables(self):
|
||||||
cursor = self.db_conn.cursor()
|
cursor = self._conn.cursor()
|
||||||
cursor.executescript("""
|
cursor.executescript("""
|
||||||
create table if not exists brozzler_sites (
|
create table if not exists brozzler_sites (
|
||||||
id integer primary key,
|
id integer primary key,
|
||||||
|
@ -54,18 +74,51 @@ class BrozzlerHQDb:
|
||||||
canon_url varchar(4000),
|
canon_url varchar(4000),
|
||||||
crawl_url_json text
|
crawl_url_json text
|
||||||
);
|
);
|
||||||
create index if not exists brozzler_urls_priority on brozzler_urls (priority);
|
create index if not exists brozzler_urls_priority on brozzler_urls (priority desc);
|
||||||
create index if not exists brozzler_urls_site_id on brozzler_urls (site_id);
|
create index if not exists brozzler_urls_site_id on brozzler_urls (site_id);
|
||||||
""")
|
""")
|
||||||
self.db_conn.commit()
|
self._conn.commit()
|
||||||
|
|
||||||
|
def pop_url(self, site_id):
|
||||||
|
cursor = self._conn.cursor()
|
||||||
|
cursor.execute("select id, priority, crawl_url_json from brozzler_urls where site_id = ? and not in_progress order by priority desc limit 1", (site_id,))
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if row:
|
||||||
|
(id, priority, crawl_url_json) = row
|
||||||
|
new_priority = priority - 2000
|
||||||
|
cursor.execute("update brozzler_urls set priority=?, in_progress=1 where id=?", (new_priority, id))
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
d = json.loads(crawl_url_json)
|
||||||
|
d["id"] = id
|
||||||
|
return d
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def new_site(self, site_dict):
|
def new_site(self, site_dict):
|
||||||
site_json = json.dumps(site_dict, separators=(',', ':'))
|
site_json = json.dumps(site_dict, separators=(',', ':'))
|
||||||
cursor = self.db_conn.cursor()
|
cursor = self._conn.cursor()
|
||||||
cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,))
|
cursor.execute("insert into brozzler_sites (site_json) values (?)", (site_json,))
|
||||||
self.db_conn.commit()
|
self._conn.commit()
|
||||||
return cursor.lastrowid
|
return cursor.lastrowid
|
||||||
|
|
||||||
|
def schedule_url(self, crawl_url, priority=0):
|
||||||
|
cursor = self._conn.cursor()
|
||||||
|
cursor.execute("insert into brozzler_urls (site_id, priority, canon_url, crawl_url_json, in_progress) values (?, ?, ?, ?, 0)",
|
||||||
|
(crawl_url.site_id, priority, crawl_url.canonical(), crawl_url.to_json()))
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def sites(self):
|
||||||
|
cursor = self._conn.cursor()
|
||||||
|
cursor.execute("select id, site_json from brozzler_sites")
|
||||||
|
while True:
|
||||||
|
row = cursor.fetchone()
|
||||||
|
if row is None:
|
||||||
|
break
|
||||||
|
site_dict = json.loads(row[1])
|
||||||
|
site_dict["id"] = row[0]
|
||||||
|
yield site_dict
|
||||||
|
|
||||||
class BrozzlerHQ:
|
class BrozzlerHQ:
|
||||||
logger = logging.getLogger(__module__ + "." + __qualname__)
|
logger = logging.getLogger(__module__ + "." + __qualname__)
|
||||||
|
|
||||||
|
@ -84,7 +137,7 @@ class BrozzlerHQ:
|
||||||
while True:
|
while True:
|
||||||
self._new_site()
|
self._new_site()
|
||||||
# self._consume_completed_url()
|
# self._consume_completed_url()
|
||||||
# self._feed_crawl_urls()
|
self._feed_crawl_urls()
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
finally:
|
finally:
|
||||||
self._conn.close()
|
self._conn.close()
|
||||||
|
@ -95,18 +148,27 @@ class BrozzlerHQ:
|
||||||
new_site_dict = msg.payload
|
new_site_dict = msg.payload
|
||||||
site_id = self._db.new_site(new_site_dict)
|
site_id = self._db.new_site(new_site_dict)
|
||||||
new_site_dict["id"] = site_id
|
new_site_dict["id"] = site_id
|
||||||
self._schedule_seed(new_site_dict["seed"])
|
self._schedule_seed(site_id, new_site_dict["seed"])
|
||||||
self._unclaimed_sites_q.put(new_site_dict)
|
self._unclaimed_sites_q.put(new_site_dict)
|
||||||
msg.ack()
|
msg.ack()
|
||||||
self.logger.info("new site {}".format(new_site_dict))
|
self.logger.info("new site {}".format(new_site_dict))
|
||||||
except kombu.simple.Empty:
|
except kombu.simple.Empty:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _schedule_seed(self, seed_url):
|
def _schedule_seed(self, site_id, seed_url):
|
||||||
pass
|
crawl_url = CrawlUrl(seed_url, site_id=site_id, hops_from_seed=0)
|
||||||
# self._db.schedule_url(
|
self._db.schedule_url(crawl_url, priority=1000)
|
||||||
|
|
||||||
|
def _feed_crawl_urls(self):
|
||||||
|
for site in self._db.sites():
|
||||||
|
q = self._conn.SimpleQueue("brozzler.sites.{}.crawl_urls".format(site["id"]))
|
||||||
|
if len(q) == 0:
|
||||||
|
url = self._db.pop_url(site["id"])
|
||||||
|
if url:
|
||||||
|
q.put(url)
|
||||||
|
|
||||||
logging.info("brozzler-hq starting")
|
logging.info("brozzler-hq starting")
|
||||||
|
|
||||||
db = BrozzlerHQDb(db_file=args.db_file)
|
db = BrozzlerHQDb(db_file=args.db_file)
|
||||||
hq = BrozzlerHQ(amqp_url=args.amqp_url, db=db)
|
hq = BrozzlerHQ(amqp_url=args.amqp_url, db=db)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue