2011-06-02 12:07:07 +02:00
|
|
|
import logging
|
2011-11-23 12:05:52 +01:00
|
|
|
import re
|
2013-02-12 17:08:39 +01:00
|
|
|
import uuid
|
2011-06-02 12:07:07 +02:00
|
|
|
|
2012-02-01 16:50:41 +01:00
|
|
|
from sqlalchemy.sql import update,and_, bindparam
|
2013-02-12 17:08:39 +01:00
|
|
|
from sqlalchemy.exc import InvalidRequestError
|
2015-06-11 11:19:07 +02:00
|
|
|
from pylons import config
|
2012-02-01 16:50:41 +01:00
|
|
|
|
2013-02-27 12:34:09 +01:00
|
|
|
from ckan import plugins as p
|
2011-06-02 12:07:07 +02:00
|
|
|
from ckan import model
|
|
|
|
from ckan.model import Session, Package
|
2011-10-26 18:26:18 +02:00
|
|
|
from ckan.logic import ValidationError, NotFound, get_action
|
|
|
|
|
2013-03-25 18:38:07 +01:00
|
|
|
from ckan.logic.schema import default_create_package_schema
|
2012-01-10 15:46:12 +01:00
|
|
|
from ckan.lib.navl.validators import ignore_missing,ignore
|
2011-11-23 12:05:52 +01:00
|
|
|
from ckan.lib.munge import munge_title_to_name,substitute_ascii_equivalents
|
2011-06-02 12:07:07 +02:00
|
|
|
|
|
|
|
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
|
|
|
|
HarvestObjectError
|
|
|
|
|
|
|
|
from ckan.plugins.core import SingletonPlugin, implements
|
|
|
|
from ckanext.harvest.interfaces import IHarvester
|
|
|
|
|
2014-01-29 10:02:16 +01:00
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
2013-02-12 17:08:39 +01:00
|
|
|
|
2011-11-23 12:05:52 +01:00
|
|
|
def munge_tag(tag):
|
2014-02-10 09:29:01 +01:00
|
|
|
tag = substitute_ascii_equivalents(tag)
|
|
|
|
tag = tag.lower().strip()
|
|
|
|
return re.sub(r'[^a-zA-Z0-9 -]', '', tag).replace(' ', '-')
|
2014-01-29 10:02:16 +01:00
|
|
|
|
2011-11-23 12:05:52 +01:00
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
class HarvesterBase(SingletonPlugin):
|
|
|
|
'''
|
2011-07-18 18:35:03 +02:00
|
|
|
Generic class for harvesters with helper functions
|
2011-06-02 12:07:07 +02:00
|
|
|
'''
|
|
|
|
implements(IHarvester)
|
|
|
|
|
2012-03-12 15:46:28 +01:00
|
|
|
config = None
|
|
|
|
|
2015-06-11 11:19:07 +02:00
|
|
|
_user_name = None
|
|
|
|
|
2013-02-12 17:08:39 +01:00
|
|
|
def _gen_new_name(self, title):
|
2011-07-18 18:35:03 +02:00
|
|
|
'''
|
|
|
|
Creates a URL friendly name from a title
|
2013-02-12 17:08:39 +01:00
|
|
|
|
|
|
|
If the name already exists, it will add some random characters at the end
|
2011-07-18 18:35:03 +02:00
|
|
|
'''
|
2013-02-12 17:08:39 +01:00
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
name = munge_title_to_name(title).replace('_', '-')
|
|
|
|
while '--' in name:
|
|
|
|
name = name.replace('--', '-')
|
2013-02-12 17:08:39 +01:00
|
|
|
pkg_obj = Session.query(Package).filter(Package.name == name).first()
|
|
|
|
if pkg_obj:
|
|
|
|
return name + str(uuid.uuid4())[:5]
|
2011-06-02 12:07:07 +02:00
|
|
|
else:
|
2013-02-12 17:08:39 +01:00
|
|
|
return name
|
|
|
|
|
|
|
|
|
|
|
|
def _save_gather_error(self, message, job):
|
|
|
|
err = HarvestGatherError(message=message, job=job)
|
|
|
|
try:
|
|
|
|
err.save()
|
|
|
|
except InvalidRequestError:
|
|
|
|
Session.rollback()
|
|
|
|
err.save()
|
|
|
|
finally:
|
|
|
|
log.error(message)
|
|
|
|
|
|
|
|
|
|
|
|
def _save_object_error(self, message, obj, stage=u'Fetch', line=None):
|
|
|
|
err = HarvestObjectError(message=message,
|
|
|
|
object=obj,
|
|
|
|
stage=stage,
|
|
|
|
line=line)
|
|
|
|
try:
|
|
|
|
err.save()
|
|
|
|
except InvalidRequestError, e:
|
|
|
|
Session.rollback()
|
|
|
|
err.save()
|
|
|
|
finally:
|
|
|
|
log_message = '{0}, line {1}'.format(message,line) if line else message
|
|
|
|
log.debug(log_message)
|
2011-06-02 12:07:07 +02:00
|
|
|
|
2015-06-11 11:19:07 +02:00
|
|
|
def _get_user_name(self):
|
|
|
|
'''
|
|
|
|
Returns the name of the user that will perform the harvesting actions
|
|
|
|
(deleting, updating and creating datasets)
|
|
|
|
|
|
|
|
By default this will be the old 'harvest' user to maintain
|
|
|
|
compatibility. If not present, the internal site admin user will be
|
|
|
|
used. This is the recommended setting, but if necessary it can be
|
|
|
|
overridden with the `ckanext.harvest.user_name` config option:
|
|
|
|
|
|
|
|
ckanext.harvest.user_name = harvest
|
|
|
|
|
|
|
|
'''
|
|
|
|
if self._user_name:
|
|
|
|
return self._user_name
|
|
|
|
|
|
|
|
config_user_name = config.get('ckanext.harvest.user_name')
|
|
|
|
if config_user_name:
|
|
|
|
self._user_name = config_user_name
|
|
|
|
return self._user_name
|
|
|
|
|
|
|
|
context = {'model': model,
|
|
|
|
'ignore_auth': True,
|
|
|
|
}
|
|
|
|
|
|
|
|
# Check if 'harvest' user exists and if is a sysadmin
|
|
|
|
try:
|
|
|
|
user_harvest = p.toolkit.get_action('user_show')(
|
|
|
|
context, {'id': 'harvest'})
|
|
|
|
if user_harvest['sysadmin']:
|
|
|
|
self._user_name = 'harvest'
|
|
|
|
return self._user_name
|
|
|
|
except p.toolkit.ObjectNotFound:
|
|
|
|
pass
|
|
|
|
|
|
|
|
context['defer_commit'] = True # See ckan/ckan#1714
|
|
|
|
self._site_user = p.toolkit.get_action('get_site_user')(context, {})
|
|
|
|
self._user_name = self._site_user['name']
|
|
|
|
|
|
|
|
return self._user_name
|
2011-06-02 12:07:07 +02:00
|
|
|
|
|
|
|
def _create_harvest_objects(self, remote_ids, harvest_job):
|
2011-07-18 18:35:03 +02:00
|
|
|
'''
|
|
|
|
Given a list of remote ids and a Harvest Job, create as many Harvest Objects and
|
2013-02-12 17:08:39 +01:00
|
|
|
return a list of their ids to be passed to the fetch stage.
|
|
|
|
|
|
|
|
TODO: Not sure it is worth keeping this function
|
2011-07-18 18:35:03 +02:00
|
|
|
'''
|
2011-06-02 12:07:07 +02:00
|
|
|
try:
|
|
|
|
object_ids = []
|
|
|
|
if len(remote_ids):
|
|
|
|
for remote_id in remote_ids:
|
|
|
|
# Create a new HarvestObject for this identifier
|
|
|
|
obj = HarvestObject(guid = remote_id, job = harvest_job)
|
|
|
|
obj.save()
|
|
|
|
object_ids.append(obj.id)
|
|
|
|
return object_ids
|
|
|
|
else:
|
|
|
|
self._save_gather_error('No remote datasets could be identified', harvest_job)
|
|
|
|
except Exception, e:
|
|
|
|
self._save_gather_error('%r' % e.message, harvest_job)
|
|
|
|
|
2013-02-12 17:08:39 +01:00
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
def _create_or_update_package(self, package_dict, harvest_object):
|
|
|
|
'''
|
|
|
|
Creates a new package or updates an exisiting one according to the
|
|
|
|
package dictionary provided. The package dictionary should look like
|
|
|
|
the REST API response for a package:
|
|
|
|
|
|
|
|
http://ckan.net/api/rest/package/statistics-catalunya
|
|
|
|
|
|
|
|
Note that the package_dict must contain an id, which will be used to
|
|
|
|
check if the package needs to be created or updated (use the remote
|
|
|
|
dataset id).
|
|
|
|
|
|
|
|
If the remote server provides the modification date of the remote
|
|
|
|
package, add it to package_dict['metadata_modified'].
|
|
|
|
|
2013-02-12 17:08:39 +01:00
|
|
|
|
|
|
|
TODO: Not sure it is worth keeping this function. If useful it should
|
|
|
|
use the output of package_show logic function (maybe keeping support
|
|
|
|
for rest api based dicts
|
2011-06-02 12:07:07 +02:00
|
|
|
'''
|
|
|
|
try:
|
2011-07-18 18:35:03 +02:00
|
|
|
# Change default schema
|
2013-03-25 18:38:07 +01:00
|
|
|
schema = default_create_package_schema()
|
2012-01-10 15:46:12 +01:00
|
|
|
schema['id'] = [ignore_missing, unicode]
|
|
|
|
schema['__junk'] = [ignore]
|
2011-06-02 12:07:07 +02:00
|
|
|
|
2011-11-18 14:20:41 +01:00
|
|
|
# Check API version
|
|
|
|
if self.config:
|
2013-05-27 13:12:05 +02:00
|
|
|
try:
|
|
|
|
api_version = int(self.config.get('api_version', 2))
|
|
|
|
except ValueError:
|
|
|
|
raise ValueError('api_version must be an integer')
|
2011-11-18 14:20:41 +01:00
|
|
|
else:
|
2013-05-22 16:46:14 +02:00
|
|
|
api_version = 2
|
2011-11-18 14:20:41 +01:00
|
|
|
|
2015-06-11 11:19:07 +02:00
|
|
|
user_name = self._get_user_name()
|
2011-06-02 12:07:07 +02:00
|
|
|
context = {
|
|
|
|
'model': model,
|
|
|
|
'session': Session,
|
2011-11-18 15:12:30 +01:00
|
|
|
'user': user_name,
|
2011-11-18 14:20:41 +01:00
|
|
|
'api_version': api_version,
|
2011-06-02 12:07:07 +02:00
|
|
|
'schema': schema,
|
2013-08-15 15:37:12 +02:00
|
|
|
'ignore_auth': True,
|
2011-06-02 12:07:07 +02:00
|
|
|
}
|
|
|
|
|
2014-02-10 09:29:01 +01:00
|
|
|
if self.config and self.config.get('clean_tags', False):
|
|
|
|
tags = package_dict.get('tags', [])
|
2014-08-07 18:05:16 +02:00
|
|
|
tags = [munge_tag(t) for t in tags if munge_tag(t) != '']
|
2014-02-10 09:29:01 +01:00
|
|
|
tags = list(set(tags))
|
|
|
|
package_dict['tags'] = tags
|
2011-06-02 12:07:07 +02:00
|
|
|
|
|
|
|
# Check if package exists
|
2011-07-29 12:31:03 +02:00
|
|
|
data_dict = {}
|
|
|
|
data_dict['id'] = package_dict['id']
|
2011-06-02 12:07:07 +02:00
|
|
|
try:
|
2011-10-26 18:26:18 +02:00
|
|
|
existing_package_dict = get_action('package_show')(context, data_dict)
|
2014-08-12 12:18:48 +02:00
|
|
|
|
|
|
|
# In case name has been modified when first importing. See issue #101.
|
|
|
|
package_dict['name'] = existing_package_dict['name']
|
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
# Check modified date
|
|
|
|
if not 'metadata_modified' in package_dict or \
|
|
|
|
package_dict['metadata_modified'] > existing_package_dict.get('metadata_modified'):
|
|
|
|
log.info('Package with GUID %s exists and needs to be updated' % harvest_object.guid)
|
|
|
|
# Update package
|
2011-07-29 12:31:03 +02:00
|
|
|
context.update({'id':package_dict['id']})
|
2013-08-18 17:08:30 +02:00
|
|
|
package_dict.setdefault('name',
|
|
|
|
existing_package_dict['name'])
|
2012-02-01 16:50:41 +01:00
|
|
|
new_package = get_action('package_update_rest')(context, package_dict)
|
2011-06-02 12:07:07 +02:00
|
|
|
|
|
|
|
else:
|
|
|
|
log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid)
|
2012-03-13 13:38:14 +01:00
|
|
|
return
|
2011-06-02 12:07:07 +02:00
|
|
|
|
2013-03-07 21:27:27 +01:00
|
|
|
# Flag the other objects linking to this package as not current anymore
|
|
|
|
from ckanext.harvest.model import harvest_object_table
|
|
|
|
conn = Session.connection()
|
|
|
|
u = update(harvest_object_table) \
|
|
|
|
.where(harvest_object_table.c.package_id==bindparam('b_package_id')) \
|
|
|
|
.values(current=False)
|
|
|
|
conn.execute(u, b_package_id=new_package['id'])
|
|
|
|
|
|
|
|
# Flag this as the current harvest object
|
|
|
|
|
|
|
|
harvest_object.package_id = new_package['id']
|
|
|
|
harvest_object.current = True
|
|
|
|
harvest_object.save()
|
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
except NotFound:
|
|
|
|
# Package needs to be created
|
|
|
|
|
2014-02-10 19:22:48 +01:00
|
|
|
# Get rid of auth audit on the context otherwise we'll get an
|
|
|
|
# exception
|
|
|
|
context.pop('__auth_audit', None)
|
|
|
|
|
2014-12-16 15:02:36 +01:00
|
|
|
# Set name for new package to prevent name conflict, see issue #117
|
2014-12-18 16:03:33 +01:00
|
|
|
if package_dict.get('name', None):
|
2014-12-16 15:02:36 +01:00
|
|
|
package_dict['name'] = self._gen_new_name(package_dict['name'])
|
|
|
|
else:
|
|
|
|
package_dict['name'] = self._gen_new_name(package_dict['title'])
|
2011-06-02 12:07:07 +02:00
|
|
|
|
|
|
|
log.info('Package with GUID %s does not exist, let\'s create it' % harvest_object.guid)
|
2013-02-27 12:34:09 +01:00
|
|
|
harvest_object.current = True
|
|
|
|
harvest_object.package_id = package_dict['id']
|
|
|
|
# Defer constraints and flush so the dataset can be indexed with
|
|
|
|
# the harvest object id (on the after_show hook from the harvester
|
|
|
|
# plugin)
|
|
|
|
harvest_object.add()
|
|
|
|
|
|
|
|
model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
|
|
|
|
model.Session.flush()
|
|
|
|
|
2011-10-26 18:26:18 +02:00
|
|
|
new_package = get_action('package_create_rest')(context, package_dict)
|
2012-02-01 16:50:41 +01:00
|
|
|
|
|
|
|
Session.commit()
|
|
|
|
|
2011-06-02 12:07:07 +02:00
|
|
|
return True
|
|
|
|
|
|
|
|
except ValidationError,e:
|
|
|
|
log.exception(e)
|
|
|
|
self._save_object_error('Invalid package with GUID %s: %r'%(harvest_object.guid,e.error_dict),harvest_object,'Import')
|
|
|
|
except Exception, e:
|
|
|
|
log.exception(e)
|
|
|
|
self._save_object_error('%r'%e,harvest_object,'Import')
|
|
|
|
|
|
|
|
return None
|