diff --git a/README.rst b/README.rst index c3337c6..03cdfc8 100644 --- a/README.rst +++ b/README.rst @@ -430,17 +430,24 @@ following methods:: ''' The import stage will receive a HarvestObject object and will be responsible for: - - performing any necessary action with the fetched object (e.g - create a CKAN package). + - performing any necessary action with the fetched object (e.g. + create, update or delete a CKAN package). Note: if this stage creates or updates a package, a reference to the package should be added to the HarvestObject. - - creating the HarvestObject - Package relation (if necessary) + - setting the HarvestObject.package (if there is one) + - setting the HarvestObject.current for this harvest: + - True if successfully created/updated + - False if successfully deleted + - setting HarvestObject.current to False for previous harvest + objects of this harvest source if the action was successful. - creating and storing any suitable HarvestObjectErrors that may occur. - - returning True if everything went as expected, False otherwise. + - returning True if the action was done, "unchanged" if nothing + was needed doing after all or False if there were errors. :param harvest_object: HarvestObject object - :returns: True if everything went right, False if errors were found + :returns: True if the action was done, "unchanged" if nothing was + needed doing after all and False if something went wrong. ''' diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 1fd0ebc..314ab4b 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -240,6 +240,10 @@ class HarvesterBase(SingletonPlugin): If the remote server provides the modification date of the remote package, add it to package_dict['metadata_modified']. + :returns: The same as what import_stage should return. i.e. True if the + create or update occurred ok, 'unchanged' if it didn't need + updating or False if there were errors. + TODO: Not sure it is worth keeping this function. If useful it should use the output of package_show logic function (maybe keeping support @@ -280,7 +284,10 @@ class HarvesterBase(SingletonPlugin): data_dict = {} data_dict['id'] = package_dict['id'] try: - existing_package_dict = get_action('package_show')(context, data_dict) + package_show_context = {'model': model, 'session': Session, + 'ignore_auth': True} + existing_package_dict = get_action('package_show')( + package_show_context, data_dict) # In case name has been modified when first importing. See issue #101. package_dict['name'] = existing_package_dict['name'] @@ -297,7 +304,8 @@ class HarvesterBase(SingletonPlugin): else: log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid) - return + # NB harvest_object.current/package_id are not set + return 'unchanged' # Flag the other objects linking to this package as not current anymore from ckanext.harvest.model import harvest_object_table diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 0dcb3b4..4fd26e7 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -206,7 +206,7 @@ class CKANHarvester(HarvesterBase): package_ids = revision['packages'] else: log.info('No packages have been updated on the remote CKAN instance since the last harvest job') - return None + return [] except urllib2.HTTPError,e: if e.getcode() == 400: @@ -427,14 +427,20 @@ class CKANHarvester(HarvesterBase): package_dict['extras'][key] = value - # Clear remote url_type for resources (eg datastore, upload) as we - # are only creating normal resources with links to the remote ones for resource in package_dict.get('resources', []): + # Clear remote url_type for resources (eg datastore, upload) as + # we are only creating normal resources with links to the + # remote ones resource.pop('url_type', None) + # Clear revision_id as the revision won't exist on this CKAN + # and saving it will cause an IntegrityError with the foreign + # key. + resource.pop('revision_id', None) + result = self._create_or_update_package(package_dict,harvest_object) - if result and self.config.get('read_only',False) == True: + if result is True and self.config.get('read_only', False) is True: package = model.Package.get(package_dict['id']) @@ -451,8 +457,7 @@ class CKANHarvester(HarvesterBase): user = model.User.get(user_name) pkg_role = model.PackageRole(package=package, user=user, role=model.Role.READER) - - return True + return result except ValidationError,e: self._save_object_error('Invalid package with GUID %s: %r' % (harvest_object.guid, e.error_dict), harvest_object, 'Import') diff --git a/ckanext/harvest/interfaces.py b/ckanext/harvest/interfaces.py index c0ed689..99022ca 100644 --- a/ckanext/harvest/interfaces.py +++ b/ckanext/harvest/interfaces.py @@ -106,15 +106,22 @@ class IHarvester(Interface): ''' The import stage will receive a HarvestObject object and will be responsible for: - - performing any necessary action with the fetched object (e.g - create a CKAN package). + - performing any necessary action with the fetched object (e.g. + create, update or delete a CKAN package). Note: if this stage creates or updates a package, a reference to the package should be added to the HarvestObject. - - creating the HarvestObject - Package relation (if necessary) + - setting the HarvestObject.package (if there is one) + - setting the HarvestObject.current for this harvest: + - True if successfully created/updated + - False if successfully deleted + - setting HarvestObject.current to False for previous harvest + objects of this harvest source if the action was successful. - creating and storing any suitable HarvestObjectErrors that may occur. - - returning True if everything went as expected, False otherwise. + - returning True if the action was done, "unchanged" if nothing + was needed doing after all or False if there were errors. :param harvest_object: HarvestObject object - :returns: True if everything went right, False if errors were found + :returns: True if the action was done, "unchanged" if nothing was + needed doing after all and False if something went wrong. ''' diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 18c32af..450f2e8 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -419,6 +419,8 @@ def harvest_jobs_run(context, data_dict): .first() if last_object: job_obj.finished = last_object.import_finished + else: + job_obj.finished = job['gather_finished'] job_obj.save() # Reindex the harvest source dataset so it has the latest # status diff --git a/ckanext/harvest/logic/dictization.py b/ckanext/harvest/logic/dictization.py index ce88ee5..430d39c 100644 --- a/ckanext/harvest/logic/dictization.py +++ b/ckanext/harvest/logic/dictization.py @@ -37,7 +37,8 @@ def harvest_job_dictize(job, context): func.count(HarvestObject.id).label('total_objects'))\ .filter_by(harvest_job_id=job.id)\ .group_by(HarvestObject.report_status).all() - out['stats'] = {'added': 0, 'updated': 0, 'errors': 0, 'deleted': 0} + out['stats'] = {'added': 0, 'updated': 0, 'not modified': 0, + 'errors': 0, 'deleted': 0} for status, count in stats: out['stats'][status] = count diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py index 4a30a54..5a813a5 100644 --- a/ckanext/harvest/model/__init__.py +++ b/ckanext/harvest/model/__init__.py @@ -218,7 +218,7 @@ def define_harvester_tables(): Column('guid', types.UnicodeText, default=u''), # When you harvest a dataset multiple times, only the latest # successfully imported harvest_object should be flagged 'current'. - # The import_stage reads and writes it. + # The import_stage usually reads and writes it. Column('current',types.Boolean,default=False), Column('gathered', types.DateTime, default=datetime.datetime.utcnow), Column('fetch_started', types.DateTime), @@ -233,6 +233,7 @@ def define_harvester_tables(): Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')), Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')), Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True), nullable=True), + # report_status: 'added', 'updated', 'not modified', 'deleted', 'errored' Column('report_status', types.UnicodeText, nullable=True), ) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 548c6a4..e320d16 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -347,14 +347,16 @@ def fetch_and_import_stages(harvester, obj): obj.import_finished = datetime.datetime.utcnow() if success_import: obj.state = "COMPLETE" + if success_import is 'unchanged': + obj.report_status = 'not modified' + obj.save() + return else: obj.state = "ERROR" obj.save() else: obj.state = "ERROR" obj.save() - if obj.report_status: - return if obj.state == 'ERROR': obj.report_status = 'errored' elif obj.current == False: diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index b384eb8..64ae101 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -440,7 +440,7 @@ REVISIONS = [ "approved_timestamp": None, "packages": [ - "dataset1" + DATASETS[1]['name'] ], "groups": [ ] }, @@ -452,7 +452,7 @@ REVISIONS = [ "approved_timestamp": None, "packages": [ - "dataset1" + DATASETS[1]['name'] ], "groups": [ ] }] diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index 89b3089..b5b87ca 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -1,6 +1,9 @@ +import copy from nose.tools import assert_equal import json +from mock import patch + try: from ckan.tests.helpers import reset_db from ckan.tests.factories import Organization @@ -92,15 +95,21 @@ class TestCkanHarvester(object): run_harvest( url='http://localhost:%s/' % mock_ckan.PORT, harvester=CKANHarvester()) - results_by_guid = run_harvest( - url='http://localhost:%s/' % mock_ckan.PORT, - harvester=CKANHarvester()) + + # change the modified date + datasets = copy.deepcopy(mock_ckan.DATASETS) + datasets[1]['metadata_modified'] = '2050-05-09T22:00:01.486366' + with patch('ckanext.harvest.tests.harvesters.mock_ckan.DATASETS', + datasets): + results_by_guid = run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) # updated the dataset which has revisions - result = results_by_guid['dataset1'] + result = results_by_guid[mock_ckan.DATASETS[1]['name']] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'updated') - assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) assert_equal(result['errors'], []) # the other dataset is unchanged and not harvested @@ -134,3 +143,20 @@ class TestCkanHarvester(object): config=json.dumps(config)) assert 'dataset1-id' in results_by_guid assert mock_ckan.DATASETS[1]['id'] not in results_by_guid + + def test_harvest_not_modified(self): + run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + + results_by_guid = run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) + + # The metadata_modified was the same for this dataset so the import + # would have returned 'unchanged' + result = results_by_guid[mock_ckan.DATASETS[1]['name']] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'not modified') + assert 'dataset' not in result + assert_equal(result['errors'], []) diff --git a/ckanext/harvest/tests/lib.py b/ckanext/harvest/tests/lib.py index ff04f26..72f82c0 100644 --- a/ckanext/harvest/tests/lib.py +++ b/ckanext/harvest/tests/lib.py @@ -43,7 +43,8 @@ def run_harvest_job(job, harvester): if harvest_object.state == 'COMPLETE' and harvest_object.package_id: results_by_guid[guid]['dataset'] = \ toolkit.get_action('package_show')( - {}, dict(id=harvest_object.package_id)) + {'ignore_auth': True}, + dict(id=harvest_object.package_id)) results_by_guid[guid]['errors'] = harvest_object.errors # Do 'harvest_jobs_run' to change the job status to 'finished' diff --git a/ckanext/harvest/tests/test_queue.py b/ckanext/harvest/tests/test_queue.py index bdd0954..ae13a34 100644 --- a/ckanext/harvest/tests/test_queue.py +++ b/ckanext/harvest/tests/test_queue.py @@ -13,7 +13,7 @@ from ckan import model from nose.tools import assert_equal -class TestHarvester(SingletonPlugin): +class MockHarvester(SingletonPlugin): implements(IHarvester) def info(self): return {'name': 'test', 'title': 'test', 'description': 'test'} @@ -196,14 +196,14 @@ class TestHarvestQueue(object): ) assert_equal(harvest_job['status'], u'Finished') - assert_equal(harvest_job['stats'], {'added': 3, 'updated': 0, 'errors': 0, 'deleted': 0}) + assert_equal(harvest_job['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errors': 0, 'deleted': 0}) harvest_source_dict = logic.get_action('harvest_source_show')( context, {'id': harvest_source['id']} ) - assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0, 'errors': 0, 'deleted': 0}) + assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errors': 0, 'deleted': 0}) assert_equal(harvest_source_dict['status']['total_datasets'], 3) assert_equal(harvest_source_dict['status']['job_count'], 1) @@ -267,7 +267,7 @@ class TestHarvestQueue(object): context, {'id': job_id} ) - assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'errors': 0, 'deleted': 1}) + assert_equal(harvest_job['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errors': 0, 'deleted': 1}) context['detailed'] = True harvest_source_dict = logic.get_action('harvest_source_show')( @@ -275,6 +275,6 @@ class TestHarvestQueue(object): {'id': harvest_source['id']} ) - assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2, 'errors': 0, 'deleted': 1}) + assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errors': 0, 'deleted': 1}) assert_equal(harvest_source_dict['status']['total_datasets'], 2) assert_equal(harvest_source_dict['status']['job_count'], 2) diff --git a/ckanext/harvest/tests/test_queue2.py b/ckanext/harvest/tests/test_queue2.py new file mode 100644 index 0000000..33b5bd3 --- /dev/null +++ b/ckanext/harvest/tests/test_queue2.py @@ -0,0 +1,169 @@ +'''Tests elements of queue.py, but doesn't use the queue subsystem +(redis/rabbitmq) +''' +import json + +from nose.tools import assert_equal + +try: + from ckan.tests.helpers import reset_db +except ImportError: + from ckan.new_tests.helpers import reset_db +from ckan import model +from ckan import plugins as p +from ckan.plugins import toolkit + +from ckanext.harvest.tests.factories import (HarvestObjectObj) +from ckanext.harvest.interfaces import IHarvester +import ckanext.harvest.model as harvest_model +from ckanext.harvest.tests.lib import run_harvest + + +class MockHarvester(p.SingletonPlugin): + p.implements(IHarvester) + + @classmethod + def _set_test_params(cls, guid, **test_params): + cls._guid = guid + cls._test_params = test_params + + def info(self): + return {'name': 'test', 'title': 'test', 'description': 'test'} + + def gather_stage(self, harvest_job): + obj = HarvestObjectObj(guid=self._guid, job=harvest_job) + return [obj.id] + + def fetch_stage(self, harvest_object): + harvest_object.content = json.dumps({'name': harvest_object.guid}) + harvest_object.save() + return True + + def import_stage(self, harvest_object): + user = toolkit.get_action('get_site_user')( + {'model': model, 'ignore_auth': True}, {} + )['name'] + + package = json.loads(harvest_object.content) + name = package['name'] + + package_object = model.Package.get(name) + if package_object: + logic_function = 'package_update' + else: + logic_function = 'package_create' + + package_dict = toolkit.get_action(logic_function)( + {'model': model, 'session': model.Session, + 'user': user}, + json.loads(harvest_object.content) + ) + + if self._test_params.get('object_error'): + return False + + # successful, so move 'current' to this object + previous_object = model.Session.query(harvest_model.HarvestObject) \ + .filter_by(guid=harvest_object.guid) \ + .filter_by(current=True) \ + .first() + if previous_object: + previous_object.current = False + previous_object.save() + harvest_object.package_id = package_dict['id'] + harvest_object.current = True + + if self._test_params.get('delete'): + # 'current=False' is the key step in getting report_status to be + # set as 'deleted' + harvest_object.current = False + package_object.save() + + harvest_object.save() + + if self._test_params.get('object_unchanged'): + return 'unchanged' + return True + + +class TestEndStates(object): + @classmethod + def setup_class(cls): + reset_db() + harvest_model.setup() + + def test_create_dataset(self): + guid = 'obj-create' + MockHarvester._set_test_params(guid=guid) + + results_by_guid = run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + + result = results_by_guid[guid] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'added') + assert_equal(result['errors'], []) + + def test_update_dataset(self): + guid = 'obj-update' + MockHarvester._set_test_params(guid=guid) + + # create the original harvest_object and dataset + run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + # update it + results_by_guid = run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + + result = results_by_guid[guid] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'updated') + assert_equal(result['errors'], []) + + def test_delete_dataset(self): + guid = 'obj-delete' + MockHarvester._set_test_params(guid=guid) + # create the original harvest_object and dataset + run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + MockHarvester._set_test_params(guid=guid, delete=True) + + # delete it + results_by_guid = run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + + result = results_by_guid[guid] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'deleted') + assert_equal(result['errors'], []) + + def test_obj_error(self): + guid = 'obj-error' + MockHarvester._set_test_params(guid=guid, object_error=True) + + results_by_guid = run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + + result = results_by_guid[guid] + assert_equal(result['state'], 'ERROR') + assert_equal(result['report_status'], 'errored') + assert_equal(result['errors'], []) + + def test_unchanged(self): + guid = 'obj-error' + MockHarvester._set_test_params(guid=guid, object_unchanged=True) + + results_by_guid = run_harvest( + url='http://some-url.com', + harvester=MockHarvester()) + + result = results_by_guid[guid] + assert_equal(result['state'], 'COMPLETE') + assert_equal(result['report_status'], 'not modified') + assert_equal(result['errors'], []) diff --git a/setup.py b/setup.py index 5275010..cfabe8a 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,7 @@ setup( harvest=ckanext.harvest.plugin:Harvest ckan_harvester=ckanext.harvest.harvesters:CKANHarvester [ckan.test_plugins] - test_harvester=ckanext.harvest.tests.test_queue:TestHarvester + test_harvester=ckanext.harvest.tests.test_queue:MockHarvester test_action_harvester=ckanext.harvest.tests.test_action:MockHarvesterForActionTests [paste.paster_command] harvester = ckanext.harvest.commands.harvester:Harvester