Adds test for queue.resubmit_objects()

This commit is contained in:
seitenbau-govdata 2021-11-19 15:19:54 +01:00
parent 9d5679f046
commit 0a06cad506
1 changed files with 128 additions and 57 deletions

View File

@ -6,7 +6,7 @@ from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan.plugins import toolkit
from ckan import model
from ckan.lib.base import config
import uuid
@ -44,7 +44,7 @@ class MockHarvester(SingletonPlugin):
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None
user = logic.get_action('get_site_user')(
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
@ -57,7 +57,7 @@ class MockHarvester(SingletonPlugin):
else:
logic_function = 'package_create'
package_dict = logic.get_action(logic_function)(
package_dict = toolkit.get_action(logic_function)(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True},
json.loads(harvest_object.content)
@ -97,58 +97,14 @@ class TestHarvestQueue(object):
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
source_dict = {
'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test',
}
harvest_source = logic.get_action('harvest_source_create')(
context,
source_dict
)
assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
@ -173,12 +129,12 @@ class TestHarvestQueue(object):
assert all_objects[2].report_status == 'added'
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)
@ -186,7 +142,7 @@ class TestHarvestQueue(object):
assert harvest_job['status'] == u'Finished'
assert harvest_job['stats'] == {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0}
harvest_source_dict = logic.get_action('harvest_source_show')(
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
@ -197,13 +153,13 @@ class TestHarvestQueue(object):
assert harvest_source_dict['status']['job_count'] == 1
# Second run
harvest_job = logic.get_action('harvest_job_create')(
harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
)
job_id = harvest_job['id']
assert logic.get_action('harvest_job_show')(
assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
@ -238,18 +194,18 @@ class TestHarvestQueue(object):
assert len(all_objects) == 1
# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
toolkit.get_action('harvest_jobs_run')(
context,
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
harvest_job = toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert harvest_job['stats'] == {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1}
harvest_source_dict = logic.get_action('harvest_source_show')(
harvest_source_dict = toolkit.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
@ -295,6 +251,121 @@ class TestHarvestQueue(object):
finally:
redis.delete('ckanext-harvest:some-random-key')
def test_resubmit_objects(self):
'''
Test that only harvest objects re-submitted which were not be present in the redis fetch queue.
'''
if config.get('ckan.harvest.mq.type') != 'redis':
pytest.skip()
redis = queue.get_connection()
fetch_routing_key = queue.get_fetch_routing_key()
redis.flushdb()
try:
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3, 'ignore_auth': True}
harvest_source, job_id = self._create_harvest_job_and_finish_gather_stage(consumer, context)
assert redis.llen(fetch_routing_key) == 3
# do only one time for the first harvest object
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type == 'dataset') \
.count()
assert count == 1
all_objects = model.Session.query(HarvestObject).order_by(HarvestObject.state.asc()).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'COMPLETE'
assert all_objects[0].report_status == 'added'
assert all_objects[0].current is True
assert all_objects[1].state == 'WAITING'
assert all_objects[1].current is False
assert all_objects[2].state == 'WAITING'
assert all_objects[2].current is False
assert len(redis.keys(fetch_routing_key + ':*')) == 0
assert redis.llen(fetch_routing_key) == 2
# Remove one object from redis that should be re-sent to the fetch queue
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert len(fetch_queue_items) == 1
harvest_object_id = reply[2]
assert fetch_queue_items[0] != harvest_object_id
queue.resubmit_objects()
assert redis.llen(fetch_routing_key) == 2
fetch_queue_items = redis.lrange(fetch_routing_key, 0, 10)
assert harvest_object_id in fetch_queue_items
assert redis.dbsize() == 1
finally:
redis.flushdb()
def _create_harvest_job_and_finish_gather_stage(self, consumer, context):
source_dict = {'title': 'Test Source',
'name': 'test-source',
'url': 'basic_test',
'source_type': 'test'}
try:
harvest_source = toolkit.get_action('harvest_source_create')(
context,
source_dict)
except toolkit.ValidationError:
harvest_source = toolkit.get_action('harvest_source_show')(
context,
{'id': source_dict['name']}
)
pass
assert harvest_source['source_type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = toolkit.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True})
job_id = harvest_job['id']
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
assert harvest_job['status'] == u'Running'
assert toolkit.get_action('harvest_job_show')(
context,
{'id': job_id}
)['status'] == u'Running'
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
return harvest_source, job_id
class TestHarvestCorruptRedis(object):