Add copious logging to record what happens in harvesting.

This commit is contained in:
David Read 2012-06-08 17:09:22 +01:00
parent a5fac2ac86
commit ccf0cd3da2
7 changed files with 35 additions and 15 deletions

View File

@ -1,4 +1,5 @@
import re import re
import logging
from ckan.logic import NotFound, ValidationError, check_access from ckan.logic import NotFound, ValidationError, check_access
from ckan.lib.navl.dictization_functions import validate from ckan.lib.navl.dictization_functions import validate
@ -9,8 +10,11 @@ from ckanext.harvest.logic.dictization import (harvest_source_dictize,
harvest_job_dictize) harvest_job_dictize)
from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list
log = logging.getLogger(__name__)
def harvest_source_create(context,data_dict): def harvest_source_create(context,data_dict):
log.info('Creating harvest source: %r', data_dict)
check_access('harvest_source_create',context,data_dict) check_access('harvest_source_create',context,data_dict)
model = context['model'] model = context['model']
@ -21,6 +25,7 @@ def harvest_source_create(context,data_dict):
if errors: if errors:
session.rollback() session.rollback()
log.warn('Harvest source does not validate: %r', errors)
raise ValidationError(errors,_error_summary(errors)) raise ValidationError(errors,_error_summary(errors))
source = HarvestSource() source = HarvestSource()
@ -36,11 +41,12 @@ def harvest_source_create(context,data_dict):
source.active = data['active'] source.active = data['active']
source.save() source.save()
log.info('Harvest source created: %s', source.id)
return harvest_source_dictize(source,context) return harvest_source_dictize(source,context)
def harvest_job_create(context,data_dict): def harvest_job_create(context,data_dict):
log.info('Harvest job create: %r', data_dict)
check_access('harvest_job_create',context,data_dict) check_access('harvest_job_create',context,data_dict)
source_id = data_dict['source_id'] source_id = data_dict['source_id']
@ -48,10 +54,12 @@ def harvest_job_create(context,data_dict):
# Check if source exists # Check if source exists
source = HarvestSource.get(source_id) source = HarvestSource.get(source_id)
if not source: if not source:
log.warn('Harvest source %s does not exist', source_id)
raise NotFound('Harvest source %s does not exist' % source_id) raise NotFound('Harvest source %s does not exist' % source_id)
# Check if the source is active # Check if the source is active
if not source.active: if not source.active:
log.warn('Harvest job cannot be created for inactive source %s', source_id)
raise Exception('Can not create jobs on inactive sources') raise Exception('Can not create jobs on inactive sources')
# Check if there already is an unrun job for this source # Check if there already is an unrun job for this source
@ -61,16 +69,18 @@ def harvest_job_create(context,data_dict):
} }
exists = harvest_job_list(context,data_dict) exists = harvest_job_list(context,data_dict)
if len(exists): if len(exists):
log.warn('There is already an unrun job %r for this source %s', exists, source_id)
raise Exception('There already is an unrun job for this source') raise Exception('There already is an unrun job for this source')
job = HarvestJob() job = HarvestJob()
job.source = source job.source = source
job.save() job.save()
log.info('Harvest job saved %s', job.id)
return harvest_job_dictize(job,context) return harvest_job_dictize(job,context)
def harvest_job_create_all(context,data_dict): def harvest_job_create_all(context,data_dict):
log.info('Harvest job create all: %r', data_dict)
check_access('harvest_job_create_all',context,data_dict) check_access('harvest_job_create_all',context,data_dict)
data_dict.update({'only_active':True}) data_dict.update({'only_active':True})
@ -92,6 +102,7 @@ def harvest_job_create_all(context,data_dict):
job = harvest_job_create(context,{'source_id':source['id']}) job = harvest_job_create(context,{'source_id':source['id']})
jobs.append(job) jobs.append(job)
log.info('Created jobs for %i harvest sources', len(jobs))
return jobs return jobs
def _error_summary(error_dict): def _error_summary(error_dict):

View File

@ -1,15 +1,19 @@
import logging
from ckan.logic import NotFound, check_access from ckan.logic import NotFound, check_access
from ckanext.harvest.model import (HarvestSource, HarvestJob) from ckanext.harvest.model import (HarvestSource, HarvestJob)
log = logging.getLogger(__name__)
def harvest_source_delete(context,data_dict): def harvest_source_delete(context,data_dict):
log.info('Deleting harvest source: %r', data_dict)
check_access('harvest_source_delete',context,data_dict) check_access('harvest_source_delete',context,data_dict)
source_id = data_dict.get('id') source_id = data_dict.get('id')
source = HarvestSource.get(source_id) source = HarvestSource.get(source_id)
if not source: if not source:
log.warn('Harvest source %s does not exist', source_id)
raise NotFound('Harvest source %s does not exist' % source_id) raise NotFound('Harvest source %s does not exist' % source_id)
# Don't actually delete the record, just flag it as inactive # Don't actually delete the record, just flag it as inactive
@ -19,8 +23,10 @@ def harvest_source_delete(context,data_dict):
# Abort any pending jobs # Abort any pending jobs
jobs = HarvestJob.filter(source=source,status=u'New') jobs = HarvestJob.filter(source=source,status=u'New')
if jobs: if jobs:
log.info('Aborting %i jobs due to deleted harvest source', jobs.count())
for job in jobs: for job in jobs:
job.status = u'Aborted' job.status = u'Aborted'
job.save() job.save()
log.info('Harvest source %s deleted', source_id)
return True return True

View File

@ -30,8 +30,10 @@ def harvest_source_update(context,data_dict):
source_id = data_dict.get('id') source_id = data_dict.get('id')
schema = context.get('schema') or default_harvest_source_schema() schema = context.get('schema') or default_harvest_source_schema()
log.info('Harvest source %s update: %r', source_id, data_dict)
source = HarvestSource.get(source_id) source = HarvestSource.get(source_id)
if not source: if not source:
log.error('Harvest source %s does not exist', source_id)
raise NotFound('Harvest source %s does not exist' % source_id) raise NotFound('Harvest source %s does not exist' % source_id)
data, errors = validate(data_dict, schema) data, errors = validate(data_dict, schema)
@ -55,6 +57,7 @@ def harvest_source_update(context,data_dict):
# Abort any pending jobs # Abort any pending jobs
if not source.active: if not source.active:
jobs = HarvestJob.filter(source=source,status=u'New') jobs = HarvestJob.filter(source=source,status=u'New')
log.info('Harvest source %s not active, so aborting %i outstanding jobs', source_id, jobs.count())
if jobs: if jobs:
for job in jobs: for job in jobs:
job.status = u'Aborted' job.status = u'Aborted'
@ -71,6 +74,7 @@ def harvest_objects_import(context,data_dict):
It will only affect the last fetched objects already present in the It will only affect the last fetched objects already present in the
database. database.
''' '''
log.info('Harvest objects import: %r', data_dict)
check_access('harvest_objects_import',context,data_dict) check_access('harvest_objects_import',context,data_dict)
model = context['model'] model = context['model']
@ -80,9 +84,11 @@ def harvest_objects_import(context,data_dict):
if source_id: if source_id:
source = HarvestSource.get(source_id) source = HarvestSource.get(source_id)
if not source: if not source:
log.error('Harvest source %s does not exist', source_id)
raise NotFound('Harvest source %s does not exist' % source_id) raise NotFound('Harvest source %s does not exist' % source_id)
if not source.active: if not source.active:
log.warn('Harvest source %s is not active.', source_id)
raise Exception('This harvest source is not active') raise Exception('This harvest source is not active')
last_objects_ids = session.query(HarvestObject.id) \ last_objects_ids = session.query(HarvestObject.id) \
@ -108,10 +114,11 @@ def harvest_objects_import(context,data_dict):
harvester.import_stage(obj) harvester.import_stage(obj)
break break
last_objects.append(harvest_object_dictize(obj,context)) last_objects.append(harvest_object_dictize(obj,context))
log.info('Harvest objects imported: %r', last_objects)
return last_objects return last_objects
def harvest_jobs_run(context,data_dict): def harvest_jobs_run(context,data_dict):
log.info('Harvest job run: %r', data_dict)
check_access('harvest_jobs_run',context,data_dict) check_access('harvest_jobs_run',context,data_dict)
source_id = data_dict.get('source_id',None) source_id = data_dict.get('source_id',None)
@ -119,6 +126,7 @@ def harvest_jobs_run(context,data_dict):
# Check if there are pending harvest jobs # Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'}) jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
if len(jobs) == 0: if len(jobs) == 0:
log.info('No new harvest jobs.')
raise Exception('There are no new harvesting jobs') raise Exception('There are no new harvesting jobs')
# Send each job to the gather queue # Send each job to the gather queue

