This commit is contained in:
Jari Voutilainen 2021-03-05 08:49:10 +02:00
parent 3c83f41131
commit 078077c456
15 changed files with 81 additions and 83 deletions

View File

@ -70,7 +70,7 @@ def show(ctx, id):
try:
with flask_app.test_request_context():
result = utils.show_harvest_source(id)
except tk.ObjectNotFound as e:
except tk.ObjectNotFound:
tk.error_shout(u"Source <{}> not found.".format(id))
raise click.Abort()
click.echo(result)
@ -186,7 +186,7 @@ def job_abort(ctx, id):
with flask_app.test_request_context():
try:
result = utils.abort_job(id)
except tk.ObjectNotFound as e:
except tk.ObjectNotFound:
tk.error_shout(u"Job not found.")
ctx.abort()
@ -344,7 +344,7 @@ def import_stage(
package_id,
segments,
)
except tk.ObjectNotFound as e:
except tk.ObjectNotFound:
tk.error_shout(u"Source <{}> not found.".format(id))

View File

@ -420,11 +420,11 @@ class HarvesterBase(SingletonPlugin):
.filter(
HarvestJob.gather_started != None # noqa: E711
).filter(HarvestJob.status == 'Finished') \
.filter(HarvestJob.id != harvest_job.id) \
.filter(
~exists().where(
HarvestGatherError.harvest_job_id == HarvestJob.id)) \
.order_by(HarvestJob.gather_started.desc())
.filter(HarvestJob.id != harvest_job.id) \
.filter(
~exists().where(
HarvestGatherError.harvest_job_id == HarvestJob.id)) \
.order_by(HarvestJob.gather_started.desc())
# now check them until we find one with no fetch/import errors
# (looping rather than doing sql, in case there are lots of objects
# and lots of jobs)

View File

@ -137,7 +137,7 @@ class CKANHarvester(HarvesterBase):
# save the dict to the config object, as we'll need it
# in the import_stage of every dataset
config_obj['default_group_dicts'].append(group)
except NotFound as e:
except NotFound:
raise ValueError('Default group not found')
config = json.dumps(config_obj)
@ -424,7 +424,7 @@ class CKANHarvester(HarvesterBase):
else:
raise NotFound
except NotFound as e:
except NotFound:
if 'name' in group_:
data_dict = {'id': group_['name']}
group = get_action('group_show')(base_context.copy(), data_dict)
@ -433,7 +433,7 @@ class CKANHarvester(HarvesterBase):
# Found local group
validated_groups.append({'id': group['id'], 'name': group['name']})
except NotFound as e:
except NotFound:
log.info('Group %s is not available', group_)
if remote_groups == 'create':
try:
@ -473,7 +473,7 @@ class CKANHarvester(HarvesterBase):
data_dict = {'id': remote_org}
org = get_action('organization_show')(base_context.copy(), data_dict)
validated_org = org['id']
except NotFound as e:
except NotFound:
log.info('Organization %s is not available', remote_org)
if remote_orgs == 'create':
try:

View File

@ -111,7 +111,7 @@ def harvest_source_show_status(context, data_dict):
.join(harvest_model.HarvestObject) \
.filter(harvest_model.HarvestObject.harvest_source_id == source.id) \
.filter(
harvest_model.HarvestObject.current == True # noqa: E711
harvest_model.HarvestObject.current == True # noqa: E712
).filter(model.Package.state == u'active') \
.filter(model.Package.private == False)
out['total_datasets'] = packages.count()
@ -125,7 +125,6 @@ def harvest_source_list(context, data_dict):
TODO: Use package search
'''
organization_id = data_dict.get('organization_id')
limit = config.get('ckan.harvest.harvest_source_limit', 100)
@ -418,10 +417,11 @@ def _get_sources_for_user(context, data_dict, organization_id=None, limit=None):
return sources
def harvest_get_notifications_recipients(context, data_dict):
""" get all recipients for a harvest source
Return a list of dicts like {'name': 'Jhon', 'email': jhon@source.com'} """
check_access('harvest_get_notifications_recipients', context, data_dict)
source_id = data_dict['source_id']
@ -457,5 +457,5 @@ def harvest_get_notifications_recipients(context, data_dict):
'name': member_details['name'],
'email': member_details['email']
})
return recipients
return recipients

View File

@ -12,7 +12,7 @@ from sqlalchemy import and_, or_
from six.moves.urllib.parse import urljoin
from ckan.lib.search.index import PackageSearchIndex
from ckan.plugins import toolkit, PluginImplementations, IActions
from ckan.plugins import toolkit, PluginImplementations
from ckan.logic import get_action
from ckanext.harvest.interfaces import IHarvester
from ckan.lib.search.common import SearchIndexError, make_connection
@ -29,7 +29,7 @@ from ckanext.harvest.utils import (
from ckanext.harvest.queue import (
get_gather_publisher, resubmit_jobs, resubmit_objects)
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, HarvestGatherError, HarvestObjectError
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, HarvestGatherError
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.dictization import harvest_job_dictize
@ -574,7 +574,7 @@ def harvest_jobs_run(context, data_dict):
resubmits queue items if needed.
If ckanext.harvest.timeout is set:
Check if the duration of the job is longer than ckanext.harvest.timeout,
Check if the duration of the job is longer than ckanext.harvest.timeout,
then mark that job as finished as there is probably an underlying issue with the harvest process.
This should be called every few minutes (e.g. by a cron), or else jobs
@ -617,7 +617,7 @@ def harvest_jobs_run(context, data_dict):
msg += '\tJob created: {}\n'.format(job_obj.created)
msg += '\tJob gather finished: {}\n'.format(job_obj.created)
msg += '\tJob last action time: {}\n'.format(last_time)
job_obj.status = u'Finished'
job_obj.finished = now
job_obj.save()
@ -637,7 +637,7 @@ def harvest_jobs_run(context, data_dict):
.count()
if num_objects_in_progress == 0:
job_obj.status = u'Finished'
log.info('Marking job as finished %s %s',
job_obj.source.url, job_obj.id)
@ -668,7 +668,7 @@ def harvest_jobs_run(context, data_dict):
notify_errors = toolkit.asbool(config.get('ckan.harvest.status_mail.errored'))
last_job_errors = status['last_job']['stats'].get('errored', 0)
log.debug('Notifications: All:{} On error:{} Errors:{}'.format(notify_all, notify_errors, last_job_errors))
if last_job_errors > 0 and (notify_all or notify_errors):
send_error_email(context, job_obj.source.id, status)
elif notify_all:
@ -689,13 +689,13 @@ def harvest_jobs_run(context, data_dict):
def get_mail_extra_vars(context, source_id, status):
last_job = status['last_job']
source = get_action('harvest_source_show')(context, {'id': source_id})
report = get_action(
'harvest_job_report')(context, {'id': status['last_job']['id']})
obj_errors = []
job_errors = []
for harvest_object_error_key in islice(report.get('object_errors'), 0, 20):
harvest_object_error = report.get(
'object_errors')[harvest_object_error_key]['errors']
@ -745,22 +745,25 @@ def get_mail_extra_vars(context, source_id, status):
return extra_vars
def prepare_summary_mail(context, source_id, status):
extra_vars = get_mail_extra_vars(context, source_id, status)
body = render_jinja2('emails/summary_email.txt', extra_vars)
subject = '{} - Harvesting Job Successful - Summary Notification'\
.format(config.get('ckan.site_title'))
.format(config.get('ckan.site_title'))
return subject, body
def prepare_error_mail(context, source_id, status):
extra_vars = get_mail_extra_vars(context, source_id, status)
body = render_jinja2('emails/error_email.txt', extra_vars)
subject = '{} - Harvesting Job - Error Notification'\
.format(config.get('ckan.site_title'))
.format(config.get('ckan.site_title'))
return subject, body
def send_summary_email(context, source_id, status):
subject, body = prepare_summary_mail(context, source_id, status)
recipients = toolkit.get_action('harvest_get_notifications_recipients')(context, {'source_id': source_id})
@ -773,7 +776,7 @@ def send_error_email(context, source_id, status):
def send_mail(recipients, subject, body):
for recipient in recipients:
email = {'recipient_name': recipient['name'],
'recipient_email': recipient['email'],
@ -927,7 +930,7 @@ def harvest_source_reindex(context, data_dict):
defer_commit = context.get('defer_commit', False)
if 'extras_as_string'in context:
if 'extras_as_string' in context:
del context['extras_as_string']
context.update({'ignore_auth': True})
package_dict = logic.get_action('harvest_source_show')(

View File

@ -135,5 +135,5 @@ def harvesters_info_show(context, data_dict):
def harvest_get_notifications_recipients(context, data_dict):
# Only sysadmins can access this
# Only sysadmins can access this
return {'success': False}

View File

@ -1,8 +1,8 @@
from sqlalchemy import distinct, func, text
from sqlalchemy import func, text
from ckan.model import Package, Group
from ckan.model import Group
from ckan import logic
from ckanext.harvest.model import (HarvestSource, HarvestJob, HarvestObject,
from ckanext.harvest.model import (HarvestJob, HarvestObject,
HarvestGatherError, HarvestObjectError)
@ -111,8 +111,6 @@ def _get_source_status(source, context):
TODO: Deprecated, use harvest_source_show_status instead
'''
model = context.get('model')
out = dict()
job_count = HarvestJob.filter(source=source).count()

