From 36e323c9426eccea5d39d7b4e6c01dfc75e2011f Mon Sep 17 00:00:00 2001 From: Daniel Bicho Date: Thu, 12 Oct 2017 19:21:13 +0100 Subject: [PATCH 1/5] fix resume_job function, the job was not able to resume because the job stop_requested value was not reset. --- brozzler/frontier.py | 1 + 1 file changed, 1 insertion(+) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index 7fce61f..ac158e1 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -249,6 +249,7 @@ class RethinkDbFrontier: def resume_job(self, job): job.status = "ACTIVE" + job.stop_requested = None job.starts_and_stops.append( {"start":doublethink.utcnow(), "stop":None}) job.save() From 378c097c2907ccabe06fce021bc6bba2959a9378 Mon Sep 17 00:00:00 2001 From: Daniel Bicho Date: Fri, 13 Oct 2017 12:13:51 +0100 Subject: [PATCH 2/5] add verification change to test_resume_job --- tests/test_frontier.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 8b1b521..8f2e2d3 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -18,15 +18,15 @@ See the License for the specific language governing permissions and limitations under the License. ''' -import brozzler -import logging import argparse -import doublethink -import time import datetime -import uuid +import logging + +import doublethink import pytest +import brozzler + args = argparse.Namespace() args.log_level = logging.INFO brozzler.cli.configure_logging(args) @@ -215,6 +215,7 @@ def test_resume_job(): site = list(frontier.job_sites(job.id))[0] assert job.status == 'ACTIVE' + assert job.stop_requested is None assert len(job.starts_and_stops) == 3 assert job.starts_and_stops[2]['start'] assert job.starts_and_stops[2]['stop'] is None From 8aa10962bc5fac15f7e771f534b4d81ff9a3ca59 Mon Sep 17 00:00:00 2001 From: Daniel Bicho Date: Sun, 15 Oct 2017 19:11:46 +0100 Subject: [PATCH 3/5] test resume_job adding a simulation of a crawl job stopped and then resumed. --- tests/test_frontier.py | 58 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 8f2e2d3..168dcf8 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -203,19 +203,18 @@ def test_resume_job(): assert len(job.starts_and_stops) == 2 assert job.starts_and_stops[1]['start'] assert job.starts_and_stops[1]['stop'] - assert job.starts_and_stops[1]['stop'] > job.starts_and_stops[0]['start'] + assert job.starts_and_stops[1]['stop'] > job.starts_and_stops[1]['start'] assert site.status == 'FINISHED' assert len(site.starts_and_stops) == 2 assert site.starts_and_stops[1]['start'] assert site.starts_and_stops[1]['stop'] - assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start'] + assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[1]['start'] # resuming a job == resuming all of its sites frontier.resume_job(job) site = list(frontier.job_sites(job.id))[0] assert job.status == 'ACTIVE' - assert job.stop_requested is None assert len(job.starts_and_stops) == 3 assert job.starts_and_stops[2]['start'] assert job.starts_and_stops[2]['stop'] is None @@ -231,12 +230,61 @@ def test_resume_job(): assert len(job.starts_and_stops) == 3 assert job.starts_and_stops[2]['start'] assert job.starts_and_stops[2]['stop'] - assert job.starts_and_stops[2]['stop'] > job.starts_and_stops[0]['start'] + assert job.starts_and_stops[2]['stop'] > job.starts_and_stops[2]['start'] assert site.status == 'FINISHED' assert len(site.starts_and_stops) == 3 assert site.starts_and_stops[2]['start'] assert site.starts_and_stops[2]['stop'] - assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[0]['start'] + assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[2]['start'] + + frontier.resume_job(job) + site = list(frontier.job_sites(job.id))[0] + + assert job.status == 'ACTIVE' + assert len(job.starts_and_stops) == 4 + assert job.starts_and_stops[3]['start'] + assert job.starts_and_stops[3]['stop'] is None + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 4 + assert site.starts_and_stops[3]['start'] + assert site.starts_and_stops[3]['stop'] is None + + # simulate a crawl stopped by a stop request + job.stop_requested = datetime.datetime.utcnow().replace(tzinfo=doublethink.UTC) + job.save() + + with pytest.raises(brozzler.CrawlStopped): + frontier.honor_stop_request(site) + + frontier.finished(site, 'FINISHED_STOP_REQUESTED') + job.refresh() + + assert job.status == 'FINISHED' + assert job.stop_requested + assert len(job.starts_and_stops) == 4 + assert job.starts_and_stops[3]['start'] + assert job.starts_and_stops[3]['stop'] + assert job.starts_and_stops[3]['stop'] > job.starts_and_stops[3]['start'] + assert site.status == 'FINISHED_STOP_REQUESTED' + assert len(site.starts_and_stops) == 4 + assert site.starts_and_stops[3]['start'] + assert site.starts_and_stops[3]['stop'] + assert site.starts_and_stops[3]['stop'] > site.starts_and_stops[3]['start'] + + # test resume job after a stop request + frontier.resume_job(job) + site = list(frontier.job_sites(job.id))[0] + + assert job.status == 'ACTIVE' + assert job.stop_requested is None + assert len(job.starts_and_stops) == 5 + assert job.starts_and_stops[4]['start'] + assert job.starts_and_stops[4]['stop'] is None + assert site.status == 'ACTIVE' + assert len(site.starts_and_stops) == 5 + assert site.starts_and_stops[4]['start'] + assert site.starts_and_stops[4]['stop'] is None + def test_time_limit(): # XXX test not thoroughly adapted to change in time accounting, since From bb98a43c8cb49f8fa6b11b345f424b06b301ea4c Mon Sep 17 00:00:00 2001 From: Daniel Bicho Date: Mon, 16 Oct 2017 11:46:35 +0100 Subject: [PATCH 4/5] fix and test both job stop request and site stop requests --- brozzler/frontier.py | 1 + tests/test_frontier.py | 104 ++++++++++++++++++++++++++++++++--------- 2 files changed, 84 insertions(+), 21 deletions(-) diff --git a/brozzler/frontier.py b/brozzler/frontier.py index ac158e1..4e42bd7 100644 --- a/brozzler/frontier.py +++ b/brozzler/frontier.py @@ -264,6 +264,7 @@ class RethinkDbFrontier: # can't call resume_job since that would resume jobs's other sites job = brozzler.Job.load(self.rr, site.job_id) job.status = "ACTIVE" + site.stop_requested = None job.starts_and_stops.append( {"start":doublethink.utcnow(), "stop":None}) job.save() diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 168dcf8..0806cf5 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -249,42 +249,104 @@ def test_resume_job(): assert site.starts_and_stops[3]['start'] assert site.starts_and_stops[3]['stop'] is None - # simulate a crawl stopped by a stop request + # simulate a job stop request + job_conf = {'seeds': [{'url': 'http://example.com/'}, {'url': 'http://example_2.com/'}]} + job = brozzler.new_job(frontier, job_conf) + assert len(list(frontier.job_sites(job.id))) == 2 + site1 = list(frontier.job_sites(job.id))[0] + site2 = list(frontier.job_sites(job.id))[1] + job.stop_requested = datetime.datetime.utcnow().replace(tzinfo=doublethink.UTC) job.save() + # should raise a CrawlStopped with pytest.raises(brozzler.CrawlStopped): - frontier.honor_stop_request(site) + frontier.honor_stop_request(site1) - frontier.finished(site, 'FINISHED_STOP_REQUESTED') + frontier.finished(site1, 'FINISHED_STOP_REQUESTED') + frontier.finished(site2, 'FINISHED_STOP_REQUESTED') job.refresh() assert job.status == 'FINISHED' assert job.stop_requested - assert len(job.starts_and_stops) == 4 - assert job.starts_and_stops[3]['start'] - assert job.starts_and_stops[3]['stop'] - assert job.starts_and_stops[3]['stop'] > job.starts_and_stops[3]['start'] - assert site.status == 'FINISHED_STOP_REQUESTED' - assert len(site.starts_and_stops) == 4 - assert site.starts_and_stops[3]['start'] - assert site.starts_and_stops[3]['stop'] - assert site.starts_and_stops[3]['stop'] > site.starts_and_stops[3]['start'] + assert len(job.starts_and_stops) == 1 + assert job.starts_and_stops[0]['start'] + assert job.starts_and_stops[0]['stop'] + assert job.starts_and_stops[0]['stop'] > job.starts_and_stops[0]['start'] + assert site1.status == 'FINISHED_STOP_REQUESTED' + assert site2.status == 'FINISHED_STOP_REQUESTED' + assert len(site1.starts_and_stops) == 1 + assert len(site2.starts_and_stops) == 1 + assert site1.starts_and_stops[0]['start'] + assert site1.starts_and_stops[0]['stop'] + assert site1.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start'] + assert site2.starts_and_stops[0]['start'] + assert site2.starts_and_stops[0]['stop'] + assert site2.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start'] - # test resume job after a stop request + # simulate job resume after a stop request frontier.resume_job(job) - site = list(frontier.job_sites(job.id))[0] + site1 = list(frontier.job_sites(job.id))[0] + site2 = list(frontier.job_sites(job.id))[0] assert job.status == 'ACTIVE' assert job.stop_requested is None - assert len(job.starts_and_stops) == 5 - assert job.starts_and_stops[4]['start'] - assert job.starts_and_stops[4]['stop'] is None - assert site.status == 'ACTIVE' - assert len(site.starts_and_stops) == 5 - assert site.starts_and_stops[4]['start'] - assert site.starts_and_stops[4]['stop'] is None + assert len(job.starts_and_stops) == 2 + assert job.starts_and_stops[1]['start'] + assert job.starts_and_stops[1]['stop'] is None + assert site1.status == 'ACTIVE' + assert len(site1.starts_and_stops) == 2 + assert site1.starts_and_stops[1]['start'] + assert site1.starts_and_stops[1]['stop'] is None + assert site2.status == 'ACTIVE' + assert len(site2.starts_and_stops) == 2 + assert site2.starts_and_stops[1]['start'] + assert site2.starts_and_stops[1]['stop'] is None + # simulate a site stop request + site1.stop_requested = datetime.datetime.utcnow().replace(tzinfo=doublethink.UTC) + site1.save() + + # should not raise a CrawlStopped + frontier.honor_stop_request(site) + + frontier.finished(site1, 'FINISHED_STOP_REQUESTED') + job.refresh() + + assert job.status == 'ACTIVE' + assert job.stop_requested is None + assert len(job.starts_and_stops) == 2 + assert job.starts_and_stops[1]['start'] + assert job.starts_and_stops[1]['stop'] is None + assert site1.status == 'FINISHED_STOP_REQUESTED' + assert len(site1.starts_and_stops) == 2 + assert site1.starts_and_stops[1]['start'] + assert site1.starts_and_stops[1]['stop'] + assert site1.starts_and_stops[1]['stop'] > site.starts_and_stops[1]['start'] + assert site2.status == 'ACTIVE' + assert len(site2.starts_and_stops) == 2 + assert site2.starts_and_stops[1]['start'] + assert site2.starts_and_stops[1]['stop'] is None + + # simulate site resume after a stop request + frontier.resume_site(site1) + site1 = list(frontier.job_sites(job.id))[0] + site2 = list(frontier.job_sites(job.id))[1] + + assert job.status == 'ACTIVE' + assert job.stop_requested is None + assert len(job.starts_and_stops) == 2 + assert job.starts_and_stops[1]['start'] + assert job.starts_and_stops[1]['stop'] is None + assert site1.status == 'ACTIVE' + assert site1.stop_requested is None + assert len(site1.starts_and_stops) == 3 + assert site1.starts_and_stops[2]['start'] + assert site1.starts_and_stops[2]['stop'] is None + assert site2.status == 'ACTIVE' + assert len(site2.starts_and_stops) == 2 + assert site2.starts_and_stops[1]['start'] + assert site2.starts_and_stops[1]['stop'] is None def test_time_limit(): # XXX test not thoroughly adapted to change in time accounting, since From c4fa612547af69bc4adaa48c6c6a7594c69b70e6 Mon Sep 17 00:00:00 2001 From: Daniel Bicho Date: Tue, 17 Oct 2017 10:33:26 +0100 Subject: [PATCH 5/5] fix some errors in test_resume_job --- tests/test_frontier.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_frontier.py b/tests/test_frontier.py index 0806cf5..b77c1e6 100644 --- a/tests/test_frontier.py +++ b/tests/test_frontier.py @@ -287,7 +287,7 @@ def test_resume_job(): # simulate job resume after a stop request frontier.resume_job(job) site1 = list(frontier.job_sites(job.id))[0] - site2 = list(frontier.job_sites(job.id))[0] + site2 = list(frontier.job_sites(job.id))[1] assert job.status == 'ACTIVE' assert job.stop_requested is None @@ -308,7 +308,7 @@ def test_resume_job(): site1.save() # should not raise a CrawlStopped - frontier.honor_stop_request(site) + frontier.honor_stop_request(site2) frontier.finished(site1, 'FINISHED_STOP_REQUESTED') job.refresh()