View File

@ -36,7 +36,6 @@ harvest_object_error_table = None
def setup(): def setup():
if harvest_source_table is None: if harvest_source_table is None:
define_harvester_tables() define_harvester_tables()
log.debug('Harvest tables defined in memory') log.debug('Harvest tables defined in memory')

View File

@ -122,7 +122,7 @@ class Harvest(SingletonPlugin):
module_root = '%s.%s' % (module_root, auth_profile) module_root = '%s.%s' % (module_root, auth_profile)
auth_functions = _get_auth_functions(module_root,auth_functions) auth_functions = _get_auth_functions(module_root,auth_functions)
log.info('Using auth profile at %s' % module_root) log.debug('Using auth profile at %s' % module_root)
return auth_functions return auth_functions

View File

@ -29,7 +29,7 @@ EXCHANGE_NAME = 'ckan.harvest'
def get_carrot_connection(): def get_carrot_connection():
backend = config.get('ckan.harvest.mq.library', 'pyamqplib') backend = config.get('ckan.harvest.mq.library', 'pyamqplib')
log.info("Carrot connnection using %s backend" % backend) log.debug("Carrot connection using %s backend" % backend)
try: try:
port = int(config.get('ckan.harvest.mq.port', PORT)) port = int(config.get('ckan.harvest.mq.port', PORT))
except ValueError: except ValueError:

View File

@ -57,14 +57,10 @@
<td><a href="harvest/${source.id}"><img src="ckanext/harvest/images/icons/source_view.png" alt="View" title="View" /></a></td> <td><a href="harvest/${source.id}"><img src="ckanext/harvest/images/icons/source_view.png" alt="View" title="View" /></a></td>
<td><a href="harvest/edit/${source.id}"><img src="ckanext/harvest/images/icons/source_edit.png" alt="Edit" title="Edit" /></a></td> <td><a href="harvest/edit/${source.id}"><img src="ckanext/harvest/images/icons/source_edit.png" alt="Edit" title="Edit" /></a></td>
<td><a href="harvest/refresh/${source.id}"><img src="ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" title="Refresh" /></a></td> <td><a href="harvest/refresh/${source.id}"><img src="ckanext/harvest/images/icons/source_refresh.png" alt="Refresh" title="Refresh" /></a></td>
<py:choose> <?python
<py:when test="source.url and len(source.url)>50"> from webhelpers.text import truncate
<td>${source.url[:50]}<span title="${source.url}">...</span></td> ?>
</py:when> <td title="${source.url}">${truncate(source.url, 50)}</td>
<py:otherwise>
<td>${source.url}</td>
</py:otherwise>
</py:choose>
<td>${source.type}</td> <td>${source.type}</td>
<td class="state">${source.active}</td> <td class="state">${source.active}</td>
<py:choose> <py:choose>