add jobs at certain frequencies

This commit is contained in:
kindly 2012-10-29 17:15:02 +00:00
parent 9fc0ae9937
commit 2529a17304
7 changed files with 87 additions and 9 deletions

View File

@ -15,7 +15,7 @@ class Harvester(CkanCommand):
harvester initdb
- Creates the necessary tables in the database
harvester source {url} {type} [{active}] [{user-id}] [{publisher-id}]
harvester source {url} {type} [{config}] [{active}] [{user-id}] [{publisher-id}] [{frequency}]
- create new harvest source
harvester rmsource {id}
@ -64,7 +64,7 @@ class Harvester(CkanCommand):
summary = __doc__.split('\n')[0]
usage = __doc__
max_args = 6
max_args = 8
min_args = 0
def __init__(self,name):
@ -169,11 +169,18 @@ class Harvester(CkanCommand):
publisher_id = unicode(self.args[6])
else:
publisher_id = u''
if len(self.args) >= 8:
frequency = unicode(self.args[7])
if not frequency:
frequency = None
else:
frequency = None
try:
data_dict = {
'url':url,
'type':type,
'config':config,
'frequency':frequency,
'active':active,
'user_id':user_id,
'publisher_id':publisher_id}
@ -186,9 +193,11 @@ class Harvester(CkanCommand):
sources = get_action('harvest_source_list')(context,{})
self.print_there_are('harvest source', sources)
# Create a harvest job for the new source
# Create a harvest job for the new source if not regular job.
if not data_dict['frequency']:
get_action('harvest_job_create')(context,{'source_id':source['id']})
print 'A new Harvest Job for this source has also been created'
except ValidationError,e:
print 'An error occurred:'
print str(e.error_dict)
@ -278,6 +287,7 @@ class Harvester(CkanCommand):
print ' active: %s' % source['active']
print ' user: %s' % source['user_id']
print 'publisher: %s' % source['publisher_id']
print 'frequency: %s' % source['frequency']
print ' jobs: %s' % source['status']['job_count']
print ''

View File

@ -5,3 +5,5 @@ except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)
class HarvestJobExists(Exception):
pass

View File

@ -2,6 +2,7 @@ import re
import logging
from ckan.logic import NotFound, ValidationError, check_access
from ckanext.harvest.logic import HarvestJobExists
from ckan.lib.navl.dictization_functions import validate
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
@ -32,7 +33,8 @@ def harvest_source_create(context,data_dict):
source.url = data['url'].strip()
source.type = data['type']
opt = ['active','title','description','user_id','publisher_id','config']
opt = ['active','title','description','user_id',
'publisher_id','config', 'frequency']
for o in opt:
if o in data and data[o] is not None:
source.__setattr__(o,data[o])
@ -45,6 +47,7 @@ def harvest_source_create(context,data_dict):
return harvest_source_dictize(source,context)
def harvest_job_create(context,data_dict):
log.info('Harvest job create: %r', data_dict)
check_access('harvest_job_create',context,data_dict)
@ -70,7 +73,7 @@ def harvest_job_create(context,data_dict):
exists = harvest_job_list(context,data_dict)
if len(exists):
log.warn('There is already an unrun job %r for this source %s', exists, source_id)
raise Exception('There already is an unrun job for this source')
raise HarvestJobExists('There already is an unrun job for this source')
job = HarvestJob()
job.source = source

View File

@ -2,6 +2,7 @@ import logging
from sqlalchemy import or_
from ckan.authz import Authorizer
from ckan.model import User
import datetime
from ckan.plugins import PluginImplementations
from ckanext.harvest.interfaces import IHarvester
@ -153,6 +154,7 @@ def _get_sources_for_user(context,data_dict):
user = context.get('user','')
only_active = data_dict.get('only_active',False)
only_to_run = data_dict.get('only_to_run',False)
query = session.query(HarvestSource) \
.order_by(HarvestSource.created.desc())
@ -160,6 +162,14 @@ def _get_sources_for_user(context,data_dict):
if only_active:
query = query.filter(HarvestSource.active==True) \
if only_to_run:
query = query.filter(or_(HarvestSource.frequency!=None,
HarvestSource.frequency!='')
)
query = query.filter(or_(HarvestSource.next_run<=datetime.datetime.utcnow(),
HarvestSource.next_run==None)
)
# Sysadmins will get all sources
if not Authorizer().is_sysadmin(user):
# This only applies to a non sysadmin user when using the

