Merge branch 'release-v2.0' into 2.0-dataset-sources

Conflicts:
	ckanext/harvest/logic/action/get.py
	ckanext/harvest/tests/test_queue.py
This commit is contained in:
amercader 2012-12-20 16:16:30 +00:00
commit fdac761fba
5 changed files with 164 additions and 46 deletions

View File

@ -8,7 +8,6 @@ from ckan import logic
from ckan.plugins import PluginImplementations from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester from ckanext.harvest.interfaces import IHarvester
from ckan.logic import NotFound, check_access from ckan.logic import NotFound, check_access
from ckanext.harvest import model as harvest_model from ckanext.harvest import model as harvest_model
@ -283,13 +282,13 @@ def _get_sources_for_user(context,data_dict):
HarvestSource.next_run==None) HarvestSource.next_run==None)
) )
user_obj = User.get(user)
# Sysadmins will get all sources # Sysadmins will get all sources
if not Authorizer().is_sysadmin(user): if user_obj and user_obj.sysadmin:
# This only applies to a non sysadmin user when using the # This only applies to a non sysadmin user when using the
# publisher auth profile. When using the default profile, # publisher auth profile. When using the default profile,
# normal users will never arrive at this point, but even if they # normal users will never arrive at this point, but even if they
# do, they will get an empty list. # do, they will get an empty list.
user_obj = User.get(user)
publisher_filters = [] publisher_filters = []
publishers_for_the_user = user_obj.get_groups(u'publisher') publishers_for_the_user = user_obj.get_groups(u'publisher')

View File

@ -1,4 +1,5 @@
from sqlalchemy import distinct from sqlalchemy import distinct, func
import ckan.logic as logic
from ckan.model import Package,Group from ckan.model import Package,Group
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \
@ -31,6 +32,16 @@ def harvest_job_dictize(job, context):
for obj in job.objects: for obj in job.objects:
out['objects'].append(obj.as_dict()) out['objects'].append(obj.as_dict())
if context.get('return_stats', True):
stats = context['model'].Session.query(
HarvestObject.report_status,
func.count(HarvestObject.id).label('total_objects'))\
.filter_by(harvest_job_id=job.id)\
.group_by(HarvestObject.report_status).all()
out['stats'] = {}
for status, count in stats:
out['stats'][status] = count
for error in job.gather_errors: for error in job.gather_errors:
out['gather_errors'].append(error.as_dict()) out['gather_errors'].append(error.as_dict())
@ -67,7 +78,7 @@ def _get_source_status(source, context):
'job_count': 0, 'job_count': 0,
'next_harvest':'', 'next_harvest':'',
'last_harvest_request':'', 'last_harvest_request':'',
'last_harvest_statistics':{'added':0,'updated':0,'errors':0}, 'last_harvest_statistics':{'added':0,'updated':0,'errors':0,'deleted':0},
'last_harvest_errors':{'gather':[],'object':[]}, 'last_harvest_errors':{'gather':[],'object':[]},
'overall_statistics':{'added':0, 'errors':0}, 'overall_statistics':{'added':0, 'errors':0},
'packages':[]} 'packages':[]}
@ -93,38 +104,21 @@ def _get_source_status(source, context):
#TODO: Should we encode the dates as strings? #TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished) out['last_harvest_request'] = str(last_job.gather_finished)
#Get HarvestObjects from last job whit links to packages
if detailed: if detailed:
last_objects = [obj for obj in last_job.objects if obj.package is not None] harvest_job_dict = harvest_job_dictize(last_job, context)
if len(last_objects) == 0:
# No packages added or updated # No packages added or updated
out['last_harvest_statistics']['added'] = 0 statistics = out['last_harvest_statistics']
out['last_harvest_statistics']['updated'] = 0 statistics['added'] = harvest_job_dict['stats'].get('new',0)
else: statistics['updated'] = harvest_job_dict['stats'].get('updated',0)
# Check wether packages were added or updated statistics['deleted'] = harvest_job_dict['stats'].get('deleted',0)
for last_object in last_objects: statistics['errors'] = (harvest_job_dict['stats'].get('errored',0) +
# Check if the same package had been linked before len(last_job.gather_errors))
previous_objects = model.Session.query(HarvestObject) \
.filter(HarvestObject.package==last_object.package) \
.count()
if previous_objects == 1:
# It didn't previously exist, it has been added
out['last_harvest_statistics']['added'] += 1
else:
# Pacakge already existed, but it has been updated
out['last_harvest_statistics']['updated'] += 1
# Last harvest errors
# We have the gathering errors in last_job.gather_errors, so let's also
# get also the object errors.
object_errors = model.Session.query(HarvestObjectError).join(HarvestObject) \
.filter(HarvestObject.job==last_job)
out['last_harvest_statistics']['errors'] = len(last_job.gather_errors) \
+ object_errors.count()
if detailed: if detailed:
# We have the gathering errors in last_job.gather_errors, so let's also
# get also the object errors.
object_errors = model.Session.query(HarvestObjectError).join(HarvestObject) \
.filter(HarvestObject.job==last_job)
for gather_error in last_job.gather_errors: for gather_error in last_job.gather_errors:
out['last_harvest_errors']['gather'].append(gather_error.message) out['last_harvest_errors']['gather'].append(gather_error.message)

View File

@ -213,6 +213,7 @@ def define_harvester_tables():
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')), Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')), Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True), nullable=True), Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True), nullable=True),
Column('report_status', types.UnicodeText, nullable=True),
) )
# New table # New table
@ -373,6 +374,7 @@ ALTER TABLE harvest_object
ADD COLUMN import_started timestamp without time zone, ADD COLUMN import_started timestamp without time zone,
ADD COLUMN import_finished timestamp without time zone, ADD COLUMN import_finished timestamp without time zone,
ADD COLUMN "state" text; ADD COLUMN "state" text;
ADD COLUMN "report_status" text;
ALTER TABLE harvest_source ALTER TABLE harvest_source
ADD COLUMN frequency text, ADD COLUMN frequency text,
@ -386,6 +388,7 @@ ALTER TABLE harvest_object_extra
UPDATE harvest_object set state = 'COMPLETE'; UPDATE harvest_object set state = 'COMPLETE';
UPDATE harvest_object set retry_times = 0; UPDATE harvest_object set retry_times = 0;
UPDATE harvest_object set report_status = 'new';
UPDATE harvest_source set frequency = 'MANUAL'; UPDATE harvest_source set frequency = 'MANUAL';
ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey; ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey;

