Merge pull request #122 from nlevitt/new-job-bulk-inserts

improve performance of brozzler-new-job
This commit is contained in:
jkafader 2018-09-28 15:26:33 -07:00 committed by GitHub
commit d2b1843a6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -84,26 +84,30 @@ def new_job(frontier, job_conf):
job.save() job.save()
sites = [] sites = []
pages = []
for seed_conf in job_conf["seeds"]: for seed_conf in job_conf["seeds"]:
merged_conf = merge(seed_conf, job_conf) merged_conf = merge(seed_conf, job_conf)
merged_conf.pop("seeds") merged_conf.pop("seeds")
merged_conf["job_id"] = job.id merged_conf["job_id"] = job.id
merged_conf["seed"] = merged_conf.pop("url") merged_conf["seed"] = merged_conf.pop("url")
site = brozzler.Site(frontier.rr, merged_conf) site = brozzler.Site(frontier.rr, merged_conf)
site.id = str(uuid.uuid4())
sites.append(site) sites.append(site)
pages.append(new_seed_page(frontier, site))
for site in sites: # insert in batches to avoid this error
new_site(frontier, site) # rethinkdb.errors.ReqlDriverError: Query size (167883036) greater than maximum (134217727) in:
for batch in (pages[i:i+500] for i in range(0, len(pages), 500)):
logging.info('inserting batch of %s pages', len(batch))
result = frontier.rr.table('pages').insert(batch).run()
for batch in (sites[i:i+100] for i in range(0, len(sites), 100)):
logging.info('inserting batch of %s sites', len(batch))
result = frontier.rr.table('sites').insert(batch).run()
logging.info('job %s fully started', job.id)
return job return job
def new_site(frontier, site): def new_seed_page(frontier, site):
site.id = str(uuid.uuid4())
logging.info("new site %s", site)
# insert the Page into the database before the Site, to avoid situation
# where a brozzler worker immediately claims the site, finds no pages
# to crawl, and decides the site is finished
try:
url = urlcanon.parse_url(site.seed) url = urlcanon.parse_url(site.seed)
hashtag = (url.hash_sign + url.fragment).decode("utf-8") hashtag = (url.hash_sign + url.fragment).decode("utf-8")
urlcanon.canon.remove_fragment(url) urlcanon.canon.remove_fragment(url)
@ -113,6 +117,16 @@ def new_site(frontier, site):
"priority": 1000, "needs_robots_check": True}) "priority": 1000, "needs_robots_check": True})
if hashtag: if hashtag:
page.hashtags = [hashtag,] page.hashtags = [hashtag,]
return page
def new_site(frontier, site):
logging.info("new site %s", site)
site.id = site.id or str(uuid.uuid4())
# insert the Page into the database before the Site, to avoid situation
# where a brozzler worker immediately claims the site, finds no pages
# to crawl, and decides the site is finished
try:
page = new_seed_page(frontier, site)
page.save() page.save()
logging.info("queued page %s", page) logging.info("queued page %s", page)
finally: finally: