import urllib2 from ckan.model import Session, Package from ckan.logic import ValidationError, NotFound from ckan.lib.helpers import json from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ HarvestObjectError from ckanclient import CkanClient import logging log = logging.getLogger(__name__) from base import HarvesterBase class CKANHarvester(HarvesterBase): ''' A Harvester for CKAN instances ''' #TODO: check different API versions api_version = '2' def _get_rest_api_offset(self): return '/api/%s/rest' % self.api_version def _get_search_api_offset(self): return '/api/%s/search' % self.api_version def _get_content(self, url): http_request = urllib2.Request( url = url, ) try: http_response = urllib2.urlopen(http_request) return http_response.read() except Exception, e: raise e def info(self): return { 'name': 'ckan', 'title': 'CKAN', 'description': 'Harvests remote CKAN instances', 'form_config_interface':'Text' } def validate_config(self,config): try: config_obj = json.loads(config) except ValueError,e: raise e return config def gather_stage(self,harvest_job): log.debug('In CKANHarvester gather_stage (%s)' % harvest_job.source.url) get_all_packages = True package_ids = [] # Check if this source has been harvested before previous_job = Session.query(HarvestJob) \ .filter(HarvestJob.source==harvest_job.source) \ .filter(HarvestJob.gather_finished!=None) \ .filter(HarvestJob.id!=harvest_job.id) \ .order_by(HarvestJob.gather_finished.desc()) \ .limit(1).first() # Get source URL base_url = harvest_job.source.url.rstrip('/') base_rest_url = base_url + self._get_rest_api_offset() base_search_url = base_url + self._get_search_api_offset() if previous_job and not previous_job.gather_errors: get_all_packages = False # Request only the packages modified since last harvest job last_time = harvest_job.gather_started.isoformat() url = base_search_url + '/revision?since_time=%s' % last_time try: content = self._get_content(url) revision_ids = json.loads(content) if len(revision_ids): for revision_id in revision_ids: url = base_rest_url + '/revision/%s' % revision_id try: content = self._get_content(url) except Exception,e: self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) continue revision = json.loads(content) for package_id in revision.packages: if not package_id in package_ids: package_ids.append(package_id) else: log.info('No packages have been updated on the remote CKAN instance since the last harvest job') return None except urllib2.HTTPError,e: if e.getcode() == 400: log.info('CKAN instance %s does not suport revision filtering' % base_url) get_all_packages = True else: self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None if get_all_packages: # Request all remote packages url = base_rest_url + '/package' try: content = self._get_content(url) except Exception,e: self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) return None package_ids = json.loads(content) try: object_ids = [] if len(package_ids): for package_id in package_ids: # Create a new HarvestObject for this identifier obj = HarvestObject(guid = package_id, job = harvest_job) obj.save() object_ids.append(obj.id) return object_ids else: self._save_gather_error('No packages received for URL: %s' % url, harvest_job) return None except Exception, e: self._save_gather_error('%r'%e.message,harvest_job) def fetch_stage(self,harvest_object): log.debug('In CKANHarvester fetch_stage') # Get source URL url = harvest_object.source.url.rstrip('/') url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid # Get contents try: content = self._get_content(url) except Exception,e: self._save_object_error('Unable to get content for package: %s: %r' % \ (url, e),harvest_object) return None # Save the fetched contents in the HarvestObject harvest_object.content = content harvest_object.save() return True def import_stage(self,harvest_object): log.debug('In CKANHarvester import_stage') if not harvest_object: log.error('No harvest object received') return False if harvest_object.content is None: self._save_object_error('Empty content for object %s' % harvest_object.id, harvest_object, 'Import') return False try: package_dict = json.loads(harvest_object.content) return self._create_or_update_package(package_dict,harvest_object) except ValidationError,e: self._save_object_error('Invalid package with GUID %s: %r' % (harvest_object.guid, e.error_dict), harvest_object, 'Import') except Exception, e: self._save_object_error('%r'%e,harvest_object,'Import')