View File

@ -6,6 +6,7 @@ import pika
from ckan.lib.base import config from ckan.lib.base import config
from ckan.plugins import PluginImplementations from ckan.plugins import PluginImplementations
from ckan import model
from ckanext.harvest.model import HarvestJob, HarvestObject,HarvestGatherError from ckanext.harvest.model import HarvestJob, HarvestObject,HarvestGatherError
from ckanext.harvest.interfaces import IHarvester from ckanext.harvest.interfaces import IHarvester
@ -191,6 +192,20 @@ def fetch_callback(channel, method, header, body):
else: else:
obj.state = "ERROR" obj.state = "ERROR"
obj.save() obj.save()
if obj.report_status:
continue
if obj.state == 'ERROR':
obj.report_status = 'errored'
elif obj.current == False:
obj.report_status = 'deleted'
elif len(model.Session.query(HarvestObject)
.filter_by(package_id = obj.package_id)
.limit(2)
.all()) == 2:
obj.report_status = 'updated'
else:
obj.report_status = 'new'
obj.save()
channel.basic_ack(method.delivery_tag) channel.basic_ack(method.delivery_tag)

View File

@ -19,9 +19,11 @@ class TestHarvester(SingletonPlugin):
obj = HarvestObject(guid = 'test1', job = harvest_job) obj = HarvestObject(guid = 'test1', job = harvest_job)
obj.extras.append(HarvestObjectExtra(key='key', value='value')) obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid = 'test2', job = harvest_job) obj2 = HarvestObject(guid = 'test2', job = harvest_job)
obj3 = HarvestObject(guid = 'test_to_delete', job = harvest_job)
obj.add() obj.add()
obj2.save() # this will commit both obj2.add()
return [obj.id, obj2.id] obj3.save() # this will commit both
return [obj.id, obj2.id, obj3.id]
return [] return []
@ -40,11 +42,31 @@ class TestHarvester(SingletonPlugin):
user = logic.get_action('get_site_user')( user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {} {'model': model, 'ignore_auth': True}, {}
)['name'] )['name']
logic.get_action('package_create')(
package = json.loads(harvest_object.content)
name = package['name']
package_object = model.Package.get(name)
if package_object:
logic_function = 'package_update'
else:
logic_function = 'package_create'
package_dict = logic.get_action(logic_function)(
{'model': model, 'session': model.Session, {'model': model, 'session': model.Session,
'user': user, 'api_version': 3}, 'user': user, 'api_version': 3},
json.loads(harvest_object.content) json.loads(harvest_object.content)
) )
# delete test_to_delete package on second run
harvest_object.package_id = package_dict['id']
harvest_object.current = True
if package_dict['name'] == 'test_to_delete' and package_object:
harvest_object.current = False
package_object.state = 'deleted'
package_object.save()
harvest_object.save()
return True return True
@ -58,7 +80,7 @@ class TestHarvestQueue(object):
model.repo.rebuild_db() model.repo.rebuild_db()
def test_01_basic_harvester(cls): def test_01_basic_harvester(self):
### make sure queues/exchanges are created first and are empty ### make sure queues/exchanges are created first and are empty
consumer = queue.get_consumer('ckan.harvest.gather','harvest_job_id') consumer = queue.get_consumer('ckan.harvest.gather','harvest_job_id')
@ -117,16 +139,19 @@ class TestHarvestQueue(object):
all_objects = model.Session.query(HarvestObject).all() all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 2 assert len(all_objects) == 3
assert all_objects[0].state == 'WAITING' assert all_objects[0].state == 'WAITING'
assert all_objects[1].state == 'WAITING' assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 2 assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1 assert len(model.Session.query(HarvestObjectExtra).all()) == 1
## do twice as two harvest objects ## do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply) queue.fetch_callback(consumer, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply) queue.fetch_callback(consumer, *reply)
@ -134,12 +159,16 @@ class TestHarvestQueue(object):
count = model.Session.query(model.Package) \ count = model.Session.query(model.Package) \
.filter(model.Package.type==None) \ .filter(model.Package.type==None) \
.count() .count()
assert count == 2 assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
all_objects = model.Session.query(HarvestObject).all() assert len(all_objects) == 3
assert len(all_objects) == 2
assert all_objects[0].state == 'COMPLETE' assert all_objects[0].state == 'COMPLETE'
assert all_objects[0].report_status == 'new'
assert all_objects[1].state == 'COMPLETE' assert all_objects[1].state == 'COMPLETE'
assert all_objects[1].report_status == 'new'
assert all_objects[2].state == 'COMPLETE'
assert all_objects[2].report_status == 'new'
## fire run again to check if job is set to Finished ## fire run again to check if job is set to Finished
try: try:
@ -150,7 +179,85 @@ class TestHarvestQueue(object):
except Exception, e: except Exception, e:
assert 'There are no new harvesting jobs' in str(e) assert 'There are no new harvesting jobs' in str(e)
assert logic.get_action('harvest_job_show')( harvest_job = logic.get_action('harvest_job_show')(
context, context,
{'id': job_id} {'id': job_id}
)['status'] == u'Finished' )
assert harvest_job['status'] == u'Finished'
assert harvest_job['stats'] == {'new': 3}
context['detailed'] = True
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert harvest_source_dict['status']['last_harvest_statistics'] == {'updated': 0, 'added': 3, 'deleted': 0, 'errors': 0L}
assert harvest_source_dict['status']['overall_statistics'] == {'added': 3L, 'errors': 0L}
########### Second run ########################
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id':harvest_source['id']}
)
logic.get_action('harvest_jobs_run')(
context,
{'source_id':harvest_source['id']}
)
job_id = harvest_job['id']
## pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
all_objects = model.Session.query(HarvestObject).all()
assert len(all_objects) == 6
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
reply = consumer.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer, *reply)
assert len(model.Session.query(model.Package).all()) == 3
all_objects = model.Session.query(HarvestObject).filter_by(report_status='new').all()
assert len(all_objects) == 3, len(all_objects)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all()
assert len(all_objects) == 2, len(all_objects)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all()
assert len(all_objects) == 1, len(all_objects)
# run to make sure job is marked as finshed
try:
logic.get_action('harvest_jobs_run')(
context,
{'source_id':harvest_source['id']}
)
except Exception, e:
assert 'There are no new harvesting jobs' in str(e)
harvest_job = logic.get_action('harvest_job_show')(
context,
{'id': job_id}
)
assert harvest_job['stats'] == {'updated': 2, 'deleted': 1}
context['detailed'] = True
harvest_source_dict = logic.get_action('harvest_source_show')(
context,
{'id': harvest_source['id']}
)
assert harvest_source_dict['status']['last_harvest_statistics'] == {'updated': 2, 'added': 0, 'deleted': 1, 'errors': 0L}
assert harvest_source_dict['status']['overall_statistics'] == {'added': 2L, 'errors': 0L}