diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 818530e..00fcd45 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -15,7 +15,7 @@ class Harvester(CkanCommand): harvester initdb - Creates the necessary tables in the database - harvester source {url} {type} [{active}] [{user-id}] [{publisher-id}] + harvester source {url} {type} [{config}] [{active}] [{user-id}] [{publisher-id}] [{frequency}] - create new harvest source harvester rmsource {id} @@ -64,7 +64,7 @@ class Harvester(CkanCommand): summary = __doc__.split('\n')[0] usage = __doc__ - max_args = 6 + max_args = 8 min_args = 0 def __init__(self,name): @@ -169,11 +169,18 @@ class Harvester(CkanCommand): publisher_id = unicode(self.args[6]) else: publisher_id = u'' + if len(self.args) >= 8: + frequency = unicode(self.args[7]) + if not frequency: + frequency = None + else: + frequency = None try: data_dict = { 'url':url, 'type':type, 'config':config, + 'frequency':frequency, 'active':active, 'user_id':user_id, 'publisher_id':publisher_id} @@ -186,9 +193,11 @@ class Harvester(CkanCommand): sources = get_action('harvest_source_list')(context,{}) self.print_there_are('harvest source', sources) - # Create a harvest job for the new source - get_action('harvest_job_create')(context,{'source_id':source['id']}) - print 'A new Harvest Job for this source has also been created' + # Create a harvest job for the new source if not regular job. + if not data_dict['frequency']: + get_action('harvest_job_create')(context,{'source_id':source['id']}) + print 'A new Harvest Job for this source has also been created' + except ValidationError,e: print 'An error occurred:' print str(e.error_dict) @@ -278,6 +287,7 @@ class Harvester(CkanCommand): print ' active: %s' % source['active'] print ' user: %s' % source['user_id'] print 'publisher: %s' % source['publisher_id'] + print 'frequency: %s' % source['frequency'] print ' jobs: %s' % source['status']['job_count'] print '' diff --git a/ckanext/harvest/logic/__init__.py b/ckanext/harvest/logic/__init__.py index d0ed2fc..580a57b 100644 --- a/ckanext/harvest/logic/__init__.py +++ b/ckanext/harvest/logic/__init__.py @@ -5,3 +5,5 @@ except ImportError: import pkgutil __path__ = pkgutil.extend_path(__path__, __name__) +class HarvestJobExists(Exception): + pass diff --git a/ckanext/harvest/logic/action/create.py b/ckanext/harvest/logic/action/create.py index a93b5c5..50954e6 100644 --- a/ckanext/harvest/logic/action/create.py +++ b/ckanext/harvest/logic/action/create.py @@ -2,6 +2,7 @@ import re import logging from ckan.logic import NotFound, ValidationError, check_access +from ckanext.harvest.logic import HarvestJobExists from ckan.lib.navl.dictization_functions import validate from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject) @@ -32,7 +33,8 @@ def harvest_source_create(context,data_dict): source.url = data['url'].strip() source.type = data['type'] - opt = ['active','title','description','user_id','publisher_id','config'] + opt = ['active','title','description','user_id', + 'publisher_id','config', 'frequency'] for o in opt: if o in data and data[o] is not None: source.__setattr__(o,data[o]) @@ -45,6 +47,7 @@ def harvest_source_create(context,data_dict): return harvest_source_dictize(source,context) + def harvest_job_create(context,data_dict): log.info('Harvest job create: %r', data_dict) check_access('harvest_job_create',context,data_dict) @@ -70,7 +73,7 @@ def harvest_job_create(context,data_dict): exists = harvest_job_list(context,data_dict) if len(exists): log.warn('There is already an unrun job %r for this source %s', exists, source_id) - raise Exception('There already is an unrun job for this source') + raise HarvestJobExists('There already is an unrun job for this source') job = HarvestJob() job.source = source diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index c319641..831d0cd 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -2,6 +2,7 @@ import logging from sqlalchemy import or_ from ckan.authz import Authorizer from ckan.model import User +import datetime from ckan.plugins import PluginImplementations from ckanext.harvest.interfaces import IHarvester @@ -153,6 +154,7 @@ def _get_sources_for_user(context,data_dict): user = context.get('user','') only_active = data_dict.get('only_active',False) + only_to_run = data_dict.get('only_to_run',False) query = session.query(HarvestSource) \ .order_by(HarvestSource.created.desc()) @@ -160,6 +162,14 @@ def _get_sources_for_user(context,data_dict): if only_active: query = query.filter(HarvestSource.active==True) \ + if only_to_run: + query = query.filter(or_(HarvestSource.frequency!=None, + HarvestSource.frequency!='') + ) + query = query.filter(or_(HarvestSource.next_run<=datetime.datetime.utcnow(), + HarvestSource.next_run==None) + ) + # Sysadmins will get all sources if not Authorizer().is_sysadmin(user): # This only applies to a non sysadmin user when using the diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index e0f5d72..6422cab 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -1,8 +1,10 @@ import hashlib import logging +import datetime from ckan.plugins import PluginImplementations +from ckan.logic import get_action from ckanext.harvest.interfaces import IHarvester from ckan.model import Package @@ -14,10 +16,11 @@ from ckanext.harvest.queue import get_gather_publisher from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject) from ckanext.harvest.logic.schema import default_harvest_source_schema +from ckanext.harvest.logic import HarvestJobExists from ckanext.harvest.logic.dictization import (harvest_source_dictize,harvest_object_dictize) from ckanext.harvest.logic.action.create import _error_summary -from ckanext.harvest.logic.action.get import harvest_source_show,harvest_job_list +from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user log = logging.getLogger(__name__) @@ -132,12 +135,56 @@ def harvest_objects_import(context,data_dict): log.info('Harvest objects imported: %s', last_objects_count) return last_objects_count +def _caluclate_next_run(frequency): + + now = datetime.datetime.utcnow() + if frequency == 'ALWAYS': + return now + if frequency == 'WEEKLY': + return now + datetime.timedelta(weeks=1) + if frequency == 'BIWEEKLY': + return now + datetime.timedelta(weeks=2) + if frequency == 'DAILY': + return now + datetime.timedelta(days=1) + if frequency == 'MONTHLY': + if now.month in (4,6,9,11): + days = 30 + elif now.month == 2: + if now.year % 4 == 0: + days = 29 + else: + days = 28 + else: + days = 31 + return now + datetime.timedelta(days=days) + raise Exception('Frequency {freq} not recognised'.format(freq=frequency)) + + +def _make_scheduled_jobs(context, data_dict): + + data_dict = {'only_to_run': True, + 'only_active': True} + sources = _get_sources_for_user(context, data_dict) + + for source in sources: + data_dict = {'source_id': source.id} + try: + get_action('harvest_job_create')(context, data_dict) + except HarvestJobExists, e: + log.info('Trying to rerun job for %s skipping' % source.id) + + source.next_run = _caluclate_next_run(source.frequency) + source.save() + def harvest_jobs_run(context,data_dict): log.info('Harvest job run: %r', data_dict) check_access('harvest_jobs_run',context,data_dict) source_id = data_dict.get('source_id',None) + if not source_id: + _make_scheduled_jobs(context, data_dict) + # Check if there are pending harvest jobs jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'}) if len(jobs) == 0: diff --git a/ckanext/harvest/logic/schema.py b/ckanext/harvest/logic/schema.py index c95c7c8..8970947 100644 --- a/ckanext/harvest/logic/schema.py +++ b/ckanext/harvest/logic/schema.py @@ -11,7 +11,8 @@ from ckanext.harvest.logic.validators import (harvest_source_id_exists, harvest_source_url_validator, harvest_source_type_exists, harvest_source_config_validator, - harvest_source_active_validator,) + harvest_source_active_validator, + harvest_source_frequency_exists) def default_harvest_source_schema(): @@ -21,6 +22,7 @@ def default_harvest_source_schema(): 'type': [not_empty, unicode, harvest_source_type_exists], 'title': [ignore_missing,unicode], 'description': [ignore_missing,unicode], + 'frequency': [ignore_missing,unicode, harvest_source_frequency_exists], 'active': [ignore_missing,harvest_source_active_validator], 'user_id': [ignore_missing,unicode], 'config': [ignore_missing,harvest_source_config_validator] diff --git a/ckanext/harvest/logic/validators.py b/ckanext/harvest/logic/validators.py index e851649..c7a2deb 100644 --- a/ckanext/harvest/logic/validators.py +++ b/ckanext/harvest/logic/validators.py @@ -99,3 +99,7 @@ def harvest_source_active_validator(value,context): return False return bool(value) +def harvest_source_frequency_exists(value): + if value.upper() not in ['MONTHLY','ALWAYS','WEEKLY','BIWEEKLY','DAILY']: + raise Invalid('Frequency %s not recognised' % value) + return value.upper()