harvester-d4science/ckanext/harvest/logic/action/update.py

135 lines
4.4 KiB
Python
Raw Normal View History

import logging
from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester
from ckan.model import Package
from ckan.logic import NotFound, ValidationError, check_access
from ckan.lib.navl.dictization_functions import validate
from ckanext.harvest.queue import get_gather_publisher
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.schema import harvest_source_form_schema
from ckanext.harvest.logic.dictization import (harvest_source_dictize,harvest_object_dictize)
from ckanext.harvest.logic.action.create import _error_summary
from ckanext.harvest.logic.action.get import harvest_source_show,harvest_job_list
log = logging.getLogger(__name__)
def harvest_source_update(context,data_dict):
check_access('harvest_source_update',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('id')
schema = harvest_source_form_schema()
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
data, errors = validate(data_dict, schema)
if errors:
session.rollback()
raise ValidationError(errors,_error_summary(errors))
fields = ['url','title','type','description','user_id','publisher_id']
for f in fields:
if f in data and data[f] is not None:
source.__setattr__(f,data[f])
if 'active' in data_dict:
source.active = data['active']
if 'config' in data_dict:
source.config = data['config']
source.save()
# Abort any pending jobs
if not source.active:
jobs = HarvestJob.filter(source=source,status=u'New')
if jobs:
for job in jobs:
job.status = u'Aborted'
job.save()
return harvest_source_dictize(source,context)
def harvest_objects_import(context,data_dict):
'''
Reimports the current harvest objects
It performs the import stage with the last fetched objects, optionally
belonging to a certain source.
Please note that no objects will be fetched from the remote server.
It will only affect the last fetched objects already present in the
database.
'''
check_access('harvest_objects_import',context,data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',None)
if source_id:
source = HarvestSource.get(source_id)
if not source:
raise NotFound('Harvest source %s does not exist' % source_id)
if not source.active:
raise Exception('This harvest source is not active')
last_objects_ids = session.query(HarvestObject.id) \
.join(HarvestSource).join(Package) \
.filter(HarvestObject.source==source) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
else:
last_objects_ids = session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.current==True) \
.filter(Package.state==u'active') \
.all()
last_objects = []
for obj_id in last_objects_ids:
obj = session.query(HarvestObject).get(obj_id)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
if hasattr(harvester,'force_import'):
harvester.force_import = True
harvester.import_stage(obj)
break
last_objects.append(harvest_object_dictize(obj,context))
return last_objects
def harvest_jobs_run(context,data_dict):
check_access('harvest_jobs_run',context,data_dict)
# Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'status':u'New'})
if len(jobs) == 0:
raise Exception('There are no new harvesting jobs')
# Send each job to the gather queue
publisher = get_gather_publisher()
sent_jobs = []
for job in jobs:
source = harvest_source_show(context,{'id':job['source']})
if source['active']:
publisher.send({'harvest_job_id': job['id']})
log.info('Sent job %s to the gather queue' % job['id'])
sent_jobs.append(job)
publisher.close()
return sent_jobs