2011-04-19 15:54:59 +02:00
|
|
|
import urllib2
|
|
|
|
|
2011-11-18 14:20:41 +01:00
|
|
|
from ckan.lib.base import c
|
|
|
|
from ckan import model
|
2011-06-02 12:07:07 +02:00
|
|
|
from ckan.model import Session, Package
|
2011-11-18 14:20:41 +01:00
|
|
|
from ckan.logic import ValidationError, NotFound, get_action
|
2011-04-19 15:54:59 +02:00
|
|
|
from ckan.lib.helpers import json
|
|
|
|
|
2011-05-17 18:26:42 +02:00
|
|
|
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
|
2011-04-19 15:54:59 +02:00
|
|
|
HarvestObjectError
|
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
from ckanclient import CkanClient
|
|
|
|
|
2011-04-19 15:54:59 +02:00
|
|
|
import logging
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
from base import HarvesterBase
|
2011-04-19 15:54:59 +02:00
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
class CKANHarvester(HarvesterBase):
|
2011-04-19 15:54:59 +02:00
|
|
|
'''
|
|
|
|
A Harvester for CKAN instances
|
|
|
|
'''
|
2011-06-07 14:35:11 +02:00
|
|
|
config = None
|
2011-05-17 18:26:42 +02:00
|
|
|
|
|
|
|
api_version = '2'
|
|
|
|
|
|
|
|
def _get_rest_api_offset(self):
|
|
|
|
return '/api/%s/rest' % self.api_version
|
|
|
|
|
|
|
|
def _get_search_api_offset(self):
|
|
|
|
return '/api/%s/search' % self.api_version
|
2011-04-19 15:54:59 +02:00
|
|
|
|
|
|
|
def _get_content(self, url):
|
|
|
|
http_request = urllib2.Request(
|
|
|
|
url = url,
|
|
|
|
)
|
|
|
|
|
|
|
|
try:
|
|
|
|
http_response = urllib2.urlopen(http_request)
|
|
|
|
|
|
|
|
return http_response.read()
|
|
|
|
except Exception, e:
|
|
|
|
raise e
|
|
|
|
|
2011-06-07 14:35:11 +02:00
|
|
|
def _set_config(self,config_str):
|
|
|
|
if config_str:
|
|
|
|
self.config = json.loads(config_str)
|
2011-07-18 18:35:32 +02:00
|
|
|
|
|
|
|
if 'api_version' in self.config:
|
|
|
|
self.api_version = self.config['api_version']
|
|
|
|
|
2011-06-07 14:35:11 +02:00
|
|
|
log.debug('Using config: %r', self.config)
|
|
|
|
else:
|
|
|
|
self.config = {}
|
|
|
|
|
2011-05-13 19:39:36 +02:00
|
|
|
def info(self):
|
|
|
|
return {
|
|
|
|
'name': 'ckan',
|
|
|
|
'title': 'CKAN',
|
2011-06-07 13:07:53 +02:00
|
|
|
'description': 'Harvests remote CKAN instances',
|
|
|
|
'form_config_interface':'Text'
|
2011-05-13 19:39:36 +02:00
|
|
|
}
|
2011-04-19 15:54:59 +02:00
|
|
|
|
2011-06-07 13:07:53 +02:00
|
|
|
def validate_config(self,config):
|
2011-06-28 16:04:40 +02:00
|
|
|
if not config:
|
|
|
|
return config
|
|
|
|
|
2011-06-07 13:07:53 +02:00
|
|
|
try:
|
|
|
|
config_obj = json.loads(config)
|
2011-11-18 14:20:41 +01:00
|
|
|
|
|
|
|
if 'default_groups' in config_obj:
|
|
|
|
# Check if default groups exist
|
|
|
|
context = {'model':model,'user':c.user}
|
|
|
|
for group_name in config_obj['default_groups']:
|
|
|
|
try:
|
|
|
|
group = get_action('group_show')(context,{'id':group_name})
|
|
|
|
except NotFound,e:
|
|
|
|
raise ValueError('Default group not found')
|
2011-11-18 15:12:30 +01:00
|
|
|
|
|
|
|
if 'user' in config_obj:
|
|
|
|
# Check if user exists
|
|
|
|
context = {'model':model,'user':c.user}
|
|
|
|
try:
|
|
|
|
user = get_action('user_show')(context,{'id':config_obj.get('user')})
|
|
|
|
except NotFound,e:
|
|
|
|
raise ValueError('User not found')
|
2011-11-18 14:20:41 +01:00
|
|
|
|
2011-06-07 13:07:53 +02:00
|
|
|
except ValueError,e:
|
|
|
|
raise e
|
|
|
|
|
|
|
|
return config
|
|
|
|
|
|
|
|
|
2011-04-19 15:54:59 +02:00
|
|
|
def gather_stage(self,harvest_job):
|
2011-05-17 18:26:42 +02:00
|
|
|
log.debug('In CKANHarvester gather_stage (%s)' % harvest_job.source.url)
|
|
|
|
get_all_packages = True
|
|
|
|
package_ids = []
|
|
|
|
|
2011-06-14 16:59:13 +02:00
|
|
|
self._set_config(harvest_job.source.config)
|
2011-06-07 14:35:11 +02:00
|
|
|
|
2011-05-17 18:26:42 +02:00
|
|
|
# Check if this source has been harvested before
|
|
|
|
previous_job = Session.query(HarvestJob) \
|
|
|
|
.filter(HarvestJob.source==harvest_job.source) \
|
|
|
|
.filter(HarvestJob.gather_finished!=None) \
|
|
|
|
.filter(HarvestJob.id!=harvest_job.id) \
|
|
|
|
.order_by(HarvestJob.gather_finished.desc()) \
|
|
|
|
.limit(1).first()
|
2011-04-19 15:54:59 +02:00
|
|
|
|
|
|
|
# Get source URL
|
2011-05-17 18:26:42 +02:00
|
|
|
base_url = harvest_job.source.url.rstrip('/')
|
|
|
|
base_rest_url = base_url + self._get_rest_api_offset()
|
|
|
|
base_search_url = base_url + self._get_search_api_offset()
|
2011-06-07 13:07:53 +02:00
|
|
|
|
2011-06-14 13:59:48 +02:00
|
|
|
if previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0:
|
2011-05-17 18:26:42 +02:00
|
|
|
get_all_packages = False
|
2011-04-19 15:54:59 +02:00
|
|
|
|
2011-05-17 18:26:42 +02:00
|
|
|
# Request only the packages modified since last harvest job
|
|
|
|
last_time = harvest_job.gather_started.isoformat()
|
|
|
|
url = base_search_url + '/revision?since_time=%s' % last_time
|
|
|
|
|
|
|
|
try:
|
|
|
|
content = self._get_content(url)
|
|
|
|
|
|
|
|
revision_ids = json.loads(content)
|
|
|
|
if len(revision_ids):
|
|
|
|
for revision_id in revision_ids:
|
|
|
|
url = base_rest_url + '/revision/%s' % revision_id
|
|
|
|
try:
|
|
|
|
content = self._get_content(url)
|
|
|
|
except Exception,e:
|
|
|
|
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
|
|
|
|
continue
|
|
|
|
|
|
|
|
revision = json.loads(content)
|
|
|
|
for package_id in revision.packages:
|
|
|
|
if not package_id in package_ids:
|
|
|
|
package_ids.append(package_id)
|
|
|
|
else:
|
|
|
|
log.info('No packages have been updated on the remote CKAN instance since the last harvest job')
|
|
|
|
return None
|
|
|
|
|
|
|
|
except urllib2.HTTPError,e:
|
|
|
|
if e.getcode() == 400:
|
|
|
|
log.info('CKAN instance %s does not suport revision filtering' % base_url)
|
|
|
|
get_all_packages = True
|
|
|
|
else:
|
|
|
|
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if get_all_packages:
|
|
|
|
# Request all remote packages
|
|
|
|
url = base_rest_url + '/package'
|
|
|
|
try:
|
|
|
|
content = self._get_content(url)
|
|
|
|
except Exception,e:
|
|
|
|
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
|
|
|
|
return None
|
2011-04-19 15:54:59 +02:00
|
|
|
|
|
|
|
package_ids = json.loads(content)
|
2011-05-17 18:26:42 +02:00
|
|
|
|
|
|
|
try:
|
2011-04-19 15:54:59 +02:00
|
|
|
object_ids = []
|
|
|
|
if len(package_ids):
|
|
|
|
for package_id in package_ids:
|
|
|
|
# Create a new HarvestObject for this identifier
|
|
|
|
obj = HarvestObject(guid = package_id, job = harvest_job)
|
|
|
|
obj.save()
|
|
|
|
object_ids.append(obj.id)
|
|
|
|
|
|
|
|
return object_ids
|
|
|
|
|
|
|
|
else:
|
2011-06-07 13:07:53 +02:00
|
|
|
self._save_gather_error('No packages received for URL: %s' % url,
|
2011-06-02 12:07:07 +02:00
|
|
|
harvest_job)
|
2011-04-19 15:54:59 +02:00
|
|
|
return None
|
|
|
|
except Exception, e:
|
|
|
|
self._save_gather_error('%r'%e.message,harvest_job)
|
|
|
|
|
|
|
|
|
|
|
|
def fetch_stage(self,harvest_object):
|
|
|
|
log.debug('In CKANHarvester fetch_stage')
|
2011-06-07 14:35:11 +02:00
|
|
|
|
2011-06-14 16:59:13 +02:00
|
|
|
self._set_config(harvest_object.job.source.config)
|
2011-06-07 14:35:11 +02:00
|
|
|
|
2011-04-19 15:54:59 +02:00
|
|
|
# Get source URL
|
|
|
|
url = harvest_object.source.url.rstrip('/')
|
2011-05-17 18:26:42 +02:00
|
|
|
url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid
|
2011-04-19 15:54:59 +02:00
|
|
|
|
|
|
|
# Get contents
|
|
|
|
try:
|
|
|
|
content = self._get_content(url)
|
|
|
|
except Exception,e:
|
|
|
|
self._save_object_error('Unable to get content for package: %s: %r' % \
|
|
|
|
(url, e),harvest_object)
|
|
|
|
return None
|
|
|
|
|
|
|
|
# Save the fetched contents in the HarvestObject
|
|
|
|
harvest_object.content = content
|
|
|
|
harvest_object.save()
|
|
|
|
return True
|
|
|
|
|
|
|
|
def import_stage(self,harvest_object):
|
|
|
|
log.debug('In CKANHarvester import_stage')
|
|
|
|
if not harvest_object:
|
|
|
|
log.error('No harvest object received')
|
|
|
|
return False
|
|
|
|
|
|
|
|
if harvest_object.content is None:
|
2011-06-07 13:07:53 +02:00
|
|
|
self._save_object_error('Empty content for object %s' % harvest_object.id,
|
2011-06-02 12:07:07 +02:00
|
|
|
harvest_object, 'Import')
|
2011-04-19 15:54:59 +02:00
|
|
|
return False
|
|
|
|
|
2011-06-14 16:59:13 +02:00
|
|
|
self._set_config(harvest_object.job.source.config)
|
2011-06-07 14:35:11 +02:00
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
try:
|
2011-04-19 15:54:59 +02:00
|
|
|
package_dict = json.loads(harvest_object.content)
|
2011-11-18 14:20:41 +01:00
|
|
|
|
|
|
|
# Set default tags if needed
|
|
|
|
default_tags = self.config.get('default_tags',[])
|
|
|
|
if default_tags:
|
|
|
|
if not 'tags' in package_dict:
|
|
|
|
package_dict['tags'] = []
|
|
|
|
package_dict['tags'].extend([t for t in default_tags if t not in package_dict['tags']])
|
|
|
|
|
|
|
|
# Ignore remote groups for the time being
|
|
|
|
del package_dict['groups']
|
|
|
|
|
|
|
|
# Set default groups if needed
|
|
|
|
default_groups = self.config.get('default_groups',[])
|
|
|
|
if default_groups:
|
|
|
|
if not 'groups' in package_dict:
|
|
|
|
package_dict['groups'] = []
|
|
|
|
package_dict['groups'].extend([g for g in default_groups if g not in package_dict['groups']])
|
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
return self._create_or_update_package(package_dict,harvest_object)
|
2011-04-19 15:54:59 +02:00
|
|
|
except ValidationError,e:
|
2011-06-07 13:07:53 +02:00
|
|
|
self._save_object_error('Invalid package with GUID %s: %r' % (harvest_object.guid, e.error_dict),
|
2011-06-02 12:07:07 +02:00
|
|
|
harvest_object, 'Import')
|
2011-04-19 15:54:59 +02:00
|
|
|
except Exception, e:
|
|
|
|
self._save_object_error('%r'%e,harvest_object,'Import')
|
|
|
|
|