mirror of
https://github.com/internetarchive/brozzler.git
synced 2025-08-01 11:06:17 -04:00
commit
19b67196ab
2 changed files with 122 additions and 9 deletions
|
@ -249,6 +249,7 @@ class RethinkDbFrontier:
|
||||||
|
|
||||||
def resume_job(self, job):
|
def resume_job(self, job):
|
||||||
job.status = "ACTIVE"
|
job.status = "ACTIVE"
|
||||||
|
job.stop_requested = None
|
||||||
job.starts_and_stops.append(
|
job.starts_and_stops.append(
|
||||||
{"start":doublethink.utcnow(), "stop":None})
|
{"start":doublethink.utcnow(), "stop":None})
|
||||||
job.save()
|
job.save()
|
||||||
|
@ -263,6 +264,7 @@ class RethinkDbFrontier:
|
||||||
# can't call resume_job since that would resume jobs's other sites
|
# can't call resume_job since that would resume jobs's other sites
|
||||||
job = brozzler.Job.load(self.rr, site.job_id)
|
job = brozzler.Job.load(self.rr, site.job_id)
|
||||||
job.status = "ACTIVE"
|
job.status = "ACTIVE"
|
||||||
|
site.stop_requested = None
|
||||||
job.starts_and_stops.append(
|
job.starts_and_stops.append(
|
||||||
{"start":doublethink.utcnow(), "stop":None})
|
{"start":doublethink.utcnow(), "stop":None})
|
||||||
job.save()
|
job.save()
|
||||||
|
|
|
@ -18,15 +18,15 @@ See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
'''
|
'''
|
||||||
|
|
||||||
import brozzler
|
|
||||||
import logging
|
|
||||||
import argparse
|
import argparse
|
||||||
import doublethink
|
|
||||||
import time
|
|
||||||
import datetime
|
import datetime
|
||||||
import uuid
|
import logging
|
||||||
|
|
||||||
|
import doublethink
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
import brozzler
|
||||||
|
|
||||||
args = argparse.Namespace()
|
args = argparse.Namespace()
|
||||||
args.log_level = logging.INFO
|
args.log_level = logging.INFO
|
||||||
brozzler.cli.configure_logging(args)
|
brozzler.cli.configure_logging(args)
|
||||||
|
@ -203,12 +203,12 @@ def test_resume_job():
|
||||||
assert len(job.starts_and_stops) == 2
|
assert len(job.starts_and_stops) == 2
|
||||||
assert job.starts_and_stops[1]['start']
|
assert job.starts_and_stops[1]['start']
|
||||||
assert job.starts_and_stops[1]['stop']
|
assert job.starts_and_stops[1]['stop']
|
||||||
assert job.starts_and_stops[1]['stop'] > job.starts_and_stops[0]['start']
|
assert job.starts_and_stops[1]['stop'] > job.starts_and_stops[1]['start']
|
||||||
assert site.status == 'FINISHED'
|
assert site.status == 'FINISHED'
|
||||||
assert len(site.starts_and_stops) == 2
|
assert len(site.starts_and_stops) == 2
|
||||||
assert site.starts_and_stops[1]['start']
|
assert site.starts_and_stops[1]['start']
|
||||||
assert site.starts_and_stops[1]['stop']
|
assert site.starts_and_stops[1]['stop']
|
||||||
assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[0]['start']
|
assert site.starts_and_stops[1]['stop'] > site.starts_and_stops[1]['start']
|
||||||
|
|
||||||
# resuming a job == resuming all of its sites
|
# resuming a job == resuming all of its sites
|
||||||
frontier.resume_job(job)
|
frontier.resume_job(job)
|
||||||
|
@ -230,12 +230,123 @@ def test_resume_job():
|
||||||
assert len(job.starts_and_stops) == 3
|
assert len(job.starts_and_stops) == 3
|
||||||
assert job.starts_and_stops[2]['start']
|
assert job.starts_and_stops[2]['start']
|
||||||
assert job.starts_and_stops[2]['stop']
|
assert job.starts_and_stops[2]['stop']
|
||||||
assert job.starts_and_stops[2]['stop'] > job.starts_and_stops[0]['start']
|
assert job.starts_and_stops[2]['stop'] > job.starts_and_stops[2]['start']
|
||||||
assert site.status == 'FINISHED'
|
assert site.status == 'FINISHED'
|
||||||
assert len(site.starts_and_stops) == 3
|
assert len(site.starts_and_stops) == 3
|
||||||
assert site.starts_and_stops[2]['start']
|
assert site.starts_and_stops[2]['start']
|
||||||
assert site.starts_and_stops[2]['stop']
|
assert site.starts_and_stops[2]['stop']
|
||||||
assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[0]['start']
|
assert site.starts_and_stops[2]['stop'] > site.starts_and_stops[2]['start']
|
||||||
|
|
||||||
|
frontier.resume_job(job)
|
||||||
|
site = list(frontier.job_sites(job.id))[0]
|
||||||
|
|
||||||
|
assert job.status == 'ACTIVE'
|
||||||
|
assert len(job.starts_and_stops) == 4
|
||||||
|
assert job.starts_and_stops[3]['start']
|
||||||
|
assert job.starts_and_stops[3]['stop'] is None
|
||||||
|
assert site.status == 'ACTIVE'
|
||||||
|
assert len(site.starts_and_stops) == 4
|
||||||
|
assert site.starts_and_stops[3]['start']
|
||||||
|
assert site.starts_and_stops[3]['stop'] is None
|
||||||
|
|
||||||
|
# simulate a job stop request
|
||||||
|
job_conf = {'seeds': [{'url': 'http://example.com/'}, {'url': 'http://example_2.com/'}]}
|
||||||
|
job = brozzler.new_job(frontier, job_conf)
|
||||||
|
assert len(list(frontier.job_sites(job.id))) == 2
|
||||||
|
site1 = list(frontier.job_sites(job.id))[0]
|
||||||
|
site2 = list(frontier.job_sites(job.id))[1]
|
||||||
|
|
||||||
|
job.stop_requested = datetime.datetime.utcnow().replace(tzinfo=doublethink.UTC)
|
||||||
|
job.save()
|
||||||
|
|
||||||
|
# should raise a CrawlStopped
|
||||||
|
with pytest.raises(brozzler.CrawlStopped):
|
||||||
|
frontier.honor_stop_request(site1)
|
||||||
|
|
||||||
|
frontier.finished(site1, 'FINISHED_STOP_REQUESTED')
|
||||||
|
frontier.finished(site2, 'FINISHED_STOP_REQUESTED')
|
||||||
|
job.refresh()
|
||||||
|
|
||||||
|
assert job.status == 'FINISHED'
|
||||||
|
assert job.stop_requested
|
||||||
|
assert len(job.starts_and_stops) == 1
|
||||||
|
assert job.starts_and_stops[0]['start']
|
||||||
|
assert job.starts_and_stops[0]['stop']
|
||||||
|
assert job.starts_and_stops[0]['stop'] > job.starts_and_stops[0]['start']
|
||||||
|
assert site1.status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
assert site2.status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
assert len(site1.starts_and_stops) == 1
|
||||||
|
assert len(site2.starts_and_stops) == 1
|
||||||
|
assert site1.starts_and_stops[0]['start']
|
||||||
|
assert site1.starts_and_stops[0]['stop']
|
||||||
|
assert site1.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
|
||||||
|
assert site2.starts_and_stops[0]['start']
|
||||||
|
assert site2.starts_and_stops[0]['stop']
|
||||||
|
assert site2.starts_and_stops[0]['stop'] > site.starts_and_stops[0]['start']
|
||||||
|
|
||||||
|
# simulate job resume after a stop request
|
||||||
|
frontier.resume_job(job)
|
||||||
|
site1 = list(frontier.job_sites(job.id))[0]
|
||||||
|
site2 = list(frontier.job_sites(job.id))[1]
|
||||||
|
|
||||||
|
assert job.status == 'ACTIVE'
|
||||||
|
assert job.stop_requested is None
|
||||||
|
assert len(job.starts_and_stops) == 2
|
||||||
|
assert job.starts_and_stops[1]['start']
|
||||||
|
assert job.starts_and_stops[1]['stop'] is None
|
||||||
|
assert site1.status == 'ACTIVE'
|
||||||
|
assert len(site1.starts_and_stops) == 2
|
||||||
|
assert site1.starts_and_stops[1]['start']
|
||||||
|
assert site1.starts_and_stops[1]['stop'] is None
|
||||||
|
assert site2.status == 'ACTIVE'
|
||||||
|
assert len(site2.starts_and_stops) == 2
|
||||||
|
assert site2.starts_and_stops[1]['start']
|
||||||
|
assert site2.starts_and_stops[1]['stop'] is None
|
||||||
|
|
||||||
|
# simulate a site stop request
|
||||||
|
site1.stop_requested = datetime.datetime.utcnow().replace(tzinfo=doublethink.UTC)
|
||||||
|
site1.save()
|
||||||
|
|
||||||
|
# should not raise a CrawlStopped
|
||||||
|
frontier.honor_stop_request(site2)
|
||||||
|
|
||||||
|
frontier.finished(site1, 'FINISHED_STOP_REQUESTED')
|
||||||
|
job.refresh()
|
||||||
|
|
||||||
|
assert job.status == 'ACTIVE'
|
||||||
|
assert job.stop_requested is None
|
||||||
|
assert len(job.starts_and_stops) == 2
|
||||||
|
assert job.starts_and_stops[1]['start']
|
||||||
|
assert job.starts_and_stops[1]['stop'] is None
|
||||||
|
assert site1.status == 'FINISHED_STOP_REQUESTED'
|
||||||
|
assert len(site1.starts_and_stops) == 2
|
||||||
|
assert site1.starts_and_stops[1]['start']
|
||||||
|
assert site1.starts_and_stops[1]['stop']
|
||||||
|
assert site1.starts_and_stops[1]['stop'] > site.starts_and_stops[1]['start']
|
||||||
|
assert site2.status == 'ACTIVE'
|
||||||
|
assert len(site2.starts_and_stops) == 2
|
||||||
|
assert site2.starts_and_stops[1]['start']
|
||||||
|
assert site2.starts_and_stops[1]['stop'] is None
|
||||||
|
|
||||||
|
# simulate site resume after a stop request
|
||||||
|
frontier.resume_site(site1)
|
||||||
|
site1 = list(frontier.job_sites(job.id))[0]
|
||||||
|
site2 = list(frontier.job_sites(job.id))[1]
|
||||||
|
|
||||||
|
assert job.status == 'ACTIVE'
|
||||||
|
assert job.stop_requested is None
|
||||||
|
assert len(job.starts_and_stops) == 2
|
||||||
|
assert job.starts_and_stops[1]['start']
|
||||||
|
assert job.starts_and_stops[1]['stop'] is None
|
||||||
|
assert site1.status == 'ACTIVE'
|
||||||
|
assert site1.stop_requested is None
|
||||||
|
assert len(site1.starts_and_stops) == 3
|
||||||
|
assert site1.starts_and_stops[2]['start']
|
||||||
|
assert site1.starts_and_stops[2]['stop'] is None
|
||||||
|
assert site2.status == 'ACTIVE'
|
||||||
|
assert len(site2.starts_and_stops) == 2
|
||||||
|
assert site2.starts_and_stops[1]['start']
|
||||||
|
assert site2.starts_and_stops[1]['stop'] is None
|
||||||
|
|
||||||
def test_time_limit():
|
def test_time_limit():
|
||||||
# XXX test not thoroughly adapted to change in time accounting, since
|
# XXX test not thoroughly adapted to change in time accounting, since
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue