diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index d26d436..72791eb 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -7,7 +7,6 @@ from ckan import logic from ckan.plugins import PluginImplementations from ckanext.harvest.interfaces import IHarvester - from ckan.logic import NotFound, check_access from ckanext.harvest import model as harvest_model @@ -282,28 +281,27 @@ def _get_sources_for_user(context,data_dict): HarvestSource.next_run==None) ) + user_obj = User.get(user) # Sysadmins will get all sources - if user: - user_obj = User.get(user) - if not user_obj.sysadmin: - # This only applies to a non sysadmin user when using the - # publisher auth profile. When using the default profile, - # normal users will never arrive at this point, but even if they - # do, they will get an empty list. + if user_obj and user_obj.sysadmin: + # This only applies to a non sysadmin user when using the + # publisher auth profile. When using the default profile, + # normal users will never arrive at this point, but even if they + # do, they will get an empty list. - publisher_filters = [] - publishers_for_the_user = user_obj.get_groups(u'publisher') - for publisher_id in [g.id for g in publishers_for_the_user]: - publisher_filters.append(HarvestSource.publisher_id==publisher_id) + publisher_filters = [] + publishers_for_the_user = user_obj.get_groups(u'publisher') + for publisher_id in [g.id for g in publishers_for_the_user]: + publisher_filters.append(HarvestSource.publisher_id==publisher_id) - if len(publisher_filters): - query = query.filter(or_(*publisher_filters)) - else: - # This user does not belong to a publisher yet, no sources for him/her - return [] + if len(publisher_filters): + query = query.filter(or_(*publisher_filters)) + else: + # This user does not belong to a publisher yet, no sources for him/her + return [] - log.debug('User %s with publishers %r has Harvest Sources: %r', - user, publishers_for_the_user, [(hs.id, hs.url) for hs in query]) + log.debug('User %s with publishers %r has Harvest Sources: %r', + user, publishers_for_the_user, [(hs.id, hs.url) for hs in query]) sources = query.all() diff --git a/ckanext/harvest/logic/dictization.py b/ckanext/harvest/logic/dictization.py index ae5bb39..c38559b 100644 --- a/ckanext/harvest/logic/dictization.py +++ b/ckanext/harvest/logic/dictization.py @@ -1,4 +1,5 @@ -from sqlalchemy import distinct +from sqlalchemy import distinct, func +import ckan.logic as logic from ckan.model import Package,Group from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \ @@ -31,6 +32,16 @@ def harvest_job_dictize(job, context): for obj in job.objects: out['objects'].append(obj.as_dict()) + if context.get('return_stats', True): + stats = context['model'].Session.query( + HarvestObject.report_status, + func.count(HarvestObject.id).label('total_objects'))\ + .filter_by(harvest_job_id=job.id)\ + .group_by(HarvestObject.report_status).all() + out['stats'] = {} + for status, count in stats: + out['stats'][status] = count + for error in job.gather_errors: out['gather_errors'].append(error.as_dict()) @@ -67,7 +78,7 @@ def _get_source_status(source, context): 'job_count': 0, 'next_harvest':'', 'last_harvest_request':'', - 'last_harvest_statistics':{'added':0,'updated':0,'errors':0}, + 'last_harvest_statistics':{'added':0,'updated':0,'errors':0,'deleted':0}, 'last_harvest_errors':{'gather':[],'object':[]}, 'overall_statistics':{'added':0, 'errors':0}, 'packages':[]} @@ -93,38 +104,21 @@ def _get_source_status(source, context): #TODO: Should we encode the dates as strings? out['last_harvest_request'] = str(last_job.gather_finished) - #Get HarvestObjects from last job whit links to packages if detailed: - last_objects = [obj for obj in last_job.objects if obj.package is not None] - - if len(last_objects) == 0: + harvest_job_dict = harvest_job_dictize(last_job, context) # No packages added or updated - out['last_harvest_statistics']['added'] = 0 - out['last_harvest_statistics']['updated'] = 0 - else: - # Check wether packages were added or updated - for last_object in last_objects: - # Check if the same package had been linked before - previous_objects = model.Session.query(HarvestObject) \ - .filter(HarvestObject.package==last_object.package) \ - .count() + statistics = out['last_harvest_statistics'] + statistics['added'] = harvest_job_dict['stats'].get('new',0) + statistics['updated'] = harvest_job_dict['stats'].get('updated',0) + statistics['deleted'] = harvest_job_dict['stats'].get('deleted',0) + statistics['errors'] = (harvest_job_dict['stats'].get('errored',0) + + len(last_job.gather_errors)) - if previous_objects == 1: - # It didn't previously exist, it has been added - out['last_harvest_statistics']['added'] += 1 - else: - # Pacakge already existed, but it has been updated - out['last_harvest_statistics']['updated'] += 1 - - # Last harvest errors - # We have the gathering errors in last_job.gather_errors, so let's also - # get also the object errors. - object_errors = model.Session.query(HarvestObjectError).join(HarvestObject) \ - .filter(HarvestObject.job==last_job) - - out['last_harvest_statistics']['errors'] = len(last_job.gather_errors) \ - + object_errors.count() if detailed: + # We have the gathering errors in last_job.gather_errors, so let's also + # get also the object errors. + object_errors = model.Session.query(HarvestObjectError).join(HarvestObject) \ + .filter(HarvestObject.job==last_job) for gather_error in last_job.gather_errors: out['last_harvest_errors']['gather'].append(gather_error.message) diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 1ce8a12..069156a 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -213,6 +213,7 @@ def define_harvester_tables(): Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')), Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')), Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True), nullable=True), + Column('report_status', types.UnicodeText, nullable=True), ) # New table @@ -373,6 +374,7 @@ ALTER TABLE harvest_object ADD COLUMN import_started timestamp without time zone, ADD COLUMN import_finished timestamp without time zone, ADD COLUMN "state" text; + ADD COLUMN "report_status" text; ALTER TABLE harvest_source ADD COLUMN frequency text, @@ -386,6 +388,7 @@ ALTER TABLE harvest_object_extra UPDATE harvest_object set state = 'COMPLETE'; UPDATE harvest_object set retry_times = 0; +UPDATE harvest_object set report_status = 'new'; UPDATE harvest_source set frequency = 'MANUAL'; ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey; diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 46f86ca..8fd959e 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -6,6 +6,7 @@ import pika from ckan.lib.base import config from ckan.plugins import PluginImplementations +from ckan import model from ckanext.harvest.model import HarvestJob, HarvestObject,HarvestGatherError from ckanext.harvest.interfaces import IHarvester @@ -191,6 +192,20 @@ def fetch_callback(channel, method, header, body): else: obj.state = "ERROR" obj.save() + if obj.report_status: + continue + if obj.state == 'ERROR': + obj.report_status = 'errored' + elif obj.current == False: + obj.report_status = 'deleted' + elif len(model.Session.query(HarvestObject) + .filter_by(package_id = obj.package_id) + .limit(2) + .all()) == 2: + obj.report_status = 'updated' + else: + obj.report_status = 'new' + obj.save() channel.basic_ack(method.delivery_tag) diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index b31b2d9..46ec874 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -19,9 +19,11 @@ class TestHarvester(SingletonPlugin): obj = HarvestObject(guid = 'test1', job = harvest_job) obj.extras.append(HarvestObjectExtra(key='key', value='value')) obj2 = HarvestObject(guid = 'test2', job = harvest_job) + obj3 = HarvestObject(guid = 'test_to_delete', job = harvest_job) obj.add() - obj2.save() # this will commit both - return [obj.id, obj2.id] + obj2.add() + obj3.save() # this will commit both + return [obj.id, obj2.id, obj3.id] return [] @@ -40,11 +42,31 @@ class TestHarvester(SingletonPlugin): user = logic.get_action('get_site_user')( {'model': model, 'ignore_auth': True}, {} )['name'] - logic.get_action('package_create')( + + package = json.loads(harvest_object.content) + name = package['name'] + + package_object = model.Package.get(name) + if package_object: + logic_function = 'package_update' + else: + logic_function = 'package_create' + + package_dict = logic.get_action(logic_function)( {'model': model, 'session': model.Session, 'user': user, 'api_version': 3}, json.loads(harvest_object.content) ) + + # delete test_to_delete package on second run + harvest_object.package_id = package_dict['id'] + harvest_object.current = True + if package_dict['name'] == 'test_to_delete' and package_object: + harvest_object.current = False + package_object.state = 'deleted' + package_object.save() + + harvest_object.save() return True @@ -58,7 +80,7 @@ class TestHarvestQueue(object): model.repo.rebuild_db() - def test_01_basic_harvester(cls): + def test_01_basic_harvester(self): ### make sure queues/exchanges are created first and are empty consumer = queue.get_consumer('ckan.harvest.gather','harvest_job_id') @@ -117,16 +139,19 @@ class TestHarvestQueue(object): all_objects = model.Session.query(HarvestObject).all() - assert len(all_objects) == 2 + assert len(all_objects) == 3 assert all_objects[0].state == 'WAITING' assert all_objects[1].state == 'WAITING' + assert all_objects[2].state == 'WAITING' - assert len(model.Session.query(HarvestObject).all()) == 2 + assert len(model.Session.query(HarvestObject).all()) == 3 assert len(model.Session.query(HarvestObjectExtra).all()) == 1 - ## do twice as two harvest objects - reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') + ## do three times as three harvest objects + reply = consumer.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer, *reply) + reply = consumer.basic_get(queue='ckan.harvest.fetch') queue.fetch_callback(consumer, *reply) reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch') queue.fetch_callback(consumer, *reply) @@ -134,12 +159,16 @@ class TestHarvestQueue(object): count = model.Session.query(model.Package) \ .filter(model.Package.type==None) \ .count() - assert count == 2 + assert count == 3 + all_objects = model.Session.query(HarvestObject).filter_by(current=True).all() - all_objects = model.Session.query(HarvestObject).all() - assert len(all_objects) == 2 + assert len(all_objects) == 3 assert all_objects[0].state == 'COMPLETE' + assert all_objects[0].report_status == 'new' assert all_objects[1].state == 'COMPLETE' + assert all_objects[1].report_status == 'new' + assert all_objects[2].state == 'COMPLETE' + assert all_objects[2].report_status == 'new' ## fire run again to check if job is set to Finished try: @@ -150,7 +179,85 @@ class TestHarvestQueue(object): except Exception, e: assert 'There are no new harvesting jobs' in str(e) - assert logic.get_action('harvest_job_show')( + harvest_job = logic.get_action('harvest_job_show')( context, {'id': job_id} - )['status'] == u'Finished' + ) + + assert harvest_job['status'] == u'Finished' + assert harvest_job['stats'] == {'new': 3} + + context['detailed'] = True + + harvest_source_dict = logic.get_action('harvest_source_show')( + context, + {'id': harvest_source['id']} + ) + + assert harvest_source_dict['status']['last_harvest_statistics'] == {'updated': 0, 'added': 3, 'deleted': 0, 'errors': 0L} + assert harvest_source_dict['status']['overall_statistics'] == {'added': 3L, 'errors': 0L} + + + ########### Second run ######################## + + harvest_job = logic.get_action('harvest_job_create')( + context, + {'source_id':harvest_source['id']} + ) + + logic.get_action('harvest_jobs_run')( + context, + {'source_id':harvest_source['id']} + ) + + job_id = harvest_job['id'] + + ## pop on item off the queue and run the callback + reply = consumer.basic_get(queue='ckan.harvest.gather') + queue.gather_callback(consumer, *reply) + + all_objects = model.Session.query(HarvestObject).all() + + assert len(all_objects) == 6 + + reply = consumer.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer, *reply) + reply = consumer.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer, *reply) + reply = consumer.basic_get(queue='ckan.harvest.fetch') + queue.fetch_callback(consumer, *reply) + + assert len(model.Session.query(model.Package).all()) == 3 + + all_objects = model.Session.query(HarvestObject).filter_by(report_status='new').all() + assert len(all_objects) == 3, len(all_objects) + + all_objects = model.Session.query(HarvestObject).filter_by(report_status='updated').all() + assert len(all_objects) == 2, len(all_objects) + + all_objects = model.Session.query(HarvestObject).filter_by(report_status='deleted').all() + assert len(all_objects) == 1, len(all_objects) + + # run to make sure job is marked as finshed + try: + logic.get_action('harvest_jobs_run')( + context, + {'source_id':harvest_source['id']} + ) + except Exception, e: + assert 'There are no new harvesting jobs' in str(e) + + harvest_job = logic.get_action('harvest_job_show')( + context, + {'id': job_id} + ) + assert harvest_job['stats'] == {'updated': 2, 'deleted': 1} + + context['detailed'] = True + harvest_source_dict = logic.get_action('harvest_source_show')( + context, + {'id': harvest_source['id']} + ) + + assert harvest_source_dict['status']['last_harvest_statistics'] == {'updated': 2, 'added': 0, 'deleted': 1, 'errors': 0L} + assert harvest_source_dict['status']['overall_statistics'] == {'added': 2L, 'errors': 0L}