View File

@ -1,8 +1,10 @@
import hashlib
import logging
import datetime
from ckan.plugins import PluginImplementations
from ckan.logic import get_action
from ckanext.harvest.interfaces import IHarvester
from ckan.model import Package
@ -14,10 +16,11 @@ from ckanext.harvest.queue import get_gather_publisher
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject)
from ckanext.harvest.logic.schema import default_harvest_source_schema
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.dictization import (harvest_source_dictize,harvest_object_dictize)
from ckanext.harvest.logic.action.create import _error_summary
from ckanext.harvest.logic.action.get import harvest_source_show,harvest_job_list
from ckanext.harvest.logic.action.get import harvest_source_show, harvest_job_list, _get_sources_for_user
log = logging.getLogger(__name__)
@ -132,12 +135,56 @@ def harvest_objects_import(context,data_dict):
log.info('Harvest objects imported: %s', last_objects_count)
return last_objects_count
def _caluclate_next_run(frequency):
now = datetime.datetime.utcnow()
if frequency == 'ALWAYS':
return now
if frequency == 'WEEKLY':
return now + datetime.timedelta(weeks=1)
if frequency == 'BIWEEKLY':
return now + datetime.timedelta(weeks=2)
if frequency == 'DAILY':
return now + datetime.timedelta(days=1)
if frequency == 'MONTHLY':
if now.month in (4,6,9,11):
days = 30
elif now.month == 2:
if now.year % 4 == 0:
days = 29
else:
days = 28
else:
days = 31
return now + datetime.timedelta(days=days)
raise Exception('Frequency {freq} not recognised'.format(freq=frequency))
def _make_scheduled_jobs(context, data_dict):
data_dict = {'only_to_run': True,
'only_active': True}
sources = _get_sources_for_user(context, data_dict)
for source in sources:
data_dict = {'source_id': source.id}
try:
get_action('harvest_job_create')(context, data_dict)
except HarvestJobExists, e:
log.info('Trying to rerun job for %s skipping' % source.id)
source.next_run = _caluclate_next_run(source.frequency)
source.save()
def harvest_jobs_run(context,data_dict):
log.info('Harvest job run: %r', data_dict)
check_access('harvest_jobs_run',context,data_dict)
source_id = data_dict.get('source_id',None)
if not source_id:
_make_scheduled_jobs(context, data_dict)
# Check if there are pending harvest jobs
jobs = harvest_job_list(context,{'source_id':source_id,'status':u'New'})
if len(jobs) == 0:

View File

@ -11,7 +11,8 @@ from ckanext.harvest.logic.validators import (harvest_source_id_exists,
harvest_source_url_validator,
harvest_source_type_exists,
harvest_source_config_validator,
harvest_source_active_validator,)
harvest_source_active_validator,
harvest_source_frequency_exists)
def default_harvest_source_schema():
@ -21,6 +22,7 @@ def default_harvest_source_schema():
'type': [not_empty, unicode, harvest_source_type_exists],
'title': [ignore_missing,unicode],
'description': [ignore_missing,unicode],
'frequency': [ignore_missing,unicode, harvest_source_frequency_exists],
'active': [ignore_missing,harvest_source_active_validator],
'user_id': [ignore_missing,unicode],
'config': [ignore_missing,harvest_source_config_validator]

View File

@ -99,3 +99,7 @@ def harvest_source_active_validator(value,context):
return False
return bool(value)
def harvest_source_frequency_exists(value):
if value.upper() not in ['MONTHLY','ALWAYS','WEEKLY','BIWEEKLY','DAILY']:
raise Invalid('Frequency %s not recognised' % value)
return value.upper()