From 6dd40bfcf96d3e75597ed142ad3ce80a838aa98a Mon Sep 17 00:00:00 2001 From: Ross Jones Date: Thu, 10 Sep 2015 16:46:25 +0100 Subject: [PATCH] Changes the gather state to use v3 API Rather than using the revisions in v2 API this now uses the package_search API so that we can extend it with proper filters in future. --- ckanext/harvest/harvesters/base.py | 13 ++--- ckanext/harvest/harvesters/ckanharvester.py | 54 +++++++++------------ 2 files changed, 28 insertions(+), 39 deletions(-) diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index 41a5061..dfc756e 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -150,9 +150,9 @@ class HarvesterBase(SingletonPlugin): ''' Creates a new package or updates an exisiting one according to the package dictionary provided. The package dictionary should look like - the REST API response for a package: + the Action API response for a package: - http://ckan.net/api/rest/package/statistics-catalunya + http://ckan.net/api/action/package_show?id=statistics-catalunya Note that the package_dict must contain an id, which will be used to check if the package needs to be created or updated (use the remote @@ -161,10 +161,6 @@ class HarvesterBase(SingletonPlugin): If the remote server provides the modification date of the remote package, add it to package_dict['metadata_modified']. - - TODO: Not sure it is worth keeping this function. If useful it should - use the output of package_show logic function (maybe keeping support - for rest api based dicts ''' try: # Change default schema @@ -199,6 +195,7 @@ class HarvesterBase(SingletonPlugin): # Check if package exists data_dict = {} + data_dict['id'] = package_dict['id'] try: existing_package_dict = get_action('package_show')(context, data_dict) @@ -214,7 +211,7 @@ class HarvesterBase(SingletonPlugin): context.update({'id':package_dict['id']}) package_dict.setdefault('name', existing_package_dict['name']) - new_package = get_action('package_update_rest')(context, package_dict) + new_package = get_action('package_update')(context, package_dict) else: log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid) @@ -258,7 +255,7 @@ class HarvesterBase(SingletonPlugin): model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') model.Session.flush() - new_package = get_action('package_create_rest')(context, package_dict) + new_package = get_action('package_create')(context, package_dict) Session.commit() diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 6339969..625ee0f 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -6,6 +6,7 @@ from ckan.model import Session, Package from ckan.logic import ValidationError, NotFound, get_action from ckan.lib.helpers import json from ckan.lib.munge import munge_name +from urlparse import urljoin from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ HarvestObjectError @@ -24,14 +25,11 @@ class CKANHarvester(HarvesterBase): api_version = 2 action_api_version = 3 - def _get_rest_api_offset(self): - return '/api/%d/rest' % self.api_version - def _get_action_api_offset(self): return '/api/%d/action' % self.action_api_version def _get_search_api_offset(self): - return '/api/%d/search' % self.api_version + return "%s/package_search" % self._get_action_api_offset() def _get_content(self, url): http_request = urllib2.Request( @@ -46,13 +44,13 @@ class CKANHarvester(HarvesterBase): http_response = urllib2.urlopen(http_request) except urllib2.URLError, e: raise ContentFetchError( - 'Could not fetch url: %s, error: %s' % + 'Could not fetch url: %s, error: %s' % (url, str(e)) ) return http_response.read() def _get_group(self, base_url, group_name): - url = base_url + self._get_rest_api_offset() + '/group/' + munge_name(group_name) + url = base_url + self._get_action_api_offset() + '/group_show?id=' + munge_name(group_name) try: content = self._get_content(url) return json.loads(content) @@ -157,34 +155,28 @@ class CKANHarvester(HarvesterBase): # Get source URL base_url = harvest_job.source.url.rstrip('/') - base_rest_url = base_url + self._get_rest_api_offset() base_search_url = base_url + self._get_search_api_offset() + log.debug("%r", previous_job) + if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0): if not self.config.get('force_all',False): get_all_packages = False # Request only the packages modified since last harvest job last_time = previous_job.gather_finished.isoformat() - url = base_search_url + '/revision?since_time=%s' % last_time + log.info("Searching for datasets modified since: %s", last_time) + + fq = "metadata_modified:[{last_check} TO *]".format(last_check=last_time) + url = base_search_url + '?fq={fq}&rows=1000'.format(fq) try: content = self._get_content(url) - revision_ids = json.loads(content) - if len(revision_ids): - for revision_id in revision_ids: - url = base_rest_url + '/revision/%s' % revision_id - try: - content = self._get_content(url) - except ContentFetchError,e: - self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) - continue - - revision = json.loads(content) - for package_id in revision['packages']: - if not package_id in package_ids: - package_ids.append(package_id) + package_dicts = json.loads(content).get('result', {}).get('results', []) + for package in package_dicts: + if not package['id'] in package_ids: + package_ids.append(package['id']) else: log.info('No packages have been updated on the remote CKAN instance since the last harvest job') return None @@ -197,23 +189,21 @@ class CKANHarvester(HarvesterBase): self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None - - if get_all_packages: # Request all remote packages - url = base_rest_url + '/package' + url = urljoin(base_url,self._get_action_api_offset() + '/package_list') try: content = self._get_content(url) except ContentFetchError,e: self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None - - package_ids = json.loads(content) + package_ids = json.loads(content).get('result',[]) try: object_ids = [] if len(package_ids): for package_id in package_ids: + log.debug("Creating harvestjob for %s", package_id) # Create a new HarvestObject for this identifier obj = HarvestObject(guid = package_id, job = harvest_job) obj.save() @@ -236,7 +226,7 @@ class CKANHarvester(HarvesterBase): # Get source URL url = harvest_object.source.url.rstrip('/') - url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid + url = url + self._get_action_api_offset() + '/package_show?id=' + harvest_object.guid # Get contents try: @@ -246,8 +236,10 @@ class CKANHarvester(HarvesterBase): (url, e),harvest_object) return None + content = json.loads(content)['result'] + # Save the fetched contents in the HarvestObject - harvest_object.content = content + harvest_object.content = json.dumps(content) harvest_object.save() return True @@ -374,7 +366,7 @@ class CKANHarvester(HarvesterBase): # Find any extras whose values are not strings and try to convert # them to strings, as non-string extras are not allowed anymore in # CKAN 2.0. - for key in package_dict['extras'].keys(): + for key in package_dict.get('extras', {}).keys(): if not isinstance(package_dict['extras'][key], basestring): try: package_dict['extras'][key] = json.dumps( @@ -390,7 +382,7 @@ class CKANHarvester(HarvesterBase): if not 'extras' in package_dict: package_dict['extras'] = {} for key,value in default_extras.iteritems(): - if not key in package_dict['extras'] or override_extras: + if not key in package_dict.get('extras', {}) or override_extras: # Look for replacement strings if isinstance(value,basestring): value = value.format(harvest_source_id=harvest_object.job.source.id,