Fixed bug with deleting harvest source's which have a custom

configuration. Added PEP-8 compliance.
This commit is contained in:
Mark Winterbottom 2015-10-30 15:15:41 +00:00
parent 55325f5940
commit 02b81187df
2 changed files with 98 additions and 74 deletions

View File

@ -5,8 +5,9 @@ from ckan import plugins as p
log = logging.getLogger(__name__)
def harvest_source_delete(context, data_dict):
'''
"""
Deletes an existing harvest source
This method just proxies the request to package_delete,
@ -19,16 +20,14 @@ def harvest_source_delete(context, data_dict):
:returns: the newly created harvest source
:rtype: dictionary
'''
"""
log.info('Deleting harvest source: %r', data_dict)
p.toolkit.check_access('harvest_source_delete', context, data_dict)
p.toolkit.get_action('package_delete')(context, data_dict)
if context.get('clear_source', False):
# We need the id, the name won't work
# We need the id, the name won't work.
package_dict = p.toolkit.get_action('package_show')(context, data_dict)
p.toolkit.get_action('harvest_source_clear')(context, {'id': package_dict['id']})
p.toolkit.get_action('harvest_source_clear')(
context, {'id': package_dict['id']})

View File

@ -29,12 +29,14 @@ from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.logic import HarvestJobExists, NoNewHarvestJobError
from ckanext.harvest.logic.schema import harvest_source_show_package_schema
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
from ckanext.harvest.logic.action.get import harvest_source_show, \
harvest_job_list, _get_sources_for_user
log = logging.getLogger(__name__)
def harvest_source_update(context,data_dict):
def harvest_source_update(context, data_dict):
'''
Updates an existing harvest source
@ -83,18 +85,22 @@ def harvest_source_update(context,data_dict):
return source
def harvest_source_clear(context,data_dict):
def harvest_source_clear(context, data_dict):
'''
Clears all datasets, jobs and objects related to a harvest source, but keeps the source itself.
This is useful to clean history of long running harvest sources to start again fresh.
Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself.
This is useful to clean history of long running harvest sources to start
again fresh.
:param id: the id of the harvest source to clear
:type id: string
'''
check_access('harvest_source_clear',context,data_dict)
harvest_source_id = data_dict.get('id',None)
check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id', None)
source = HarvestSource.get(harvest_source_id)
if not source:
@ -108,14 +114,18 @@ def harvest_source_clear(context,data_dict):
model = context['model']
sql = "select id from related where id in (select related_id from related_dataset where dataset_id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}'));".format(harvest_source_id=harvest_source_id)
sql = "select id from related where id in (select related_id from " \
"related_dataset where dataset_id in (select package_id from " \
"harvest_object where harvest_source_id = " \
"'{harvest_source_id}'));".format(
harvest_source_id=harvest_source_id)
result = model.Session.execute(sql)
ids = []
for row in result:
ids.append(row[0])
related_ids = "('" + "','".join(ids) + "')"
sql = '''begin;
sql = '''begin;
update package set state = 'to_delete' where id in (select package_id from harvest_object where harvest_source_id = '{harvest_source_id}');'''.format(
harvest_source_id=harvest_source_id)
@ -129,15 +139,15 @@ def harvest_source_clear(context,data_dict):
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
else:
sql += '''
delete from resource_revision where resource_group_id in
(select id from resource_group where package_id in
delete from resource_revision where resource_group_id in
(select id from resource_group where package_id in
(select id from package where state = 'to_delete'));
delete from resource where resource_group_id in
(select id from resource_group where package_id in
delete from resource where resource_group_id in
(select id from resource_group where package_id in
(select id from package where state = 'to_delete'));
delete from resource_group_revision where package_id in
delete from resource_group_revision where package_id in
(select id from package where state = 'to_delete');
delete from resource_group where package_id in
delete from resource_group where package_id in
(select id from package where state = 'to_delete');
'''
# CKAN pre-2.5: authz models were removed in migration 078
@ -181,10 +191,11 @@ def harvest_source_clear(context,data_dict):
return {'id': harvest_source_id}
def harvest_source_index_clear(context,data_dict):
check_access('harvest_source_clear',context,data_dict)
harvest_source_id = data_dict.get('id',None)
def harvest_source_index_clear(context, data_dict):
check_access('harvest_source_clear', context, data_dict)
harvest_source_id = data_dict.get('id', None)
source = HarvestSource.get(harvest_source_id)
if not source:
@ -194,8 +205,8 @@ def harvest_source_index_clear(context,data_dict):
harvest_source_id = source.id
conn = make_connection()
query = ''' +%s:"%s" +site_id:"%s" ''' % ('harvest_source_id', harvest_source_id,
config.get('ckan.site_id'))
query = ''' +%s:"%s" +site_id:"%s" ''' % (
'harvest_source_id', harvest_source_id, config.get('ckan.site_id'))
try:
conn.delete_query(query)
if asbool(config.get('ckan.search.solr_commit', 'true')):
@ -208,7 +219,8 @@ def harvest_source_index_clear(context,data_dict):
return {'id': harvest_source_id}
def harvest_objects_import(context,data_dict):
def harvest_objects_import(context, data_dict):
'''
Reimports the current harvest objects
It performs the import stage with the last fetched objects, optionally
@ -217,18 +229,19 @@ def harvest_objects_import(context,data_dict):
It will only affect the last fetched objects already present in the
database.
'''
log.info('Harvest objects import: %r', data_dict)
check_access('harvest_objects_import',context,data_dict)
check_access('harvest_objects_import', context, data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',None)
harvest_object_id = data_dict.get('harvest_object_id',None)
package_id_or_name = data_dict.get('package_id',None)
source_id = data_dict.get('source_id', None)
harvest_object_id = data_dict.get('harvest_object_id', None)
package_id_or_name = data_dict.get('package_id', None)
segments = context.get('segments',None)
segments = context.get('segments', None)
join_datasets = context.get('join_datasets',True)
join_datasets = context.get('join_datasets', rue)
if source_id:
source = HarvestSource.get(source_id)
@ -241,42 +254,43 @@ def harvest_objects_import(context,data_dict):
raise Exception('This harvest source is not active')
last_objects_ids = session.query(HarvestObject.id) \
.join(HarvestSource) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True)
.join(HarvestSource) \
.filter(HarvestObject.source == source) \
.filter(HarvestObject.current == True)
elif harvest_object_id:
last_objects_ids = session.query(HarvestObject.id) \
.filter(HarvestObject.id==harvest_object_id)
.filter(HarvestObject.id == harvest_object_id)
elif package_id_or_name:
last_objects_ids = session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.filter(or_(Package.id==package_id_or_name,
Package.name==package_id_or_name))
.filter(HarvestObject.current == True) \
.filter(Package.state == u'active') \
.filter(or_(Package.id == package_id_or_name,
Package.name == package_id_or_name))
join_datasets = False
else:
last_objects_ids = session.query(HarvestObject.id) \
.filter(HarvestObject.current==True)
.filter(HarvestObject.current == True)
if join_datasets:
last_objects_ids = last_objects_ids.join(Package) \
.filter(Package.state==u'active')
.filter(Package.state == u'active')
last_objects_ids = last_objects_ids.all()
last_objects_count = 0
for obj_id in last_objects_ids:
if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
if segments and \
str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
continue
obj = session.query(HarvestObject).get(obj_id)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
if hasattr(harvester,'force_import'):
if hasattr(harvester, 'force_import'):
harvester.force_import = True
harvester.import_stage(obj)
break
@ -284,6 +298,7 @@ def harvest_objects_import(context,data_dict):
log.info('Harvest objects imported: %s', last_objects_count)
return last_objects_count
def _caluclate_next_run(frequency):
now = datetime.datetime.utcnow()
@ -296,7 +311,7 @@ def _caluclate_next_run(frequency):
if frequency == 'DAILY':
return now + datetime.timedelta(days=1)
if frequency == 'MONTHLY':
if now.month in (4,6,9,11):
if now.month in (4, 6, 9, 11):
days = 30
elif now.month == 2:
if now.year % 4 == 0:
@ -325,13 +340,14 @@ def _make_scheduled_jobs(context, data_dict):
source.next_run = _caluclate_next_run(source.frequency)
source.save()
def harvest_jobs_run(context,data_dict):
def harvest_jobs_run(context, data_dict):
log.info('Harvest job run: %r', data_dict)
check_access('harvest_jobs_run',context,data_dict)
check_access('harvest_jobs_run', context, data_dict)
session = context['session']
source_id = data_dict.get('source_id',None)
source_id = data_dict.get('source_id', None)
if not source_id:
_make_scheduled_jobs(context, data_dict)
@ -339,14 +355,15 @@ def harvest_jobs_run(context,data_dict):
context['return_objects'] = False
# Flag finished jobs as such
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'Running'})
jobs = harvest_job_list(
context, {'source_id': source_id, 'status': u'Running'})
if len(jobs):
for job in jobs:
if job['gather_finished']:
objects = session.query(HarvestObject.id) \
.filter(HarvestObject.harvest_job_id==job['id']) \
.filter(and_((HarvestObject.state!=u'COMPLETE'),
(HarvestObject.state!=u'ERROR'))) \
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(and_((HarvestObject.state != u'COMPLETE'),
(HarvestObject.state != u'ERROR'))) \
.order_by(HarvestObject.import_finished.desc())
if objects.count() == 0:
@ -354,23 +371,24 @@ def harvest_jobs_run(context,data_dict):
job_obj.status = u'Finished'
last_object = session.query(HarvestObject) \
.filter(HarvestObject.harvest_job_id==job['id']) \
.filter(HarvestObject.import_finished!=None) \
.order_by(HarvestObject.import_finished.desc()) \
.first()
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(HarvestObject.import_finished != None) \
.order_by(HarvestObject.import_finished.desc()) \
.first()
if last_object:
job_obj.finished = last_object.import_finished
job_obj.save()
# Reindex the harvest source dataset so it has the latest
# status
get_action('harvest_source_reindex')(context,
{'id': job_obj.source.id})
get_action('harvest_source_reindex')(
context, {'id': job_obj.source.id})
# resubmit old redis tasks
resubmit_jobs()
# Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
jobs = harvest_job_list(
context, {'source_id': source_id, 'status': u'New'})
if len(jobs) == 0:
log.info('No new harvest jobs.')
raise NoNewHarvestJobError('There are no new harvesting jobs')
@ -380,7 +398,7 @@ def harvest_jobs_run(context,data_dict):
sent_jobs = []
for job in jobs:
context['detailed'] = False
source = harvest_source_show(context,{'id':job['source_id']})
source = harvest_source_show(context, {'id': job['source_id']})
if source['active']:
job_obj = HarvestJob.get(job['id'])
job_obj.status = job['status'] = u'Running'
@ -404,23 +422,25 @@ def harvest_sources_reindex(context, data_dict):
model = context['model']
packages = model.Session.query(model.Package) \
.filter(model.Package.type==DATASET_TYPE_NAME) \
.filter(model.Package.state==u'active') \
.filter(model.Package.type == DATASET_TYPE_NAME) \
.filter(model.Package.state == u'active') \
.all()
package_index = PackageSearchIndex()
reindex_context = {'defer_commit': True}
for package in packages:
get_action('harvest_source_reindex')(reindex_context, {'id': package.id})
get_action('harvest_source_reindex')(
reindex_context, {'id': package.id})
package_index.commit()
return True
@logic.side_effect_free
def harvest_source_reindex(context, data_dict):
'''Reindex a single harvest source'''
"""Reindex a single harvest source."""
harvest_source_id = logic.get_or_bust(data_dict, 'id')
defer_commit = context.get('defer_commit', False)
@ -428,18 +448,23 @@ def harvest_source_reindex(context, data_dict):
if 'extras_as_string'in context:
del context['extras_as_string']
context.update({'ignore_auth': True})
package_dict = logic.get_action('harvest_source_show')(context,
{'id': harvest_source_id})
log.debug('Updating search index for harvest source {0}'.format(harvest_source_id))
package_dict = logic.get_action('harvest_source_show')(
context, {'id': harvest_source_id})
log.debug('Updating search index for harvest source {0}'.format(
harvest_source_id))
# Remove configuration values
new_dict = {}
if package_dict.get('config'):
if package_dict.get('config', None):
config = json.loads(package_dict['config'])
for key, value in package_dict.iteritems():
if key not in config:
new_dict[key] = value
if value:
if value and key not in config:
new_dict[key] = value
package_index = PackageSearchIndex()
package_index.index_package(new_dict, defer_commit=defer_commit)
package_index.index_package(
new_dict,
defer_commit=defer_commit)
return True