View File

@ -224,7 +224,7 @@ def harvest_source_convert_from_config(key, data, errors, context):
if config:
try:
config_dict = json.loads(config)
except ValueError as e:
except ValueError:
log.error('Wrong JSON provided config, skipping')
data[key] = None
return

View File

@ -79,19 +79,19 @@ def setup():
if "harvest_job_id_idx" not in index_names:
log.debug('Creating index for harvest_object')
Index("harvest_job_id_idx", harvest_object_table.c.harvest_job_id).create()
if "harvest_source_id_idx" not in index_names:
log.debug('Creating index for harvest source')
Index("harvest_source_id_idx", harvest_object_table.c.harvest_source_id).create()
if "package_id_idx" not in index_names:
log.debug('Creating index for package')
Index("package_id_idx", harvest_object_table.c.package_id).create()
if "guid_idx" not in index_names:
log.debug('Creating index for guid')
Index("guid_idx", harvest_object_table.c.guid).create()
index_names = [index['name'] for index in inspector.get_indexes("harvest_object_extra")]
if "harvest_object_id_idx" not in index_names:
log.debug('Creating index for harvest_object_extra')
@ -137,10 +137,10 @@ class HarvestSource(HarvestDomainObject):
def __str__(self):
return self.__repr__().encode('ascii', 'ignore')
def get_jobs(self, status=None):
""" get the running jobs for this source """
query = Session.query(HarvestJob).filter(HarvestJob.source_id == self.id)
if status is not None:
@ -160,54 +160,54 @@ class HarvestJob(HarvestDomainObject):
(``HarvestObjectError``) are stored in the ``harvest_object_error``
table.
'''
def get_last_finished_object(self):
''' Determine the last finished object in this job
Helpful to know if a job is running or not and
Helpful to know if a job is running or not and
to avoid timeouts when the source is running
'''
query = Session.query(HarvestObject)\
.filter(HarvestObject.harvest_job_id == self.id)\
.filter(HarvestObject.state == "COMPLETE")\
.filter(HarvestObject.import_finished.isnot(None))\
.order_by(HarvestObject.import_finished.desc())\
.first()
.filter(HarvestObject.harvest_job_id == self.id)\
.filter(HarvestObject.state == "COMPLETE")\
.filter(HarvestObject.import_finished.isnot(None))\
.order_by(HarvestObject.import_finished.desc())\
.first()
return query
def get_last_gathered_object(self):
''' Determine the last gathered object in this job
Helpful to know if a job is running or not and
to avoid timeouts when the source is running
'''
query = Session.query(HarvestObject)\
.filter(HarvestObject.harvest_job_id == self.id)\
.order_by(HarvestObject.gathered.desc())\
.first()
.filter(HarvestObject.harvest_job_id == self.id)\
.order_by(HarvestObject.gathered.desc())\
.first()
return query
def get_last_action_time(self):
last_object = self.get_last_finished_object()
if last_object is not None:
return last_object.import_finished
if self.gather_finished is not None:
return self.gather_finished
last_gathered_object = self.get_last_gathered_object()
if last_gathered_object is not None:
return last_gathered_object.gathered
return self.created
def get_gather_errors(self):
query = Session.query(HarvestGatherError)\
.filter(HarvestGatherError.harvest_job_id == self.id)\
.order_by(HarvestGatherError.created.desc())
return query.all()

View File

@ -4,6 +4,7 @@ import ckan.plugins as p
import ckanext.harvest.cli as cli
import ckanext.harvest.views as views
class MixinPlugin(p.SingletonPlugin):
p.implements(p.IClick)
p.implements(p.IBlueprint)

View File

@ -3,6 +3,7 @@
import ckan.plugins as p
from ckanext.harvest.utils import DATASET_TYPE_NAME
class MixinPlugin(p.SingletonPlugin):
p.implements(p.IRoutes, inherit=True)

View File

@ -465,7 +465,7 @@ def fetch_callback(channel, method, header, body):
channel.basic_ack(method.delivery_tag)
return False
# check if job has been set to finished
# check if job has been set to finished
job = HarvestJob.get(obj.harvest_job_id)
if job.status == 'Finished':
obj.state = "ERROR"

View File

@ -3,7 +3,6 @@ from __future__ import print_function
import json
import re
import copy
import urllib
import six
from six.moves.urllib.parse import unquote_plus

View File

@ -1,5 +1,4 @@
import pytest
import six
from mock import patch
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
@ -335,7 +334,5 @@ class TestHarvestCorruptRedis(object):
args, _ = mock_log_error.call_args_list[0]
assert "concatenate" in str(args[1])
finally:
redis.delete('ckanext-harvest:some-random-key-2')

View File

@ -5,7 +5,6 @@ from ckan.tests import factories as ckan_factories
from ckan import model
from ckan.lib.base import config
from ckan.plugins.toolkit import get_action
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.tests import factories as harvest_factories
from ckanext.harvest.logic import HarvestJobExists
@ -14,18 +13,18 @@ from ckanext.harvest.logic import HarvestJobExists
@pytest.mark.ckan_config('ckan.plugins', 'harvest test_action_harvester')
class TestModelFunctions:
dataset_counter = 0
def test_timeout_jobs(self):
""" Create harvest source, job and objects
Validate we read the last object fished time
Validate we raise timeout in harvest_jobs_run_action
"""
source, job = self.get_source()
ob1 = self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=10)
self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=10)
ob2 = self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=5)
ob3 = self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=15)
self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=15)
assert_equal(job.get_last_finished_object(), ob2)
assert_equal(job.get_last_action_time(), ob2.import_finished)
@ -34,14 +33,14 @@ class TestModelFunctions:
assert_equal(job.status, 'Finished')
gather_error = gather_errors[0]
assert_in('timeout', gather_error.message)
def test_no_timeout_jobs(self):
""" Test a job that don't raise timeout """
source, job = self.get_source()
ob1 = self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=10)
self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=10)
ob2 = self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=5)
ob3 = self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=15)
self.add_object(job=job, source=source, state='COMPLETE', minutes_ago=15)
assert_equal(job.get_last_finished_object(), ob2)
assert_equal(job.get_last_action_time(), ob2.import_finished)
@ -74,10 +73,10 @@ class TestModelFunctions:
""" Test get_last_action_time at gather stage """
source, job = self.get_source()
ob1 = self.add_object(job=job, source=source, state='WAITING')
ob2 = self.add_object(job=job, source=source, state='WAITING')
self.add_object(job=job, source=source, state='WAITING')
self.add_object(job=job, source=source, state='WAITING')
ob3 = self.add_object(job=job, source=source, state='WAITING')
assert_equal(job.get_last_gathered_object(), ob3)
assert_equal(job.get_last_action_time(), ob3.gathered)
@ -103,7 +102,7 @@ class TestModelFunctions:
config['ckan.harvest.timeout'] = timeout
harvest_jobs_run_action = get_action('harvest_jobs_run')
harvest_jobs_run_action(context, data_dict)
return job.get_gather_errors()
def get_source(self):
@ -121,7 +120,7 @@ class TestModelFunctions:
job = harvest_factories.HarvestJobObj(source=source)
except HarvestJobExists: # not sure why
job = source.get_jobs()[0]
job.status = 'Running'
job.save()
@ -129,7 +128,7 @@ class TestModelFunctions:
assert_in(job, jobs)
return source, job
def add_object(self, job, source, state, minutes_ago=0):
now = datetime.utcnow()
self.dataset_counter += 1