Merge pull request #482 from GovDataOfficial/improve-test-coverage-and-fixes-timeout-calculation

Improve test coverage and fixes timeout calculation
This commit is contained in:
Stefan Oderbolz 2021-11-29 19:25:07 +01:00 committed by GitHub
commit d84d847b09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 149 additions and 66 deletions

View File

@ -614,7 +614,7 @@ def harvest_jobs_run(context, data_dict):
job_obj = HarvestJob.get(job['id'])
if timeout:
last_time = job_obj.get_last_action_time()
now = datetime.datetime.now()
now = datetime.datetime.utcnow()
if now - last_time > datetime.timedelta(minutes=int(timeout)):
msg = 'Job {} timeout ({} minutes)\n'.format(job_obj.id, timeout)
msg += '\tJob created: {}\n'.format(job_obj.created)

View File

@ -132,14 +132,16 @@ def resubmit_jobs():
# fetch queue
harvest_object_pending = redis.keys(get_fetch_routing_key() + ':*')
for key in harvest_object_pending:
redis_key = redis.get(key)
if redis_key is None:
log.info('Fetch Queue: Redis cannot get key {}'.format(key))
redis_value = redis.get(key)
if redis_value is None:
log.info('Fetch Queue: Redis cannot get value for key {}'.format(key))
continue
date_of_key = datetime.datetime.strptime(
redis_key, "%Y-%m-%d %H:%M:%S.%f")
redis_value, "%Y-%m-%d %H:%M:%S.%f")
log.debug('[Fetch queue]: Check key {} with value {}'.format(key, date_of_key))
# 3 minutes for fetch and import max
if (datetime.datetime.now() - date_of_key).seconds > 180:
log.debug('[Fetch queue]: Re-new harvest object with KEY {} in redis'.format(key))
redis.rpush(get_fetch_routing_key(),
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
@ -148,14 +150,16 @@ def resubmit_jobs():
# gather queue
harvest_jobs_pending = redis.keys(get_gather_routing_key() + ':*')
for key in harvest_jobs_pending:
redis_key = redis.get(key)
if redis_key is None:
log.info('Gather Queue: Redis cannot get key {}'.format(key))
redis_value = redis.get(key)
if redis_value is None:
log.info('Gather Queue: Redis cannot get value for key {}'.format(key))
continue
date_of_key = datetime.datetime.strptime(
redis_key, "%Y-%m-%d %H:%M:%S.%f")
redis_value, "%Y-%m-%d %H:%M:%S.%f")
log.debug('[Gather queue]: Check key {} with value {}'.format(key, date_of_key))
# 3 hours for a gather
if (datetime.datetime.now() - date_of_key).seconds > 7200:
log.debug('[Gather queue]: Re-new harvest job with KEY {} in redis'.format(key))
redis.rpush(get_gather_routing_key(),
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
@ -186,6 +190,8 @@ def resubmit_objects():
log.debug('Re-sent object {} to the fetch queue'.format(object_id))
publisher.send({'harvest_object_id': object_id})
publisher.close()
class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):

View File

@ -6,7 +6,7 @@ from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan.plugins import toolkit
from ckan import model
from ckan.lib.base import config
import uuid
@ -44,7 +44,7 @@ class MockHarvester(SingletonPlugin):
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None
user = logic.get_action('get_site_user')(
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
@ -57,7 +57,7 @@ class MockHarvester(SingletonPlugin):
else:
logic_function = 'package_create'
package_dict = logic.get_action(logic_function)(
package_dict = toolkit.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True},
json.loads(harvest_object.content)
@ -91,64 +91,25 @@ class TestHarvestQueue(object):
def test_01_basic_harvester(self):
if config.get('ckan.harvest.mq.type') == 'redis':
# make sure that there are no old elements in the redis db
redis = queue.get_connection()
redis.flushdb()
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
@ -173,12 +134,12 @@ class TestHarvestQueue(object):
assert all_objects[2].report_status == 'added'
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)
@ -186,7 +147,7 @@ class TestHarvestQueue(object):
assert harvest_job['status'] == u'Finished'
assert harvest_job['stats'] == {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0}
harvest_source_dict = logic.get_action('harvest_source_show')(
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
@ -197,13 +158,13 @@ class TestHarvestQueue(object):
assert harvest_source_dict['status']['job_count'] == 1
# Second run
harvest_job = logic.get_action('harvest_job_create')(
harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert logic.get_action('harvest_job_show')(
assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
@ -238,18 +199,18 @@ class TestHarvestQueue(object):
assert len(all_objects) == 1
# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert harvest_job['stats'] == {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}
harvest_source_dict = logic.get_action('harvest_source_show')(
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
@ -295,6 +256,122 @@ class TestHarvestQueue(object):
finally:
redis.delete('ckanext-harvest:some-random-key')
def test_resubmit_objects(self):
'''
Test that only harvest objects re-submitted which were not be present in the redis fetch queue.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
pytest.skip()
# make sure that there are no old elements in the redis db
redis = queue.get_connection()
fetch_routing_key = queue.get_fetch_routing_key()
redis.flushdb()
try:
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)
assert redis.llen(fetch_routing_key) == 3
# do only one time for the first harvest object
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 1
all_objects = model.Session.query(HarvestObject).order_by(HarvestObject.state.asc()).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'COMPLETE'
assert all_objects[0].report_status == 'added'
assert all_objects[0].current is True
assert all_objects[1].state == 'WAITING'
assert all_objects[1].current is False
assert all_objects[2].state == 'WAITING'
assert all_objects[2].current is False
assert len(redis.keys(fetch_routing_key + ':*')) == 0
assert redis.llen(fetch_routing_key) == 2
# Remove one object from redis that should be re-sent to the fetch queue
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert len(fetch_queue_items) == 1
harvest_object_id = reply[2]
assert fetch_queue_items[0] != harvest_object_id
queue.resubmit_objects()
assert redis.llen(fetch_routing_key) == 2
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert harvest_object_id in fetch_queue_items
assert redis.dbsize() == 1
finally:
redis.flushdb()
def _create_harvest_job_and_finish_gather_stage(self, consumer, context):
source_dict = {'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test'}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass
assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True})
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
return harvest_source, job_id
class TestHarvestCorruptRedis(object):