Fix last harvest time to avoid timeouts

This commit is contained in:
Andres Vazquez 2021-02-03 10:50:12 -03:00
parent d00553831e
commit eb246439c6
2 changed files with 37 additions and 4 deletions

View File

@ -176,12 +176,31 @@ class HarvestJob(HarvestDomainObject):
return query
def get_last_gathered_object(self):
''' Determine the last gathered object in this job
Helpful to know if a job is running or not and
to avoid timeouts when the source is running
'''
query = Session.query(HarvestObject)\
.filter(HarvestObject.harvest_job_id == self.id)\
.order_by(HarvestObject.gathered.desc())\
.first()
return query
def get_last_action_time(self):
last_object = self.get_last_finished_object()
if last_object is not None:
return last_object.import_finished
if self.gather_finished is not None:
return self.gather_finished
last_gathered_object = self.get_last_gathered_object()
if last_gathered_object is not None:
return last_gathered_object.gathered
return self.created
def get_gather_errors(self):

View File

@ -13,9 +13,10 @@ from ckanext.harvest.logic import HarvestJobExists
@pytest.mark.usefixtures('with_plugins', 'clean_db', 'harvest_setup', 'clean_queues')
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_action_harvester')
class TestModelFunctions:
dataset_counter = 0
def test_timeout_jobs(self):
""" Create harvest spurce, job and objects
""" Create harvest source, job and objects
Validate we read the last object fished time
Validate we raise timeout in harvest_jobs_run_action
"""
@ -69,6 +70,17 @@ class TestModelFunctions:
assert_equal(job.get_last_finished_object(), None)
assert_equal(job.get_last_action_time(), job.created)
def test_gather_get_last_action_time(self):
""" Test get_last_action_time at gather stage """
source, job = self.get_source()
ob1 = self.add_object(job=job, source=source, state='WAITING')
ob2 = self.add_object(job=job, source=source, state='WAITING')
ob3 = self.add_object(job=job, source=source, state='WAITING')
assert_equal(job.get_last_gathered_object(), ob3)
assert_equal(job.get_last_action_time(), ob3.gathered)
def run(self, timeout, source, job):
""" Run the havester_job_run and return the errors """
@ -118,9 +130,10 @@ class TestModelFunctions:
return source, job
def add_object(self, job, source, state, minutes_ago):
def add_object(self, job, source, state, minutes_ago=0):
now = datetime.utcnow()
name = 'dataset-{}-{}'.format(state.lower(), minutes_ago)
self.dataset_counter += 1
name = 'dataset-{}-{}'.format(state.lower(), self.dataset_counter)
dataset = ckan_factories.Dataset(name=name)
obj = harvest_factories.HarvestObjectObj(
job=job,
@ -132,6 +145,7 @@ class TestModelFunctions:
)
obj.state = state
if minutes_ago > 0:
obj.import_finished = now - timedelta(minutes=minutes_ago)
obj.save()
return obj