From 6dd40bfcf96d3e75597ed142ad3ce80a838aa98a Mon Sep 17 00:00:00 2001 From: Ross Jones Date: Thu, 10 Sep 2015 16:46:25 +0100 Subject: [PATCH 01/18] Changes the gather state to use v3 API Rather than using the revisions in v2 API this now uses the package_search API so that we can extend it with proper filters in future. --- ckanext/harvest/harvesters/base.py | 13 ++--- ckanext/harvest/harvesters/ckanharvester.py | 54 +++++++++------------ 2 files changed, 28 insertions(+), 39 deletions(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 41a5061..dfc756e 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -150,9 +150,9 @@ class HarvesterBase(SingletonPlugin): ''' Creates a new package or updates an exisiting one according to the package dictionary provided. The package dictionary should look like - the REST API response for a package: + the Action API response for a package: - http://ckan.net/api/rest/package/statistics-catalunya + http://ckan.net/api/action/package_show?id=statistics-catalunya Note that the package_dict must contain an id, which will be used to check if the package needs to be created or updated (use the remote @@ -161,10 +161,6 @@ class HarvesterBase(SingletonPlugin): If the remote server provides the modification date of the remote package, add it to package_dict['metadata_modified']. - - TODO: Not sure it is worth keeping this function. If useful it should - use the output of package_show logic function (maybe keeping support - for rest api based dicts ''' try: # Change default schema @@ -199,6 +195,7 @@ class HarvesterBase(SingletonPlugin): # Check if package exists data_dict = {} + data_dict['id'] = package_dict['id'] try: existing_package_dict = get_action('package_show')(context, data_dict) @@ -214,7 +211,7 @@ class HarvesterBase(SingletonPlugin): context.update({'id':package_dict['id']}) package_dict.setdefault('name', existing_package_dict['name']) - new_package = get_action('package_update_rest')(context, package_dict) + new_package = get_action('package_update')(context, package_dict) else: log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid) @@ -258,7 +255,7 @@ class HarvesterBase(SingletonPlugin): model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') model.Session.flush() - new_package = get_action('package_create_rest')(context, package_dict) + new_package = get_action('package_create')(context, package_dict) Session.commit() diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 6339969..625ee0f 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -6,6 +6,7 @@ from ckan.model import Session, Package from ckan.logic import ValidationError, NotFound, get_action from ckan.lib.helpers import json from ckan.lib.munge import munge_name +from urlparse import urljoin from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ HarvestObjectError @@ -24,14 +25,11 @@ class CKANHarvester(HarvesterBase): api_version = 2 action_api_version = 3 - def _get_rest_api_offset(self): - return '/api/%d/rest' % self.api_version - def _get_action_api_offset(self): return '/api/%d/action' % self.action_api_version def _get_search_api_offset(self): - return '/api/%d/search' % self.api_version + return "%s/package_search" % self._get_action_api_offset() def _get_content(self, url): http_request = urllib2.Request( @@ -46,13 +44,13 @@ class CKANHarvester(HarvesterBase): http_response = urllib2.urlopen(http_request) except urllib2.URLError, e: raise ContentFetchError( - 'Could not fetch url: %s, error: %s' % + 'Could not fetch url: %s, error: %s' % (url, str(e)) ) return http_response.read() def _get_group(self, base_url, group_name): - url = base_url + self._get_rest_api_offset() + '/group/' + munge_name(group_name) + url = base_url + self._get_action_api_offset() + '/group_show?id=' + munge_name(group_name) try: content = self._get_content(url) return json.loads(content) @@ -157,34 +155,28 @@ class CKANHarvester(HarvesterBase): # Get source URL base_url = harvest_job.source.url.rstrip('/') - base_rest_url = base_url + self._get_rest_api_offset() base_search_url = base_url + self._get_search_api_offset() + log.debug("%r", previous_job) + if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0): if not self.config.get('force_all',False): get_all_packages = False # Request only the packages modified since last harvest job last_time = previous_job.gather_finished.isoformat() - url = base_search_url + '/revision?since_time=%s' % last_time + log.info("Searching for datasets modified since: %s", last_time) + + fq = "metadata_modified:[{last_check} TO *]".format(last_check=last_time) + url = base_search_url + '?fq={fq}&rows=1000'.format(fq) try: content = self._get_content(url) - revision_ids = json.loads(content) - if len(revision_ids): - for revision_id in revision_ids: - url = base_rest_url + '/revision/%s' % revision_id - try: - content = self._get_content(url) - except ContentFetchError,e: - self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) - continue - - revision = json.loads(content) - for package_id in revision['packages']: - if not package_id in package_ids: - package_ids.append(package_id) + package_dicts = json.loads(content).get('result', {}).get('results', []) + for package in package_dicts: + if not package['id'] in package_ids: + package_ids.append(package['id']) else: log.info('No packages have been updated on the remote CKAN instance since the last harvest job') return None @@ -197,23 +189,21 @@ class CKANHarvester(HarvesterBase): self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None - - if get_all_packages: # Request all remote packages - url = base_rest_url + '/package' + url = urljoin(base_url,self._get_action_api_offset() + '/package_list') try: content = self._get_content(url) except ContentFetchError,e: self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None - - package_ids = json.loads(content) + package_ids = json.loads(content).get('result',[]) try: object_ids = [] if len(package_ids): for package_id in package_ids: + log.debug("Creating harvestjob for %s", package_id) # Create a new HarvestObject for this identifier obj = HarvestObject(guid = package_id, job = harvest_job) obj.save() @@ -236,7 +226,7 @@ class CKANHarvester(HarvesterBase): # Get source URL url = harvest_object.source.url.rstrip('/') - url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid + url = url + self._get_action_api_offset() + '/package_show?id=' + harvest_object.guid # Get contents try: @@ -246,8 +236,10 @@ class CKANHarvester(HarvesterBase): (url, e),harvest_object) return None + content = json.loads(content)['result'] + # Save the fetched contents in the HarvestObject - harvest_object.content = content + harvest_object.content = json.dumps(content) harvest_object.save() return True @@ -374,7 +366,7 @@ class CKANHarvester(HarvesterBase): # Find any extras whose values are not strings and try to convert # them to strings, as non-string extras are not allowed anymore in # CKAN 2.0. - for key in package_dict['extras'].keys(): + for key in package_dict.get('extras', {}).keys(): if not isinstance(package_dict['extras'][key], basestring): try: package_dict['extras'][key] = json.dumps( @@ -390,7 +382,7 @@ class CKANHarvester(HarvesterBase): if not 'extras' in package_dict: package_dict['extras'] = {} for key,value in default_extras.iteritems(): - if not key in package_dict['extras'] or override_extras: + if not key in package_dict.get('extras', {}) or override_extras: # Look for replacement strings if isinstance(value,basestring): value = value.format(harvest_source_id=harvest_object.job.source.id, From b56fae8aed58684cc13653426d663069fc3d073f Mon Sep 17 00:00:00 2001 From: David Read Date: Fri, 23 Oct 2015 17:30:28 +0000 Subject: [PATCH 02/18] Fixes and tests * Fix extras as a list of dicts * Fix SOLR dates syntax - needed a Z * Basic tests for this updated ckan harvester * Now require CKAN 2.0 to be able to be able to save these packages in package_show form. Take advantage of this now we are such various imports from are definitely available, such as munge_tag. * Add back compatibility for other harvesters supplying restful-like package_dicts to _create_or_update_package TODO add back in the ability to harvest pre 2.0 CKANs with the RESTful calls (fallback or maybe configurable) --- README.rst | 24 ++++ ckanext/harvest/harvesters/base.py | 60 +++++---- ckanext/harvest/harvesters/ckanharvester.py | 114 +++++++++--------- ckanext/harvest/tests/harvesters/mock_ckan.py | 21 +++- .../tests/harvesters/test_ckanharvester.py | 24 ++-- 5 files changed, 149 insertions(+), 94 deletions(-) diff --git a/README.rst b/README.rst index 1d21059..65c5129 100644 --- a/README.rst +++ b/README.rst @@ -8,9 +8,13 @@ ckanext-harvest - Remote harvesting extension This extension provides a common harvesting framework for ckan extensions and adds a CLI and a WUI to CKAN to manage harvesting sources and jobs. + Installation ============ +This extension requires CKAN v2.0 or later, although the CKAN harvester can +harvest from CKANs of earlier versions. + 1. The harvest extension can use two different backends. You can choose whichever you prefer depending on your needs, but Redis has been found to be more stable and reliable so it is the recommended one: @@ -572,6 +576,26 @@ following steps with the one you are using. You can of course modify this periodicity, this `Wikipedia page `_ has a good overview of the crontab syntax. +Tests +===== + +You can run the tests like this: + + cd ckanext-harvest + nosetests --reset-db --ckan --with-pylons=test-core.ini ckanext/harvest/tests + +Here are some common errors and solutions: + +* ``(OperationalError) no such table: harvest_object_error u'delete from "harvest_object_error"`` + The database has got into in a bad state. Run the tests again but with the ``--reset-db`` parameter. + +* ``(ProgrammingError) relation "harvest_object_extra" does not exist`` + The database has got into in a bad state. Run the tests again but *without* the ``--reset-db`` parameter. + +* ``(OperationalError) near "SET": syntax error`` + You are testing with SQLite as the database, but the CKAN Harvester needs PostgreSQL. Specify test-core.ini instead of test.ini. + + Community ========= diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 6187644..6c5eeab 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -2,21 +2,20 @@ import logging import re import uuid -from sqlalchemy.sql import update,and_, bindparam +from sqlalchemy.sql import update, bindparam from sqlalchemy.exc import InvalidRequestError from pylons import config from ckan import plugins as p from ckan import model from ckan.model import Session, Package, PACKAGE_NAME_MAX_LENGTH -from ckan.logic import ValidationError, NotFound, get_action from ckan.logic.schema import default_create_package_schema -from ckan.lib.navl.validators import ignore_missing,ignore -from ckan.lib.munge import munge_title_to_name,substitute_ascii_equivalents +from ckan.lib.navl.validators import ignore_missing, ignore +from ckan.lib.munge import munge_title_to_name, munge_tag -from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ - HarvestObjectError +from ckanext.harvest.model import (HarvestObject, HarvestGatherError, + HarvestObjectError) from ckan.plugins.core import SingletonPlugin, implements from ckanext.harvest.interfaces import IHarvester @@ -25,12 +24,6 @@ from ckanext.harvest.interfaces import IHarvester log = logging.getLogger(__name__) -def munge_tag(tag): - tag = substitute_ascii_equivalents(tag) - tag = tag.lower().strip() - return re.sub(r'[^a-zA-Z0-9 -]', '', tag).replace(' ', '-') - - class HarvesterBase(SingletonPlugin): ''' Generic class for harvesters with helper functions @@ -224,14 +217,32 @@ class HarvesterBase(SingletonPlugin): except Exception, e: self._save_gather_error('%r' % e.message, harvest_job) - - def _create_or_update_package(self, package_dict, harvest_object): + def _create_or_update_package(self, package_dict, harvest_object, + package_dict_form='rest'): ''' Creates a new package or updates an exisiting one according to the - package dictionary provided. The package dictionary should look like - the Action API response for a package: + package dictionary provided. - http://ckan.net/api/action/package_show?id=statistics-catalunya + The package dictionary can be in one of two forms: + + 1. 'rest' - as seen on the RESTful API: + + http://datahub.io/api/rest/dataset/1996_population_census_data_canada + + This is the legacy form. It is the default to provide backward + compatibility. + + * 'extras' is a dict e.g. {'theme': 'health', 'sub-theme': 'cancer'} + * 'tags' is a list of strings e.g. ['large-river', 'flood'] + + 2. 'package_show' form, as provided by the Action API (CKAN v2.0+): + + http://datahub.io/api/action/package_show?id=1996_population_census_data_canada + * 'extras' is a list of dicts + e.g. [{'key': 'theme', 'value': 'health'}, + {'key': 'sub-theme', 'value': 'cancer'}] + * 'tags' is a list of dicts + e.g. [{'name': 'large-river'}, {'name': 'flood'}] Note that the package_dict must contain an id, which will be used to check if the package needs to be created or updated (use the remote @@ -241,6 +252,7 @@ class HarvesterBase(SingletonPlugin): package, add it to package_dict['metadata_modified']. ''' + assert package_dict_form in ('rest', 'package_show') try: # Change default schema schema = default_create_package_schema() @@ -277,7 +289,9 @@ class HarvesterBase(SingletonPlugin): data_dict['id'] = package_dict['id'] try: - existing_package_dict = get_action('package_show')(context, data_dict) + existing_package_dict = p.toolkit.get_action( + 'package_show' if package_dict_form == 'package_show' + else 'package_show_rest')(context, data_dict) # In case name has been modified when first importing. See issue #101. package_dict['name'] = existing_package_dict['name'] @@ -290,7 +304,7 @@ class HarvesterBase(SingletonPlugin): context.update({'id':package_dict['id']}) package_dict.setdefault('name', existing_package_dict['name']) - new_package = get_action('package_update')(context, package_dict) + new_package = p.toolkit.get_action('package_update')(context, package_dict) else: log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid) @@ -310,7 +324,7 @@ class HarvesterBase(SingletonPlugin): harvest_object.current = True harvest_object.save() - except NotFound: + except p.toolkit.ObjectNotFound: # Package needs to be created # Get rid of auth audit on the context otherwise we'll get an @@ -334,13 +348,15 @@ class HarvesterBase(SingletonPlugin): model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') model.Session.flush() - new_package = get_action('package_create')(context, package_dict) + new_package = p.toolkit.get_action( + 'package_create' if package_dict_form == 'package_show' + else 'package_create_rest')(context, package_dict) Session.commit() return True - except ValidationError,e: + except p.toolkit.ValidationError, e: log.exception(e) self._save_object_error('Invalid package with GUID %s: %r'%(harvest_object.guid,e.error_dict),harvest_object,'Import') except Exception, e: diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 625ee0f..3977e77 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -6,16 +6,17 @@ from ckan.model import Session, Package from ckan.logic import ValidationError, NotFound, get_action from ckan.lib.helpers import json from ckan.lib.munge import munge_name +from ckan.plugins import toolkit from urlparse import urljoin -from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ - HarvestObjectError +from ckanext.harvest.model import HarvestJob, HarvestObject import logging log = logging.getLogger(__name__) from base import HarvesterBase + class CKANHarvester(HarvesterBase): ''' A Harvester for CKAN instances @@ -140,6 +141,7 @@ class CKANHarvester(HarvesterBase): def gather_stage(self,harvest_job): log.debug('In CKANHarvester gather_stage (%s)' % harvest_job.source.url) + toolkit.requires_ckan_version(min_version='2.0') get_all_packages = True package_ids = [] @@ -159,25 +161,33 @@ class CKANHarvester(HarvesterBase): log.debug("%r", previous_job) + # Ideally we can request from the remote CKAN only those datasets + # modified since last harvest job if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0): if not self.config.get('force_all',False): get_all_packages = False - # Request only the packages modified since last harvest job - last_time = previous_job.gather_finished.isoformat() - log.info("Searching for datasets modified since: %s", last_time) + # Request only the datasets modified since last harvest job + last_time = previous_job.gather_started.isoformat() + # Note: SOLR works in UTC, and gather_started is also UTC, so + # this should work as long as local and remote clocks are + # relatively accurate + log.info("Searching for datasets modified since: %s UTC", last_time) - fq = "metadata_modified:[{last_check} TO *]".format(last_check=last_time) - url = base_search_url + '?fq={fq}&rows=1000'.format(fq) + fq = "metadata_modified:[{last_check}Z+TO+*]".format(last_check=last_time) + url = base_search_url + '?fq={fq}&rows=1000'.format(fq=fq) try: content = self._get_content(url) - package_dicts = json.loads(content).get('result', {}).get('results', []) + try: + package_dicts = json.loads(content).get('result', {}).get('results', []) + except ValueError: + raise ValueError('Response from CKAN was not JSON: %r' % content) for package in package_dicts: if not package['id'] in package_ids: package_ids.append(package['id']) - else: + if not package_ids: log.info('No packages have been updated on the remote CKAN instance since the last harvest job') return None @@ -189,6 +199,7 @@ class CKANHarvester(HarvesterBase): self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None + # Fall-back option - request all the datasets from the remote CKAN if get_all_packages: # Request all remote packages url = urljoin(base_url,self._get_action_api_offset() + '/package_list') @@ -361,73 +372,60 @@ class CKANHarvester(HarvesterBase): if default_groups: if not 'groups' in package_dict: package_dict['groups'] = [] - package_dict['groups'].extend([g for g in default_groups if g not in package_dict['groups']]) - - # Find any extras whose values are not strings and try to convert - # them to strings, as non-string extras are not allowed anymore in - # CKAN 2.0. - for key in package_dict.get('extras', {}).keys(): - if not isinstance(package_dict['extras'][key], basestring): - try: - package_dict['extras'][key] = json.dumps( - package_dict['extras'][key]) - except TypeError: - # If converting to a string fails, just delete it. - del package_dict['extras'][key] + package_dict['groups'].extend( + [g for g in default_groups + if g not in package_dict['groups']]) # Set default extras if needed - default_extras = self.config.get('default_extras',{}) + default_extras = self.config.get('default_extras', {}) + def get_extra(key, package_dict): + for extra in package_dict.get('extras', []): + if extra['key'] == key: + return extra if default_extras: - override_extras = self.config.get('override_extras',False) + override_extras = self.config.get('override_extras', False) if not 'extras' in package_dict: package_dict['extras'] = {} - for key,value in default_extras.iteritems(): - if not key in package_dict.get('extras', {}) or override_extras: - # Look for replacement strings - if isinstance(value,basestring): - value = value.format(harvest_source_id=harvest_object.job.source.id, - harvest_source_url=harvest_object.job.source.url.strip('/'), - harvest_source_title=harvest_object.job.source.title, - harvest_job_id=harvest_object.job.id, - harvest_object_id=harvest_object.id, - dataset_id=package_dict['id']) + for key, value in default_extras.iteritems(): + existing_extra = get_extra(key, package_dict) + if existing_extra and not override_extras: + continue # no need for the default + if existing_extra: + package_dict['extras'].remove(existing_extra) + # Look for replacement strings + if isinstance(value, basestring): + value = value.format( + harvest_source_id=harvest_object.job.source.id, + harvest_source_url= + harvest_object.job.source.url.strip('/'), + harvest_source_title= + harvest_object.job.source.title, + harvest_job_id=harvest_object.job.id, + harvest_object_id=harvest_object.id, + dataset_id=package_dict['id']) - package_dict['extras'][key] = value + package_dict['extras'].append({'key': key, 'value': value}) # Clear remote url_type for resources (eg datastore, upload) as we # are only creating normal resources with links to the remote ones for resource in package_dict.get('resources', []): resource.pop('url_type', None) - result = self._create_or_update_package(package_dict,harvest_object) - - if result and self.config.get('read_only',False) == True: - - package = model.Package.get(package_dict['id']) - - # Clear default permissions - model.clear_user_roles(package) - - # Setup harvest user as admin - user_name = self.config.get('user',u'harvest') - user = model.User.get(user_name) - pkg_role = model.PackageRole(package=package, user=user, role=model.Role.ADMIN) - - # Other users can only read - for user_name in (u'visitor',u'logged_in'): - user = model.User.get(user_name) - pkg_role = model.PackageRole(package=package, user=user, role=model.Role.READER) - + result = self._create_or_update_package( + package_dict, harvest_object, package_dict_form='package_show') return True - except ValidationError,e: - self._save_object_error('Invalid package with GUID %s: %r' % (harvest_object.guid, e.error_dict), - harvest_object, 'Import') + except ValidationError, e: + self._save_object_error('Invalid package with GUID %s: %r' % + (harvest_object.guid, e.error_dict), + harvest_object, 'Import') except Exception, e: - self._save_object_error('%r'%e,harvest_object,'Import') + self._save_object_error('%s' % e, harvest_object, 'Import') + class ContentFetchError(Exception): pass + class RemoteResourceError(Exception): pass diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index b384eb8..0d983ea 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -36,6 +36,9 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): else: dataset_refs = [d['name'] for d in DATASETS] return self.respond_json(dataset_refs) + if self.path == '/api/action/package_list': + dataset_names = [d['name'] for d in DATASETS] + return self.respond_action(dataset_names) if self.path.startswith('/api/rest/package/'): dataset_ref = self.path.split('/')[-1] dataset = self.get_dataset(dataset_ref) @@ -47,7 +50,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): dataset_ref = params['id'] dataset = self.get_dataset(dataset_ref) if dataset: - return self.respond_json(dataset) + return self.respond_action(dataset) if self.path.startswith('/api/search/dataset'): params = self.get_url_params() if params.keys() == ['organization']: @@ -68,6 +71,19 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): if rev['id'] == revision_ref: return self.respond_json(rev) self.respond('Cannot find revision', status=404) + # /api/3/action/package_search?fq=metadata_modified:[2015-10-23T14:51:13.282361Z TO *]&rows=1000 + if self.path.startswith('/api/action/package_search'): + params = self.get_url_params() + if set(params.keys()) == set(['fq', 'rows']) and \ + 'metadata_modified' in params['fq']: + datasets = ['dataset1'] + else: + return self.respond( + 'Not implemented search params %s' % params, status=400) + out = {'count': len(datasets), + 'results': [self.get_dataset(dataset_ref_) + for dataset_ref_ in datasets]} + return self.respond_action(out) # if we wanted to server a file from disk, then we'd call this: #return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self) @@ -93,7 +109,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): return dict([param.split('=') for param in params]) def respond_action(self, result_dict, status=200): - response_dict = {'result': result_dict} + response_dict = {'result': result_dict, 'success': True} return self.respond_json(response_dict, status=status) def respond_json(self, content_dict, status=200): @@ -140,6 +156,7 @@ DATASETS = [ 'name': 'dataset1', 'title': 'Test Dataset1', 'owner_org': 'org1-id', + 'tags': [{'name': 'test-tag'}], 'extras': []}, { "id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb", diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index eb88505..d5b2bf3 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -34,38 +34,38 @@ class TestCkanHarvester(object): harvester = CKANHarvester() obj_ids = harvester.gather_stage(job) + assert_equal(job.gather_errors, []) assert_equal(type(obj_ids), list) assert_equal(len(obj_ids), len(mock_ckan.DATASETS)) harvest_object = harvest_model.HarvestObject.get(obj_ids[0]) - assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id']) + assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['name']) def test_fetch_normal(self): source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT) job = HarvestJobObj(source=source) - harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['id'], + harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['name'], job=job) harvester = CKANHarvester() result = harvester.fetch_stage(harvest_object) + assert_equal(harvest_object.errors, []) assert_equal(result, True) assert_equal( - harvest_object.content, - json.dumps( - mock_ckan.convert_dataset_to_restful_form( - mock_ckan.DATASETS[0]))) + json.loads(harvest_object.content), + mock_ckan.DATASETS[0]) def test_import_normal(self): org = Organization() harvest_object = HarvestObjectObj( - guid=mock_ckan.DATASETS[0]['id'], - content=json.dumps(mock_ckan.convert_dataset_to_restful_form( - mock_ckan.DATASETS[0])), + guid=mock_ckan.DATASETS[0]['name'], + content=json.dumps(mock_ckan.DATASETS[0]), job__source__owner_org=org['id']) harvester = CKANHarvester() result = harvester.import_stage(harvest_object) + assert_equal(harvest_object.errors, []) assert_equal(result, True) assert harvest_object.package_id dataset = model.Package.get(harvest_object.package_id) @@ -76,13 +76,13 @@ class TestCkanHarvester(object): url='http://localhost:%s/' % mock_ckan.PORT, harvester=CKANHarvester()) - result = results_by_guid['dataset1-id'] + result = results_by_guid['dataset1'] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'added') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) assert_equal(result['errors'], []) - result = results_by_guid[mock_ckan.DATASETS[1]['id']] + result = results_by_guid[mock_ckan.DATASETS[1]['name']] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'added') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) @@ -97,7 +97,7 @@ class TestCkanHarvester(object): harvester=CKANHarvester()) # updated the dataset which has revisions - result = results_by_guid['dataset1'] + result = results_by_guid['dataset1-id'] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'updated') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) From 2a7987385537e35804d26ddd36465476e84ef365 Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 27 Oct 2015 17:33:22 +0000 Subject: [PATCH 03/18] [#158] Use package search to get all datasets. Add paging search results. Store pkg_dict from search in the object rather than request it again in fetch_stage. --- ckanext/harvest/harvesters/ckanharvester.py | 247 ++++++++++-------- ckanext/harvest/tests/harvesters/mock_ckan.py | 16 +- .../tests/harvesters/test_ckanharvester.py | 20 +- 3 files changed, 165 insertions(+), 118 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 3977e77..0efcde1 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -1,13 +1,12 @@ +import urllib import urllib2 from ckan.lib.base import c from ckan import model -from ckan.model import Session, Package from ckan.logic import ValidationError, NotFound, get_action from ckan.lib.helpers import json from ckan.lib.munge import munge_name from ckan.plugins import toolkit -from urlparse import urljoin from ckanext.harvest.model import HarvestJob, HarvestObject @@ -33,20 +32,18 @@ class CKANHarvester(HarvesterBase): return "%s/package_search" % self._get_action_api_offset() def _get_content(self, url): - http_request = urllib2.Request( - url = url, - ) + http_request = urllib2.Request(url=url) - api_key = self.config.get('api_key',None) + api_key = self.config.get('api_key', None) if api_key: - http_request.add_header('Authorization',api_key) + http_request.add_header('Authorization', api_key) try: http_response = urllib2.urlopen(http_request) except urllib2.URLError, e: raise ContentFetchError( - 'Could not fetch url: %s, error: %s' % - (url, str(e)) + 'Could not fetch url: "%s", URL error: %s' % + (url, e) ) return http_response.read() @@ -138,133 +135,166 @@ class CKANHarvester(HarvesterBase): return config - - def gather_stage(self,harvest_job): - log.debug('In CKANHarvester gather_stage (%s)' % harvest_job.source.url) + def gather_stage(self, harvest_job): + log.debug('In CKANHarvester gather_stage (%s)', + harvest_job.source.url) toolkit.requires_ckan_version(min_version='2.0') get_all_packages = True - package_ids = [] self._set_config(harvest_job.source.config) - # Check if this source has been harvested before - previous_job = Session.query(HarvestJob) \ - .filter(HarvestJob.source==harvest_job.source) \ - .filter(HarvestJob.gather_finished!=None) \ - .filter(HarvestJob.id!=harvest_job.id) \ - .order_by(HarvestJob.gather_finished.desc()) \ - .limit(1).first() + # Check if there is a previous harvest, ignoring whether it was + # successful (i.e. current=True) or not + previous_job = \ + model.Session.query(HarvestJob) \ + .filter(HarvestJob.source == harvest_job.source) \ + .filter(HarvestJob.gather_finished != None) \ + .filter(HarvestJob.id != harvest_job.id) \ + .order_by(HarvestJob.gather_finished.desc()) \ + .first() # Get source URL - base_url = harvest_job.source.url.rstrip('/') - base_search_url = base_url + self._get_search_api_offset() + remote_ckan_base_url = harvest_job.source.url.rstrip('/') - log.debug("%r", previous_job) + log.debug('Previous job: %r', previous_job) # Ideally we can request from the remote CKAN only those datasets # modified since last harvest job - if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0): - if not self.config.get('force_all',False): - get_all_packages = False + if (previous_job and + not previous_job.gather_errors and + not len(previous_job.objects) == 0 and + not self.config.get('force_all', False)): + get_all_packages = False - # Request only the datasets modified since last harvest job - last_time = previous_job.gather_started.isoformat() - # Note: SOLR works in UTC, and gather_started is also UTC, so - # this should work as long as local and remote clocks are - # relatively accurate - log.info("Searching for datasets modified since: %s UTC", last_time) + # Request only the datasets modified since last harvest job + last_time = previous_job.gather_started.isoformat() + # Note: SOLR works in UTC, and gather_started is also UTC, so + # this should work as long as local and remote clocks are + # relatively accurate + log.info('Searching for datasets modified since: %s UTC', + last_time) - fq = "metadata_modified:[{last_check}Z+TO+*]".format(last_check=last_time) - url = base_search_url + '?fq={fq}&rows=1000'.format(fq=fq) + fq = 'metadata_modified:[{last_check}Z+TO+*]'.format( + last_check=last_time) - try: - content = self._get_content(url) + try: + pkg_dicts = self._search_for_datasets(remote_ckan_base_url, + fq) + except SearchError, e: + log.info('Searching for datasets changed since last time ' + 'gave an error: s', e) + get_all_packages = True - try: - package_dicts = json.loads(content).get('result', {}).get('results', []) - except ValueError: - raise ValueError('Response from CKAN was not JSON: %r' % content) - for package in package_dicts: - if not package['id'] in package_ids: - package_ids.append(package['id']) - if not package_ids: - log.info('No packages have been updated on the remote CKAN instance since the last harvest job') - return None - - except urllib2.HTTPError,e: - if e.getcode() == 400: - log.info('CKAN instance %s does not suport revision filtering' % base_url) - get_all_packages = True - else: - self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) - return None + if not pkg_dicts: + log.info('No datasets have been updated on the remote ' + 'CKAN instance since the last harvest job %s', + last_time) + return None # Fall-back option - request all the datasets from the remote CKAN if get_all_packages: # Request all remote packages - url = urljoin(base_url,self._get_action_api_offset() + '/package_list') try: - content = self._get_content(url) - except ContentFetchError,e: - self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) - return None - package_ids = json.loads(content).get('result',[]) - - try: - object_ids = [] - if len(package_ids): - for package_id in package_ids: - log.debug("Creating harvestjob for %s", package_id) - # Create a new HarvestObject for this identifier - obj = HarvestObject(guid = package_id, job = harvest_job) - obj.save() - object_ids.append(obj.id) - - return object_ids - - else: - self._save_gather_error('No packages received for URL: %s' % url, - harvest_job) - return None - except Exception, e: - self._save_gather_error('%r'%e.message,harvest_job) - - - def fetch_stage(self,harvest_object): - log.debug('In CKANHarvester fetch_stage') - - self._set_config(harvest_object.job.source.config) - - # Get source URL - url = harvest_object.source.url.rstrip('/') - url = url + self._get_action_api_offset() + '/package_show?id=' + harvest_object.guid - - # Get contents - try: - content = self._get_content(url) - except ContentFetchError,e: - self._save_object_error('Unable to get content for package: %s: %r' % \ - (url, e),harvest_object) + pkg_dicts = self._search_for_datasets(remote_ckan_base_url) + except SearchError, e: + log.info('Searching for all datasets gave an error: s', e) + self._save_gather_error( + 'Unable to search remote CKAN for datasets: %s %s' % e, + harvest_job) + if not pkg_dicts: + self._save_gather_error( + 'No datasets found at CKAN: %s' % remote_ckan_base_url, + harvest_job) return None - content = json.loads(content)['result'] + # Create harvest objects for each dataset + try: + package_ids = set() + object_ids = [] + for pkg_dict in pkg_dicts: + if pkg_dict['id'] in package_ids: + log.info('Discarding duplicate dataset %s - probably due ' + 'to datasets being changed at the same time as ' + 'when the harvester was paging through', + pkg_dict['id']) + continue + package_ids.add(pkg_dict['id']) - # Save the fetched contents in the HarvestObject - harvest_object.content = json.dumps(content) - harvest_object.save() + log.info('Creating HarvestObject for %s %s', + pkg_dict['name'], pkg_dict['id']) + obj = HarvestObject(guid=pkg_dict['id'], + job=harvest_job, + content=json.dumps(pkg_dict)) + obj.save() + object_ids.append(obj.id) + + return object_ids + except Exception, e: + self._save_gather_error('%r' % e.message, harvest_job) + + def _search_for_datasets(self, remote_ckan_base_url, fq=None): + '''Does a dataset search on a remote CKAN and returns the results. + + Deals with paging to return all the results, not just the first page. + ''' + base_search_url = remote_ckan_base_url + self._get_search_api_offset() + params = {'rows': '100', 'start': '0'} + if fq: + params['fq'] = fq + + pkg_dicts = [] + previous_content = None + while True: + url = base_search_url + '?' + urllib.urlencode(params) + log.debug('Searching for CKAN datasets: %s', url) + try: + content = self._get_content(url) + except urllib2.HTTPError, e: + raise SearchError('Remote CKAN instance %s returned HTTP ' + 'error %s for search: %s' % + (remote_ckan_base_url, e.getcode(), url)) + + if previous_content and content == previous_content: + raise SearchError('The paging doesn\'t seem to work. URL: %s' % + url) + try: + response_dict = json.loads(content) + except ValueError: + raise SearchError('Response from remote CKAN was not JSON: %r' + % content) + try: + pkg_dicts_page = response_dict.get('result', {}).get('results', + []) + except ValueError: + raise SearchError('Response JSON did not contain ' + 'result/results: %r' % response_dict) + pkg_dicts.extend(pkg_dicts_page) + + if len(pkg_dicts_page) == 0: + break + + params['start'] = str(int(params['start']) + int(params['rows'])) + + return pkg_dicts + + def fetch_stage(self, harvest_object): + # Nothing to do here - we got the package dict in the search in the + # gather stage return True - def import_stage(self,harvest_object): + def import_stage(self, harvest_object): log.debug('In CKANHarvester import_stage') - context = {'model': model, 'session': Session, 'user': self._get_user_name()} + context = {'model': model, 'session': model.Session, + 'user': self._get_user_name()} if not harvest_object: log.error('No harvest object received') return False if harvest_object.content is None: - self._save_object_error('Empty content for object %s' % harvest_object.id, - harvest_object, 'Import') + self._save_object_error('Empty content for object %s' % + harvest_object.id, + harvest_object, 'Import') return False self._set_config(harvest_object.job.source.config) @@ -277,11 +307,12 @@ class CKANHarvester(HarvesterBase): return True # Set default tags if needed - default_tags = self.config.get('default_tags',[]) + default_tags = self.config.get('default_tags', []) if default_tags: if not 'tags' in package_dict: package_dict['tags'] = [] - package_dict['tags'].extend([t for t in default_tags if t not in package_dict['tags']]) + package_dict['tags'].extend( + [t for t in default_tags if t not in package_dict['tags']]) remote_groups = self.config.get('remote_groups', None) if not remote_groups in ('only_local', 'create'): @@ -429,3 +460,7 @@ class ContentFetchError(Exception): class RemoteResourceError(Exception): pass + + +class SearchError(Exception): + pass diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index 0d983ea..82acf2e 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -74,13 +74,23 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): # /api/3/action/package_search?fq=metadata_modified:[2015-10-23T14:51:13.282361Z TO *]&rows=1000 if self.path.startswith('/api/action/package_search'): params = self.get_url_params() - if set(params.keys()) == set(['fq', 'rows']) and \ + if set(params.keys()) == set(['rows', 'start']): + if params['start'] == '0': + datasets = ['dataset1', DATASETS[1]['name']] + else: + datasets = [] + count = len(DATASETS) + elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ 'metadata_modified' in params['fq']: - datasets = ['dataset1'] + if params['start'] == '0': + datasets = ['dataset1'] + else: + datasets = [] + count = 1 else: return self.respond( 'Not implemented search params %s' % params, status=400) - out = {'count': len(datasets), + out = {'count': count, 'results': [self.get_dataset(dataset_ref_) for dataset_ref_ in datasets]} return self.respond_action(out) diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index d5b2bf3..693151a 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -38,27 +38,29 @@ class TestCkanHarvester(object): assert_equal(type(obj_ids), list) assert_equal(len(obj_ids), len(mock_ckan.DATASETS)) harvest_object = harvest_model.HarvestObject.get(obj_ids[0]) - assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['name']) + assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id']) + assert_equal( + json.loads(harvest_object.content), + mock_ckan.DATASETS[0]) def test_fetch_normal(self): source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT) job = HarvestJobObj(source=source) - harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['name'], - job=job) + harvest_object = HarvestObjectObj( + guid=mock_ckan.DATASETS[0]['id'], + job=job, + content=json.dumps(mock_ckan.DATASETS[0])) harvester = CKANHarvester() result = harvester.fetch_stage(harvest_object) assert_equal(harvest_object.errors, []) assert_equal(result, True) - assert_equal( - json.loads(harvest_object.content), - mock_ckan.DATASETS[0]) def test_import_normal(self): org = Organization() harvest_object = HarvestObjectObj( - guid=mock_ckan.DATASETS[0]['name'], + guid=mock_ckan.DATASETS[0]['id'], content=json.dumps(mock_ckan.DATASETS[0]), job__source__owner_org=org['id']) @@ -76,13 +78,13 @@ class TestCkanHarvester(object): url='http://localhost:%s/' % mock_ckan.PORT, harvester=CKANHarvester()) - result = results_by_guid['dataset1'] + result = results_by_guid['dataset1-id'] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'added') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) assert_equal(result['errors'], []) - result = results_by_guid[mock_ckan.DATASETS[1]['name']] + result = results_by_guid[mock_ckan.DATASETS[1]['id']] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'added') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) From 55245b50919762e74456dcac6867c5ff475c6c48 Mon Sep 17 00:00:00 2001 From: David Read Date: Tue, 27 Oct 2015 17:43:11 +0000 Subject: [PATCH 04/18] [#158] PEP8/formatting. --- README.rst | 20 ++++--- ckanext/harvest/harvesters/ckanharvester.py | 59 +++++++++++---------- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/README.rst b/README.rst index 65c5129..c73d928 100644 --- a/README.rst +++ b/README.rst @@ -12,8 +12,9 @@ and adds a CLI and a WUI to CKAN to manage harvesting sources and jobs. Installation ============ -This extension requires CKAN v2.0 or later, although the CKAN harvester can -harvest from CKANs of earlier versions. +This extension requires CKAN v2.0 or later on both the CKAN it is installed +into and the CKANs it harvests. However you are unlikely to encounter a CKAN +running a version lower than 2.0. 1. The harvest extension can use two different backends. You can choose whichever you prefer depending on your needs, but Redis has been found to be more stable @@ -49,8 +50,9 @@ harvest from CKANs of earlier versions. ckan.plugins = harvest ckan_harvester -5. If you haven't done it yet on the previous step, define the backend that you are using with the ``ckan.harvest.mq.type`` - option (it defaults to ``rabbitmq``):: +5. If you haven't done it yet on the previous step, define the backend that you + are using with the ``ckan.harvest.mq.type`` option (it defaults to + ``rabbitmq``):: ckan.harvest.mq.type = redis @@ -473,7 +475,8 @@ following steps with the one you are using. describe the tasks that need to be monitored. This configuration files are stored in ``/etc/supervisor/conf.d``. - Create a file named ``/etc/supervisor/conf.d/ckan_harvesting.conf``, and copy the following contents:: + Create a file named ``/etc/supervisor/conf.d/ckan_harvesting.conf``, and + copy the following contents:: ; =============================== @@ -564,10 +567,11 @@ following steps with the one you are using. sudo crontab -e -u ckan - Note that we are running this command as the same user we configured the processes to be run with - (`ckan` in our example). + Note that we are running this command as the same user we configured the + processes to be run with (`ckan` in our example). - Paste this line into your crontab, again replacing the paths to paster and the ini file with yours:: + Paste this line into your crontab, again replacing the paths to paster and + the ini file with yours:: # m h dom mon dow command */15 * * * * /usr/lib/ckan/default/bin/paster --plugin=ckanext-harvest harvester run --config=/etc/ckan/std/std.ini diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 0efcde1..5626ac5 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -29,7 +29,7 @@ class CKANHarvester(HarvesterBase): return '/api/%d/action' % self.action_api_version def _get_search_api_offset(self): - return "%s/package_search" % self._get_action_api_offset() + return '%s/package_search' % self._get_action_api_offset() def _get_content(self, url): http_request = urllib2.Request(url=url) @@ -48,25 +48,28 @@ class CKANHarvester(HarvesterBase): return http_response.read() def _get_group(self, base_url, group_name): - url = base_url + self._get_action_api_offset() + '/group_show?id=' + munge_name(group_name) + url = base_url + self._get_action_api_offset() + '/group_show?id=' + \ + munge_name(group_name) try: content = self._get_content(url) return json.loads(content) except (ContentFetchError, ValueError): - log.debug('Could not fetch/decode remote group'); + log.debug('Could not fetch/decode remote group') raise RemoteResourceError('Could not fetch/decode remote group') def _get_organization(self, base_url, org_name): - url = base_url + self._get_action_api_offset() + '/organization_show?id=' + org_name + url = base_url + self._get_action_api_offset() + \ + '/organization_show?id=' + org_name try: content = self._get_content(url) content_dict = json.loads(content) return content_dict['result'] except (ContentFetchError, ValueError, KeyError): - log.debug('Could not fetch/decode remote group'); - raise RemoteResourceError('Could not fetch/decode remote organization') + log.debug('Could not fetch/decode remote group') + raise RemoteResourceError( + 'Could not fetch/decode remote organization') - def _set_config(self,config_str): + def _set_config(self, config_str): if config_str: self.config = json.loads(config_str) if 'api_version' in self.config: @@ -81,10 +84,10 @@ class CKANHarvester(HarvesterBase): 'name': 'ckan', 'title': 'CKAN', 'description': 'Harvests remote CKAN instances', - 'form_config_interface':'Text' + 'form_config_interface': 'Text' } - def validate_config(self,config): + def validate_config(self, config): if not config: return config @@ -98,39 +101,41 @@ class CKANHarvester(HarvesterBase): raise ValueError('api_version must be an integer') if 'default_tags' in config_obj: - if not isinstance(config_obj['default_tags'],list): + if not isinstance(config_obj['default_tags'], list): raise ValueError('default_tags must be a list') if 'default_groups' in config_obj: - if not isinstance(config_obj['default_groups'],list): + if not isinstance(config_obj['default_groups'], list): raise ValueError('default_groups must be a list') # Check if default groups exist - context = {'model':model,'user':c.user} + context = {'model': model, 'user': c.user} for group_name in config_obj['default_groups']: try: - group = get_action('group_show')(context,{'id':group_name}) - except NotFound,e: + group = get_action('group_show')( + context, {'id': group_name}) + except NotFound, e: raise ValueError('Default group not found') if 'default_extras' in config_obj: - if not isinstance(config_obj['default_extras'],dict): + if not isinstance(config_obj['default_extras'], dict): raise ValueError('default_extras must be a dictionary') if 'user' in config_obj: # Check if user exists - context = {'model':model,'user':c.user} + context = {'model': model, 'user': c.user} try: - user = get_action('user_show')(context,{'id':config_obj.get('user')}) - except NotFound,e: + user = get_action('user_show')( + context, {'id': config_obj.get('user')}) + except NotFound, e: raise ValueError('User not found') - for key in ('read_only','force_all'): + for key in ('read_only', 'force_all'): if key in config_obj: - if not isinstance(config_obj[key],bool): + if not isinstance(config_obj[key], bool): raise ValueError('%s must be boolean' % key) - except ValueError,e: + except ValueError, e: raise e return config @@ -334,19 +339,19 @@ class CKANHarvester(HarvesterBase): else: validated_groups.append(group['id']) except NotFound, e: - log.info('Group %s is not available' % group_name) + log.info('Group %s is not available', group_name) if remote_groups == 'create': try: group = self._get_group(harvest_object.source.url, group_name) except RemoteResourceError: - log.error('Could not get remote group %s' % group_name) + log.error('Could not get remote group %s', group_name) continue for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name']: group.pop(key, None) get_action('group_create')(context, group) - log.info('Group %s has been newly created' % group_name) + log.info('Group %s has been newly created', group_name) if self.api_version == 1: validated_groups.append(group['name']) else: @@ -378,7 +383,7 @@ class CKANHarvester(HarvesterBase): org = get_action('organization_show')(context, data_dict) validated_org = org['id'] except NotFound, e: - log.info('Organization %s is not available' % remote_org) + log.info('Organization %s is not available', remote_org) if remote_orgs == 'create': try: try: @@ -391,10 +396,10 @@ class CKANHarvester(HarvesterBase): for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name', 'type']: org.pop(key, None) get_action('organization_create')(context, org) - log.info('Organization %s has been newly created' % remote_org) + log.info('Organization %s has been newly created', remote_org) validated_org = org['id'] except (RemoteResourceError, ValidationError): - log.error('Could not get remote org %s' % remote_org) + log.error('Could not get remote org %s', remote_org) package_dict['owner_org'] = validated_org or local_org From 1a680f3fd309f9c7187969504962541681221549 Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 29 Oct 2015 17:31:04 +0000 Subject: [PATCH 05/18] [#158] Fix spaces encoding broken in previous merge. Tested with data.gov.uk. --- ckanext/harvest/harvesters/ckanharvester.py | 4 ++-- ckanext/harvest/tests/harvesters/mock_ckan.py | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 76a5ec0..9a449a0 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -190,8 +190,8 @@ class CKANHarvester(HarvesterBase): log.info('Searching for datasets modified since: %s UTC', last_time) - fq_since_last_time = 'metadata_modified:[{last_check}Z+TO+*]'.format( - last_check=last_time) + fq_since_last_time = 'metadata_modified:[{last_check}Z TO *]' \ + .format(last_check=last_time) try: pkg_dicts = self._search_for_datasets( diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index 2a9f68e..a00bbc9 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -87,6 +87,9 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): datasets = ['dataset1'] elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ 'metadata_modified' in params['fq']: + assert '+TO+' not in params['fq'], \ + 'Spaces should not be decoded by now - seeing + means ' \ + 'they were double encoded and SOLR doesnt like that' datasets = ['dataset1'] else: return self.respond( @@ -117,7 +120,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): def get_url_params(self): params_str = self.path.split('?')[-1] - params_unicode = urllib.unquote(params_str).decode('utf8') + params_unicode = urllib.unquote_plus(params_str).decode('utf8') params = params_unicode.split('&') return dict([param.split('=') for param in params]) From b7552ba7004c505ded5b2fbf939eda1ce94708ad Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 2 Nov 2015 16:59:19 +0000 Subject: [PATCH 06/18] [#158] Try harder to use the "get datasets since time X" method of harvesting. Go back to the last completely successful harvest, rather than just consider the previous one. And that had a bug, because fetch errors were ignored, meaning one fetch error could mean that dataset never got harvested again. --- ckanext/harvest/harvesters/ckanharvester.py | 53 +++++++++++++-------- ckanext/harvest/logic/action/update.py | 16 ++++--- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 9a449a0..eb335c2 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -1,6 +1,8 @@ import urllib import urllib2 +from sqlalchemy import exists + from ckan.lib.base import c from ckan import model from ckan.logic import ValidationError, NotFound, get_action @@ -8,7 +10,7 @@ from ckan.lib.helpers import json from ckan.lib.munge import munge_name from ckan.plugins import toolkit -from ckanext.harvest.model import HarvestJob, HarvestObject +from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError import logging log = logging.getLogger(__name__) @@ -148,21 +150,9 @@ class CKANHarvester(HarvesterBase): self._set_config(harvest_job.source.config) - # Check if there is a previous harvest, ignoring whether it was - # successful (i.e. current=True) or not - previous_job = \ - model.Session.query(HarvestJob) \ - .filter(HarvestJob.source == harvest_job.source) \ - .filter(HarvestJob.gather_finished != None) \ - .filter(HarvestJob.id != harvest_job.id) \ - .order_by(HarvestJob.gather_finished.desc()) \ - .first() - # Get source URL remote_ckan_base_url = harvest_job.source.url.rstrip('/') - log.debug('Previous job: %r', previous_job) - # Filter in/out datasets from particular organizations fq_terms = [] org_filter_include = self.config.get('organizations_filter_include', []) @@ -175,15 +165,15 @@ class CKANHarvester(HarvesterBase): '-organization:%s' % org_name for org_name in org_filter_exclude) # Ideally we can request from the remote CKAN only those datasets - # modified since last harvest job - if (previous_job and - not previous_job.gather_errors and - not len(previous_job.objects) == 0 and + # modified since the last completely successful harvest. + last_error_free_job = self._last_error_free_job(harvest_job) + log.debug('Last error-free job: %r', last_error_free_job) + if (last_error_free_job and not self.config.get('force_all', False)): get_all_packages = False - # Request only the datasets modified since last harvest job - last_time = previous_job.gather_started.isoformat() + # Request only the datasets modified since + last_time = last_error_free_job.gather_started.isoformat() # Note: SOLR works in UTC, and gather_started is also UTC, so # this should work as long as local and remote clocks are # relatively accurate @@ -295,6 +285,31 @@ class CKANHarvester(HarvesterBase): return pkg_dicts + @classmethod + def _last_error_free_job(cls, harvest_job): + # TODO weed out cancelled jobs somehow. + # look for jobs with no gather errors + jobs = \ + model.Session.query(HarvestJob) \ + .filter(HarvestJob.source == harvest_job.source) \ + .filter(HarvestJob.gather_started != None) \ + .filter(HarvestJob.status == 'Finished') \ + .filter(HarvestJob.id != harvest_job.id) \ + .filter( + ~exists().where( + HarvestGatherError.harvest_job_id == HarvestJob.id)) \ + .order_by(HarvestJob.gather_started.desc()) + # now check them until we find one with no fetch/import errors + # (looping rather than doing sql, in case there are lots of objects + # and lots of jobs) + for job in jobs: + for obj in job.objects: + if obj.current is False: + # unsuccessful, so go onto the next job + break + else: + return job + def fetch_stage(self, harvest_object): # Nothing to do here - we got the package dict in the search in the # gather stage diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 9e871d9..52b25e2 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -343,16 +343,19 @@ def harvest_jobs_run(context,data_dict): if len(jobs): for job in jobs: if job['gather_finished']: - objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(and_((HarvestObject.state!=u'COMPLETE'), - (HarvestObject.state!=u'ERROR'))) \ - .order_by(HarvestObject.import_finished.desc()) + num_objects_in_progress = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.harvest_job_id==job['id']) \ + .filter(and_((HarvestObject.state!=u'COMPLETE'), + (HarvestObject.state!=u'ERROR'))) \ + .count() - if objects.count() == 0: + if num_objects_in_progress == 0: job_obj = HarvestJob.get(job['id']) job_obj.status = u'Finished' + log.info('Marking job as finished %s %s', job.source.url, job.id) + # save the time of finish, according to the last running object last_object = session.query(HarvestObject) \ .filter(HarvestObject.harvest_job_id==job['id']) \ .filter(HarvestObject.import_finished!=None) \ @@ -361,6 +364,7 @@ def harvest_jobs_run(context,data_dict): if last_object: job_obj.finished = last_object.import_finished job_obj.save() + # Reindex the harvest source dataset so it has the latest # status get_action('harvest_source_reindex')(context, From d495e269e72b05bb2efd3ac54f7ab7c09a3615cf Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 2 Nov 2015 17:29:45 +0000 Subject: [PATCH 07/18] [#158] Fix tests --- ckanext/harvest/logic/action/update.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 0d75bda..15013b5 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -375,7 +375,8 @@ def harvest_jobs_run(context,data_dict): if num_objects_in_progress == 0: job_obj = HarvestJob.get(job['id']) job_obj.status = u'Finished' - log.info('Marking job as finished %s %s', job.source.url, job.id) + log.info('Marking job as finished %s %s', + job_obj.source.url, job_obj.id) # save the time of finish, according to the last running object last_object = session.query(HarvestObject) \ From 24415844e0d07b7f31c1d47c016bc42a68f38737 Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 2 Nov 2015 18:13:29 +0000 Subject: [PATCH 08/18] [#158] Fix revision_id problem in second harvest. --- ckanext/harvest/harvesters/base.py | 3 ++- ckanext/harvest/harvesters/ckanharvester.py | 10 ++++++-- ckanext/harvest/tests/harvesters/mock_ckan.py | 2 +- .../tests/harvesters/test_ckanharvester.py | 24 ++++++++++++------- dev-requirements.txt | 1 + 5 files changed, 28 insertions(+), 12 deletions(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 6c5eeab..7129d39 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -303,7 +303,8 @@ class HarvesterBase(SingletonPlugin): # Update package context.update({'id':package_dict['id']}) package_dict.setdefault('name', - existing_package_dict['name']) + existing_package_dict['name']) + new_package = p.toolkit.get_action('package_update')(context, package_dict) else: diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index eb335c2..49c50d5 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -470,11 +470,17 @@ class CKANHarvester(HarvesterBase): package_dict['extras'].append({'key': key, 'value': value}) - # Clear remote url_type for resources (eg datastore, upload) as we - # are only creating normal resources with links to the remote ones for resource in package_dict.get('resources', []): + # Clear remote url_type for resources (eg datastore, upload) as + # we are only creating normal resources with links to the + # remote ones resource.pop('url_type', None) + # Clear revision_id as the revision won't exist on this CKAN + # and saving it will cause an IntegrityError with the foreign + # key. + resource.pop('revision_id', None) + result = self._create_or_update_package( package_dict, harvest_object, package_dict_form='package_show') diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index a00bbc9..c6e799c 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -90,7 +90,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): assert '+TO+' not in params['fq'], \ 'Spaces should not be decoded by now - seeing + means ' \ 'they were double encoded and SOLR doesnt like that' - datasets = ['dataset1'] + datasets = [DATASETS[1]['name']] else: return self.respond( 'Not implemented search params %s' % params, status=400) diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index 3bd575d..83cd344 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -1,6 +1,9 @@ -from nose.tools import assert_equal +import copy import json +from nose.tools import assert_equal +import mock + try: from ckan.tests.helpers import reset_db from ckan.tests.factories import Organization @@ -11,7 +14,7 @@ from ckan import model from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj, HarvestObjectObj) -from ckanext.harvest.tests.lib import run_harvest, run_harvest_job +from ckanext.harvest.tests.lib import run_harvest import ckanext.harvest.model as harvest_model from ckanext.harvest.harvesters.ckanharvester import CKANHarvester @@ -94,19 +97,24 @@ class TestCkanHarvester(object): run_harvest( url='http://localhost:%s/' % mock_ckan.PORT, harvester=CKANHarvester()) - results_by_guid = run_harvest( - url='http://localhost:%s/' % mock_ckan.PORT, - harvester=CKANHarvester()) + patched_datasets = copy.deepcopy(mock_ckan.DATASETS) + patched_datasets[1]['title'] = 'Title CHANGED' + patched_datasets[1]['metadata_modified'] = '2015-05-09T22:00:01.486366' + with mock.patch("ckanext.harvest.tests.harvesters.mock_ckan.DATASETS", + patched_datasets): + results_by_guid = run_harvest( + url='http://localhost:%s/' % mock_ckan.PORT, + harvester=CKANHarvester()) # updated the dataset which has revisions - result = results_by_guid['dataset1-id'] + result = results_by_guid[mock_ckan.DATASETS[1]['id']] assert_equal(result['state'], 'COMPLETE') assert_equal(result['report_status'], 'updated') - assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name']) + assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) assert_equal(result['errors'], []) # the other dataset is unchanged and not harvested - assert mock_ckan.DATASETS[1]['name'] not in result + assert mock_ckan.DATASETS[0]['id'] not in result def test_harvest_invalid_tag(self): from nose.plugins.skip import SkipTest; raise SkipTest() diff --git a/dev-requirements.txt b/dev-requirements.txt index 9aa8db0..34f8c82 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1 +1,2 @@ factory-boy>=2 +mock From 735ab3e286c93a061bf427667dc9fac98a2e90f6 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 4 Nov 2015 11:37:03 +0000 Subject: [PATCH 09/18] [#157] Try to fix test for ckan 2.2 - cf https://github.com/ckan/ckanext-harvest/commit/91afc0e9285431f4144367528a46e52f37b1637a --- ckanext/harvest/tests/harvesters/test_ckanharvester.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index 83cd344..8c9093f 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -99,7 +99,7 @@ class TestCkanHarvester(object): harvester=CKANHarvester()) patched_datasets = copy.deepcopy(mock_ckan.DATASETS) patched_datasets[1]['title'] = 'Title CHANGED' - patched_datasets[1]['metadata_modified'] = '2015-05-09T22:00:01.486366' + patched_datasets[1]['metadata_modified'] = '2050-05-09T22:00:01.486366' with mock.patch("ckanext.harvest.tests.harvesters.mock_ckan.DATASETS", patched_datasets): results_by_guid = run_harvest( From 331ad84272c19ba8c386f6260e7be7316af69227 Mon Sep 17 00:00:00 2001 From: David Read Date: Fri, 12 Feb 2016 18:00:00 +0000 Subject: [PATCH 10/18] Deal with worry about datasets on the remote CKAN being added/removed during harvest. --- ckanext/harvest/harvesters/ckanharvester.py | 37 +++++++++++- ckanext/harvest/tests/harvesters/mock_ckan.py | 56 +++++++++++++------ .../tests/harvesters/test_ckanharvester.py | 9 +++ 3 files changed, 81 insertions(+), 21 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index c91786e..dbc2a4d 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -1,9 +1,9 @@ import urllib import urllib2 import httplib +import datetime from sqlalchemy import exists -from simplejson.scanner import JSONDecodeError from ckan.lib.base import c from ckan import model @@ -179,10 +179,12 @@ class CKANHarvester(HarvesterBase): get_all_packages = False # Request only the datasets modified since - last_time = last_error_free_job.gather_started.isoformat() + last_time = last_error_free_job.gather_started # Note: SOLR works in UTC, and gather_started is also UTC, so # this should work as long as local and remote clocks are - # relatively accurate + # relatively accurate. Going back a little earlier, just in case. + last_time -= datetime.timedelta(hours=1) + last_time = last_time.isoformat() log.info('Searching for datasets modified since: %s UTC', last_time) @@ -253,10 +255,29 @@ class CKANHarvester(HarvesterBase): ''' base_search_url = remote_ckan_base_url + self._get_search_api_offset() params = {'rows': '100', 'start': '0'} + # There is the worry that datasets will be changed whilst we are paging + # through them. + # * In SOLR 4.7 there is a cursor, but not using that yet + # because few CKANs are running that version yet. + # * However we sort, then new names added or removed before the current + # page would cause existing names on the next page to be missed or + # double counted. + # * Another approach might be to sort by metadata_modified and always + # ask for changes since (and including) the date of the last item of + # the day before. However if the entire page is of the exact same + # time, then you end up in an infinite loop asking for the same page. + # * We choose a balanced approach of sorting by ID, which means + # datasets are only missed if some are removed, which is far less + # likely than any being added. If some are missed then it is assumed + # they will harvested the next time anyway. When datasets are added, + # we are at risk of seeing datasets twice in the paging, so we detect + # and remove any duplicates. + params['sort'] = 'id asc' if fq_terms: params['fq'] = ' '.join(fq_terms) pkg_dicts = [] + pkg_ids = set() previous_content = None while True: url = base_search_url + '?' + urllib.urlencode(params) @@ -282,6 +303,16 @@ class CKANHarvester(HarvesterBase): except ValueError: raise SearchError('Response JSON did not contain ' 'result/results: %r' % response_dict) + + # Weed out any datasets found on previous pages (should datasets be + # changing while we page) + ids_in_page = set(p['id'] for p in pkg_dicts_page) + duplicate_ids = ids_in_page & pkg_ids + if duplicate_ids: + pkg_dicts_page = [p for p in pkg_dicts_page + if p['id'] not in duplicate_ids] + pkg_ids |= ids_in_page + pkg_dicts.extend(pkg_dicts_page) if len(pkg_dicts_page) == 0: diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index be60fb6..b690d61 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -76,25 +76,45 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): # /api/3/action/package_search?fq=metadata_modified:[2015-10-23T14:51:13.282361Z TO *]&rows=1000 if self.path.startswith('/api/action/package_search'): params = self.get_url_params() - if params['start'] != '0': - datasets = [] - elif set(params.keys()) == set(['rows', 'start']): - datasets = ['dataset1', DATASETS[1]['name']] - elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ - params['fq'] == '-organization:org1': - datasets = [DATASETS[1]['name']] - elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ - params['fq'] == 'organization:org1': - datasets = ['dataset1'] - elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ - 'metadata_modified' in params['fq']: - assert '+TO+' not in params['fq'], \ - 'Spaces should not be decoded by now - seeing + means ' \ - 'they were double encoded and SOLR doesnt like that' - datasets = [DATASETS[1]['name']] + + if self.test_name == 'datasets_added': + if params['start'] == '0': + # when page 1 is retrieved, the site only has 1 dataset + datasets = [DATASETS[0]['name']] + elif params['start'] == '100': + # when page 2 is retrieved, the site now has new datasets, + # and so the second page has the original dataset, pushed + # onto this page now, plus a new one + datasets = [DATASETS[0]['name'], + DATASETS[1]['name']] + else: + datasets = [] else: - return self.respond( - 'Not implemented search params %s' % params, status=400) + # ignore sort param for now + if 'sort' in params: + del params['sort'] + if params['start'] != '0': + datasets = [] + elif set(params.keys()) == set(['rows', 'start']): + datasets = ['dataset1', DATASETS[1]['name']] + elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ + params['fq'] == '-organization:org1': + datasets = [DATASETS[1]['name']] + elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ + params['fq'] == 'organization:org1': + datasets = ['dataset1'] + elif set(params.keys()) == set(['fq', 'rows', 'start']) and \ + 'metadata_modified' in params['fq']: + assert '+TO+' not in params['fq'], \ + 'Spaces should not be decoded by now - seeing + '\ + 'means they were double encoded and SOLR doesnt like '\ + 'that' + datasets = [DATASETS[1]['name']] + else: + return self.respond( + 'Not implemented search params %s' % params, + status=400) + out = {'count': len(datasets), 'results': [self.get_dataset(dataset_ref_) for dataset_ref_ in datasets]} diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index 07a7f33..c0506f6 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -165,3 +165,12 @@ class TestCkanHarvester(object): assert_equal(result['report_status'], 'not modified') assert 'dataset' not in result assert_equal(result['errors'], []) + + def test_harvest_whilst_datasets_added(self): + results_by_guid = run_harvest( + url='http://localhost:%s/datasets_added' % mock_ckan.PORT, + harvester=CKANHarvester()) + + assert_equal(sorted(results_by_guid.keys()), + [mock_ckan.DATASETS[1]['id'], + mock_ckan.DATASETS[0]['id']]) From 4c1f27161e0404497aa371b9bc010a5cd78df2cc Mon Sep 17 00:00:00 2001 From: David Read Date: Fri, 12 Feb 2016 18:22:36 +0000 Subject: [PATCH 11/18] Fix merge --- README.rst | 5 ----- 1 file changed, 5 deletions(-) diff --git a/README.rst b/README.rst index e5c1f2f..c033855 100644 --- a/README.rst +++ b/README.rst @@ -8,11 +8,6 @@ ckanext-harvest - Remote harvesting extension This extension provides a common harvesting framework for ckan extensions and adds a CLI and a WUI to CKAN to manage harvesting sources and jobs. -<<<<<<< HEAD -======= -Requires CKAN 2.0 or later. - ->>>>>>> 6354ad5656bd8f5d3ff9f75389f17622a6b32044 Installation ============ From 52c071dbe96d4b591564b5dd3a806af7a4fea087 Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 15 Feb 2016 12:10:44 +0000 Subject: [PATCH 12/18] Improved error handling. e.g. if the site it harvests just returns errors. --- ckanext/harvest/harvesters/ckanharvester.py | 24 ++++++++++++------- ckanext/harvest/tests/harvesters/mock_ckan.py | 2 ++ .../tests/harvesters/test_ckanharvester.py | 13 ++++++---- ckanext/harvest/tests/lib.py | 3 +++ 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index dbc2a4d..405aa90 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -2,6 +2,7 @@ import urllib import urllib2 import httplib import datetime +import socket from sqlalchemy import exists @@ -53,6 +54,10 @@ class CKANHarvester(HarvesterBase): raise ContentFetchError('URL error: %s' % e.reason) except httplib.HTTPException, e: raise ContentFetchError('HTTP Exception: %s' % e) + except socket.error, e: + raise ContentFetchError('HTTP socket error: %s' % e) + except Exception, e: + raise ContentFetchError('HTTP general exception: %s' % e) return http_response.read() def _get_group(self, base_url, group_name): @@ -197,10 +202,10 @@ class CKANHarvester(HarvesterBase): fq_terms + [fq_since_last_time]) except SearchError, e: log.info('Searching for datasets changed since last time ' - 'gave an error: s', e) + 'gave an error: %s', e) get_all_packages = True - if not pkg_dicts: + if not get_all_packages and pkg_dicts: log.info('No datasets have been updated on the remote ' 'CKAN instance since the last harvest job %s', last_time) @@ -213,10 +218,12 @@ class CKANHarvester(HarvesterBase): pkg_dicts = self._search_for_datasets(remote_ckan_base_url, fq_terms) except SearchError, e: - log.info('Searching for all datasets gave an error: s', e) + log.info('Searching for all datasets gave an error: %s', e) self._save_gather_error( - 'Unable to search remote CKAN for datasets: %s %s' % e, + 'Unable to search remote CKAN for datasets:%s url:%s' + 'terms:%s' % (e, remote_ckan_base_url, fq_terms), harvest_job) + return None if not pkg_dicts: self._save_gather_error( 'No datasets found at CKAN: %s' % remote_ckan_base_url, @@ -284,10 +291,11 @@ class CKANHarvester(HarvesterBase): log.debug('Searching for CKAN datasets: %s', url) try: content = self._get_content(url) - except urllib2.HTTPError, e: - raise SearchError('Remote CKAN instance %s returned HTTP ' - 'error %s for search: %s' % - (remote_ckan_base_url, e.getcode(), url)) + except ContentFetchError, e: + raise SearchError( + 'Error sending request to search remote ' + 'CKAN instance %s using URL %r. Error: %s' % + (remote_ckan_base_url, url, e)) if previous_content and content == previous_content: raise SearchError('The paging doesn\'t seem to work. URL: %s' % diff --git a/ckanext/harvest/tests/harvesters/mock_ckan.py b/ckanext/harvest/tests/harvesters/mock_ckan.py index b690d61..68c55c5 100644 --- a/ckanext/harvest/tests/harvesters/mock_ckan.py +++ b/ckanext/harvest/tests/harvesters/mock_ckan.py @@ -23,6 +23,8 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler): self.test_name = None else: self.path = re.sub('^/([^/]+)/', '/', self.path) + if self.test_name == 'site_down': + return self.respond('Site is down', status=500) # The API version is recorded and then removed from the path api_version = None diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index c0506f6..00f6fe4 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -1,12 +1,9 @@ import copy -from nose.tools import assert_equal +from nose.tools import assert_equal, assert_raises import json from mock import patch -from nose.tools import assert_equal -import mock - try: from ckan.tests.helpers import reset_db from ckan.tests.factories import Organization @@ -19,7 +16,7 @@ from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj, HarvestObjectObj) from ckanext.harvest.tests.lib import run_harvest import ckanext.harvest.model as harvest_model -from ckanext.harvest.harvesters.ckanharvester import CKANHarvester +from ckanext.harvest.harvesters.ckanharvester import CKANHarvester, SearchError import mock_ckan @@ -174,3 +171,9 @@ class TestCkanHarvester(object): assert_equal(sorted(results_by_guid.keys()), [mock_ckan.DATASETS[1]['id'], mock_ckan.DATASETS[0]['id']]) + + def test_harvest_site_down(self): + results_by_guid = run_harvest( + url='http://localhost:%s/site_down' % mock_ckan.PORT, + harvester=CKANHarvester()) + assert not results_by_guid diff --git a/ckanext/harvest/tests/lib.py b/ckanext/harvest/tests/lib.py index 0e9e464..6abb2cc 100644 --- a/ckanext/harvest/tests/lib.py +++ b/ckanext/harvest/tests/lib.py @@ -29,6 +29,9 @@ def run_harvest_job(job, harvester): # queue.gather_callback, which determines the harvester and then calls # gather_stage. We simply call the gather_stage. obj_ids = queue.gather_stage(harvester, job) + if not isinstance(obj_ids, list): + # gather failed + return None # The object ids are put onto the fetch queue, consumed by # queue.fetch_callback which calls queue.fetch_and_import_stages From f63140354d73bc3b477cd26a2afadb5af541cfd4 Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 15 Feb 2016 12:28:46 +0000 Subject: [PATCH 13/18] Fix logic error in previous commit --- ckanext/harvest/harvesters/ckanharvester.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 405aa90..a1bd06a 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -205,7 +205,7 @@ class CKANHarvester(HarvesterBase): 'gave an error: %s', e) get_all_packages = True - if not get_all_packages and pkg_dicts: + if not get_all_packages and not pkg_dicts: log.info('No datasets have been updated on the remote ' 'CKAN instance since the last harvest job %s', last_time) From 385b369148612c793225b56c97920a1498ab716b Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 15 Feb 2016 13:16:23 +0000 Subject: [PATCH 14/18] Error-free jobs now include ones where an object was not modified. --- ckanext/harvest/harvesters/base.py | 2 +- ckanext/harvest/harvesters/ckanharvester.py | 10 ++++++---- ckanext/harvest/tests/lib.py | 5 +++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 3934c3e..72946ef 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -312,7 +312,7 @@ class HarvesterBase(SingletonPlugin): new_package = p.toolkit.get_action('package_update')(context, package_dict) else: - log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid) + log.info('No changes to package with GUID %s, skipping...' % harvest_object.guid) # NB harvest_object.current/package_id are not set return 'unchanged' diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index a1bd06a..d51cdc5 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -189,12 +189,13 @@ class CKANHarvester(HarvesterBase): # this should work as long as local and remote clocks are # relatively accurate. Going back a little earlier, just in case. last_time -= datetime.timedelta(hours=1) - last_time = last_time.isoformat() + get_changes_since = \ + (last_time - datetime.timedelta(hours=1)).isoformat() log.info('Searching for datasets modified since: %s UTC', - last_time) + get_changes_since) fq_since_last_time = 'metadata_modified:[{last_check}Z TO *]' \ - .format(last_check=last_time) + .format(last_check=get_changes_since) try: pkg_dicts = self._search_for_datasets( @@ -349,7 +350,8 @@ class CKANHarvester(HarvesterBase): # and lots of jobs) for job in jobs: for obj in job.objects: - if obj.current is False: + if obj.current is False and \ + obj.report_status != 'not modified': # unsuccessful, so go onto the next job break else: diff --git a/ckanext/harvest/tests/lib.py b/ckanext/harvest/tests/lib.py index 6abb2cc..8b21ec7 100644 --- a/ckanext/harvest/tests/lib.py +++ b/ckanext/harvest/tests/lib.py @@ -30,8 +30,9 @@ def run_harvest_job(job, harvester): # gather_stage. We simply call the gather_stage. obj_ids = queue.gather_stage(harvester, job) if not isinstance(obj_ids, list): - # gather failed - return None + # gather had nothing to do or errored. Carry on to ensure the job is + # closed properly + obj_ids = [] # The object ids are put onto the fetch queue, consumed by # queue.fetch_callback which calls queue.fetch_and_import_stages From 49faa0ae6c238dce7e1edaded4d60b99d99763ea Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 15 Feb 2016 13:30:28 +0000 Subject: [PATCH 15/18] Tests for CKANHarvester._last_error_free_job --- .../tests/harvesters/test_ckanharvester.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/ckanext/harvest/tests/harvesters/test_ckanharvester.py b/ckanext/harvest/tests/harvesters/test_ckanharvester.py index 00f6fe4..141e7af 100644 --- a/ckanext/harvest/tests/harvesters/test_ckanharvester.py +++ b/ckanext/harvest/tests/harvesters/test_ckanharvester.py @@ -1,8 +1,8 @@ import copy -from nose.tools import assert_equal, assert_raises +from nose.tools import assert_equal import json -from mock import patch +from mock import patch, MagicMock try: from ckan.tests.helpers import reset_db @@ -16,7 +16,7 @@ from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj, HarvestObjectObj) from ckanext.harvest.tests.lib import run_harvest import ckanext.harvest.model as harvest_model -from ckanext.harvest.harvesters.ckanharvester import CKANHarvester, SearchError +from ckanext.harvest.harvesters.ckanharvester import CKANHarvester import mock_ckan @@ -24,6 +24,16 @@ import mock_ckan mock_ckan.serve() +def was_last_job_considered_error_free(): + last_job = model.Session.query(harvest_model.HarvestJob) \ + .order_by(harvest_model.HarvestJob.created.desc()) \ + .first() + job = MagicMock() + job.source = last_job.source + job.id = '' + return bool(CKANHarvester._last_error_free_job(job)) + + class TestCkanHarvester(object): @classmethod def setup(cls): @@ -92,6 +102,7 @@ class TestCkanHarvester(object): assert_equal(result['report_status'], 'added') assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name']) assert_equal(result['errors'], []) + assert was_last_job_considered_error_free() def test_harvest_twice(self): run_harvest( @@ -116,6 +127,7 @@ class TestCkanHarvester(object): # the other dataset is unchanged and not harvested assert mock_ckan.DATASETS[0]['id'] not in result + assert was_last_job_considered_error_free() def test_harvest_invalid_tag(self): from nose.plugins.skip import SkipTest; raise SkipTest() @@ -162,6 +174,7 @@ class TestCkanHarvester(object): assert_equal(result['report_status'], 'not modified') assert 'dataset' not in result assert_equal(result['errors'], []) + assert was_last_job_considered_error_free() def test_harvest_whilst_datasets_added(self): results_by_guid = run_harvest( @@ -177,3 +190,4 @@ class TestCkanHarvester(object): url='http://localhost:%s/site_down' % mock_ckan.PORT, harvester=CKANHarvester()) assert not results_by_guid + assert not was_last_job_considered_error_free() From 794fc93230eac81a7a19b52eb473498fc8edef27 Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 15 Feb 2016 15:23:39 +0000 Subject: [PATCH 16/18] Maintain compatibility with rest-style updates --- ckanext/harvest/harvesters/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 72946ef..6f5e08e 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -309,7 +309,9 @@ class HarvesterBase(SingletonPlugin): package_dict.setdefault('name', existing_package_dict['name']) - new_package = p.toolkit.get_action('package_update')(context, package_dict) + new_package = p.toolkit.get_action( + 'package_update' if package_dict_form == 'package_show' + else 'package_update_rest')(context, package_dict) else: log.info('No changes to package with GUID %s, skipping...' % harvest_object.guid) From 84b04629799dd6d0c1d501d669677815235589cb Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 15 Feb 2016 15:36:02 +0000 Subject: [PATCH 17/18] No need to go back twice --- ckanext/harvest/harvesters/ckanharvester.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index e19f404..0a0eec6 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -188,14 +188,13 @@ class CKANHarvester(HarvesterBase): # Note: SOLR works in UTC, and gather_started is also UTC, so # this should work as long as local and remote clocks are # relatively accurate. Going back a little earlier, just in case. - last_time -= datetime.timedelta(hours=1) get_changes_since = \ (last_time - datetime.timedelta(hours=1)).isoformat() log.info('Searching for datasets modified since: %s UTC', get_changes_since) - fq_since_last_time = 'metadata_modified:[{last_check}Z TO *]' \ - .format(last_check=get_changes_since) + fq_since_last_time = 'metadata_modified:[{since}Z TO *]' \ + .format(since=get_changes_since) try: pkg_dicts = self._search_for_datasets( From 9dfeb154ebb5d360a120fe7abaaa77213da4771c Mon Sep 17 00:00:00 2001 From: amercader Date: Wed, 17 Feb 2016 10:05:57 +0000 Subject: [PATCH 18/18] [#158] Tone down log message --- ckanext/harvest/harvesters/ckanharvester.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 0a0eec6..cce7e4e 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -243,8 +243,8 @@ class CKANHarvester(HarvesterBase): continue package_ids.add(pkg_dict['id']) - log.info('Creating HarvestObject for %s %s', - pkg_dict['name'], pkg_dict['id']) + log.debug('Creating HarvestObject for %s %s', + pkg_dict['name'], pkg_dict['id']) obj = HarvestObject(guid=pkg_dict['id'], job=harvest_job, content=json.dumps(pkg_dict))