Merge branch 'master' of github.com:ckan/ckanext-harvest into 180-fix-modified-date

This commit is contained in:
David Read 2015-11-04 09:35:23 +00:00
commit 46e7e182f5
5 changed files with 306 additions and 156 deletions

View File

@ -148,7 +148,7 @@ def _check_for_existing_jobs(context, source_id):
return exist
def harvest_object_create(context, data_dict):
""" Create a new harvest object
''' Create a new harvest object
:type guid: string (optional)
:type content: string (optional)
@ -156,7 +156,7 @@ def harvest_object_create(context, data_dict):
:type source_id: string (optional)
:type package_id: string (optional)
:type extras: dict (optional)
"""
'''
check_access('harvest_object_create', context, data_dict)
data, errors = _validate(data_dict, harvest_object_create_schema(), context)

View File

@ -2,12 +2,11 @@ import logging
from ckan import plugins as p
log = logging.getLogger(__name__)
def harvest_source_delete(context, data_dict):
'''
Deletes an existing harvest source
'''Deletes an existing harvest source
This method just proxies the request to package_delete,
which will delete the actual harvest type dataset and the
@ -15,10 +14,6 @@ def harvest_source_delete(context, data_dict):
:param id: the name or id of the harvest source to delete
:type id: string
:returns: the newly created harvest source
:rtype: dictionary
'''
log.info('Deleting harvest source: %r', data_dict)
@ -28,7 +23,8 @@ def harvest_source_delete(context, data_dict):
if context.get('clear_source', False):
# We need the id, the name won't work
# We need the id. The name won't work.
package_dict = p.toolkit.get_action('package_show')(context, data_dict)
p.toolkit.get_action('harvest_source_clear')(context, {'id': package_dict['id']})
p.toolkit.get_action('harvest_source_clear')(
context, {'id': package_dict['id']})

View File

@ -29,11 +29,12 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError
from ckanext.harvest.logic.dictization import harvest_job_dictize
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
from ckanext.harvest.logic.action.get import (
harvest_source_show, harvest_job_list, _get_sources_for_user)
log = logging.getLogger(__name__)
def harvest_source_update(context, data_dict):
'''
Updates an existing harvest source
@ -69,10 +70,8 @@ def harvest_source_update(context,data_dict):
type. Should be a serialized as JSON. (optional)
:type config: string
:returns: the newly created harvest source
:rtype: dictionary
'''
log.info('Updating harvest source: %r', data_dict)
@ -92,11 +91,11 @@ def harvest_source_clear(context, data_dict):
:param id: the id of the harvest source to clear
:type id: string
'''
check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id', None)
harvest_source_id = data_dict.get('id')
source = HarvestSource.get(harvest_source_id)
if not source:
@ -110,7 +109,11 @@ def harvest_source_clear(context, data_dict):
model = context['model']
sql = "select id from related where id in (select related_id from related_dataset where dataset_id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}'));".format(harvest_source_id=harvest_source_id)
sql = '''select id from related where id in (
select related_id from related_dataset where dataset_id in (
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}'));'''.format(
harvest_source_id=harvest_source_id)
result = model.Session.execute(sql)
ids = []
for row in result:
@ -118,60 +121,84 @@ def harvest_source_clear(context, data_dict):
related_ids = "('" + "','".join(ids) + "')"
sql = '''begin;
update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format(
update package set state = 'to_delete' where id in (
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}');'''.format(
harvest_source_id=harvest_source_id)
# CKAN-2.3 or above: delete resource views, resource revisions & resources
if toolkit.check_ckan_version(min_version='2.3'):
sql += '''
delete from resource_view where resource_id in (select id from resource where package_id in (select id from package where state = 'to_delete' ));
delete from resource_revision where package_id in (select id from package where state = 'to_delete' );
delete from resource where package_id in (select id from package where state = 'to_delete' );
delete from resource_view where resource_id in (
select id from resource where package_id in (
select id from package where state = 'to_delete'));
delete from resource_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource where package_id in (
select id from package where state = 'to_delete');
'''
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
else:
sql += '''
delete from resource_revision where resource_group_id in
(select id from resource_group where package_id in
(select id from package where state = 'to_delete'));
delete from resource where resource_group_id in
(select id from resource_group where package_id in
(select id from package where state = 'to_delete'));
delete from resource_group_revision where package_id in
(select id from package where state = 'to_delete');
delete from resource_group where package_id in
(select id from package where state = 'to_delete');
delete from resource_revision where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = 'to_delete'));
delete from resource where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = 'to_delete'));
delete from resource_group_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource_group where package_id in (
select id from package where state = 'to_delete');
'''
# CKAN pre-2.5: authz models were removed in migration 078
if toolkit.check_ckan_version(max_version='2.4.99'):
sql += '''
delete from package_role where package_id in
(select id from package where state = 'to_delete');
delete from user_object_role where id not in
(select user_object_role_id from package_role) and context = 'Package';
delete from package_role where package_id in (
select id from package where state = 'to_delete');
delete from user_object_role where id not in (
select user_object_role_id from package_role)
and context = 'Package';
'''
sql += '''
delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_error where harvest_object_id in (
select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (
select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_gather_error where harvest_job_id in (
select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
delete from package_tag_revision where package_id in (select id from package where state = 'to_delete');
delete from member_revision where table_id in (select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (select id from package where state = 'to_delete');
delete from package_revision where id in (select id from package where state = 'to_delete');
delete from package_tag where package_id in (select id from package where state = 'to_delete');
delete from package_extra where package_id in (select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (select id from package where state = 'to_delete');
delete from package_relationship_revision where object_package_id in (select id from package where state = 'to_delete');
delete from package_relationship where subject_package_id in (select id from package where state = 'to_delete');
delete from package_relationship where object_package_id in (select id from package where state = 'to_delete');
delete from member where table_id in (select id from package where state = 'to_delete');
delete from related_dataset where dataset_id in (select id from package where state = 'to_delete');
delete from package_tag_revision where package_id in (
select id from package where state = 'to_delete');
delete from member_revision where table_id in (
select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (
select id from package where state = 'to_delete');
delete from package_revision where id in (
select id from package where state = 'to_delete');
delete from package_tag where package_id in (
select id from package where state = 'to_delete');
delete from package_extra where package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where object_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where object_package_id in (
select id from package where state = 'to_delete');
delete from member where table_id in (
select id from package where state = 'to_delete');
delete from related_dataset where dataset_id in (
select id from package where state = 'to_delete');
delete from related where id in {related_ids};
delete from package where id in (select id from package where state = 'to_delete');
delete from package where id in (
select id from package where state = 'to_delete');
commit;
'''.format(
harvest_source_id=harvest_source_id, related_ids=related_ids)
@ -183,6 +210,7 @@ def harvest_source_clear(context, data_dict):
return {'id': harvest_source_id}
def harvest_source_index_clear(context, data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but
@ -194,7 +222,7 @@ def harvest_source_index_clear(context,data_dict):
'''
check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id',None)
harvest_source_id = data_dict.get('id')
source = HarvestSource.get(harvest_source_id)
if not source:
@ -204,8 +232,8 @@ def harvest_source_index_clear(context,data_dict):
harvest_source_id = source.id
conn = make_connection()
query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id,
config.get('ckan.site_id'))
query = ''' +%s:"%s" +site_id:"%s" ''' % (
'harvest_source_id', harvest_source_id, config.get('ckan.site_id'))
try:
conn.delete_query(query)
if asbool(config.get('ckan.search.solr_commit', 'true')):
@ -244,11 +272,11 @@ def harvest_objects_import(context, data_dict):
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',None)
harvest_object_id = data_dict.get('harvest_object_id',None)
package_id_or_name = data_dict.get('package_id',None)
source_id = data_dict.get('source_id')
harvest_object_id = data_dict.get('harvest_object_id')
package_id_or_name = data_dict.get('package_id')
segments = context.get('segments',None)
segments = context.get('segments')
join_datasets = context.get('join_datasets', True)
@ -262,16 +290,19 @@ def harvest_objects_import(context, data_dict):
log.warn('Harvest source %s is not active.', source_id)
raise Exception('This harvest source is not active')
last_objects_ids = session.query(HarvestObject.id) \
last_objects_ids = \
session.query(HarvestObject.id) \
.join(HarvestSource) \
.filter(HarvestObject.source == source) \
.filter(HarvestObject.current == True)
elif harvest_object_id:
last_objects_ids = session.query(HarvestObject.id) \
last_objects_ids = \
session.query(HarvestObject.id) \
.filter(HarvestObject.id == harvest_object_id)
elif package_id_or_name:
last_objects_ids = session.query(HarvestObject.id) \
last_objects_ids = \
session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.current == True) \
.filter(Package.state == u'active') \
@ -279,7 +310,8 @@ def harvest_objects_import(context, data_dict):
Package.name == package_id_or_name))
join_datasets = False
else:
last_objects_ids = session.query(HarvestObject.id) \
last_objects_ids = \
session.query(HarvestObject.id) \
.filter(HarvestObject.current == True)
if join_datasets:
@ -291,7 +323,8 @@ def harvest_objects_import(context, data_dict):
last_objects_count = 0
for obj_id in last_objects_ids:
if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
if segments and \
str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
continue
obj = session.query(HarvestObject).get(obj_id)
@ -306,7 +339,8 @@ def harvest_objects_import(context, data_dict):
log.info('Harvest objects imported: %s', last_objects_count)
return last_objects_count
def _caluclate_next_run(frequency):
def _calculate_next_run(frequency):
now = datetime.datetime.utcnow()
if frequency == 'ALWAYS':
@ -341,19 +375,20 @@ def _make_scheduled_jobs(context, data_dict):
data_dict = {'source_id': source.id}
try:
get_action('harvest_job_create')(context, data_dict)
except HarvestJobExists, e:
except HarvestJobExists:
log.info('Trying to rerun job for %s skipping' % source.id)
source.next_run = _caluclate_next_run(source.frequency)
source.next_run = _calculate_next_run(source.frequency)
source.save()
def harvest_jobs_run(context, data_dict):
log.info('Harvest job run: %r', data_dict)
check_access('harvest_jobs_run', context, data_dict)
session = context['session']
source_id = data_dict.get('source_id',None)
source_id = data_dict.get('source_id')
if not source_id:
_make_scheduled_jobs(context, data_dict)
@ -361,11 +396,13 @@ def harvest_jobs_run(context,data_dict):
context['return_objects'] = False
# Flag finished jobs as such
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'Running'})
jobs = harvest_job_list(
context, {'source_id': source_id, 'status': u'Running'})
if len(jobs):
for job in jobs:
if job['gather_finished']:
objects = session.query(HarvestObject.id) \
objects = \
session.query(HarvestObject.id) \
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(and_((HarvestObject.state != u'COMPLETE'),
(HarvestObject.state != u'ERROR'))) \
@ -385,14 +422,15 @@ def harvest_jobs_run(context,data_dict):
job_obj.save()
# Reindex the harvest source dataset so it has the latest
# status
get_action('harvest_source_reindex')(context,
{'id': job_obj.source.id})
get_action('harvest_source_reindex')(
context, {'id': job_obj.source.id})
# resubmit old redis tasks
resubmit_jobs()
# Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
jobs = harvest_job_list(
context, {'source_id': source_id, 'status': u'New'})
if len(jobs) == 0:
log.info('No new harvest jobs.')
raise NoNewHarvestJobError('There are no new harvesting jobs')
@ -431,7 +469,7 @@ def harvest_job_abort(context, data_dict):
model = context['model']
source_id = data_dict.get('source_id', None)
source_id = data_dict.get('source_id')
source = harvest_source_show(context, {'id': source_id})
# HarvestJob set status to 'Finished'
@ -492,12 +530,14 @@ def harvest_sources_reindex(context, data_dict):
reindex_context = {'defer_commit': True}
for package in packages:
get_action('harvest_source_reindex')(reindex_context, {'id': package.id})
get_action('harvest_source_reindex')(
reindex_context, {'id': package.id})
package_index.commit()
return True
@logic.side_effect_free
def harvest_source_reindex(context, data_dict):
'''Reindex a single harvest source'''
@ -508,8 +548,8 @@ def harvest_source_reindex(context, data_dict):
if 'extras_as_string'in context:
del context['extras_as_string']
context.update({'ignore_auth': True})
package_dict = logic.get_action('harvest_source_show')(context,
{'id': harvest_source_id})
package_dict = logic.get_action('harvest_source_show')(
context, {'id': harvest_source_id})
log.debug('Updating search index for harvest source: {0}'.format(
package_dict.get('name') or harvest_source_id))

View File

@ -14,22 +14,25 @@ from ckan.lib.navl.validators import keep_extras
log = logging.getLogger(__name__)
def harvest_source_id_exists(value, context):
result = HarvestSource.get(value,None)
result = HarvestSource.get(value)
if not result:
raise Invalid('Harvest Source with id %r does not exist.' % str(value))
return value
def harvest_job_exists(value, context):
"""Check if a harvest job exists and returns the model if it does"""
result = HarvestJob.get(value, None)
'''Check if a harvest job exists and returns the model if it does'''
result = HarvestJob.get(value)
if not result:
raise Invalid('Harvest Job with id %r does not exist.' % str(value))
return result
def _normalize_url(url):
o = urlparse.urlparse(url)
@ -55,18 +58,28 @@ def _normalize_url(url):
return check_url
def harvest_source_url_validator(key, data, errors, context):
package = context.get("package")
'''Validate the provided harvest source URL
Checks that the URL & config combination are unique to this HarvestSource.
'''
package = context.get('package')
if package:
package_id = package.id
else:
package_id = data.get(key[:-1] + ("id",))
package_id = data.get(key[:-1] + ('id',))
try:
new_config = data.get(key[:-1] + ('config',))
except:
new_config = None
new_url = _normalize_url(data[key])
#pkg_id = data.get(('id',),'')
q = model.Session.query(model.Package.url, model.Package.state) \
q = model.Session.query(model.Package.id, model.Package.url) \
.filter(model.Package.type == DATASET_TYPE_NAME)
if package_id:
@ -75,13 +88,22 @@ def harvest_source_url_validator(key,data,errors,context):
existing_sources = q.all()
for url, state in existing_sources:
for id_, url in existing_sources:
url = _normalize_url(url)
if url == new_url:
raise Invalid('There already is a Harvest Source for this URL: %s' % data[key])
conf = model.Session.query(HarvestSource.config).filter(
HarvestSource.id == id_).first()
if conf:
conf = conf[0]
else:
conf = None
if url == new_url and conf == new_config:
raise Invalid('There already is a Harvest Source for this URL (& '
'config): url=%s config=%s' % (new_url, new_config))
return data[key]
def harvest_source_type_exists(value, context):
# TODO: use new description interface
@ -90,16 +112,18 @@ def harvest_source_type_exists(value,context):
for harvester in PluginImplementations(IHarvester):
info = harvester.info()
if not info or 'name' not in info:
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
log.error('Harvester %s does not provide the harvester name in '
'the info response' % harvester)
continue
available_types.append(info['name'])
if not value in available_types:
raise Invalid('Unknown harvester type: %s. Have you registered a harvester for this type?' % value)
raise Invalid('Unknown harvester type: %s. Have you registered a '
'harvester for this type?' % value)
return value
def harvest_source_config_validator(key, data, errors, context):
harvester_type = data.get(('source_type',), '')
for harvester in PluginImplementations(IHarvester):
@ -109,16 +133,19 @@ def harvest_source_config_validator(key,data,errors,context):
try:
return harvester.validate_config(data[key])
except Exception, e:
raise Invalid('Error parsing the configuration options: %s' % str(e))
raise Invalid('Error parsing the configuration options: %s'
% e)
else:
return data[key]
def keep_not_empty_extras(key, data, errors, context):
extras = data.pop(key, {})
for extras_key, value in extras.iteritems():
if value:
data[key[:-1] + (extras_key,)] = value
def harvest_source_extra_validator(key, data, errors, context):
harvester_type = data.get(('source_type',), '')
@ -152,8 +179,8 @@ def harvest_source_extra_validator(key,data,errors,context):
for key, value in extra_errors.iteritems():
errors[(key,)] = value
## need to get config out of extras as __extra runs
## after rest of validation
# need to get config out of extras as __extra runs
# after rest of validation
package_extras = data.get(('extras',), [])
for num, extra in enumerate(list(package_extras)):
@ -177,6 +204,7 @@ def harvest_source_extra_validator(key,data,errors,context):
if package_extras:
data[('extras',)] = package_extras
def harvest_source_convert_from_config(key, data, errors, context):
config = data[key]
if config:
@ -184,6 +212,7 @@ def harvest_source_convert_from_config(key,data,errors,context):
for key, value in config_dict.iteritems():
data[(key,)] = value
def harvest_source_active_validator(value, context):
if isinstance(value, basestring):
if value.lower() == 'true':
@ -192,6 +221,7 @@ def harvest_source_active_validator(value,context):
return False
return bool(value)
def harvest_source_frequency_exists(value):
if value == '':
value = 'MANUAL'
@ -205,6 +235,7 @@ def dataset_type_exists(value):
value = DATASET_TYPE_NAME
return value
def harvest_object_extras_validator(value, context):
if not isinstance(value, dict):
raise Invalid('extras must be a dict')

View File

@ -2,6 +2,7 @@ import json
import copy
import factories
import unittest
from nose.tools import assert_equal, assert_raises
try:
from ckan.tests import factories as ckan_factories
@ -12,6 +13,7 @@ except ImportError:
from ckan import plugins as p
from ckan.plugins import toolkit
from ckan import model
import ckan.lib.search as search
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
@ -127,6 +129,32 @@ class FunctionalTestBaseWithoutClearBetweenTests(object):
pass
SOURCE_DICT = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
class ActionBase(object):
@classmethod
def setup_class(cls):
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
def setup(self):
reset_db()
harvest_model.setup()
@classmethod
def teardown_class(cls):
p.unload('test_action_harvester')
class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests):
@classmethod
@ -136,15 +164,7 @@ class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests):
cls.sysadmin = ckan_factories.Sysadmin()
cls.default_source_dict = {
"url": "http://test.action.com",
"name": "test-source-action",
"title": "Test source action",
"notes": "Test source action desc",
"source_type": "test-for-action",
"frequency": "MANUAL",
"config": json.dumps({"custom_option": ["a", "b"]})
}
cls.default_source_dict = SOURCE_DICT
if not p.plugin_loaded('test_action_harvester'):
p.load('test_action_harvester')
@ -287,6 +307,69 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
assert source.type == source_dict['source_type']
class TestActions(ActionBase):
def test_harvest_source_clear(self):
source = factories.HarvestSourceObj(**SOURCE_DICT)
job = factories.HarvestJobObj(source=source)
dataset = ckan_factories.Dataset()
object_ = factories.HarvestObjectObj(job=job, source=source,
package_id=dataset['id'])
context = {'model': model, 'session': model.Session,
'ignore_auth': True, 'user': ''}
result = toolkit.get_action('harvest_source_clear')(
context, {'id': source.id})
assert_equal(result, {'id': source.id})
source = harvest_model.HarvestSource.get(source.id)
assert source
assert_equal(harvest_model.HarvestJob.get(job.id), None)
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
assert_equal(model.Package.get(dataset['id']), None)
def test_harvest_source_create_twice_with_unique_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source1'
data_dict['url'] = 'http://another-url'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_same_url(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source2'
assert_raises(toolkit.ValidationError,
toolkit.get_action('harvest_source_create'),
{'user': site_user}, data_dict)
def test_harvest_source_create_twice_with_unique_url_and_config(self):
# don't use factory because it looks for the existing source
data_dict = SOURCE_DICT
site_user = toolkit.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {})['name']
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
data_dict['name'] = 'another-source3'
data_dict['config'] = '{"something": "new"}'
toolkit.get_action('harvest_source_create')(
{'user': site_user}, data_dict)
class TestHarvestObject(unittest.TestCase):
@classmethod
def setup_class(cls):