diff --git a/ckanext/harvest/harvesters.py b/ckanext/harvest/harvesters.py index 22116c5..c7d6b8f 100644 --- a/ckanext/harvest/harvesters.py +++ b/ckanext/harvest/harvesters.py @@ -14,15 +14,15 @@ from ckan.lib.helpers import json from ckan.plugins.core import SingletonPlugin, implements from ckanext.harvest.interfaces import IHarvester -from ckanext.harvest.model import HarvestObject, HarvestGatherError, \ +from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ HarvestObjectError import logging log = logging.getLogger(__name__) -class MockTranslator(object): - def ugettext(self, value): - return value +class MockTranslator(object): + def ugettext(self, value): + return value def ungettext(self, singular, plural, n): if n > 1: @@ -33,32 +33,35 @@ class CKANHarvester(SingletonPlugin): ''' A Harvester for CKAN instances ''' - + implements(IHarvester) + #TODO: check different API versions + api_version = '2' + + def __init__(self,**kw): from paste.registry import Registry import pylons - self.registry=Registry() - self.registry.prepare() + self.registry=Registry() + self.registry.prepare() self.translator_obj=MockTranslator() self.registry.register(pylons.translator, self.translator_obj) - def _get_api_offset(self): - #TODO: check different API versions? - return '/api/2/rest' + def _get_rest_api_offset(self): + return '/api/%s/rest' % self.api_version + + def _get_search_api_offset(self): + return '/api/%s/search' % self.api_version def _get_content(self, url): - #TODO: configure http_request = urllib2.Request( url = url, - headers = {'Authorization' : 'fcff821f-1f92-42ef-8c52-7c38d74a7291'} ) try: - #http_response = urllib2.urlopen(url) http_response = urllib2.urlopen(http_request) return http_response.read() @@ -79,22 +82,74 @@ class CKANHarvester(SingletonPlugin): return 'CKAN' def gather_stage(self,harvest_job): - log.debug('In CKANHarvester gather_stage') + log.debug('In CKANHarvester gather_stage (%s)' % harvest_job.source.url) + + get_all_packages = True + package_ids = [] + + # Check if this source has been harvested before + previous_job = Session.query(HarvestJob) \ + .filter(HarvestJob.source==harvest_job.source) \ + .filter(HarvestJob.gather_finished!=None) \ + .filter(HarvestJob.id!=harvest_job.id) \ + .order_by(HarvestJob.gather_finished.desc()) \ + .limit(1).first() # Get source URL - url = harvest_job.source.url.rstrip('/') - url = url + self._get_api_offset() + '/package' + base_url = harvest_job.source.url.rstrip('/') + base_rest_url = base_url + self._get_rest_api_offset() + base_search_url = base_url + self._get_search_api_offset() + + if previous_job and not previous_job.gather_errors: + get_all_packages = False - # Get contents - try: - content = self._get_content(url) - except Exception,e: - self._save_gather_error('Unable to get content for URL: %s: %r' % \ - (url, e),harvest_job) - return None + # Request only the packages modified since last harvest job + last_time = harvest_job.gather_started.isoformat() + url = base_search_url + '/revision?since_time=%s' % last_time + + try: + content = self._get_content(url) + + revision_ids = json.loads(content) + if len(revision_ids): + for revision_id in revision_ids: + url = base_rest_url + '/revision/%s' % revision_id + try: + content = self._get_content(url) + except Exception,e: + self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) + continue + + revision = json.loads(content) + for package_id in revision.packages: + if not package_id in package_ids: + package_ids.append(package_id) + else: + log.info('No packages have been updated on the remote CKAN instance since the last harvest job') + return None + + except urllib2.HTTPError,e: + if e.getcode() == 400: + log.info('CKAN instance %s does not suport revision filtering' % base_url) + get_all_packages = True + else: + self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) + return None + + + + if get_all_packages: + # Request all remote packages + url = base_rest_url + '/package' + try: + content = self._get_content(url) + except Exception,e: + self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) + return None - try: package_ids = json.loads(content) + + try: object_ids = [] if len(package_ids): for package_id in package_ids: @@ -116,7 +171,7 @@ class CKANHarvester(SingletonPlugin): log.debug('In CKANHarvester fetch_stage') # Get source URL url = harvest_object.source.url.rstrip('/') - url = url + self._get_api_offset() + '/package/' + harvest_object.guid + url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid # Get contents try: @@ -144,11 +199,23 @@ class CKANHarvester(SingletonPlugin): return False try: - # harvest_object.content is the result of an API call like - # http://ec2-46-51-149-132.eu-west-1.compute.amazonaws.com:8081/api/2/rest/package/77d93608-3a3e-42e5-baab-3521afb504f1 + # harvest_object.content is the result of a package REST API call package_dict = json.loads(harvest_object.content) # Save metadata modified date in Harvest Object + if not 'metadata_modified' in package_dict: + # Get the date from the revision + url = harvest_object.job.source.url.rstrip('/') + url = url + self._get_rest_api_offset() + '/revision/%s' % package_dict['revision_id'] + + try: + content = self._get_content(url) + revision_dict = json.loads(content) + package_dict['metadata_modified'] = revision_dict['timestamp'] + except Exception,e: + self._save_gather_error('Unable to get revision %s info : %r' % \ + (package_dict['revision_id'], e),harvest_job) + harvest_object.metadata_modified_date = package_dict['metadata_modified'] harvest_object.save() @@ -163,7 +230,7 @@ class CKANHarvester(SingletonPlugin): 'api_version':'2', 'schema': schema, } - + # Ugly Hack: tags in DGU are created with Upper case and spaces, # and the validator does not like them if 'tags' in package_dict: @@ -174,10 +241,8 @@ class CKANHarvester(SingletonPlugin): # Check if package exists context.update({'id':package_dict['id']}) - try: existing_package_dict = package_show(context) - # Check modified date if package_dict['metadata_modified'] > existing_package_dict['metadata_modified']: log.info('Package with GUID %s exists and needs to be updated' % harvest_object.guid)