Update queue tests
This commit is contained in:
parent
dcfd201cdd
commit
70dfee1a36
|
@ -58,6 +58,15 @@ class TestHarvester(SingletonPlugin):
|
|||
json.loads(harvest_object.content)
|
||||
)
|
||||
|
||||
# set previous objects to not current
|
||||
previous_object = model.Session.query(HarvestObject) \
|
||||
.filter(HarvestObject.guid==harvest_object.guid) \
|
||||
.filter(HarvestObject.current==True) \
|
||||
.first()
|
||||
if previous_object:
|
||||
previous_object.current = False
|
||||
previous_object.save()
|
||||
|
||||
# delete test_to_delete package on second run
|
||||
harvest_object.package_id = package_dict['id']
|
||||
harvest_object.current = True
|
||||
|
@ -135,6 +144,7 @@ class TestHarvestQueue(object):
|
|||
|
||||
## pop on item off the queue and run the callback
|
||||
reply = consumer.basic_get(queue='ckan.harvest.gather')
|
||||
|
||||
queue.gather_callback(consumer, *reply)
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).all()
|
||||
|
@ -149,26 +159,26 @@ class TestHarvestQueue(object):
|
|||
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
|
||||
|
||||
## do three times as three harvest objects
|
||||
reply = consumer.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer, *reply)
|
||||
reply = consumer.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer, *reply)
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
|
||||
count = model.Session.query(model.Package) \
|
||||
.filter(model.Package.type==None) \
|
||||
.filter(model.Package.type=='dataset') \
|
||||
.count()
|
||||
assert count == 3
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
|
||||
|
||||
assert len(all_objects) == 3
|
||||
assert all_objects[0].state == 'COMPLETE'
|
||||
assert all_objects[0].report_status == 'new'
|
||||
assert all_objects[0].report_status == 'added'
|
||||
assert all_objects[1].state == 'COMPLETE'
|
||||
assert all_objects[1].report_status == 'new'
|
||||
assert all_objects[1].report_status == 'added'
|
||||
assert all_objects[2].state == 'COMPLETE'
|
||||
assert all_objects[2].report_status == 'new'
|
||||
assert all_objects[2].report_status == 'added'
|
||||
|
||||
## fire run again to check if job is set to Finished
|
||||
try:
|
||||
|
@ -185,21 +195,19 @@ class TestHarvestQueue(object):
|
|||
)
|
||||
|
||||
assert harvest_job['status'] == u'Finished'
|
||||
assert harvest_job['stats'] == {'new': 3}
|
||||
|
||||
context['detailed'] = True
|
||||
assert harvest_job['stats'] == {'added': 3}
|
||||
|
||||
harvest_source_dict = logic.get_action('harvest_source_show')(
|
||||
context,
|
||||
{'id': harvest_source['id']}
|
||||
)
|
||||
|
||||
assert harvest_source_dict['status']['last_harvest_statistics'] == {'updated': 0, 'added': 3, 'deleted': 0, 'errors': 0L}
|
||||
assert harvest_source_dict['status']['overall_statistics'] == {'added': 3L, 'errors': 0L}
|
||||
assert harvest_source_dict['status']['last_job']['stats'] == {'added': 3}
|
||||
assert harvest_source_dict['status']['total_datasets'] == 3
|
||||
assert harvest_source_dict['status']['job_count'] == 1
|
||||
|
||||
|
||||
########### Second run ########################
|
||||
|
||||
harvest_job = logic.get_action('harvest_job_create')(
|
||||
context,
|
||||
{'source_id':harvest_source['id']}
|
||||
|
@ -211,6 +219,10 @@ class TestHarvestQueue(object):
|
|||
)
|
||||
|
||||
job_id = harvest_job['id']
|
||||
assert logic.get_action('harvest_job_show')(
|
||||
context,
|
||||
{'id': job_id}
|
||||
)['status'] == u'Running'
|
||||
|
||||
## pop on item off the queue and run the callback
|
||||
reply = consumer.basic_get(queue='ckan.harvest.gather')
|
||||
|
@ -220,16 +232,19 @@ class TestHarvestQueue(object):
|
|||
|
||||
assert len(all_objects) == 6
|
||||
|
||||
reply = consumer.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer, *reply)
|
||||
reply = consumer.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer, *reply)
|
||||
reply = consumer.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
|
||||
queue.fetch_callback(consumer_fetch, *reply)
|
||||
|
||||
assert len(model.Session.query(model.Package).all()) == 3
|
||||
count = model.Session.query(model.Package) \
|
||||
.filter(model.Package.type=='dataset') \
|
||||
.count()
|
||||
assert count == 3
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(report_status='new').all()
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
|
||||
assert len(all_objects) == 3, len(all_objects)
|
||||
|
||||
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
|
||||
|
@ -259,5 +274,6 @@ class TestHarvestQueue(object):
|
|||
{'id': harvest_source['id']}
|
||||
)
|
||||
|
||||
assert harvest_source_dict['status']['last_harvest_statistics'] == {'updated': 2, 'added': 0, 'deleted': 1, 'errors': 0L}
|
||||
assert harvest_source_dict['status']['overall_statistics'] == {'added': 2L, 'errors': 0L}
|
||||
assert harvest_source_dict['status']['last_job']['stats'] == {'updated': 2, 'deleted': 1}
|
||||
assert harvest_source_dict['status']['total_datasets'] == 2
|
||||
assert harvest_source_dict['status']['job_count'] == 2
|
||||
|
|
Loading…
Reference in New Issue