2012-08-02 19:41:59 +02:00
|
|
|
import hashlib
|
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
import logging
|
2012-10-29 18:15:02 +01:00
|
|
|
import datetime
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-12-13 17:33:44 +01:00
|
|
|
from sqlalchemy import and_
|
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
from ckan.plugins import PluginImplementations
|
2012-10-29 18:15:02 +01:00
|
|
|
from ckan.logic import get_action
|
2012-02-29 16:20:35 +01:00
|
|
|
from ckanext.harvest.interfaces import IHarvester
|
|
|
|
|
|
|
|
from ckan.model import Package
|
2012-11-30 15:03:04 +01:00
|
|
|
from ckan import logic
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-11-30 15:03:04 +01:00
|
|
|
from ckan.logic import NotFound, check_access
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-11-30 15:03:04 +01:00
|
|
|
from ckanext.harvest.plugin import DATASET_TYPE_NAME
|
2012-02-29 16:20:35 +01:00
|
|
|
from ckanext.harvest.queue import get_gather_publisher
|
|
|
|
|
2012-12-13 19:33:59 +01:00
|
|
|
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
|
2012-10-29 18:15:02 +01:00
|
|
|
from ckanext.harvest.logic import HarvestJobExists
|
2012-11-30 15:03:04 +01:00
|
|
|
from ckanext.harvest.logic.schema import harvest_source_db_to_form_schema
|
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-10-29 18:15:02 +01:00
|
|
|
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
|
2012-02-29 16:20:35 +01:00
|
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
def harvest_source_update(context,data_dict):
|
2012-11-30 15:03:04 +01:00
|
|
|
'''
|
|
|
|
Updates an existing harvest source
|
|
|
|
|
|
|
|
This method just proxies the request to package_update,
|
|
|
|
which will create a harvest_source dataset type and the
|
|
|
|
HarvestSource object. All auth checks and validation will
|
|
|
|
be done there .We only make sure to set the dataset type
|
|
|
|
|
|
|
|
Note that the harvest source type (ckan, waf, csw, etc)
|
|
|
|
is now set via the source_type field.
|
|
|
|
|
|
|
|
:param id: the name or id of the harvest source to update
|
|
|
|
:type id: string
|
|
|
|
:param url: the URL for the harvest source
|
|
|
|
:type url: string
|
|
|
|
:param name: the name of the new harvest source, must be between 2 and 100
|
|
|
|
characters long and contain only lowercase alphanumeric characters
|
|
|
|
:type name: string
|
|
|
|
:param title: the title of the dataset (optional, default: same as
|
|
|
|
``name``)
|
|
|
|
:type title: string
|
|
|
|
:param notes: a description of the harvest source (optional)
|
|
|
|
:type notes: string
|
|
|
|
:param source_type: the harvester type for this source. This must be one
|
|
|
|
of the registerd harvesters, eg 'ckan', 'csw', etc.
|
|
|
|
:type source_type: string
|
|
|
|
:param frequency: the frequency in wich this harvester should run. See
|
|
|
|
``ckanext.harvest.model`` source for possible values. Default is
|
|
|
|
'MANUAL'
|
|
|
|
:type frequency: string
|
|
|
|
:param config: extra configuration options for the particular harvester
|
|
|
|
type. Should be a serialized as JSON. (optional)
|
|
|
|
:type config: string
|
|
|
|
|
|
|
|
|
|
|
|
:returns: the newly created harvest source
|
|
|
|
:rtype: dictionary
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-11-30 15:03:04 +01:00
|
|
|
'''
|
|
|
|
log.info('Updating harvest source: %r', data_dict)
|
2012-03-06 17:01:43 +01:00
|
|
|
|
2012-11-30 15:03:04 +01:00
|
|
|
data_dict['type'] = DATASET_TYPE_NAME
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-11-30 15:03:04 +01:00
|
|
|
context['extras_as_string'] = True
|
|
|
|
package_dict = logic.get_action('package_update')(context, data_dict)
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-11-30 15:03:04 +01:00
|
|
|
context['schema'] = harvest_source_db_to_form_schema()
|
|
|
|
source = logic.get_action('package_show')(context, package_dict)
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-11-30 15:03:04 +01:00
|
|
|
return source
|
2012-02-29 16:20:35 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def harvest_objects_import(context,data_dict):
|
|
|
|
'''
|
|
|
|
Reimports the current harvest objects
|
|
|
|
It performs the import stage with the last fetched objects, optionally
|
|
|
|
belonging to a certain source.
|
|
|
|
Please note that no objects will be fetched from the remote server.
|
|
|
|
It will only affect the last fetched objects already present in the
|
|
|
|
database.
|
|
|
|
'''
|
2012-06-08 18:09:22 +02:00
|
|
|
log.info('Harvest objects import: %r', data_dict)
|
2012-03-01 13:02:16 +01:00
|
|
|
check_access('harvest_objects_import',context,data_dict)
|
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
model = context['model']
|
2012-03-01 13:02:16 +01:00
|
|
|
session = context['session']
|
2012-02-29 16:20:35 +01:00
|
|
|
source_id = data_dict.get('source_id',None)
|
|
|
|
|
2012-08-02 19:41:59 +02:00
|
|
|
segments = context.get('segments',None)
|
|
|
|
|
2012-07-30 13:11:55 +02:00
|
|
|
join_datasets = context.get('join_datasets',True)
|
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
if source_id:
|
|
|
|
source = HarvestSource.get(source_id)
|
|
|
|
if not source:
|
2012-06-08 18:09:22 +02:00
|
|
|
log.error('Harvest source %s does not exist', source_id)
|
2012-02-29 16:20:35 +01:00
|
|
|
raise NotFound('Harvest source %s does not exist' % source_id)
|
|
|
|
|
|
|
|
if not source.active:
|
2012-06-08 18:09:22 +02:00
|
|
|
log.warn('Harvest source %s is not active.', source_id)
|
2012-02-29 16:20:35 +01:00
|
|
|
raise Exception('This harvest source is not active')
|
|
|
|
|
2012-03-01 13:02:16 +01:00
|
|
|
last_objects_ids = session.query(HarvestObject.id) \
|
2012-07-30 13:11:55 +02:00
|
|
|
.join(HarvestSource) \
|
2012-02-29 16:20:35 +01:00
|
|
|
.filter(HarvestObject.source==source) \
|
2012-07-30 13:11:55 +02:00
|
|
|
.filter(HarvestObject.current==True)
|
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
else:
|
2012-03-01 13:02:16 +01:00
|
|
|
last_objects_ids = session.query(HarvestObject.id) \
|
2012-02-29 16:20:35 +01:00
|
|
|
.filter(HarvestObject.current==True) \
|
2012-07-30 13:11:55 +02:00
|
|
|
|
|
|
|
if join_datasets:
|
|
|
|
last_objects_ids = last_objects_ids.join(Package) \
|
|
|
|
.filter(Package.state==u'active')
|
|
|
|
|
|
|
|
last_objects_ids = last_objects_ids.all()
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-08-09 12:17:41 +02:00
|
|
|
last_objects_count = 0
|
2012-08-02 19:41:59 +02:00
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
for obj_id in last_objects_ids:
|
2012-08-02 19:41:59 +02:00
|
|
|
if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
|
|
|
|
continue
|
|
|
|
|
2012-03-01 13:02:16 +01:00
|
|
|
obj = session.query(HarvestObject).get(obj_id)
|
2012-08-02 19:41:59 +02:00
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
for harvester in PluginImplementations(IHarvester):
|
|
|
|
if harvester.info()['name'] == obj.source.type:
|
|
|
|
if hasattr(harvester,'force_import'):
|
|
|
|
harvester.force_import = True
|
|
|
|
harvester.import_stage(obj)
|
|
|
|
break
|
2012-08-09 12:17:41 +02:00
|
|
|
last_objects_count += 1
|
|
|
|
log.info('Harvest objects imported: %s', last_objects_count)
|
|
|
|
return last_objects_count
|
2012-02-29 16:20:35 +01:00
|
|
|
|
2012-10-29 18:15:02 +01:00
|
|
|
def _caluclate_next_run(frequency):
|
|
|
|
|
|
|
|
now = datetime.datetime.utcnow()
|
|
|
|
if frequency == 'ALWAYS':
|
|
|
|
return now
|
|
|
|
if frequency == 'WEEKLY':
|
|
|
|
return now + datetime.timedelta(weeks=1)
|
|
|
|
if frequency == 'BIWEEKLY':
|
|
|
|
return now + datetime.timedelta(weeks=2)
|
|
|
|
if frequency == 'DAILY':
|
|
|
|
return now + datetime.timedelta(days=1)
|
|
|
|
if frequency == 'MONTHLY':
|
|
|
|
if now.month in (4,6,9,11):
|
|
|
|
days = 30
|
|
|
|
elif now.month == 2:
|
|
|
|
if now.year % 4 == 0:
|
|
|
|
days = 29
|
|
|
|
else:
|
|
|
|
days = 28
|
|
|
|
else:
|
|
|
|
days = 31
|
|
|
|
return now + datetime.timedelta(days=days)
|
|
|
|
raise Exception('Frequency {freq} not recognised'.format(freq=frequency))
|
|
|
|
|
|
|
|
|
|
|
|
def _make_scheduled_jobs(context, data_dict):
|
|
|
|
|
|
|
|
data_dict = {'only_to_run': True,
|
|
|
|
'only_active': True}
|
|
|
|
sources = _get_sources_for_user(context, data_dict)
|
|
|
|
|
|
|
|
for source in sources:
|
|
|
|
data_dict = {'source_id': source.id}
|
|
|
|
try:
|
|
|
|
get_action('harvest_job_create')(context, data_dict)
|
|
|
|
except HarvestJobExists, e:
|
|
|
|
log.info('Trying to rerun job for %s skipping' % source.id)
|
|
|
|
|
|
|
|
source.next_run = _caluclate_next_run(source.frequency)
|
|
|
|
source.save()
|
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
def harvest_jobs_run(context,data_dict):
|
2012-06-08 18:09:22 +02:00
|
|
|
log.info('Harvest job run: %r', data_dict)
|
2012-03-01 13:02:16 +01:00
|
|
|
check_access('harvest_jobs_run',context,data_dict)
|
|
|
|
|
2012-12-13 17:33:44 +01:00
|
|
|
session = context['session']
|
|
|
|
|
2012-03-02 17:49:39 +01:00
|
|
|
source_id = data_dict.get('source_id',None)
|
|
|
|
|
2012-10-29 18:15:02 +01:00
|
|
|
if not source_id:
|
|
|
|
_make_scheduled_jobs(context, data_dict)
|
|
|
|
|
2012-12-13 19:20:49 +01:00
|
|
|
context['return_objects'] = False
|
|
|
|
|
2012-12-13 17:33:44 +01:00
|
|
|
# Flag finished jobs as such
|
|
|
|
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'Running'})
|
|
|
|
if len(jobs):
|
|
|
|
for job in jobs:
|
|
|
|
if job['gather_finished']:
|
|
|
|
objects = session.query(HarvestObject.id) \
|
|
|
|
.filter(HarvestObject.harvest_job_id==job['id']) \
|
|
|
|
.filter(and_((HarvestObject.state!=u'COMPLETE'),
|
|
|
|
(HarvestObject.state!=u'ERROR')))
|
|
|
|
if objects.count() == 0:
|
|
|
|
job_obj = HarvestJob.get(job['id'])
|
|
|
|
job_obj.status = u'Finished'
|
|
|
|
job_obj.save()
|
2012-12-14 13:39:01 +01:00
|
|
|
# Reindex the harvest source dataset so it has the latest
|
|
|
|
# status
|
|
|
|
if 'extras_as_string'in context:
|
|
|
|
del context['extras_as_string']
|
|
|
|
context.update({'validate': False, 'ignore_auth': True})
|
|
|
|
package_dict = logic.get_action('package_show')(context,
|
|
|
|
{'id': job_obj.source.id})
|
|
|
|
|
|
|
|
if package_dict:
|
|
|
|
from ckan.lib.search.index import PackageSearchIndex
|
|
|
|
PackageSearchIndex().index_package(package_dict)
|
|
|
|
|
2012-12-13 17:33:44 +01:00
|
|
|
|
2012-02-29 16:20:35 +01:00
|
|
|
# Check if there are pending harvest jobs
|
2012-03-02 17:49:39 +01:00
|
|
|
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
|
2012-02-29 16:20:35 +01:00
|
|
|
if len(jobs) == 0:
|
2012-06-08 18:09:22 +02:00
|
|
|
log.info('No new harvest jobs.')
|
2012-02-29 16:20:35 +01:00
|
|
|
raise Exception('There are no new harvesting jobs')
|
|
|
|
|
|
|
|
# Send each job to the gather queue
|
|
|
|
publisher = get_gather_publisher()
|
|
|
|
sent_jobs = []
|
|
|
|
for job in jobs:
|
2012-06-29 12:32:18 +02:00
|
|
|
context['detailed'] = False
|
2012-02-29 16:20:35 +01:00
|
|
|
source = harvest_source_show(context,{'id':job['source']})
|
|
|
|
if source['active']:
|
2012-12-13 17:33:44 +01:00
|
|
|
job_obj = HarvestJob.get(job['id'])
|
|
|
|
job_obj.status = job['status'] = u'Running'
|
|
|
|
job_obj.save()
|
2012-02-29 16:20:35 +01:00
|
|
|
publisher.send({'harvest_job_id': job['id']})
|
|
|
|
log.info('Sent job %s to the gather queue' % job['id'])
|
|
|
|
sent_jobs.append(job)
|
|
|
|
|
|
|
|
publisher.close()
|
|
|
|
return sent_jobs
|
|
|
|
|