commit
2ff017d9de
|
@ -148,7 +148,7 @@ def _check_for_existing_jobs(context, source_id):
|
||||||
return exist
|
return exist
|
||||||
|
|
||||||
def harvest_object_create(context, data_dict):
|
def harvest_object_create(context, data_dict):
|
||||||
""" Create a new harvest object
|
''' Create a new harvest object
|
||||||
|
|
||||||
:type guid: string (optional)
|
:type guid: string (optional)
|
||||||
:type content: string (optional)
|
:type content: string (optional)
|
||||||
|
@ -156,7 +156,7 @@ def harvest_object_create(context, data_dict):
|
||||||
:type source_id: string (optional)
|
:type source_id: string (optional)
|
||||||
:type package_id: string (optional)
|
:type package_id: string (optional)
|
||||||
:type extras: dict (optional)
|
:type extras: dict (optional)
|
||||||
"""
|
'''
|
||||||
check_access('harvest_object_create', context, data_dict)
|
check_access('harvest_object_create', context, data_dict)
|
||||||
data, errors = _validate(data_dict, harvest_object_create_schema(), context)
|
data, errors = _validate(data_dict, harvest_object_create_schema(), context)
|
||||||
|
|
||||||
|
|
|
@ -2,9 +2,9 @@ import logging
|
||||||
|
|
||||||
from ckan import plugins as p
|
from ckan import plugins as p
|
||||||
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def harvest_source_delete(context, data_dict):
|
def harvest_source_delete(context, data_dict):
|
||||||
'''
|
'''
|
||||||
Deletes an existing harvest source
|
Deletes an existing harvest source
|
||||||
|
@ -31,4 +31,5 @@ def harvest_source_delete(context, data_dict):
|
||||||
# We need the id, the name won't work
|
# We need the id, the name won't work
|
||||||
package_dict = p.toolkit.get_action('package_show')(context, data_dict)
|
package_dict = p.toolkit.get_action('package_show')(context, data_dict)
|
||||||
|
|
||||||
p.toolkit.get_action('harvest_source_clear')(context, {'id': package_dict['id']})
|
p.toolkit.get_action('harvest_source_clear')(
|
||||||
|
context, {'id': package_dict['id']})
|
||||||
|
|
|
@ -29,12 +29,13 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
|
||||||
from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError
|
from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError
|
||||||
from ckanext.harvest.logic.dictization import harvest_job_dictize
|
from ckanext.harvest.logic.dictization import harvest_job_dictize
|
||||||
|
|
||||||
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
|
from ckanext.harvest.logic.action.get import (
|
||||||
|
harvest_source_show, harvest_job_list, _get_sources_for_user)
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
def harvest_source_update(context,data_dict):
|
|
||||||
|
def harvest_source_update(context, data_dict):
|
||||||
'''
|
'''
|
||||||
Updates an existing harvest source
|
Updates an existing harvest source
|
||||||
|
|
||||||
|
@ -69,10 +70,8 @@ def harvest_source_update(context,data_dict):
|
||||||
type. Should be a serialized as JSON. (optional)
|
type. Should be a serialized as JSON. (optional)
|
||||||
:type config: string
|
:type config: string
|
||||||
|
|
||||||
|
|
||||||
:returns: the newly created harvest source
|
:returns: the newly created harvest source
|
||||||
:rtype: dictionary
|
:rtype: dictionary
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log.info('Updating harvest source: %r', data_dict)
|
log.info('Updating harvest source: %r', data_dict)
|
||||||
|
|
||||||
|
@ -94,9 +93,10 @@ def harvest_source_clear(context, data_dict):
|
||||||
:type id: string
|
:type id: string
|
||||||
|
|
||||||
'''
|
'''
|
||||||
check_access('harvest_source_clear',context,data_dict)
|
|
||||||
|
|
||||||
harvest_source_id = data_dict.get('id', None)
|
check_access('harvest_source_clear', context, data_dict)
|
||||||
|
|
||||||
|
harvest_source_id = data_dict.get('id')
|
||||||
|
|
||||||
source = HarvestSource.get(harvest_source_id)
|
source = HarvestSource.get(harvest_source_id)
|
||||||
if not source:
|
if not source:
|
||||||
|
@ -110,7 +110,11 @@ def harvest_source_clear(context, data_dict):
|
||||||
|
|
||||||
model = context['model']
|
model = context['model']
|
||||||
|
|
||||||
sql = "select id from related where id in (select related_id from related_dataset where dataset_id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}'));".format(harvest_source_id=harvest_source_id)
|
sql = '''select id from related where id in (
|
||||||
|
select related_id from related_dataset where dataset_id in (
|
||||||
|
select package_id from harvest_object
|
||||||
|
where harvest_source_id = '{harvest_source_id}'));'''.format(
|
||||||
|
harvest_source_id=harvest_source_id)
|
||||||
result = model.Session.execute(sql)
|
result = model.Session.execute(sql)
|
||||||
ids = []
|
ids = []
|
||||||
for row in result:
|
for row in result:
|
||||||
|
@ -118,60 +122,84 @@ def harvest_source_clear(context, data_dict):
|
||||||
related_ids = "('" + "','".join(ids) + "')"
|
related_ids = "('" + "','".join(ids) + "')"
|
||||||
|
|
||||||
sql = '''begin;
|
sql = '''begin;
|
||||||
update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format(
|
update package set state = 'to_delete' where id in (
|
||||||
|
select package_id from harvest_object
|
||||||
|
where harvest_source_id = '{harvest_source_id}');'''.format(
|
||||||
harvest_source_id=harvest_source_id)
|
harvest_source_id=harvest_source_id)
|
||||||
|
|
||||||
# CKAN-2.3 or above: delete resource views, resource revisions & resources
|
# CKAN-2.3 or above: delete resource views, resource revisions & resources
|
||||||
if toolkit.check_ckan_version(min_version='2.3'):
|
if toolkit.check_ckan_version(min_version='2.3'):
|
||||||
sql += '''
|
sql += '''
|
||||||
delete from resource_view where resource_id in (select id from resource where package_id in (select id from package where state = 'to_delete' ));
|
delete from resource_view where resource_id in (
|
||||||
delete from resource_revision where package_id in (select id from package where state = 'to_delete' );
|
select id from resource where package_id in (
|
||||||
delete from resource where package_id in (select id from package where state = 'to_delete' );
|
select id from package where state = 'to_delete'));
|
||||||
|
delete from resource_revision where package_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
|
delete from resource where package_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
'''
|
'''
|
||||||
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
|
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
|
||||||
else:
|
else:
|
||||||
sql += '''
|
sql += '''
|
||||||
delete from resource_revision where resource_group_id in
|
delete from resource_revision where resource_group_id in (
|
||||||
(select id from resource_group where package_id in
|
select id from resource_group where package_id in (
|
||||||
(select id from package where state = 'to_delete'));
|
select id from package where state = 'to_delete'));
|
||||||
delete from resource where resource_group_id in
|
delete from resource where resource_group_id in (
|
||||||
(select id from resource_group where package_id in
|
select id from resource_group where package_id in (
|
||||||
(select id from package where state = 'to_delete'));
|
select id from package where state = 'to_delete'));
|
||||||
delete from resource_group_revision where package_id in
|
delete from resource_group_revision where package_id in (
|
||||||
(select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
delete from resource_group where package_id in
|
delete from resource_group where package_id in (
|
||||||
(select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
'''
|
'''
|
||||||
# CKAN pre-2.5: authz models were removed in migration 078
|
# CKAN pre-2.5: authz models were removed in migration 078
|
||||||
if toolkit.check_ckan_version(max_version='2.4.99'):
|
if toolkit.check_ckan_version(max_version='2.4.99'):
|
||||||
sql += '''
|
sql += '''
|
||||||
delete from package_role where package_id in
|
delete from package_role where package_id in (
|
||||||
(select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
delete from user_object_role where id not in
|
delete from user_object_role where id not in (
|
||||||
(select user_object_role_id from package_role) and context = 'Package';
|
select user_object_role_id from package_role)
|
||||||
|
and context = 'Package';
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
sql += '''
|
sql += '''
|
||||||
delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
|
delete from harvest_object_error where harvest_object_id in (
|
||||||
delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
|
select id from harvest_object
|
||||||
|
where harvest_source_id = '{harvest_source_id}');
|
||||||
|
delete from harvest_object_extra where harvest_object_id in (
|
||||||
|
select id from harvest_object
|
||||||
|
where harvest_source_id = '{harvest_source_id}');
|
||||||
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
|
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
|
||||||
delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}');
|
delete from harvest_gather_error where harvest_job_id in (
|
||||||
|
select id from harvest_job where source_id = '{harvest_source_id}');
|
||||||
delete from harvest_job where source_id = '{harvest_source_id}';
|
delete from harvest_job where source_id = '{harvest_source_id}';
|
||||||
delete from package_tag_revision where package_id in (select id from package where state = 'to_delete');
|
delete from package_tag_revision where package_id in (
|
||||||
delete from member_revision where table_id in (select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
delete from package_extra_revision where package_id in (select id from package where state = 'to_delete');
|
delete from member_revision where table_id in (
|
||||||
delete from package_revision where id in (select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
delete from package_tag where package_id in (select id from package where state = 'to_delete');
|
delete from package_extra_revision where package_id in (
|
||||||
delete from package_extra where package_id in (select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
delete from package_relationship_revision where subject_package_id in (select id from package where state = 'to_delete');
|
delete from package_revision where id in (
|
||||||
delete from package_relationship_revision where object_package_id in (select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
delete from package_relationship where subject_package_id in (select id from package where state = 'to_delete');
|
delete from package_tag where package_id in (
|
||||||
delete from package_relationship where object_package_id in (select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
delete from member where table_id in (select id from package where state = 'to_delete');
|
delete from package_extra where package_id in (
|
||||||
delete from related_dataset where dataset_id in (select id from package where state = 'to_delete');
|
select id from package where state = 'to_delete');
|
||||||
|
delete from package_relationship_revision where subject_package_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
|
delete from package_relationship_revision where object_package_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
|
delete from package_relationship where subject_package_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
|
delete from package_relationship where object_package_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
|
delete from member where table_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
|
delete from related_dataset where dataset_id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
delete from related where id in {related_ids};
|
delete from related where id in {related_ids};
|
||||||
delete from package where id in (select id from package where state = 'to_delete');
|
delete from package where id in (
|
||||||
|
select id from package where state = 'to_delete');
|
||||||
commit;
|
commit;
|
||||||
'''.format(
|
'''.format(
|
||||||
harvest_source_id=harvest_source_id, related_ids=related_ids)
|
harvest_source_id=harvest_source_id, related_ids=related_ids)
|
||||||
|
@ -183,7 +211,8 @@ def harvest_source_clear(context, data_dict):
|
||||||
|
|
||||||
return {'id': harvest_source_id}
|
return {'id': harvest_source_id}
|
||||||
|
|
||||||
def harvest_source_index_clear(context,data_dict):
|
|
||||||
|
def harvest_source_index_clear(context, data_dict):
|
||||||
'''
|
'''
|
||||||
Clears all datasets, jobs and objects related to a harvest source, but
|
Clears all datasets, jobs and objects related to a harvest source, but
|
||||||
keeps the source itself. This is useful to clean history of long running
|
keeps the source itself. This is useful to clean history of long running
|
||||||
|
@ -193,8 +222,8 @@ def harvest_source_index_clear(context,data_dict):
|
||||||
:type id: string
|
:type id: string
|
||||||
'''
|
'''
|
||||||
|
|
||||||
check_access('harvest_source_clear',context,data_dict)
|
check_access('harvest_source_clear', context, data_dict)
|
||||||
harvest_source_id = data_dict.get('id',None)
|
harvest_source_id = data_dict.get('id')
|
||||||
|
|
||||||
source = HarvestSource.get(harvest_source_id)
|
source = HarvestSource.get(harvest_source_id)
|
||||||
if not source:
|
if not source:
|
||||||
|
@ -204,8 +233,8 @@ def harvest_source_index_clear(context,data_dict):
|
||||||
harvest_source_id = source.id
|
harvest_source_id = source.id
|
||||||
|
|
||||||
conn = make_connection()
|
conn = make_connection()
|
||||||
query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id,
|
query = ''' +%s:"%s" +site_id:"%s" ''' % (
|
||||||
config.get('ckan.site_id'))
|
'harvest_source_id', harvest_source_id, config.get('ckan.site_id'))
|
||||||
try:
|
try:
|
||||||
conn.delete_query(query)
|
conn.delete_query(query)
|
||||||
if asbool(config.get('ckan.search.solr_commit', 'true')):
|
if asbool(config.get('ckan.search.solr_commit', 'true')):
|
||||||
|
@ -240,17 +269,17 @@ def harvest_objects_import(context, data_dict):
|
||||||
:type package_id: string
|
:type package_id: string
|
||||||
'''
|
'''
|
||||||
log.info('Harvest objects import: %r', data_dict)
|
log.info('Harvest objects import: %r', data_dict)
|
||||||
check_access('harvest_objects_import',context,data_dict)
|
check_access('harvest_objects_import', context, data_dict)
|
||||||
|
|
||||||
model = context['model']
|
model = context['model']
|
||||||
session = context['session']
|
session = context['session']
|
||||||
source_id = data_dict.get('source_id',None)
|
source_id = data_dict.get('source_id')
|
||||||
harvest_object_id = data_dict.get('harvest_object_id',None)
|
harvest_object_id = data_dict.get('harvest_object_id')
|
||||||
package_id_or_name = data_dict.get('package_id',None)
|
package_id_or_name = data_dict.get('package_id')
|
||||||
|
|
||||||
segments = context.get('segments',None)
|
segments = context.get('segments')
|
||||||
|
|
||||||
join_datasets = context.get('join_datasets',True)
|
join_datasets = context.get('join_datasets', True)
|
||||||
|
|
||||||
if source_id:
|
if source_id:
|
||||||
source = HarvestSource.get(source_id)
|
source = HarvestSource.get(source_id)
|
||||||
|
@ -262,43 +291,48 @@ def harvest_objects_import(context, data_dict):
|
||||||
log.warn('Harvest source %s is not active.', source_id)
|
log.warn('Harvest source %s is not active.', source_id)
|
||||||
raise Exception('This harvest source is not active')
|
raise Exception('This harvest source is not active')
|
||||||
|
|
||||||
last_objects_ids = session.query(HarvestObject.id) \
|
last_objects_ids = \
|
||||||
.join(HarvestSource) \
|
session.query(HarvestObject.id) \
|
||||||
.filter(HarvestObject.source==source) \
|
.join(HarvestSource) \
|
||||||
.filter(HarvestObject.current==True)
|
.filter(HarvestObject.source == source) \
|
||||||
|
.filter(HarvestObject.current == True)
|
||||||
|
|
||||||
elif harvest_object_id:
|
elif harvest_object_id:
|
||||||
last_objects_ids = session.query(HarvestObject.id) \
|
last_objects_ids = \
|
||||||
.filter(HarvestObject.id==harvest_object_id)
|
session.query(HarvestObject.id) \
|
||||||
|
.filter(HarvestObject.id == harvest_object_id)
|
||||||
elif package_id_or_name:
|
elif package_id_or_name:
|
||||||
last_objects_ids = session.query(HarvestObject.id) \
|
last_objects_ids = \
|
||||||
.join(Package) \
|
session.query(HarvestObject.id) \
|
||||||
.filter(HarvestObject.current==True) \
|
.join(Package) \
|
||||||
.filter(Package.state==u'active') \
|
.filter(HarvestObject.current == True) \
|
||||||
.filter(or_(Package.id==package_id_or_name,
|
.filter(Package.state == u'active') \
|
||||||
Package.name==package_id_or_name))
|
.filter(or_(Package.id == package_id_or_name,
|
||||||
|
Package.name == package_id_or_name))
|
||||||
join_datasets = False
|
join_datasets = False
|
||||||
else:
|
else:
|
||||||
last_objects_ids = session.query(HarvestObject.id) \
|
last_objects_ids = \
|
||||||
.filter(HarvestObject.current==True)
|
session.query(HarvestObject.id) \
|
||||||
|
.filter(HarvestObject.current == True)
|
||||||
|
|
||||||
if join_datasets:
|
if join_datasets:
|
||||||
last_objects_ids = last_objects_ids.join(Package) \
|
last_objects_ids = last_objects_ids.join(Package) \
|
||||||
.filter(Package.state==u'active')
|
.filter(Package.state == u'active')
|
||||||
|
|
||||||
last_objects_ids = last_objects_ids.all()
|
last_objects_ids = last_objects_ids.all()
|
||||||
|
|
||||||
last_objects_count = 0
|
last_objects_count = 0
|
||||||
|
|
||||||
for obj_id in last_objects_ids:
|
for obj_id in last_objects_ids:
|
||||||
if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
|
if segments and \
|
||||||
|
str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
obj = session.query(HarvestObject).get(obj_id)
|
obj = session.query(HarvestObject).get(obj_id)
|
||||||
|
|
||||||
for harvester in PluginImplementations(IHarvester):
|
for harvester in PluginImplementations(IHarvester):
|
||||||
if harvester.info()['name'] == obj.source.type:
|
if harvester.info()['name'] == obj.source.type:
|
||||||
if hasattr(harvester,'force_import'):
|
if hasattr(harvester, 'force_import'):
|
||||||
harvester.force_import = True
|
harvester.force_import = True
|
||||||
harvester.import_stage(obj)
|
harvester.import_stage(obj)
|
||||||
break
|
break
|
||||||
|
@ -306,7 +340,8 @@ def harvest_objects_import(context, data_dict):
|
||||||
log.info('Harvest objects imported: %s', last_objects_count)
|
log.info('Harvest objects imported: %s', last_objects_count)
|
||||||
return last_objects_count
|
return last_objects_count
|
||||||
|
|
||||||
def _caluclate_next_run(frequency):
|
|
||||||
|
def _calculate_next_run(frequency):
|
||||||
|
|
||||||
now = datetime.datetime.utcnow()
|
now = datetime.datetime.utcnow()
|
||||||
if frequency == 'ALWAYS':
|
if frequency == 'ALWAYS':
|
||||||
|
@ -318,7 +353,7 @@ def _caluclate_next_run(frequency):
|
||||||
if frequency == 'DAILY':
|
if frequency == 'DAILY':
|
||||||
return now + datetime.timedelta(days=1)
|
return now + datetime.timedelta(days=1)
|
||||||
if frequency == 'MONTHLY':
|
if frequency == 'MONTHLY':
|
||||||
if now.month in (4,6,9,11):
|
if now.month in (4, 6, 9, 11):
|
||||||
days = 30
|
days = 30
|
||||||
elif now.month == 2:
|
elif now.month == 2:
|
||||||
if now.year % 4 == 0:
|
if now.year % 4 == 0:
|
||||||
|
@ -341,19 +376,20 @@ def _make_scheduled_jobs(context, data_dict):
|
||||||
data_dict = {'source_id': source.id}
|
data_dict = {'source_id': source.id}
|
||||||
try:
|
try:
|
||||||
get_action('harvest_job_create')(context, data_dict)
|
get_action('harvest_job_create')(context, data_dict)
|
||||||
except HarvestJobExists, e:
|
except HarvestJobExists:
|
||||||
log.info('Trying to rerun job for %s skipping' % source.id)
|
log.info('Trying to rerun job for %s skipping' % source.id)
|
||||||
|
|
||||||
source.next_run = _caluclate_next_run(source.frequency)
|
source.next_run = _calculate_next_run(source.frequency)
|
||||||
source.save()
|
source.save()
|
||||||
|
|
||||||
def harvest_jobs_run(context,data_dict):
|
|
||||||
|
def harvest_jobs_run(context, data_dict):
|
||||||
log.info('Harvest job run: %r', data_dict)
|
log.info('Harvest job run: %r', data_dict)
|
||||||
check_access('harvest_jobs_run',context,data_dict)
|
check_access('harvest_jobs_run', context, data_dict)
|
||||||
|
|
||||||
session = context['session']
|
session = context['session']
|
||||||
|
|
||||||
source_id = data_dict.get('source_id',None)
|
source_id = data_dict.get('source_id')
|
||||||
|
|
||||||
if not source_id:
|
if not source_id:
|
||||||
_make_scheduled_jobs(context, data_dict)
|
_make_scheduled_jobs(context, data_dict)
|
||||||
|
@ -361,38 +397,41 @@ def harvest_jobs_run(context,data_dict):
|
||||||
context['return_objects'] = False
|
context['return_objects'] = False
|
||||||
|
|
||||||
# Flag finished jobs as such
|
# Flag finished jobs as such
|
||||||
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'Running'})
|
jobs = harvest_job_list(
|
||||||
|
context, {'source_id': source_id, 'status': u'Running'})
|
||||||
if len(jobs):
|
if len(jobs):
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
if job['gather_finished']:
|
if job['gather_finished']:
|
||||||
objects = session.query(HarvestObject.id) \
|
objects = \
|
||||||
.filter(HarvestObject.harvest_job_id==job['id']) \
|
session.query(HarvestObject.id) \
|
||||||
.filter(and_((HarvestObject.state!=u'COMPLETE'),
|
.filter(HarvestObject.harvest_job_id == job['id']) \
|
||||||
(HarvestObject.state!=u'ERROR'))) \
|
.filter(and_((HarvestObject.state != u'COMPLETE'),
|
||||||
.order_by(HarvestObject.import_finished.desc())
|
(HarvestObject.state != u'ERROR'))) \
|
||||||
|
.order_by(HarvestObject.import_finished.desc())
|
||||||
|
|
||||||
if objects.count() == 0:
|
if objects.count() == 0:
|
||||||
job_obj = HarvestJob.get(job['id'])
|
job_obj = HarvestJob.get(job['id'])
|
||||||
job_obj.status = u'Finished'
|
job_obj.status = u'Finished'
|
||||||
|
|
||||||
last_object = session.query(HarvestObject) \
|
last_object = session.query(HarvestObject) \
|
||||||
.filter(HarvestObject.harvest_job_id==job['id']) \
|
.filter(HarvestObject.harvest_job_id == job['id']) \
|
||||||
.filter(HarvestObject.import_finished!=None) \
|
.filter(HarvestObject.import_finished != None) \
|
||||||
.order_by(HarvestObject.import_finished.desc()) \
|
.order_by(HarvestObject.import_finished.desc()) \
|
||||||
.first()
|
.first()
|
||||||
if last_object:
|
if last_object:
|
||||||
job_obj.finished = last_object.import_finished
|
job_obj.finished = last_object.import_finished
|
||||||
job_obj.save()
|
job_obj.save()
|
||||||
# Reindex the harvest source dataset so it has the latest
|
# Reindex the harvest source dataset so it has the latest
|
||||||
# status
|
# status
|
||||||
get_action('harvest_source_reindex')(context,
|
get_action('harvest_source_reindex')(
|
||||||
{'id': job_obj.source.id})
|
context, {'id': job_obj.source.id})
|
||||||
|
|
||||||
# resubmit old redis tasks
|
# resubmit old redis tasks
|
||||||
resubmit_jobs()
|
resubmit_jobs()
|
||||||
|
|
||||||
# Check if there are pending harvest jobs
|
# Check if there are pending harvest jobs
|
||||||
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
|
jobs = harvest_job_list(
|
||||||
|
context, {'source_id': source_id, 'status': u'New'})
|
||||||
if len(jobs) == 0:
|
if len(jobs) == 0:
|
||||||
log.info('No new harvest jobs.')
|
log.info('No new harvest jobs.')
|
||||||
raise NoNewHarvestJobError('There are no new harvesting jobs')
|
raise NoNewHarvestJobError('There are no new harvesting jobs')
|
||||||
|
@ -402,7 +441,7 @@ def harvest_jobs_run(context,data_dict):
|
||||||
sent_jobs = []
|
sent_jobs = []
|
||||||
for job in jobs:
|
for job in jobs:
|
||||||
context['detailed'] = False
|
context['detailed'] = False
|
||||||
source = harvest_source_show(context,{'id':job['source_id']})
|
source = harvest_source_show(context, {'id': job['source_id']})
|
||||||
if source['active']:
|
if source['active']:
|
||||||
job_obj = HarvestJob.get(job['id'])
|
job_obj = HarvestJob.get(job['id'])
|
||||||
job_obj.status = job['status'] = u'Running'
|
job_obj.status = job['status'] = u'Running'
|
||||||
|
@ -431,7 +470,7 @@ def harvest_job_abort(context, data_dict):
|
||||||
|
|
||||||
model = context['model']
|
model = context['model']
|
||||||
|
|
||||||
source_id = data_dict.get('source_id', None)
|
source_id = data_dict.get('source_id')
|
||||||
source = harvest_source_show(context, {'id': source_id})
|
source = harvest_source_show(context, {'id': source_id})
|
||||||
|
|
||||||
# HarvestJob set status to 'Finished'
|
# HarvestJob set status to 'Finished'
|
||||||
|
@ -484,20 +523,22 @@ def harvest_sources_reindex(context, data_dict):
|
||||||
model = context['model']
|
model = context['model']
|
||||||
|
|
||||||
packages = model.Session.query(model.Package) \
|
packages = model.Session.query(model.Package) \
|
||||||
.filter(model.Package.type==DATASET_TYPE_NAME) \
|
.filter(model.Package.type == DATASET_TYPE_NAME) \
|
||||||
.filter(model.Package.state==u'active') \
|
.filter(model.Package.state == u'active') \
|
||||||
.all()
|
.all()
|
||||||
|
|
||||||
package_index = PackageSearchIndex()
|
package_index = PackageSearchIndex()
|
||||||
|
|
||||||
reindex_context = {'defer_commit': True}
|
reindex_context = {'defer_commit': True}
|
||||||
for package in packages:
|
for package in packages:
|
||||||
get_action('harvest_source_reindex')(reindex_context, {'id': package.id})
|
get_action('harvest_source_reindex')(
|
||||||
|
reindex_context, {'id': package.id})
|
||||||
|
|
||||||
package_index.commit()
|
package_index.commit()
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@logic.side_effect_free
|
@logic.side_effect_free
|
||||||
def harvest_source_reindex(context, data_dict):
|
def harvest_source_reindex(context, data_dict):
|
||||||
'''Reindex a single harvest source'''
|
'''Reindex a single harvest source'''
|
||||||
|
@ -508,8 +549,8 @@ def harvest_source_reindex(context, data_dict):
|
||||||
if 'extras_as_string'in context:
|
if 'extras_as_string'in context:
|
||||||
del context['extras_as_string']
|
del context['extras_as_string']
|
||||||
context.update({'ignore_auth': True})
|
context.update({'ignore_auth': True})
|
||||||
package_dict = logic.get_action('harvest_source_show')(context,
|
package_dict = logic.get_action('harvest_source_show')(
|
||||||
{'id': harvest_source_id})
|
context, {'id': harvest_source_id})
|
||||||
log.debug('Updating search index for harvest source: {0}'.format(
|
log.debug('Updating search index for harvest source: {0}'.format(
|
||||||
package_dict.get('name') or harvest_source_id))
|
package_dict.get('name') or harvest_source_id))
|
||||||
|
|
||||||
|
|
|
@ -14,22 +14,25 @@ from ckan.lib.navl.validators import keep_extras
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def harvest_source_id_exists(value, context):
|
def harvest_source_id_exists(value, context):
|
||||||
|
|
||||||
result = HarvestSource.get(value,None)
|
result = HarvestSource.get(value)
|
||||||
|
|
||||||
if not result:
|
if not result:
|
||||||
raise Invalid('Harvest Source with id %r does not exist.' % str(value))
|
raise Invalid('Harvest Source with id %r does not exist.' % str(value))
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
def harvest_job_exists(value, context):
|
def harvest_job_exists(value, context):
|
||||||
"""Check if a harvest job exists and returns the model if it does"""
|
'''Check if a harvest job exists and returns the model if it does'''
|
||||||
result = HarvestJob.get(value, None)
|
result = HarvestJob.get(value)
|
||||||
|
|
||||||
if not result:
|
if not result:
|
||||||
raise Invalid('Harvest Job with id %r does not exist.' % str(value))
|
raise Invalid('Harvest Job with id %r does not exist.' % str(value))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def _normalize_url(url):
|
def _normalize_url(url):
|
||||||
o = urlparse.urlparse(url)
|
o = urlparse.urlparse(url)
|
||||||
|
|
||||||
|
@ -48,14 +51,15 @@ def _normalize_url(url):
|
||||||
path = o.path.rstrip('/')
|
path = o.path.rstrip('/')
|
||||||
|
|
||||||
check_url = urlparse.urlunparse((
|
check_url = urlparse.urlunparse((
|
||||||
o.scheme,
|
o.scheme,
|
||||||
netloc,
|
netloc,
|
||||||
path,
|
path,
|
||||||
None,None,None))
|
None, None, None))
|
||||||
|
|
||||||
return check_url
|
return check_url
|
||||||
|
|
||||||
def harvest_source_url_validator(key,data,errors,context):
|
|
||||||
|
def harvest_source_url_validator(key, data, errors, context):
|
||||||
package = context.get("package")
|
package = context.get("package")
|
||||||
|
|
||||||
if package:
|
if package:
|
||||||
|
@ -67,22 +71,24 @@ def harvest_source_url_validator(key,data,errors,context):
|
||||||
#pkg_id = data.get(('id',),'')
|
#pkg_id = data.get(('id',),'')
|
||||||
|
|
||||||
q = model.Session.query(model.Package.url, model.Package.state) \
|
q = model.Session.query(model.Package.url, model.Package.state) \
|
||||||
.filter(model.Package.type==DATASET_TYPE_NAME)
|
.filter(model.Package.type == DATASET_TYPE_NAME)
|
||||||
|
|
||||||
if package_id:
|
if package_id:
|
||||||
# When editing a source we need to avoid its own URL
|
# When editing a source we need to avoid its own URL
|
||||||
q = q.filter(model.Package.id!=package_id)
|
q = q.filter(model.Package.id != package_id)
|
||||||
|
|
||||||
existing_sources = q.all()
|
existing_sources = q.all()
|
||||||
|
|
||||||
for url, state in existing_sources:
|
for url, state in existing_sources:
|
||||||
url = _normalize_url(url)
|
url = _normalize_url(url)
|
||||||
if url == new_url:
|
if url == new_url:
|
||||||
raise Invalid('There already is a Harvest Source for this URL: %s' % data[key])
|
raise Invalid('There already is a Harvest Source for this URL: %s'
|
||||||
|
% data[key])
|
||||||
|
|
||||||
return data[key]
|
return data[key]
|
||||||
|
|
||||||
def harvest_source_type_exists(value,context):
|
|
||||||
|
def harvest_source_type_exists(value, context):
|
||||||
#TODO: use new description interface
|
#TODO: use new description interface
|
||||||
|
|
||||||
# Get all the registered harvester types
|
# Get all the registered harvester types
|
||||||
|
@ -90,18 +96,20 @@ def harvest_source_type_exists(value,context):
|
||||||
for harvester in PluginImplementations(IHarvester):
|
for harvester in PluginImplementations(IHarvester):
|
||||||
info = harvester.info()
|
info = harvester.info()
|
||||||
if not info or 'name' not in info:
|
if not info or 'name' not in info:
|
||||||
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
|
log.error('Harvester %s does not provide the harvester name in '
|
||||||
|
'the info response' % harvester)
|
||||||
continue
|
continue
|
||||||
available_types.append(info['name'])
|
available_types.append(info['name'])
|
||||||
|
|
||||||
|
|
||||||
if not value in available_types:
|
if not value in available_types:
|
||||||
raise Invalid('Unknown harvester type: %s. Have you registered a harvester for this type?' % value)
|
raise Invalid('Unknown harvester type: %s. Have you registered a '
|
||||||
|
'harvester for this type?' % value)
|
||||||
|
|
||||||
return value
|
return value
|
||||||
|
|
||||||
def harvest_source_config_validator(key,data,errors,context):
|
|
||||||
harvester_type = data.get(('source_type',),'')
|
def harvest_source_config_validator(key, data, errors, context):
|
||||||
|
harvester_type = data.get(('source_type',), '')
|
||||||
for harvester in PluginImplementations(IHarvester):
|
for harvester in PluginImplementations(IHarvester):
|
||||||
info = harvester.info()
|
info = harvester.info()
|
||||||
if info['name'] == harvester_type:
|
if info['name'] == harvester_type:
|
||||||
|
@ -109,21 +117,24 @@ def harvest_source_config_validator(key,data,errors,context):
|
||||||
try:
|
try:
|
||||||
return harvester.validate_config(data[key])
|
return harvester.validate_config(data[key])
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
raise Invalid('Error parsing the configuration options: %s' % str(e))
|
raise Invalid('Error parsing the configuration options: %s'
|
||||||
|
% e)
|
||||||
else:
|
else:
|
||||||
return data[key]
|
return data[key]
|
||||||
|
|
||||||
|
|
||||||
def keep_not_empty_extras(key, data, errors, context):
|
def keep_not_empty_extras(key, data, errors, context):
|
||||||
extras = data.pop(key, {})
|
extras = data.pop(key, {})
|
||||||
for extras_key, value in extras.iteritems():
|
for extras_key, value in extras.iteritems():
|
||||||
if value:
|
if value:
|
||||||
data[key[:-1] + (extras_key,)] = value
|
data[key[:-1] + (extras_key,)] = value
|
||||||
|
|
||||||
def harvest_source_extra_validator(key,data,errors,context):
|
|
||||||
harvester_type = data.get(('source_type',),'')
|
|
||||||
|
|
||||||
#gather all extra fields to use as whitelist of what
|
def harvest_source_extra_validator(key, data, errors, context):
|
||||||
#can be added to top level data_dict
|
harvester_type = data.get(('source_type',), '')
|
||||||
|
|
||||||
|
# gather all extra fields to use as whitelist of what
|
||||||
|
# can be added to top level data_dict
|
||||||
all_extra_fields = set()
|
all_extra_fields = set()
|
||||||
for harvester in PluginImplementations(IHarvester):
|
for harvester in PluginImplementations(IHarvester):
|
||||||
if not hasattr(harvester, 'extra_schema'):
|
if not hasattr(harvester, 'extra_schema'):
|
||||||
|
@ -152,8 +163,8 @@ def harvest_source_extra_validator(key,data,errors,context):
|
||||||
for key, value in extra_errors.iteritems():
|
for key, value in extra_errors.iteritems():
|
||||||
errors[(key,)] = value
|
errors[(key,)] = value
|
||||||
|
|
||||||
## need to get config out of extras as __extra runs
|
# need to get config out of extras as __extra runs
|
||||||
## after rest of validation
|
# after rest of validation
|
||||||
package_extras = data.get(('extras',), [])
|
package_extras = data.get(('extras',), [])
|
||||||
|
|
||||||
for num, extra in enumerate(list(package_extras)):
|
for num, extra in enumerate(list(package_extras)):
|
||||||
|
@ -177,21 +188,24 @@ def harvest_source_extra_validator(key,data,errors,context):
|
||||||
if package_extras:
|
if package_extras:
|
||||||
data[('extras',)] = package_extras
|
data[('extras',)] = package_extras
|
||||||
|
|
||||||
def harvest_source_convert_from_config(key,data,errors,context):
|
|
||||||
|
def harvest_source_convert_from_config(key, data, errors, context):
|
||||||
config = data[key]
|
config = data[key]
|
||||||
if config:
|
if config:
|
||||||
config_dict = json.loads(config)
|
config_dict = json.loads(config)
|
||||||
for key, value in config_dict.iteritems():
|
for key, value in config_dict.iteritems():
|
||||||
data[(key,)] = value
|
data[(key,)] = value
|
||||||
|
|
||||||
def harvest_source_active_validator(value,context):
|
|
||||||
if isinstance(value,basestring):
|
def harvest_source_active_validator(value, context):
|
||||||
|
if isinstance(value, basestring):
|
||||||
if value.lower() == 'true':
|
if value.lower() == 'true':
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
return bool(value)
|
return bool(value)
|
||||||
|
|
||||||
|
|
||||||
def harvest_source_frequency_exists(value):
|
def harvest_source_frequency_exists(value):
|
||||||
if value == '':
|
if value == '':
|
||||||
value = 'MANUAL'
|
value = 'MANUAL'
|
||||||
|
@ -205,6 +219,7 @@ def dataset_type_exists(value):
|
||||||
value = DATASET_TYPE_NAME
|
value = DATASET_TYPE_NAME
|
||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
def harvest_object_extras_validator(value, context):
|
def harvest_object_extras_validator(value, context):
|
||||||
if not isinstance(value, dict):
|
if not isinstance(value, dict):
|
||||||
raise Invalid('extras must be a dict')
|
raise Invalid('extras must be a dict')
|
||||||
|
|
|
@ -2,6 +2,7 @@ import json
|
||||||
import copy
|
import copy
|
||||||
import factories
|
import factories
|
||||||
import unittest
|
import unittest
|
||||||
|
from nose.tools import assert_equal
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from ckan.tests import factories as ckan_factories
|
from ckan.tests import factories as ckan_factories
|
||||||
|
@ -127,6 +128,30 @@ class FunctionalTestBaseWithoutClearBetweenTests(object):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
SOURCE_DICT = {
|
||||||
|
"url": "http://test.action.com",
|
||||||
|
"name": "test-source-action",
|
||||||
|
"title": "Test source action",
|
||||||
|
"notes": "Test source action desc",
|
||||||
|
"source_type": "test-for-action",
|
||||||
|
"frequency": "MANUAL",
|
||||||
|
"config": json.dumps({"custom_option": ["a", "b"]})
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ActionBase(object):
|
||||||
|
@classmethod
|
||||||
|
def setup_class(cls):
|
||||||
|
reset_db()
|
||||||
|
harvest_model.setup()
|
||||||
|
if not p.plugin_loaded('test_action_harvester'):
|
||||||
|
p.load('test_action_harvester')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def teardown_class(cls):
|
||||||
|
p.unload('test_action_harvester')
|
||||||
|
|
||||||
|
|
||||||
class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests):
|
class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -136,15 +161,7 @@ class HarvestSourceActionBase(FunctionalTestBaseWithoutClearBetweenTests):
|
||||||
|
|
||||||
cls.sysadmin = ckan_factories.Sysadmin()
|
cls.sysadmin = ckan_factories.Sysadmin()
|
||||||
|
|
||||||
cls.default_source_dict = {
|
cls.default_source_dict = SOURCE_DICT
|
||||||
"url": "http://test.action.com",
|
|
||||||
"name": "test-source-action",
|
|
||||||
"title": "Test source action",
|
|
||||||
"notes": "Test source action desc",
|
|
||||||
"source_type": "test-for-action",
|
|
||||||
"frequency": "MANUAL",
|
|
||||||
"config": json.dumps({"custom_option": ["a", "b"]})
|
|
||||||
}
|
|
||||||
|
|
||||||
if not p.plugin_loaded('test_action_harvester'):
|
if not p.plugin_loaded('test_action_harvester'):
|
||||||
p.load('test_action_harvester')
|
p.load('test_action_harvester')
|
||||||
|
@ -287,6 +304,27 @@ class TestHarvestSourceActionUpdate(HarvestSourceActionBase):
|
||||||
assert source.type == source_dict['source_type']
|
assert source.type == source_dict['source_type']
|
||||||
|
|
||||||
|
|
||||||
|
class TestActions(ActionBase):
|
||||||
|
def test_harvest_source_clear(self):
|
||||||
|
source = factories.HarvestSourceObj(**SOURCE_DICT)
|
||||||
|
job = factories.HarvestJobObj(source=source)
|
||||||
|
dataset = ckan_factories.Dataset()
|
||||||
|
object_ = factories.HarvestObjectObj(job=job, source=source,
|
||||||
|
package_id=dataset['id'])
|
||||||
|
|
||||||
|
context = {'model': model, 'session': model.Session,
|
||||||
|
'ignore_auth': True, 'user': ''}
|
||||||
|
result = toolkit.get_action('harvest_source_clear')(
|
||||||
|
context, {'id': source.id})
|
||||||
|
|
||||||
|
assert_equal(result, {'id': source.id})
|
||||||
|
source = harvest_model.HarvestSource.get(source.id)
|
||||||
|
assert source
|
||||||
|
assert_equal(harvest_model.HarvestJob.get(job.id), None)
|
||||||
|
assert_equal(harvest_model.HarvestObject.get(object_.id), None)
|
||||||
|
assert_equal(model.Package.get(dataset['id']), None)
|
||||||
|
|
||||||
|
|
||||||
class TestHarvestObject(unittest.TestCase):
|
class TestHarvestObject(unittest.TestCase):
|
||||||
@classmethod
|
@classmethod
|
||||||
def setup_class(cls):
|
def setup_class(cls):
|
||||||
|
|
Loading…
Reference in New Issue