diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index e98bf35..338bb99 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -6,7 +6,6 @@ from ckan import model from ckan.logic import get_action from ckan.lib.cli import CkanCommand -from ckanext.harvest.lib import * from ckanext.harvest.queue import get_gather_consumer, get_fetch_consumer class Harvester(CkanCommand): @@ -100,6 +99,9 @@ class Harvester(CkanCommand): self.import_stage() elif cmd == 'job-all': self.create_harvest_job_all() + elif cmd == 'harvesters-info': + harvesters_info = get_action('harvesters_info_show')() + pprint(harvesters_info) else: print 'Command %s not recognized' % cmd @@ -142,14 +144,16 @@ class Harvester(CkanCommand): else: publisher_id = u'' try: - source = create_harvest_source({ + data_dict = { 'url':url, 'type':type, 'config':config, 'active':active, 'user_id':user_id, - 'publisher_id':publisher_id}) + 'publisher_id':publisher_id} + context = {'model':model} + source = get_action('harvest_source_create')(context,data_dict) print 'Created new harvest source:' self.print_harvest_source(source) @@ -157,8 +161,8 @@ class Harvester(CkanCommand): sources = get_action('harvest_source_list')(context,{}) self.print_there_are('harvest source', sources) - # Create a Harvest Job for the new Source - create_harvest_job(source['id']) + # Create a harvest job for the new source + get_action('harvest_job_create')(context,{'source_id':source['id']}) print 'A new Harvest Job for this source has also been created' except ValidationError,e: @@ -173,8 +177,8 @@ class Harvester(CkanCommand): else: print 'Please provide a source id' sys.exit(1) - - remove_harvest_source(source_id) + context = {'model': model} + get_action('harvest_source_delete')(context,{'id':source_id}) print 'Removed harvest source: %s' % source_id def list_harvest_sources(self): @@ -212,11 +216,9 @@ class Harvester(CkanCommand): self.print_there_are(what='harvest job', sequence=jobs) def run_harvester(self): - try: - jobs = run_harvest_jobs() - except: - pass - sys.exit(0) + context = {'model': model} + jobs = get_action('harvest_jobs_run')(context,{}) + #print 'Sent %s jobs to the gather queue' % len(jobs) def import_stage(self): @@ -224,12 +226,15 @@ class Harvester(CkanCommand): source_id = unicode(self.args[1]) else: source_id = None - objs = import_last_objects(source_id) + context = {'model': model} + objs = get_action('harvest_objects_import')(context,{'source_id':source_id}) + print '%s objects reimported' % len(objs) def create_harvest_job_all(self): - jobs = create_harvest_job_all() - print "Created %s new harvest jobs" % len(jobs) + context = {'model': model} + jobs = get_action('harvest_job_create_all')(context,{}) + print 'Created %s new harvest jobs' % len(jobs) def print_harvest_sources(self, sources): if sources: diff --git a/ckanext/harvest/controllers/view.py b/ckanext/harvest/controllers/view.py index 6150df4..4c560ae 100644 --- a/ckanext/harvest/controllers/view.py +++ b/ckanext/harvest/controllers/view.py @@ -11,9 +11,6 @@ from ckan.lib.base import BaseController, c, g, request, \ from ckan.lib.navl.dictization_functions import DataError from ckan.logic import NotFound, ValidationError, get_action from ckanext.harvest.logic.schema import harvest_source_form_schema -from ckanext.harvest.lib import create_harvest_source, edit_harvest_source, \ - create_harvest_job, get_registered_harvesters_info, \ - get_harvest_object from ckan.lib.helpers import Page import logging log = logging.getLogger(__name__) @@ -44,7 +41,8 @@ class ViewController(BaseController): data = data or {} errors = errors or {} error_summary = error_summary or {} - vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()} + harvesters_info = get_action('harvesters_info_show')() + vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info} c.form = render('source/new_source_form.html', extra_vars=vars) return render('source/new.html') @@ -54,10 +52,11 @@ class ViewController(BaseController): data_dict = dict(request.params) self._check_data_dict(data_dict) - source = create_harvest_source(data_dict) + context = {'model':model} + source = get_action('harvest_source_create')(context,data_dict) # Create a harvest job for the new source - create_harvest_job(source['id']) + get_action('harvest_job_create')(context,{'source_id':source['id']}) h.flash_success(_('New harvest source added successfully.' 'A new harvest job for the source has also been created.')) @@ -87,7 +86,8 @@ class ViewController(BaseController): errors = errors or {} error_summary = error_summary or {} - vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()} + harvesters_info = get_action('harvesters_info_show')() + vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info} c.form = render('source/new_source_form.html', extra_vars=vars) return render('source/edit.html') @@ -95,9 +95,11 @@ class ViewController(BaseController): def _save_edit(self,id): try: data_dict = dict(request.params) + data_dict['id'] = id self._check_data_dict(data_dict) + context = {'model':model} - source = edit_harvest_source(id,data_dict) + source = get_action('harvest_source_update')(context,data_dict) h.flash_success(_('Harvest source edited successfully.')) redirect(h.url_for('harvest')) @@ -139,9 +141,10 @@ class ViewController(BaseController): def delete(self,id): try: - delete_harvest_source(id) + context = {'model':model} + get_action('harvest_source_delete')(context, {'id':id}) - h.flash_success(_('Harvesting source deleted successfully')) + h.flash_success(_('Harvesting source successfully inactivated')) redirect(h.url_for('harvest')) except NotFound: abort(404,_('Harvest source not found')) @@ -149,7 +152,8 @@ class ViewController(BaseController): def create_harvesting_job(self,id): try: - create_harvest_job(id) + context = {'model':model} + get_action('harvest_job_create')(context,{'source_id':id}) h.flash_success(_('Refresh requested, harvesting will take place within 15 minutes.')) except NotFound: abort(404,_('Harvest source not found')) diff --git a/ckanext/harvest/lib/__init__.py b/ckanext/harvest/lib/__init__.py deleted file mode 100644 index 1c22ec9..0000000 --- a/ckanext/harvest/lib/__init__.py +++ /dev/null @@ -1,201 +0,0 @@ -import urlparse -import re - -from ckan import model -from ckan.model import Session, repo -from ckan.model import Package -from ckan.lib.navl.dictization_functions import validate -from ckan.logic import NotFound, ValidationError - -from ckanext.harvest.logic.schema import harvest_source_form_schema -from ckanext.harvest.logic.dictization import (harvest_source_dictize, harvest_job_dictize, harvest_object_dictize) -from ckan.plugins import PluginImplementations -from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \ - HarvestGatherError, HarvestObjectError -from ckanext.harvest.queue import get_gather_publisher -from ckanext.harvest.interfaces import IHarvester - -import logging -log = logging.getLogger('ckanext') - -#TODO: remove! -context = {'model':model} - -def create_harvest_source(data_dict): - - schema = harvest_source_form_schema() - data, errors = validate(data_dict, schema) - - if errors: - Session.rollback() - raise ValidationError(errors,_error_summary(errors)) - - source = HarvestSource() - source.url = data['url'] - source.type = data['type'] - - opt = ['active','title','description','user_id','publisher_id','config'] - for o in opt: - if o in data and data[o] is not None: - source.__setattr__(o,data[o]) - - if 'active' in data_dict: - source.active = data['active'] - - source.save() - - return harvest_source_dictize(source,context) - -def edit_harvest_source(source_id,data_dict): - schema = harvest_source_form_schema() - - source = HarvestSource.get(source_id) - if not source: - raise NotFound('Harvest source %s does not exist' % source_id) - - # Add source id to the dict, as some validators will need it - data_dict['id'] = source.id - - data, errors = validate(data_dict, schema) - if errors: - Session.rollback() - raise ValidationError(errors,_error_summary(errors)) - - fields = ['url','title','type','description','user_id','publisher_id'] - for f in fields: - if f in data and data[f] is not None: - source.__setattr__(f,data[f]) - - if 'active' in data_dict: - source.active = data['active'] - - if 'config' in data_dict: - source.config = data['config'] - - source.save() - # Abort any pending jobs - if not source.active: - jobs = HarvestJob.filter(source=source,status=u'New') - if jobs: - for job in jobs: - job.status = u'Aborted' - job.save() - - return harvest_source_dictize(source,context) - - -def remove_harvest_source(source_id): - - source = HarvestSource.get(source_id) - if not source: - raise NotFound('Harvest source %s does not exist' % source_id) - - # Don't actually delete the record, just flag it as inactive - source.active = False - source.save() - - # Abort any pending jobs - jobs = HarvestJob.filter(source=source,status=u'New') - if jobs: - for job in jobs: - job.status = u'Aborted' - job.save() - - return True - -def create_harvest_job(source_id): - # Check if source exists - source = HarvestSource.get(source_id) - if not source: - raise NotFound('Harvest source %s does not exist' % source_id) - - # Check if the source is active - if not source.active: - raise Exception('Can not create jobs on inactive sources') - - # Check if there already is an unrun job for this source - exists = get_action('harvest_job_list')(context,{'status':u'New'}) - if len(exists): - raise Exception('There already is an unrun job for this source') - - job = HarvestJob() - job.source = source - - job.save() - return harvest_job_dictize(job,context) - -def run_harvest_jobs(): - # Check if there are pending harvest jobs - jobs = get_action('harvest_job_list')(context,{'status':u'New'}) - if len(jobs) == 0: - raise Exception('There are no new harvesting jobs') - - # Send each job to the gather queue - publisher = get_gather_publisher() - sent_jobs = [] - for job in jobs: - if job['source']['active']: - publisher.send({'harvest_job_id': job['id']}) - log.info('Sent job %s to the gather queue' % job['id']) - sent_jobs.append(job) - - publisher.close() - return sent_jobs - -def import_last_objects(source_id=None): - if source_id: - source = HarvestSource.get(source_id) - if not source: - raise NotFound('Harvest source %s does not exist' % source_id) - - if not source.active: - raise Exception('This harvest source is not active') - - last_objects_ids = Session.query(HarvestObject.id) \ - .join(HarvestSource).join(Package) \ - .filter(HarvestObject.source==source) \ - .filter(HarvestObject.current==True) \ - .filter(Package.state==u'active') \ - .all() - else: - last_objects_ids = Session.query(HarvestObject.id) \ - .join(Package) \ - .filter(HarvestObject.current==True) \ - .filter(Package.state==u'active') \ - .all() - - last_objects = [] - for obj_id in last_objects_ids: - obj = Session.query(HarvestObject).get(obj_id) - for harvester in PluginImplementations(IHarvester): - if harvester.info()['name'] == obj.source.type: - if hasattr(harvester,'force_import'): - harvester.force_import = True - harvester.import_stage(obj) - break - last_objects.append(obj) - return last_objects - -def create_harvest_job_all(): - - # Get all active sources - sources = harvest_sources_list(active=True) - jobs = [] - # Create a new job for each - for source in sources: - job = create_harvest_job(source['id']) - jobs.append(job) - - return jobs - -def get_registered_harvesters_info(): - available_harvesters = [] - for harvester in PluginImplementations(IHarvester): - info = harvester.info() - if not info or 'name' not in info: - log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester)) - continue - info['show_config'] = (info.get('form_config_interface','') == 'Text') - available_harvesters.append(info) - - return available_harvesters diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py new file mode 100644 index 0000000..a1a92d9 --- /dev/null +++ b/ckanext/harvest/logic/action/create.py @@ -0,0 +1,99 @@ +import re + +from ckan.logic import NotFound, ValidationError +from ckan.lib.navl.dictization_functions import validate + +from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject) +from ckanext.harvest.logic.schema import harvest_source_form_schema +from ckanext.harvest.logic.dictization import (harvest_source_dictize, + harvest_job_dictize) +from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list + +def harvest_source_create(context,data_dict): + + model = context['model'] + + schema = harvest_source_form_schema() + data, errors = validate(data_dict, schema) + + if errors: + model.Session.rollback() + raise ValidationError(errors,_error_summary(errors)) + + source = HarvestSource() + source.url = data['url'] + source.type = data['type'] + + opt = ['active','title','description','user_id','publisher_id','config'] + for o in opt: + if o in data and data[o] is not None: + source.__setattr__(o,data[o]) + + if 'active' in data_dict: + source.active = data['active'] + + source.save() + + return harvest_source_dictize(source,context) + +def harvest_job_create(context,data_dict): + + source_id = data_dict['source_id'] + + # Check if source exists + source = HarvestSource.get(source_id) + if not source: + raise NotFound('Harvest source %s does not exist' % source_id) + + # Check if the source is active + if not source.active: + raise Exception('Can not create jobs on inactive sources') + + # Check if there already is an unrun job for this source + data_dict ={ + 'source_id':source_id, + 'status':u'New' + } + exists = harvest_job_list(context,data_dict) + if len(exists): + raise Exception('There already is an unrun job for this source') + + job = HarvestJob() + job.source = source + + job.save() + return harvest_job_dictize(job,context) + +def harvest_job_create_all(context,data_dict): + + data_dict.update({'only_active':True}) + + # Get all active sources + sources = harvest_source_list(context,data_dict) + jobs = [] + # Create a new job for each, if there isn't already one + for source in sources: + data_dict ={ + 'source_id':source['id'], + 'status':u'New' + } + + exists = harvest_job_list(context,data_dict) + if len(exists): + continue + + job = harvest_job_create(context,{'source_id':source['id']}) + jobs.append(job) + + return jobs + +def _error_summary(error_dict): + error_summary = {} + for key, error in error_dict.iteritems(): + error_summary[_prettify(key)] = error[0] + return error_summary + +def _prettify(field_name): + field_name = re.sub('(?