harvester-d4science/ckanext/harvest/tests/test_queue.py

124 lines
4.1 KiB
Python
Raw Normal View History

import ckanext.harvest.model as harvest_model
2012-10-25 20:01:54 +02:00
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.queue as queue
from ckan.plugins.core import SingletonPlugin, implements
import json
import ckan.logic as logic
from ckan import model
class TestHarvester(SingletonPlugin):
implements(IHarvester)
def info(self):
return {'name': 'test', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
2012-11-05 14:17:32 +01:00
if harvest_job.source.url.startswith('basic_test'):
obj = HarvestObject(guid = 'test1', job = harvest_job)
2012-10-25 20:01:54 +02:00
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid = 'test2', job = harvest_job)
obj.add()
obj2.save() # this will commit both
return [obj.id, obj2.id]
return []
def fetch_stage(self, harvest_object):
2012-10-25 20:01:54 +02:00
assert harvest_object.state == "FETCH"
assert harvest_object.fetch_started != None
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
2012-10-25 20:01:54 +02:00
assert harvest_object.state == "IMPORT"
assert harvest_object.fetch_finished != None
assert harvest_object.import_started != None
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
logic.get_action('package_create')(
{'model': model, 'session': model.Session,
'user': user, 'api_version': 3},
json.loads(harvest_object.content)
)
return True
class TestHarvestQueue(object):
@classmethod
def setup_class(cls):
harvest_model.setup()
@classmethod
def teardown_class(cls):
model.repo.rebuild_db()
def test_01_basic_harvester(cls):
### make sure queues/exchanges are created first and are empty
consumer = queue.get_consumer('ckan.harvest.gather','harvest_job_id')
consumer_fetch = queue.get_consumer('ckan.harvest.fetch','harvest_object_id')
consumer.queue_purge(queue='ckan.harvest.gather')
consumer.queue_purge(queue='ckan.harvest.fetch')
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
context = {'model': model, 'session': model.Session,
'user': user, 'api_version': 3}
harvest_source = logic.get_action('harvest_source_create')(
context,
{'type':'test', 'url': 'basic_test'}
)
assert harvest_source['type'] == 'test', harvest_source
assert harvest_source['url'] == 'basic_test', harvest_source
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id':harvest_source['id']}
)
assert harvest_job['source_id'] == harvest_source['id'], harvest_job
harvest_job = logic.get_action('harvest_jobs_run')(
context,
{'source_id':harvest_source['id']}
)
## pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
2012-10-25 20:01:54 +02:00
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 2
assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING'
2012-10-25 20:01:54 +02:00
assert len(model.Session.query(HarvestObject).all()) == 2
2012-10-25 20:01:54 +02:00
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
## do twice as two harvest objects
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
assert len(model.Session.query(model.Package).all()) == 2
2012-10-25 20:01:54 +02:00
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 2
assert all_objects[0].state == 'COMPLETE'
assert all_objects[1].state == 'COMPLETE'