Merge pull request #431 from avdata99/fix_timeout
Fix last harvest time to avoid timeouts
This commit is contained in:
commit
6222e38c85
|
@ -176,12 +176,31 @@ class HarvestJob(HarvestDomainObject):
|
||||||
|
|
||||||
return query
|
return query
|
||||||
|
|
||||||
|
def get_last_gathered_object(self):
|
||||||
|
''' Determine the last gathered object in this job
|
||||||
|
Helpful to know if a job is running or not and
|
||||||
|
to avoid timeouts when the source is running
|
||||||
|
'''
|
||||||
|
|
||||||
|
query = Session.query(HarvestObject)\
|
||||||
|
.filter(HarvestObject.harvest_job_id == self.id)\
|
||||||
|
.order_by(HarvestObject.gathered.desc())\
|
||||||
|
.first()
|
||||||
|
|
||||||
|
return query
|
||||||
|
|
||||||
def get_last_action_time(self):
|
def get_last_action_time(self):
|
||||||
last_object = self.get_last_finished_object()
|
last_object = self.get_last_finished_object()
|
||||||
if last_object is not None:
|
if last_object is not None:
|
||||||
return last_object.import_finished
|
return last_object.import_finished
|
||||||
|
|
||||||
if self.gather_finished is not None:
|
if self.gather_finished is not None:
|
||||||
return self.gather_finished
|
return self.gather_finished
|
||||||
|
|
||||||
|
last_gathered_object = self.get_last_gathered_object()
|
||||||
|
if last_gathered_object is not None:
|
||||||
|
return last_gathered_object.gathered
|
||||||
|
|
||||||
return self.created
|
return self.created
|
||||||
|
|
||||||
def get_gather_errors(self):
|
def get_gather_errors(self):
|
||||||
|
|
|
@ -13,9 +13,10 @@ from ckanext.harvest.logic import HarvestJobExists
|
||||||
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'harvest_setup', 'clean_queues')
|
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'harvest_setup', 'clean_queues')
|
||||||
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_action_harvester')
|
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_action_harvester')
|
||||||
class TestModelFunctions:
|
class TestModelFunctions:
|
||||||
|
dataset_counter = 0
|
||||||
|
|
||||||
def test_timeout_jobs(self):
|
def test_timeout_jobs(self):
|
||||||
""" Create harvest spurce, job and objects
|
""" Create harvest source, job and objects
|
||||||
Validate we read the last object fished time
|
Validate we read the last object fished time
|
||||||
Validate we raise timeout in harvest_jobs_run_action
|
Validate we raise timeout in harvest_jobs_run_action
|
||||||
"""
|
"""
|
||||||
|
@ -69,6 +70,17 @@ class TestModelFunctions:
|
||||||
assert_equal(job.get_last_finished_object(), None)
|
assert_equal(job.get_last_finished_object(), None)
|
||||||
assert_equal(job.get_last_action_time(), job.created)
|
assert_equal(job.get_last_action_time(), job.created)
|
||||||
|
|
||||||
|
def test_gather_get_last_action_time(self):
|
||||||
|
""" Test get_last_action_time at gather stage """
|
||||||
|
source, job = self.get_source()
|
||||||
|
|
||||||
|
ob1 = self.add_object(job=job, source=source, state='WAITING')
|
||||||
|
ob2 = self.add_object(job=job, source=source, state='WAITING')
|
||||||
|
ob3 = self.add_object(job=job, source=source, state='WAITING')
|
||||||
|
|
||||||
|
assert_equal(job.get_last_gathered_object(), ob3)
|
||||||
|
assert_equal(job.get_last_action_time(), ob3.gathered)
|
||||||
|
|
||||||
def run(self, timeout, source, job):
|
def run(self, timeout, source, job):
|
||||||
""" Run the havester_job_run and return the errors """
|
""" Run the havester_job_run and return the errors """
|
||||||
|
|
||||||
|
@ -118,9 +130,10 @@ class TestModelFunctions:
|
||||||
|
|
||||||
return source, job
|
return source, job
|
||||||
|
|
||||||
def add_object(self, job, source, state, minutes_ago):
|
def add_object(self, job, source, state, minutes_ago=0):
|
||||||
now = datetime.utcnow()
|
now = datetime.utcnow()
|
||||||
name = 'dataset-{}-{}'.format(state.lower(), minutes_ago)
|
self.dataset_counter += 1
|
||||||
|
name = 'dataset-{}-{}'.format(state.lower(), self.dataset_counter)
|
||||||
dataset = ckan_factories.Dataset(name=name)
|
dataset = ckan_factories.Dataset(name=name)
|
||||||
obj = harvest_factories.HarvestObjectObj(
|
obj = harvest_factories.HarvestObjectObj(
|
||||||
job=job,
|
job=job,
|
||||||
|
@ -132,6 +145,7 @@ class TestModelFunctions:
|
||||||
)
|
)
|
||||||
|
|
||||||
obj.state = state
|
obj.state = state
|
||||||
|
if minutes_ago > 0:
|
||||||
obj.import_finished = now - timedelta(minutes=minutes_ago)
|
obj.import_finished = now - timedelta(minutes=minutes_ago)
|
||||||
obj.save()
|
obj.save()
|
||||||
return obj
|
return obj
|
||||||
|
|
Loading…
Reference in New Issue