[logic] Refactor the rest of the logic functions (create,update,delete)
This commit is contained in:
parent
651474e9f1
commit
c798013752
|
@ -6,7 +6,6 @@ from ckan import model
|
|||
from ckan.logic import get_action
|
||||
|
||||
from ckan.lib.cli import CkanCommand
|
||||
from ckanext.harvest.lib import *
|
||||
from ckanext.harvest.queue import get_gather_consumer, get_fetch_consumer
|
||||
|
||||
class Harvester(CkanCommand):
|
||||
|
@ -100,6 +99,9 @@ class Harvester(CkanCommand):
|
|||
self.import_stage()
|
||||
elif cmd == 'job-all':
|
||||
self.create_harvest_job_all()
|
||||
elif cmd == 'harvesters-info':
|
||||
harvesters_info = get_action('harvesters_info_show')()
|
||||
pprint(harvesters_info)
|
||||
else:
|
||||
print 'Command %s not recognized' % cmd
|
||||
|
||||
|
@ -142,14 +144,16 @@ class Harvester(CkanCommand):
|
|||
else:
|
||||
publisher_id = u''
|
||||
try:
|
||||
source = create_harvest_source({
|
||||
data_dict = {
|
||||
'url':url,
|
||||
'type':type,
|
||||
'config':config,
|
||||
'active':active,
|
||||
'user_id':user_id,
|
||||
'publisher_id':publisher_id})
|
||||
'publisher_id':publisher_id}
|
||||
|
||||
context = {'model':model}
|
||||
source = get_action('harvest_source_create')(context,data_dict)
|
||||
print 'Created new harvest source:'
|
||||
self.print_harvest_source(source)
|
||||
|
||||
|
@ -157,8 +161,8 @@ class Harvester(CkanCommand):
|
|||
sources = get_action('harvest_source_list')(context,{})
|
||||
self.print_there_are('harvest source', sources)
|
||||
|
||||
# Create a Harvest Job for the new Source
|
||||
create_harvest_job(source['id'])
|
||||
# Create a harvest job for the new source
|
||||
get_action('harvest_job_create')(context,{'source_id':source['id']})
|
||||
print 'A new Harvest Job for this source has also been created'
|
||||
|
||||
except ValidationError,e:
|
||||
|
@ -173,8 +177,8 @@ class Harvester(CkanCommand):
|
|||
else:
|
||||
print 'Please provide a source id'
|
||||
sys.exit(1)
|
||||
|
||||
remove_harvest_source(source_id)
|
||||
context = {'model': model}
|
||||
get_action('harvest_source_delete')(context,{'id':source_id})
|
||||
print 'Removed harvest source: %s' % source_id
|
||||
|
||||
def list_harvest_sources(self):
|
||||
|
@ -212,11 +216,9 @@ class Harvester(CkanCommand):
|
|||
self.print_there_are(what='harvest job', sequence=jobs)
|
||||
|
||||
def run_harvester(self):
|
||||
try:
|
||||
jobs = run_harvest_jobs()
|
||||
except:
|
||||
pass
|
||||
sys.exit(0)
|
||||
context = {'model': model}
|
||||
jobs = get_action('harvest_jobs_run')(context,{})
|
||||
|
||||
#print 'Sent %s jobs to the gather queue' % len(jobs)
|
||||
|
||||
def import_stage(self):
|
||||
|
@ -224,12 +226,15 @@ class Harvester(CkanCommand):
|
|||
source_id = unicode(self.args[1])
|
||||
else:
|
||||
source_id = None
|
||||
objs = import_last_objects(source_id)
|
||||
context = {'model': model}
|
||||
objs = get_action('harvest_objects_import')(context,{'source_id':source_id})
|
||||
|
||||
print '%s objects reimported' % len(objs)
|
||||
|
||||
def create_harvest_job_all(self):
|
||||
jobs = create_harvest_job_all()
|
||||
print "Created %s new harvest jobs" % len(jobs)
|
||||
context = {'model': model}
|
||||
jobs = get_action('harvest_job_create_all')(context,{})
|
||||
print 'Created %s new harvest jobs' % len(jobs)
|
||||
|
||||
def print_harvest_sources(self, sources):
|
||||
if sources:
|
||||
|
|
|
@ -11,9 +11,6 @@ from ckan.lib.base import BaseController, c, g, request, \
|
|||
from ckan.lib.navl.dictization_functions import DataError
|
||||
from ckan.logic import NotFound, ValidationError, get_action
|
||||
from ckanext.harvest.logic.schema import harvest_source_form_schema
|
||||
from ckanext.harvest.lib import create_harvest_source, edit_harvest_source, \
|
||||
create_harvest_job, get_registered_harvesters_info, \
|
||||
get_harvest_object
|
||||
from ckan.lib.helpers import Page
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
|
@ -44,7 +41,8 @@ class ViewController(BaseController):
|
|||
data = data or {}
|
||||
errors = errors or {}
|
||||
error_summary = error_summary or {}
|
||||
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()}
|
||||
harvesters_info = get_action('harvesters_info_show')()
|
||||
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info}
|
||||
|
||||
c.form = render('source/new_source_form.html', extra_vars=vars)
|
||||
return render('source/new.html')
|
||||
|
@ -54,10 +52,11 @@ class ViewController(BaseController):
|
|||
data_dict = dict(request.params)
|
||||
self._check_data_dict(data_dict)
|
||||
|
||||
source = create_harvest_source(data_dict)
|
||||
context = {'model':model}
|
||||
source = get_action('harvest_source_create')(context,data_dict)
|
||||
|
||||
# Create a harvest job for the new source
|
||||
create_harvest_job(source['id'])
|
||||
get_action('harvest_job_create')(context,{'source_id':source['id']})
|
||||
|
||||
h.flash_success(_('New harvest source added successfully.'
|
||||
'A new harvest job for the source has also been created.'))
|
||||
|
@ -87,7 +86,8 @@ class ViewController(BaseController):
|
|||
errors = errors or {}
|
||||
error_summary = error_summary or {}
|
||||
|
||||
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': get_registered_harvesters_info()}
|
||||
harvesters_info = get_action('harvesters_info_show')()
|
||||
vars = {'data': data, 'errors': errors, 'error_summary': error_summary, 'harvesters': harvesters_info}
|
||||
|
||||
c.form = render('source/new_source_form.html', extra_vars=vars)
|
||||
return render('source/edit.html')
|
||||
|
@ -95,9 +95,11 @@ class ViewController(BaseController):
|
|||
def _save_edit(self,id):
|
||||
try:
|
||||
data_dict = dict(request.params)
|
||||
data_dict['id'] = id
|
||||
self._check_data_dict(data_dict)
|
||||
context = {'model':model}
|
||||
|
||||
source = edit_harvest_source(id,data_dict)
|
||||
source = get_action('harvest_source_update')(context,data_dict)
|
||||
|
||||
h.flash_success(_('Harvest source edited successfully.'))
|
||||
redirect(h.url_for('harvest'))
|
||||
|
@ -139,9 +141,10 @@ class ViewController(BaseController):
|
|||
|
||||
def delete(self,id):
|
||||
try:
|
||||
delete_harvest_source(id)
|
||||
context = {'model':model}
|
||||
get_action('harvest_source_delete')(context, {'id':id})
|
||||
|
||||
h.flash_success(_('Harvesting source deleted successfully'))
|
||||
h.flash_success(_('Harvesting source successfully inactivated'))
|
||||
redirect(h.url_for('harvest'))
|
||||
except NotFound:
|
||||
abort(404,_('Harvest source not found'))
|
||||
|
@ -149,7 +152,8 @@ class ViewController(BaseController):
|
|||
|
||||
def create_harvesting_job(self,id):
|
||||
try:
|
||||
create_harvest_job(id)
|
||||
context = {'model':model}
|
||||
get_action('harvest_job_create')(context,{'source_id':id})
|
||||
h.flash_success(_('Refresh requested, harvesting will take place within 15 minutes.'))
|
||||
except NotFound:
|
||||
abort(404,_('Harvest source not found'))
|
||||
|
|
|
@ -1,201 +0,0 @@
|
|||
import urlparse
|
||||
import re
|
||||
|
||||
from ckan import model
|
||||
from ckan.model import Session, repo
|
||||
from ckan.model import Package
|
||||
from ckan.lib.navl.dictization_functions import validate
|
||||
from ckan.logic import NotFound, ValidationError
|
||||
|
||||
from ckanext.harvest.logic.schema import harvest_source_form_schema
|
||||
from ckanext.harvest.logic.dictization import (harvest_source_dictize, harvest_job_dictize, harvest_object_dictize)
|
||||
from ckan.plugins import PluginImplementations
|
||||
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \
|
||||
HarvestGatherError, HarvestObjectError
|
||||
from ckanext.harvest.queue import get_gather_publisher
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
|
||||
import logging
|
||||
log = logging.getLogger('ckanext')
|
||||
|
||||
#TODO: remove!
|
||||
context = {'model':model}
|
||||
|
||||
def create_harvest_source(data_dict):
|
||||
|
||||
schema = harvest_source_form_schema()
|
||||
data, errors = validate(data_dict, schema)
|
||||
|
||||
if errors:
|
||||
Session.rollback()
|
||||
raise ValidationError(errors,_error_summary(errors))
|
||||
|
||||
source = HarvestSource()
|
||||
source.url = data['url']
|
||||
source.type = data['type']
|
||||
|
||||
opt = ['active','title','description','user_id','publisher_id','config']
|
||||
for o in opt:
|
||||
if o in data and data[o] is not None:
|
||||
source.__setattr__(o,data[o])
|
||||
|
||||
if 'active' in data_dict:
|
||||
source.active = data['active']
|
||||
|
||||
source.save()
|
||||
|
||||
return harvest_source_dictize(source,context)
|
||||
|
||||
def edit_harvest_source(source_id,data_dict):
|
||||
schema = harvest_source_form_schema()
|
||||
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
# Add source id to the dict, as some validators will need it
|
||||
data_dict['id'] = source.id
|
||||
|
||||
data, errors = validate(data_dict, schema)
|
||||
if errors:
|
||||
Session.rollback()
|
||||
raise ValidationError(errors,_error_summary(errors))
|
||||
|
||||
fields = ['url','title','type','description','user_id','publisher_id']
|
||||
for f in fields:
|
||||
if f in data and data[f] is not None:
|
||||
source.__setattr__(f,data[f])
|
||||
|
||||
if 'active' in data_dict:
|
||||
source.active = data['active']
|
||||
|
||||
if 'config' in data_dict:
|
||||
source.config = data['config']
|
||||
|
||||
source.save()
|
||||
# Abort any pending jobs
|
||||
if not source.active:
|
||||
jobs = HarvestJob.filter(source=source,status=u'New')
|
||||
if jobs:
|
||||
for job in jobs:
|
||||
job.status = u'Aborted'
|
||||
job.save()
|
||||
|
||||
return harvest_source_dictize(source,context)
|
||||
|
||||
|
||||
def remove_harvest_source(source_id):
|
||||
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
# Don't actually delete the record, just flag it as inactive
|
||||
source.active = False
|
||||
source.save()
|
||||
|
||||
# Abort any pending jobs
|
||||
jobs = HarvestJob.filter(source=source,status=u'New')
|
||||
if jobs:
|
||||
for job in jobs:
|
||||
job.status = u'Aborted'
|
||||
job.save()
|
||||
|
||||
return True
|
||||
|
||||
def create_harvest_job(source_id):
|
||||
# Check if source exists
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
# Check if the source is active
|
||||
if not source.active:
|
||||
raise Exception('Can not create jobs on inactive sources')
|
||||
|
||||
# Check if there already is an unrun job for this source
|
||||
exists = get_action('harvest_job_list')(context,{'status':u'New'})
|
||||
if len(exists):
|
||||
raise Exception('There already is an unrun job for this source')
|
||||
|
||||
job = HarvestJob()
|
||||
job.source = source
|
||||
|
||||
job.save()
|
||||
return harvest_job_dictize(job,context)
|
||||
|
||||
def run_harvest_jobs():
|
||||
# Check if there are pending harvest jobs
|
||||
jobs = get_action('harvest_job_list')(context,{'status':u'New'})
|
||||
if len(jobs) == 0:
|
||||
raise Exception('There are no new harvesting jobs')
|
||||
|
||||
# Send each job to the gather queue
|
||||
publisher = get_gather_publisher()
|
||||
sent_jobs = []
|
||||
for job in jobs:
|
||||
if job['source']['active']:
|
||||
publisher.send({'harvest_job_id': job['id']})
|
||||
log.info('Sent job %s to the gather queue' % job['id'])
|
||||
sent_jobs.append(job)
|
||||
|
||||
publisher.close()
|
||||
return sent_jobs
|
||||
|
||||
def import_last_objects(source_id=None):
|
||||
if source_id:
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
if not source.active:
|
||||
raise Exception('This harvest source is not active')
|
||||
|
||||
last_objects_ids = Session.query(HarvestObject.id) \
|
||||
.join(HarvestSource).join(Package) \
|
||||
.filter(HarvestObject.source==source) \
|
||||
.filter(HarvestObject.current==True) \
|
||||
.filter(Package.state==u'active') \
|
||||
.all()
|
||||
else:
|
||||
last_objects_ids = Session.query(HarvestObject.id) \
|
||||
.join(Package) \
|
||||
.filter(HarvestObject.current==True) \
|
||||
.filter(Package.state==u'active') \
|
||||
.all()
|
||||
|
||||
last_objects = []
|
||||
for obj_id in last_objects_ids:
|
||||
obj = Session.query(HarvestObject).get(obj_id)
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
if harvester.info()['name'] == obj.source.type:
|
||||
if hasattr(harvester,'force_import'):
|
||||
harvester.force_import = True
|
||||
harvester.import_stage(obj)
|
||||
break
|
||||
last_objects.append(obj)
|
||||
return last_objects
|
||||
|
||||
def create_harvest_job_all():
|
||||
|
||||
# Get all active sources
|
||||
sources = harvest_sources_list(active=True)
|
||||
jobs = []
|
||||
# Create a new job for each
|
||||
for source in sources:
|
||||
job = create_harvest_job(source['id'])
|
||||
jobs.append(job)
|
||||
|
||||
return jobs
|
||||
|
||||
def get_registered_harvesters_info():
|
||||
available_harvesters = []
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
info = harvester.info()
|
||||
if not info or 'name' not in info:
|
||||
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
|
||||
continue
|
||||
info['show_config'] = (info.get('form_config_interface','') == 'Text')
|
||||
available_harvesters.append(info)
|
||||
|
||||
return available_harvesters
|
|
@ -0,0 +1,99 @@
|
|||
import re
|
||||
|
||||
from ckan.logic import NotFound, ValidationError
|
||||
from ckan.lib.navl.dictization_functions import validate
|
||||
|
||||
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
|
||||
from ckanext.harvest.logic.schema import harvest_source_form_schema
|
||||
from ckanext.harvest.logic.dictization import (harvest_source_dictize,
|
||||
harvest_job_dictize)
|
||||
from ckanext.harvest.logic.action.get import harvest_source_list,harvest_job_list
|
||||
|
||||
def harvest_source_create(context,data_dict):
|
||||
|
||||
model = context['model']
|
||||
|
||||
schema = harvest_source_form_schema()
|
||||
data, errors = validate(data_dict, schema)
|
||||
|
||||
if errors:
|
||||
model.Session.rollback()
|
||||
raise ValidationError(errors,_error_summary(errors))
|
||||
|
||||
source = HarvestSource()
|
||||
source.url = data['url']
|
||||
source.type = data['type']
|
||||
|
||||
opt = ['active','title','description','user_id','publisher_id','config']
|
||||
for o in opt:
|
||||
if o in data and data[o] is not None:
|
||||
source.__setattr__(o,data[o])
|
||||
|
||||
if 'active' in data_dict:
|
||||
source.active = data['active']
|
||||
|
||||
source.save()
|
||||
|
||||
return harvest_source_dictize(source,context)
|
||||
|
||||
def harvest_job_create(context,data_dict):
|
||||
|
||||
source_id = data_dict['source_id']
|
||||
|
||||
# Check if source exists
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
# Check if the source is active
|
||||
if not source.active:
|
||||
raise Exception('Can not create jobs on inactive sources')
|
||||
|
||||
# Check if there already is an unrun job for this source
|
||||
data_dict ={
|
||||
'source_id':source_id,
|
||||
'status':u'New'
|
||||
}
|
||||
exists = harvest_job_list(context,data_dict)
|
||||
if len(exists):
|
||||
raise Exception('There already is an unrun job for this source')
|
||||
|
||||
job = HarvestJob()
|
||||
job.source = source
|
||||
|
||||
job.save()
|
||||
return harvest_job_dictize(job,context)
|
||||
|
||||
def harvest_job_create_all(context,data_dict):
|
||||
|
||||
data_dict.update({'only_active':True})
|
||||
|
||||
# Get all active sources
|
||||
sources = harvest_source_list(context,data_dict)
|
||||
jobs = []
|
||||
# Create a new job for each, if there isn't already one
|
||||
for source in sources:
|
||||
data_dict ={
|
||||
'source_id':source['id'],
|
||||
'status':u'New'
|
||||
}
|
||||
|
||||
exists = harvest_job_list(context,data_dict)
|
||||
if len(exists):
|
||||
continue
|
||||
|
||||
job = harvest_job_create(context,{'source_id':source['id']})
|
||||
jobs.append(job)
|
||||
|
||||
return jobs
|
||||
|
||||
def _error_summary(error_dict):
|
||||
error_summary = {}
|
||||
for key, error in error_dict.iteritems():
|
||||
error_summary[_prettify(key)] = error[0]
|
||||
return error_summary
|
||||
|
||||
def _prettify(field_name):
|
||||
field_name = re.sub('(?<!\w)[Uu]rl(?!\w)', 'URL', field_name.replace('_', ' ').capitalize())
|
||||
return field_name.replace('_', ' ')
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
from ckan.logic import NotFound
|
||||
|
||||
from ckanext.harvest.model import (HarvestSource, HarvestJob)
|
||||
|
||||
|
||||
def harvest_source_delete(context,data_dict):
|
||||
|
||||
source_id = data_dict.get('id')
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
# Don't actually delete the record, just flag it as inactive
|
||||
source.active = False
|
||||
source.save()
|
||||
|
||||
# Abort any pending jobs
|
||||
jobs = HarvestJob.filter(source=source,status=u'New')
|
||||
if jobs:
|
||||
for job in jobs:
|
||||
job.status = u'Aborted'
|
||||
job.save()
|
||||
|
||||
return True
|
|
@ -1,4 +1,8 @@
|
|||
from ckan.logic import NotFound, ValidationError
|
||||
from ckan.plugins import PluginImplementations
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
|
||||
|
||||
from ckan.logic import NotFound
|
||||
|
||||
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
|
||||
from ckanext.harvest.logic.dictization import (harvest_source_dictize,
|
||||
|
@ -51,14 +55,18 @@ def harvest_job_list(context,data_dict):
|
|||
|
||||
model = context['model']
|
||||
|
||||
source_id = data_dict.get('source_id',False)
|
||||
status = data_dict.get('status',False)
|
||||
|
||||
query = model.Session.query(HarvestJob)
|
||||
|
||||
if source_id:
|
||||
query = query.filter(HarvestJob.source_id==source_id)
|
||||
|
||||
if status:
|
||||
jobs = model.Session.query(HarvestJob) \
|
||||
.filter(HarvestJob.status==status) \
|
||||
.all()
|
||||
else:
|
||||
jobs = model.Session.query(HarvestJob).all()
|
||||
query = query.filter(HarvestJob.status==status)
|
||||
|
||||
jobs = query.all()
|
||||
|
||||
return [harvest_job_dictize(job,context) for job in jobs]
|
||||
|
||||
|
@ -87,3 +95,15 @@ def harvest_object_list(context,data_dict):
|
|||
objects = model.Session.query(HarvestObject).all()
|
||||
|
||||
return [getattr(obj,'id') for obj in objects]
|
||||
|
||||
def harvesters_info_show(context = {},data_dict = {}):
|
||||
available_harvesters = []
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
info = harvester.info()
|
||||
if not info or 'name' not in info:
|
||||
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
|
||||
continue
|
||||
info['show_config'] = (info.get('form_config_interface','') == 'Text')
|
||||
available_harvesters.append(info)
|
||||
|
||||
return available_harvesters
|
||||
|
|
|
@ -0,0 +1,125 @@
|
|||
import logging
|
||||
|
||||
from ckan.plugins import PluginImplementations
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
|
||||
from ckan.model import Package
|
||||
|
||||
from ckan.logic import NotFound, ValidationError
|
||||
from ckan.lib.navl.dictization_functions import validate
|
||||
|
||||
from ckanext.harvest.queue import get_gather_publisher
|
||||
|
||||
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
|
||||
from ckanext.harvest.logic.schema import harvest_source_form_schema
|
||||
from ckanext.harvest.logic.dictization import (harvest_source_dictize,harvest_object_dictize)
|
||||
|
||||
from ckanext.harvest.logic.action.create import _error_summary
|
||||
from ckanext.harvest.logic.action.get import harvest_source_show,harvest_job_list
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
def harvest_source_update(context,data_dict):
|
||||
|
||||
model = context['model']
|
||||
source_id = data_dict.get('id')
|
||||
|
||||
schema = harvest_source_form_schema()
|
||||
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
data, errors = validate(data_dict, schema)
|
||||
if errors:
|
||||
model.Session.rollback()
|
||||
raise ValidationError(errors,_error_summary(errors))
|
||||
|
||||
fields = ['url','title','type','description','user_id','publisher_id']
|
||||
for f in fields:
|
||||
if f in data and data[f] is not None:
|
||||
source.__setattr__(f,data[f])
|
||||
|
||||
if 'active' in data_dict:
|
||||
source.active = data['active']
|
||||
|
||||
if 'config' in data_dict:
|
||||
source.config = data['config']
|
||||
|
||||
source.save()
|
||||
# Abort any pending jobs
|
||||
if not source.active:
|
||||
jobs = HarvestJob.filter(source=source,status=u'New')
|
||||
if jobs:
|
||||
for job in jobs:
|
||||
job.status = u'Aborted'
|
||||
job.save()
|
||||
|
||||
return harvest_source_dictize(source,context)
|
||||
|
||||
def harvest_objects_import(context,data_dict):
|
||||
'''
|
||||
Reimports the current harvest objects
|
||||
It performs the import stage with the last fetched objects, optionally
|
||||
belonging to a certain source.
|
||||
Please note that no objects will be fetched from the remote server.
|
||||
It will only affect the last fetched objects already present in the
|
||||
database.
|
||||
'''
|
||||
model = context['model']
|
||||
source_id = data_dict.get('source_id',None)
|
||||
|
||||
if source_id:
|
||||
source = HarvestSource.get(source_id)
|
||||
if not source:
|
||||
raise NotFound('Harvest source %s does not exist' % source_id)
|
||||
|
||||
if not source.active:
|
||||
raise Exception('This harvest source is not active')
|
||||
|
||||
last_objects_ids = model.Session.query(HarvestObject.id) \
|
||||
.join(HarvestSource).join(Package) \
|
||||
.filter(HarvestObject.source==source) \
|
||||
.filter(HarvestObject.current==True) \
|
||||
.filter(Package.state==u'active') \
|
||||
.all()
|
||||
else:
|
||||
last_objects_ids = model.Session.query(HarvestObject.id) \
|
||||
.join(Package) \
|
||||
.filter(HarvestObject.current==True) \
|
||||
.filter(Package.state==u'active') \
|
||||
.all()
|
||||
|
||||
last_objects = []
|
||||
for obj_id in last_objects_ids:
|
||||
obj = model.Session.query(HarvestObject).get(obj_id)
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
if harvester.info()['name'] == obj.source.type:
|
||||
if hasattr(harvester,'force_import'):
|
||||
harvester.force_import = True
|
||||
harvester.import_stage(obj)
|
||||
break
|
||||
last_objects.append(harvest_object_dictize(obj,context))
|
||||
return last_objects
|
||||
|
||||
def harvest_jobs_run(context,data_dict):
|
||||
|
||||
# Check if there are pending harvest jobs
|
||||
jobs = harvest_job_list(context,{'status':u'New'})
|
||||
if len(jobs) == 0:
|
||||
raise Exception('There are no new harvesting jobs')
|
||||
|
||||
# Send each job to the gather queue
|
||||
publisher = get_gather_publisher()
|
||||
sent_jobs = []
|
||||
for job in jobs:
|
||||
source = harvest_source_show(context,{'id':job['source']})
|
||||
if source['active']:
|
||||
publisher.send({'harvest_job_id': job['id']})
|
||||
log.info('Sent job %s to the gather queue' % job['id'])
|
||||
sent_jobs.append(job)
|
||||
|
||||
publisher.close()
|
||||
return sent_jobs
|
||||
|
|
@ -46,16 +46,6 @@ def harvest_object_dictize(obj, context):
|
|||
|
||||
return out
|
||||
|
||||
def _prettify(field_name):
|
||||
field_name = re.sub('(?<!\w)[Uu]rl(?!\w)', 'URL', field_name.replace('_', ' ').capitalize())
|
||||
return field_name.replace('_', ' ')
|
||||
|
||||
def _error_summary(error_dict):
|
||||
error_summary = {}
|
||||
for key, error in error_dict.iteritems():
|
||||
error_summary[_prettify(key)] = error[0]
|
||||
return error_summary
|
||||
|
||||
def _get_source_status(source, context):
|
||||
|
||||
model = context.get('model')
|
||||
|
|
|
@ -58,12 +58,27 @@ class Harvest(SingletonPlugin):
|
|||
from ckanext.harvest.logic.action.get import (harvest_source_show,
|
||||
harvest_source_list,
|
||||
harvest_job_show,
|
||||
harvest_job_list,)
|
||||
harvest_job_list,
|
||||
harvesters_info_show,)
|
||||
from ckanext.harvest.logic.action.create import (harvest_source_create,
|
||||
harvest_job_create,
|
||||
harvest_job_create_all,)
|
||||
from ckanext.harvest.logic.action.update import (harvest_source_update,
|
||||
harvest_objects_import,
|
||||
harvest_jobs_run)
|
||||
from ckanext.harvest.logic.action.delete import (harvest_source_delete,)
|
||||
|
||||
return {
|
||||
'harvest_source_show': harvest_source_show,
|
||||
'harvest_source_list': harvest_source_list,
|
||||
'harvest_job_show': harvest_job_show,
|
||||
'harvest_job_list': harvest_job_list,
|
||||
|
||||
'harvest_source_create': harvest_source_create,
|
||||
'harvest_job_create': harvest_job_create,
|
||||
'harvest_job_create_all': harvest_job_create_all,
|
||||
'harvest_source_update': harvest_source_update,
|
||||
'harvest_source_delete': harvest_source_delete,
|
||||
'harvesters_info_show': harvesters_info_show,
|
||||
'harvest_objects_import': harvest_objects_import,
|
||||
'harvest_jobs_run':harvest_jobs_run
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue