Merge branch 'master' into 369-broken-tests

This commit is contained in:
amercader 2019-08-23 14:40:50 +02:00
commit 18e8f5ce13
36 changed files with 845 additions and 735 deletions

View File

@ -20,3 +20,22 @@ script: sh bin/travis-run.sh
# the new trusty images of Travis cause build errors with psycopg2, see https://github.com/travis-ci/travis-ci/issues/8897
dist: trusty
group: deprecated-2017Q4
stages:
- Flake8
- test
jobs:
include:
- stage: Flake8
env: FLAKE8=True
install:
- bash bin/travis-build.bash
- pip install flake8==3.5.0
- pip install pycodestyle==2.3.0
script:
- flake8 --version
# stop the build if there are Python syntax errors or undefined names
- flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --exclude ckan
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
- flake8 . --count --max-line-length=127 --statistics --exclude ckan

View File

@ -10,10 +10,17 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`_.
***********
Unreleased_
***********
Changed
-------
- Apply flake8 to be PEP-8 compliant #354
- Use ckantoolkit to clean up imports #358
Fixed
-----
- harvest_source_type_exists validator should not fail if Harvester has no ``info()`` method #338
- Fix SSL problems for old versions of Python 2.7.x #344
- Add an 'owner_org' to the v3 package migration #348
- Fix harvest request exceptions #357
*******************
1.1.4_ - 2018-10-26

View File

@ -5,4 +5,3 @@ try:
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -3,4 +3,4 @@ try:
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -122,20 +122,19 @@ class Harvester(CkanCommand):
super(Harvester, self).__init__(name)
self.parser.add_option('-j', '--no-join-datasets', dest='no_join_datasets',
action='store_true', default=False, help='Do not join harvest objects to existing datasets')
action='store_true', default=False, help='Do not join harvest objects to existing datasets')
self.parser.add_option('-o', '--harvest-object-id', dest='harvest_object_id',
default=False, help='Id of the harvest object to which perform the import stage')
default=False, help='Id of the harvest object to which perform the import stage')
self.parser.add_option('-p', '--package-id', dest='package_id',
default=False, help='Id of the package whose harvest object to perform the import stage for')
default=False, help='Id of the package whose harvest object to perform the import stage for')
self.parser.add_option('-g', '--guid', dest='guid',
default=False, help='Guid of the harvest object to which perform the import stage for')
default=False, help='Guid of the harvest object to which perform the import stage for')
self.parser.add_option('--segments', dest='segments',
default=False, help=
'''A string containing hex digits that represent which of
default=False, help='''A string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f''')
def command(self):
@ -178,7 +177,7 @@ class Harvester(CkanCommand):
elif cmd == 'gather_consumer':
import logging
from ckanext.harvest.queue import (get_gather_consumer,
gather_callback, get_gather_queue_name)
gather_callback, get_gather_queue_name)
logging.getLogger('amqplib').setLevel(logging.INFO)
consumer = get_gather_consumer()
for method, header, body in consumer.consume(queue=get_gather_queue_name()):
@ -187,10 +186,10 @@ class Harvester(CkanCommand):
import logging
logging.getLogger('amqplib').setLevel(logging.INFO)
from ckanext.harvest.queue import (get_fetch_consumer, fetch_callback,
get_fetch_queue_name)
get_fetch_queue_name)
consumer = get_fetch_consumer()
for method, header, body in consumer.consume(queue=get_fetch_queue_name()):
fetch_callback(consumer, method, header, body)
fetch_callback(consumer, method, header, body)
elif cmd == 'purge_queues':
self.purge_queues()
elif cmd == 'initdb':
@ -242,8 +241,8 @@ class Harvester(CkanCommand):
else:
title = None
if len(self.args) >= 6:
active = not(self.args[5].lower() == 'false' or \
self.args[5] == '0')
active = not(self.args[5].lower() == 'false' or
self.args[5] == '0')
else:
active = True
if len(self.args) >= 7:
@ -293,9 +292,9 @@ class Harvester(CkanCommand):
print('A new Harvest Job for this source has also been created')
except ValidationError as e:
print('An error occurred:')
print(str(e.error_dict))
raise e
print('An error occurred:')
print(str(e.error_dict))
raise e
def clear_harvest_source_history(self):
source_id = None
@ -319,7 +318,6 @@ class Harvester(CkanCommand):
cleared_sources_dicts = get_action('harvest_sources_job_history_clear')(context, {})
print('Cleared job history for all harvest sources: {0} source(s)'.format(len(cleared_sources_dicts)))
def show_harvest_source(self):
if len(self.args) >= 2:
@ -442,7 +440,7 @@ class Harvester(CkanCommand):
context, {'source_id': source['id'], 'status': 'Running'})
if running_jobs:
print('\nSource "{0}" apparently has a "Running" job:\n{1}'
.format(source.get('name') or source['id'], running_jobs))
.format(source.get('name') or source['id'], running_jobs))
resp = raw_input('Abort it? (y/n)')
if not resp.lower().startswith('y'):
sys.exit(1)
@ -516,9 +514,9 @@ class Harvester(CkanCommand):
# 'type' if source comes from HarvestSource, 'source_type' if it comes
# from the Package
print(' type: {0}'.format(source.get('source_type') or
source.get('type')))
source.get('type')))
print(' active: {0}'.format(source.get('active',
source.get('state') == 'active')))
source.get('state') == 'active')))
print('frequency: {0}'.format(source.get('frequency')))
print(' jobs: {0}'.format(source.get('status').get('job_count')))
print('')

View File

@ -4,4 +4,3 @@ try:
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -15,7 +15,8 @@ from pylons.i18n import _
from ckan import model
import ckan.plugins as p
import ckan.lib.helpers as h, json
import ckan.lib.helpers as h
import json
from ckan.lib.base import BaseController, c, request, response, render, abort
from ckanext.harvest.logic import HarvestJobExists, HarvestSourceInactiveError
@ -131,7 +132,7 @@ class ViewController(BaseController):
re.sub('<\?xml(.*)\?>', '', content.encode('utf-8'))
)
response.content_type = 'application/xml; charset=utf-8'
if not '<?xml' in content.split('\n')[0]:
if '<?xml' not in content.split('\n')[0]:
content = u'<?xml version="1.0" encoding="UTF-8"?>\n' + content
except xml_parser_exception:
@ -232,6 +233,8 @@ class ViewController(BaseController):
def abort_job(self, source, id):
try:
context = {'model': model, 'user': c.user}
p.toolkit.get_action('harvest_job_abort')(context, {'id': id})
h.flash_success(_('Harvest job stopped'))
except p.toolkit.ObjectNotFound:

View File

@ -1,3 +1,4 @@
from ckanext.harvest.harvesters.ckanharvester import CKANHarvester
from ckanext.harvest.harvesters.base import HarvesterBase
__all__ = ['CKANHarvester', 'HarvesterBase']

View File

@ -217,12 +217,12 @@ class HarvesterBase(SingletonPlugin):
if len(remote_ids):
for remote_id in remote_ids:
# Create a new HarvestObject for this identifier
obj = HarvestObject(guid = remote_id, job = harvest_job)
obj = HarvestObject(guid=remote_id, job=harvest_job)
obj.save()
object_ids.append(obj.id)
return object_ids
else:
self._save_gather_error('No remote datasets could be identified', harvest_job)
self._save_gather_error('No remote datasets could be identified', harvest_job)
except Exception, e:
self._save_gather_error('%r' % e.message, harvest_job)
@ -309,11 +309,11 @@ class HarvesterBase(SingletonPlugin):
package_dict['name'] = existing_package_dict['name']
# Check modified date
if not 'metadata_modified' in package_dict or \
if 'metadata_modified' not in package_dict or \
package_dict['metadata_modified'] > existing_package_dict.get('metadata_modified'):
log.info('Package with GUID %s exists and needs to be updated' % harvest_object.guid)
# Update package
context.update({'id':package_dict['id']})
context.update({'id': package_dict['id']})
package_dict.setdefault('name',
existing_package_dict['name'])
@ -329,9 +329,9 @@ class HarvesterBase(SingletonPlugin):
# Flag the other objects linking to this package as not current anymore
from ckanext.harvest.model import harvest_object_table
conn = Session.connection()
u = update(harvest_object_table) \
.where(harvest_object_table.c.package_id==bindparam('b_package_id')) \
.values(current=False)
u = update(harvest_object_table)\
.where(harvest_object_table.c.package_id == bindparam('b_package_id')) \
.values(current=False)
conn.execute(u, b_package_id=new_package['id'])
# Flag this as the current harvest object
@ -374,10 +374,11 @@ class HarvesterBase(SingletonPlugin):
except p.toolkit.ValidationError, e:
log.exception(e)
self._save_object_error('Invalid package with GUID %s: %r'%(harvest_object.guid,e.error_dict),harvest_object,'Import')
self._save_object_error('Invalid package with GUID %s: %r' % (harvest_object.guid, e.error_dict),
harvest_object, 'Import')
except Exception, e:
log.exception(e)
self._save_object_error('%r'%e,harvest_object,'Import')
self._save_object_error('%r' % e, harvest_object, 'Import')
return None
@ -394,17 +395,17 @@ class HarvesterBase(SingletonPlugin):
# update the dict and return it
tag_dict[key] = newvalue
return tag_dict
# assume it's in the package_show form
# assume it's in the package_show form
tags = [_update_tag(t, 'name', munge_tag(t['name'])) for t in tags if munge_tag(t['name']) != '']
except TypeError: # a TypeError is raised if `t` above is a string
# REST format: 'tags' is a list of strings
tags = [munge_tag(t) for t in tags if munge_tag(t) != '']
tags = list(set(tags))
return tags
return tags
except TypeError: # a TypeError is raised if `t` above is a string
# REST format: 'tags' is a list of strings
tags = [munge_tag(t) for t in tags if munge_tag(t) != '']
tags = list(set(tags))
return tags
return tags
@classmethod
def last_error_free_job(cls, harvest_job):
@ -413,8 +414,9 @@ class HarvesterBase(SingletonPlugin):
jobs = \
model.Session.query(HarvestJob) \
.filter(HarvestJob.source == harvest_job.source) \
.filter(HarvestJob.gather_started != None) \
.filter(HarvestJob.status == 'Finished') \
.filter(
HarvestJob.gather_started != None # noqa: E711
).filter(HarvestJob.status == 'Finished') \
.filter(HarvestJob.id != harvest_job.id) \
.filter(
~exists().where(
@ -431,4 +433,3 @@ class HarvesterBase(SingletonPlugin):
break
else:
return job

View File

@ -1,5 +1,5 @@
import requests
from requests.exceptions import RequestException
from requests.exceptions import HTTPError, RequestException
import datetime
from urllib3.contrib import pyopenssl
@ -43,8 +43,10 @@ class CKANHarvester(HarvesterBase):
try:
http_request = requests.get(url, headers=headers)
except HTTPError as e:
raise ContentFetchError('HTTP error: %s %s' % (e.response.status_code, e.request.url))
except RequestException as e:
raise ContentFetchError('HTTP error: %s' % e.code)
raise ContentFetchError('Request error: %s' % e)
except Exception as e:
raise ContentFetchError('HTTP general exception: %s' % e)
return http_request.text
@ -142,20 +144,20 @@ class CKANHarvester(HarvesterBase):
raise ValueError('default_extras must be a dictionary')
if 'organizations_filter_include' in config_obj \
and 'organizations_filter_exclude' in config_obj:
and 'organizations_filter_exclude' in config_obj:
raise ValueError('Harvest configuration cannot contain both '
'organizations_filter_include and organizations_filter_exclude')
'organizations_filter_include and organizations_filter_exclude')
if 'groups_filter_include' in config_obj \
and 'groups_filter_exclude' in config_obj:
and 'groups_filter_exclude' in config_obj:
raise ValueError('Harvest configuration cannot contain both '
'groups_filter_include and groups_filter_exclude')
'groups_filter_include and groups_filter_exclude')
if 'user' in config_obj:
# Check if user exists
context = {'model': model, 'user': toolkit.c.user}
try:
user = get_action('user_show')(
get_action('user_show')(
context, {'id': config_obj.get('user')})
except NotFound:
raise ValueError('User not found')
@ -388,17 +390,17 @@ class CKANHarvester(HarvesterBase):
# Set default tags if needed
default_tags = self.config.get('default_tags', [])
if default_tags:
if not 'tags' in package_dict:
if 'tags' not in package_dict:
package_dict['tags'] = []
package_dict['tags'].extend(
[t for t in default_tags if t not in package_dict['tags']])
remote_groups = self.config.get('remote_groups', None)
if not remote_groups in ('only_local', 'create'):
if remote_groups not in ('only_local', 'create'):
# Ignore remote groups
package_dict.pop('groups', None)
else:
if not 'groups' in package_dict:
if 'groups' not in package_dict:
package_dict['groups'] = []
# check if remote groups exist locally, otherwise remove
@ -446,11 +448,11 @@ class CKANHarvester(HarvesterBase):
remote_orgs = self.config.get('remote_orgs', None)
if not remote_orgs in ('only_local', 'create'):
if remote_orgs not in ('only_local', 'create'):
# Assign dataset to the source organization
package_dict['owner_org'] = local_org
else:
if not 'owner_org' in package_dict:
if 'owner_org' not in package_dict:
package_dict['owner_org'] = None
# check if remote org exist locally, otherwise remove
@ -473,7 +475,8 @@ class CKANHarvester(HarvesterBase):
# this especially targets older versions of CKAN
org = self._get_group(harvest_object.source.url, remote_org)
for key in ['packages', 'created', 'users', 'groups', 'tags', 'extras', 'display_name', 'type']:
for key in ['packages', 'created', 'users', 'groups', 'tags',
'extras', 'display_name', 'type']:
org.pop(key, None)
get_action('organization_create')(base_context.copy(), org)
log.info('Organization %s has been newly created', remote_org)
@ -486,7 +489,7 @@ class CKANHarvester(HarvesterBase):
# Set default groups if needed
default_groups = self.config.get('default_groups', [])
if default_groups:
if not 'groups' in package_dict:
if 'groups' not in package_dict:
package_dict['groups'] = []
existing_group_ids = [g['id'] for g in package_dict['groups']]
package_dict['groups'].extend(
@ -495,13 +498,14 @@ class CKANHarvester(HarvesterBase):
# Set default extras if needed
default_extras = self.config.get('default_extras', {})
def get_extra(key, package_dict):
for extra in package_dict.get('extras', []):
if extra['key'] == key:
return extra
if default_extras:
override_extras = self.config.get('override_extras', False)
if not 'extras' in package_dict:
if 'extras' not in package_dict:
package_dict['extras'] = []
for key, value in default_extras.iteritems():
existing_extra = get_extra(key, package_dict)
@ -513,10 +517,8 @@ class CKANHarvester(HarvesterBase):
if isinstance(value, basestring):
value = value.format(
harvest_source_id=harvest_object.job.source.id,
harvest_source_url=
harvest_object.job.source.url.strip('/'),
harvest_source_title=
harvest_object.job.source.title,
harvest_source_url=harvest_object.job.source.url.strip('/'),
harvest_source_title=harvest_object.job.source.title,
harvest_job_id=harvest_object.job.id,
harvest_object_id=harvest_object.id,
dataset_id=package_dict['id'])
@ -549,9 +551,11 @@ class CKANHarvester(HarvesterBase):
class ContentFetchError(Exception):
pass
class ContentNotFoundError(ContentFetchError):
pass
class RemoteResourceError(Exception):
pass

View File

@ -37,7 +37,7 @@ def package_list_for_source(source_id):
page = int(request.params.get('page', 1))
fq = '+harvest_source_id:"{0}"'.format(source_id)
search_dict = {
'fq' : fq,
'fq': fq,
'rows': limit,
'sort': 'metadata_modified desc',
'start': (page - 1) * limit,
@ -48,13 +48,14 @@ def package_list_for_source(source_id):
owner_org = harvest_source.get('owner_org', '')
if owner_org:
user_member_of_orgs = [org['id'] for org
in h.organizations_available('read')]
in h.organizations_available('read')]
if (harvest_source and owner_org in user_member_of_orgs):
context['ignore_capacity_check'] = True
query = logic.get_action('package_search')(context, search_dict)
base_url = h.url_for('{0}_read'.format(DATASET_TYPE_NAME), id=source_id)
def pager_url(q=None, page=None):
url = base_url
if page:
@ -78,6 +79,7 @@ def package_list_for_source(source_id):
return out
def package_count_for_source(source_id):
'''
Returns the current package count for datasets associated with the given
@ -89,20 +91,24 @@ def package_count_for_source(source_id):
result = logic.get_action('package_search')(context, search_dict)
return result.get('count', 0)
def harvesters_info():
context = {'model': model, 'user': p.toolkit.c.user or p.toolkit.c.author}
return logic.get_action('harvesters_info_show')(context,{})
return logic.get_action('harvesters_info_show')(context, {})
def harvester_types():
harvesters = harvesters_info()
return [{'text': p.toolkit._(h['title']), 'value': h['name']}
for h in harvesters]
def harvest_frequencies():
return [{'text': p.toolkit._(f.title()), 'value': f}
for f in UPDATE_FREQUENCIES]
def link_for_harvest_object(id=None, guid=None, text=None):
if not id and not guid:
@ -110,7 +116,7 @@ def link_for_harvest_object(id=None, guid=None, text=None):
if guid:
context = {'model': model, 'user': p.toolkit.c.user or p.toolkit.c.author}
obj =logic.get_action('harvest_object_show')(context, {'id': guid, 'attr': 'guid'})
obj = logic.get_action('harvest_object_show')(context, {'id': guid, 'attr': 'guid'})
id = obj.id
url = h.url_for('harvest_object_show', id=id)
@ -119,6 +125,7 @@ def link_for_harvest_object(id=None, guid=None, text=None):
return p.toolkit.literal(link)
def harvest_source_extra_fields():
fields = {}
for harvester in p.PluginImplementations(IHarvester):

View File

@ -2,9 +2,10 @@ from logging import Handler, NOTSET
from ckanext.harvest.model import HarvestLog
class DBLogHandler(Handler):
def __init__(self, level=NOTSET):
super(DBLogHandler,self).__init__(level=level)
super(DBLogHandler, self).__init__(level=level)
def emit(self, record):
try:
@ -12,5 +13,5 @@ class DBLogHandler(Handler):
msg = self.format(record)
obj = HarvestLog(level=level, content=msg)
obj.save()
except Exception as exc:
pass
except Exception:
pass

View File

@ -4,4 +4,3 @@ try:
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -1,5 +1,4 @@
import logging
from itertools import groupby
from sqlalchemy import or_
from ckan.model import User
import datetime
@ -21,6 +20,7 @@ from ckanext.harvest.logic.dictization import (harvest_source_dictize,
log = logging.getLogger(__name__)
@side_effect_free
def harvest_source_show(context, data_dict):
'''
@ -57,6 +57,7 @@ def harvest_source_show(context, data_dict):
return source_dict
@side_effect_free
def harvest_source_show_status(context, data_dict):
'''
@ -75,7 +76,6 @@ def harvest_source_show_status(context, data_dict):
p.toolkit.check_access('harvest_source_show_status', context, data_dict)
model = context.get('model')
source = harvest_model.HarvestSource.get(data_dict['id'])
@ -98,7 +98,7 @@ def harvest_source_show_status(context, data_dict):
# Get the most recent job
last_job = harvest_model.HarvestJob.filter(source=source) \
.order_by(harvest_model.HarvestJob.created.desc()).first()
.order_by(harvest_model.HarvestJob.created.desc()).first()
if not last_job:
return out
@ -107,26 +107,24 @@ def harvest_source_show_status(context, data_dict):
# Overall statistics
packages = model.Session.query(model.Package) \
.join(harvest_model.HarvestObject) \
.filter(harvest_model.HarvestObject.harvest_source_id==source.id) \
.filter(harvest_model.HarvestObject.current==True) \
.filter(model.Package.state==u'active') \
.filter(model.Package.private==False)
.join(harvest_model.HarvestObject) \
.filter(harvest_model.HarvestObject.harvest_source_id == source.id) \
.filter(
harvest_model.HarvestObject.current == True # noqa: E711
).filter(model.Package.state == u'active') \
.filter(model.Package.private == False)
out['total_datasets'] = packages.count()
return out
@side_effect_free
def harvest_source_list(context, data_dict):
'''
TODO: Use package search
'''
check_access('harvest_source_list',context,data_dict)
model = context['model']
session = context['session']
user = context.get('user','')
check_access('harvest_source_list', context, data_dict)
sources = _get_sources_for_user(context, data_dict)
@ -136,18 +134,19 @@ def harvest_source_list(context, data_dict):
@side_effect_free
def harvest_job_show(context,data_dict):
def harvest_job_show(context, data_dict):
check_access('harvest_job_show',context,data_dict)
check_access('harvest_job_show', context, data_dict)
id = data_dict.get('id')
attr = data_dict.get('attr',None)
attr = data_dict.get('attr', None)
job = HarvestJob.get(id,attr=attr)
job = HarvestJob.get(id, attr=attr)
if not job:
raise NotFound
return harvest_job_dictize(job,context)
return harvest_job_dictize(job, context)
@side_effect_free
def harvest_job_report(context, data_dict):
@ -168,9 +167,9 @@ def harvest_job_report(context, data_dict):
# Gather errors
q = model.Session.query(harvest_model.HarvestGatherError) \
.join(harvest_model.HarvestJob) \
.filter(harvest_model.HarvestGatherError.harvest_job_id==job.id) \
.order_by(harvest_model.HarvestGatherError.created.desc())
.join(harvest_model.HarvestJob) \
.filter(harvest_model.HarvestGatherError.harvest_job_id == job.id) \
.order_by(harvest_model.HarvestGatherError.created.desc())
for error in q.all():
report['gather_errors'].append({
@ -184,16 +183,16 @@ def harvest_job_report(context, data_dict):
original_url_builder = None
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == job.source.type:
if hasattr(harvester, 'get_original_url'):
if hasattr(harvester, 'get_original_url'):
original_url_builder = harvester.get_original_url
q = model.Session.query(harvest_model.HarvestObjectError, harvest_model.HarvestObject.guid) \
.join(harvest_model.HarvestObject) \
.filter(harvest_model.HarvestObject.harvest_job_id==job.id) \
.order_by(harvest_model.HarvestObjectError.harvest_object_id)
.join(harvest_model.HarvestObject) \
.filter(harvest_model.HarvestObject.harvest_job_id == job.id) \
.order_by(harvest_model.HarvestObjectError.harvest_object_id)
for error, guid in q.all():
if not error.harvest_object_id in report['object_errors']:
if error.harvest_object_id not in report['object_errors']:
report['object_errors'][error.harvest_object_id] = {
'guid': guid,
'errors': []
@ -211,29 +210,29 @@ def harvest_job_report(context, data_dict):
return report
@side_effect_free
def harvest_job_list(context,data_dict):
def harvest_job_list(context, data_dict):
'''Returns a list of jobs and details of objects and errors.
:param status: filter by e.g. "New" or "Finished" jobs
:param source_id: filter by a harvest source
'''
check_access('harvest_job_list',context,data_dict)
check_access('harvest_job_list', context, data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id',False)
source_id = data_dict.get('source_id', False)
status = data_dict.get('status', False)
query = session.query(HarvestJob)
if source_id:
query = query.filter(HarvestJob.source_id==source_id)
query = query.filter(HarvestJob.source_id == source_id)
if status:
query = query.filter(HarvestJob.status==status)
query = query.filter(HarvestJob.status == status)
query = query.order_by(HarvestJob.created.desc())
@ -242,8 +241,9 @@ def harvest_job_list(context,data_dict):
context['return_error_summary'] = False
return [harvest_job_dictize(job, context) for job in jobs]
@side_effect_free
def harvest_object_show(context,data_dict):
def harvest_object_show(context, data_dict):
p.toolkit.check_access('harvest_object_show', context, data_dict)
@ -251,8 +251,8 @@ def harvest_object_show(context,data_dict):
dataset_id = data_dict.get('dataset_id')
if id:
attr = data_dict.get('attr',None)
obj = HarvestObject.get(id,attr=attr)
attr = data_dict.get('attr', None)
obj = HarvestObject.get(id, attr=attr)
elif dataset_id:
model = context['model']
@ -261,9 +261,10 @@ def harvest_object_show(context,data_dict):
raise p.toolkit.ObjectNotFound('Dataset not found')
obj = model.Session.query(HarvestObject) \
.filter(HarvestObject.package_id == pkg.id) \
.filter(HarvestObject.current == True) \
.first()
.filter(HarvestObject.package_id == pkg.id) \
.filter(
HarvestObject.current == True # noqa: E711
).first()
else:
raise p.toolkit.ValidationError(
'Please provide either an "id" or a "dataset_id" parameter')
@ -271,37 +272,39 @@ def harvest_object_show(context,data_dict):
if not obj:
raise p.toolkit.ObjectNotFound('Harvest object not found')
return harvest_object_dictize(obj, context)
@side_effect_free
def harvest_object_list(context,data_dict):
def harvest_object_list(context, data_dict):
check_access('harvest_object_list',context,data_dict)
check_access('harvest_object_list', context, data_dict)
model = context['model']
session = context['session']
only_current = data_dict.get('only_current',True)
source_id = data_dict.get('source_id',False)
only_current = data_dict.get('only_current', True)
source_id = data_dict.get('source_id', False)
query = session.query(HarvestObject)
if source_id:
query = query.filter(HarvestObject.source_id==source_id)
query = query.filter(HarvestObject.source_id == source_id)
if only_current:
query = query.filter(HarvestObject.current==True)
query = query.filter(
HarvestObject.current == True # noqa: E712
)
objects = query.all()
return [getattr(obj,'id') for obj in objects]
return [getattr(obj, 'id') for obj in objects]
@side_effect_free
def harvesters_info_show(context,data_dict):
def harvesters_info_show(context, data_dict):
'''Returns details of the installed harvesters.'''
check_access('harvesters_info_show',context,data_dict)
check_access('harvesters_info_show', context, data_dict)
available_harvesters = []
for harvester in PluginImplementations(IHarvester):
@ -309,13 +312,14 @@ def harvesters_info_show(context,data_dict):
if not info or 'name' not in info:
log.error('Harvester %r does not provide the harvester name in the info response' % str(harvester))
continue
info['show_config'] = (info.get('form_config_interface','') == 'Text')
info['show_config'] = (info.get('form_config_interface', '') == 'Text')
available_harvesters.append(info)
return available_harvesters
@side_effect_free
def harvest_log_list(context,data_dict):
def harvest_log_list(context, data_dict):
'''Returns a list of harvester log entries.
:param limit: number of logs to be shown default: 100
@ -325,57 +329,59 @@ def harvest_log_list(context,data_dict):
check_access('harvest_log_list', context, data_dict)
model = context['model']
session = context['session']
try:
limit = int(data_dict.get('limit', 100))
except ValueError:
limit = 100
if data_dict.get('per_page', False):
try:
limit = int(data_dict.get('per_page', 100))
except ValueError:
limit = 100
try:
offset = int(data_dict.get('offset', 0))
except ValueError:
offset = 0
level = data_dict.get('level', None)
query = session.query(HarvestLog)
if level is not None:
query = query.filter(HarvestLog.level==level.upper())
query = query.filter(HarvestLog.level == level.upper())
query = query.order_by(HarvestLog.created.desc())
logs = query.offset(offset).limit(limit).all()
out = [harvest_log_dictize(obj, context) for obj in logs]
out = [harvest_log_dictize(obj, context) for obj in logs]
return out
def _get_sources_for_user(context,data_dict):
model = context['model']
def _get_sources_for_user(context, data_dict):
session = context['session']
user = context.get('user','')
user = context.get('user', '')
only_active = data_dict.get('only_active',False)
only_to_run = data_dict.get('only_to_run',False)
only_active = data_dict.get('only_active', False)
only_to_run = data_dict.get('only_to_run', False)
query = session.query(HarvestSource) \
.order_by(HarvestSource.created.desc())
.order_by(HarvestSource.created.desc())
if only_active:
query = query.filter(HarvestSource.active==True) \
query = query.filter(
HarvestSource.active == True # noqa: E712
) \
if only_to_run:
query = query.filter(HarvestSource.frequency!='MANUAL')
query = query.filter(or_(HarvestSource.next_run<=datetime.datetime.utcnow(),
HarvestSource.next_run==None)
query = query.filter(HarvestSource.frequency != 'MANUAL')
query = query.filter(or_(HarvestSource.next_run <= datetime.datetime.utcnow(),
HarvestSource.next_run == None # noqa: E711
)
)
user_obj = User.get(user)
@ -389,7 +395,7 @@ def _get_sources_for_user(context,data_dict):
publisher_filters = []
publishers_for_the_user = user_obj.get_groups(u'publisher')
for publisher_id in [g.id for g in publishers_for_the_user]:
publisher_filters.append(HarvestSource.publisher_id==publisher_id)
publisher_filters.append(HarvestSource.publisher_id == publisher_id)
if len(publisher_filters):
query = query.filter(or_(*publisher_filters))
@ -403,4 +409,3 @@ def _get_sources_for_user(context,data_dict):
sources = query.all()
return sources

View File

@ -279,10 +279,13 @@ def harvest_source_job_history_clear(context, data_dict):
model = context['model']
sql = '''begin;
delete from harvest_object_error where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_error where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id
in (select id from harvest_object where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_gather_error where harvest_job_id
in (select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
commit;
'''.format(harvest_source_id=harvest_source_id)
@ -367,7 +370,6 @@ def harvest_objects_import(context, data_dict):
log.info('Harvest objects import: %r', data_dict)
check_access('harvest_objects_import', context, data_dict)
model = context['model']
session = context['session']
source_id = data_dict.get('source_id')
guid = data_dict.get('guid')
@ -382,7 +384,7 @@ def harvest_objects_import(context, data_dict):
last_objects_ids = \
session.query(HarvestObject.id) \
.filter(HarvestObject.guid == guid) \
.filter(HarvestObject.current == True)
.filter(HarvestObject.current == True) # noqa: E712
elif source_id:
source = HarvestSource.get(source_id)
@ -398,7 +400,7 @@ def harvest_objects_import(context, data_dict):
session.query(HarvestObject.id) \
.join(HarvestSource) \
.filter(HarvestObject.source == source) \
.filter(HarvestObject.current == True)
.filter(HarvestObject.current == True) # noqa: E712
elif harvest_object_id:
last_objects_ids = \
@ -408,15 +410,16 @@ def harvest_objects_import(context, data_dict):
last_objects_ids = \
session.query(HarvestObject.id) \
.join(Package) \
.filter(HarvestObject.current == True) \
.filter(Package.state == u'active') \
.filter(
HarvestObject.current == True # noqa: E712
).filter(Package.state == u'active') \
.filter(or_(Package.id == package_id_or_name,
Package.name == package_id_or_name))
join_datasets = False
else:
last_objects_ids = \
session.query(HarvestObject.id) \
.filter(HarvestObject.current == True)
.filter(HarvestObject.current == True) # noqa: E712
if join_datasets:
last_objects_ids = last_objects_ids.join(Package) \
@ -539,8 +542,9 @@ def harvest_jobs_run(context, data_dict):
# object
last_object = session.query(HarvestObject) \
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(HarvestObject.import_finished != None) \
.order_by(HarvestObject.import_finished.desc()) \
.filter(
HarvestObject.import_finished != None # noqa: E711
).order_by(HarvestObject.import_finished.desc()) \
.first()
if last_object:
job_obj.finished = last_object.import_finished
@ -555,7 +559,8 @@ def harvest_jobs_run(context, data_dict):
status = get_action('harvest_source_show_status')(context, {'id': job_obj.source.id})
if toolkit.asbool(config.get('ckan.harvest.status_mail.errored')) and (status['last_job']['stats']['errored']):
if toolkit.asbool(config.get('ckan.harvest.status_mail.errored'))\
and (status['last_job']['stats']['errored']):
send_error_mail(context, job_obj.source.id, status)
else:
log.debug('Ongoing job:%s source:%s',
@ -575,7 +580,8 @@ def send_error_mail(context, source_id, status):
ckan_site_url = config.get('ckan.site_url')
job_url = toolkit.url_for('harvest_job_show', source=source['id'], id=last_job['id'])
msg = toolkit._('This is a failure-notification of the latest harvest job ({0}) set-up in {1}.').format(job_url, ckan_site_url)
msg = toolkit._('This is a failure-notification of the latest harvest job ({0}) set-up in {1}.')\
.format(job_url, ckan_site_url)
msg += '\n\n'
msg += toolkit._('Harvest Source: {0}').format(source['title']) + '\n'
@ -619,13 +625,17 @@ def send_error_mail(context, source_id, status):
if obj_error or job_error:
msg += '\n--\n'
msg += toolkit._('You are receiving this email because you are currently set-up as Administrator for {0}. Please do not reply to this email as it was sent from a non-monitored address.').format(config.get('ckan.site_title'))
msg += toolkit._('You are receiving this email because you are currently set-up as Administrator for {0}.'
' Please do not reply to this email as it was sent from a non-monitored address.')\
.format(config.get('ckan.site_title'))
recipients = []
# gather sysadmins
model = context['model']
sysadmins = model.Session.query(model.User).filter(model.User.sysadmin == True).all()
sysadmins = model.Session.query(model.User).filter(
model.User.sysadmin == True # noqa: E712
).all()
for sysadmin in sysadmins:
recipients.append({
'name': sysadmin.name,
@ -796,6 +806,7 @@ def harvest_source_reindex(context, data_dict):
'''Reindex a single harvest source'''
harvest_source_id = logic.get_or_bust(data_dict, 'id')
defer_commit = context.get('defer_commit', False)
if 'extras_as_string'in context:

View File

@ -1,6 +1,7 @@
from ckan.plugins import toolkit as pt
from ckanext.harvest import model as harvest_model
def user_is_sysadmin(context):
'''
Checks if the user defined in the context is a sysadmin
@ -15,12 +16,13 @@ def user_is_sysadmin(context):
return user_obj.sysadmin
def _get_object(context, data_dict, name, class_name):
'''
return the named item if in the data_dict, or get it from
model.class_name
'''
if not name in context:
if name not in context:
id = data_dict.get('id', None)
obj = getattr(harvest_model, class_name).get(id)
if not obj:
@ -29,11 +31,14 @@ def _get_object(context, data_dict, name, class_name):
obj = context[name]
return obj
def get_source_object(context, data_dict = {}):
def get_source_object(context, data_dict={}):
return _get_object(context, data_dict, 'source', 'HarvestSource')
def get_job_object(context, data_dict = {}):
def get_job_object(context, data_dict={}):
return _get_object(context, data_dict, 'job', 'HarvestJob')
def get_obj_object(context, data_dict = {}):
def get_obj_object(context, data_dict={}):
return _get_object(context, data_dict, 'obj', 'HarvestObject')

View File

@ -53,6 +53,7 @@ def harvest_job_create_all(context, data_dict):
else:
return {'success': True}
def harvest_object_create(context, data_dict):
"""
Auth check for creating a harvest object
@ -61,4 +62,3 @@ def harvest_object_create(context, data_dict):
"""
# sysadmins can run all actions if we've got to this point we're not a sysadmin
return {'success': False, 'msg': pt._('Only the sysadmins can create harvest objects')}

View File

@ -3,7 +3,6 @@ from ckan.plugins import toolkit as pt
from ckanext.harvest.logic.auth import get_job_object
def auth_allow_anonymous_access(auth_function):
'''
Local version of the auth_allow_anonymous_access decorator that only
@ -42,6 +41,7 @@ def harvest_source_show(context, data_dict):
'msg': pt._('User {0} not authorized to read harvest source {1}')
.format(user, source_id)}
@auth_allow_anonymous_access
def harvest_source_show_status(context, data_dict):
'''
@ -51,6 +51,7 @@ def harvest_source_show_status(context, data_dict):
'''
return harvest_source_show(context, data_dict)
@auth_allow_anonymous_access
def harvest_source_list(context, data_dict):
'''
@ -105,7 +106,6 @@ def harvest_job_list(context, data_dict):
.format(user, source_id)}
@auth_allow_anonymous_access
def harvest_object_show(context, data_dict):
'''

View File

@ -27,6 +27,7 @@ def harvest_source_update(context, data_dict):
return {'success': False,
'msg': pt._('User {0} not authorized to update harvest source {1}').format(user, source_id)}
def harvest_sources_clear(context, data_dict):
'''
Authorization check for clearing history for all harvest sources
@ -38,6 +39,7 @@ def harvest_sources_clear(context, data_dict):
else:
return {'success': True}
def harvest_source_clear(context, data_dict):
'''
Authorization check for clearing a harvest source
@ -46,6 +48,7 @@ def harvest_source_clear(context, data_dict):
'''
return harvest_source_update(context, data_dict)
def harvest_objects_import(context, data_dict):
'''
Authorization check reimporting all harvest objects
@ -101,6 +104,7 @@ def harvest_sources_reindex(context, data_dict):
else:
return {'success': True}
def harvest_source_reindex(context, data_dict):
'''
Authorization check for reindexing a harvest source

View File

@ -1,4 +1,4 @@
from sqlalchemy import distinct, func
from sqlalchemy import distinct, func, text
from ckan.model import Package, Group
from ckan import logic
@ -66,7 +66,7 @@ def harvest_job_dictize(job, context):
.join(HarvestObject) \
.filter(HarvestObject.harvest_job_id == job.id) \
.group_by(HarvestObjectError.message) \
.order_by('error_count desc') \
.order_by(text('error_count desc')) \
.limit(context.get('error_summmary_limit', 20))
out['object_error_summary'] = q.all()
q = model.Session.query(
@ -74,7 +74,7 @@ def harvest_job_dictize(job, context):
func.count(HarvestGatherError.message).label('error_count')) \
.filter(HarvestGatherError.harvest_job_id == job.id) \
.group_by(HarvestGatherError.message) \
.order_by('error_count desc') \
.order_by(text('error_count desc')) \
.limit(context.get('error_summmary_limit', 20))
out['gather_error_summary'] = q.all()
return out
@ -98,12 +98,14 @@ def harvest_object_dictize(obj, context):
return out
def harvest_log_dictize(obj, context):
out = obj.as_dict()
del out['id']
return out
def _get_source_status(source, context):
'''
TODO: Deprecated, use harvest_source_show_status instead
@ -140,7 +142,7 @@ def _get_source_status(source, context):
.order_by(HarvestJob.created.desc()).first()
if last_job:
#TODO: Should we encode the dates as strings?
# TODO: Should we encode the dates as strings?
out['last_harvest_request'] = str(last_job.gather_finished)
# Overall statistics
@ -148,8 +150,9 @@ def _get_source_status(source, context):
Package.name) \
.join(Package).join(HarvestSource) \
.filter(HarvestObject.source == source) \
.filter(HarvestObject.current == True) \
.filter(Package.state == u'active')
.filter(
HarvestObject.current == True # noqa: E711
).filter(Package.state == u'active')
out['overall_statistics']['added'] = packages.count()

View File

@ -5,7 +5,6 @@ from ckan.logic.validators import (package_id_exists,
name_validator,
owner_org_validator,
package_name_validator,
ignore_not_package_admin,
boolean_validator,
)
from ckan.logic.converters import convert_to_extras, convert_from_extras
@ -28,6 +27,7 @@ from ckanext.harvest.logic.validators import (harvest_source_url_validator,
harvest_object_extras_validator,
)
def harvest_source_schema():
schema = {
@ -56,9 +56,9 @@ def harvest_source_schema():
from ckan.logic.validators import datasets_with_no_organization_cannot_be_private
schema['private'].append(datasets_with_no_organization_cannot_be_private)
return schema
def harvest_source_create_package_schema():
schema = harvest_source_schema()
@ -68,6 +68,7 @@ def harvest_source_create_package_schema():
return schema
def harvest_source_update_package_schema():
schema = harvest_source_create_package_schema()
@ -75,6 +76,7 @@ def harvest_source_update_package_schema():
return schema
def harvest_source_show_package_schema():
schema = harvest_source_schema()
@ -97,6 +99,7 @@ def harvest_source_show_package_schema():
return schema
def harvest_object_create_schema():
schema = {
'guid': [ignore_missing, unicode],
@ -108,4 +111,3 @@ def harvest_object_create_schema():
'extras': [ignore_missing, harvest_object_extras_validator],
}
return schema

View File

@ -10,8 +10,6 @@ from ckanext.harvest.plugin import DATASET_TYPE_NAME
from ckanext.harvest.model import HarvestSource, UPDATE_FREQUENCIES, HarvestJob
from ckanext.harvest.interfaces import IHarvester
from ckan.lib.navl.validators import keep_extras
log = logging.getLogger(__name__)
@ -77,7 +75,7 @@ def harvest_source_url_validator(key, data, errors, context):
try:
new_config = data.get(key[:-1] + ('config',))
except:
except Exception:
new_config = None
new_url = _normalize_url(data[key])
@ -123,7 +121,7 @@ def harvest_source_type_exists(value, context):
continue
available_types.append(info['name'])
if not value in available_types:
if value not in available_types:
raise Invalid('Unknown harvester type: %s. Registered types: %r' %
(value, available_types))

View File

@ -21,7 +21,7 @@ from ckan.model.domain_object import DomainObject
from ckan.model.package import Package
from ckan.lib.munge import munge_title_to_name
UPDATE_FREQUENCIES = ['MANUAL','MONTHLY','WEEKLY','BIWEEKLY','DAILY', 'ALWAYS']
UPDATE_FREQUENCIES = ['MANUAL', 'MONTHLY', 'WEEKLY', 'BIWEEKLY', 'DAILY', 'ALWAYS']
log = logging.getLogger(__name__)
@ -65,7 +65,7 @@ def setup():
harvest_object_error_table.create()
harvest_object_extra_table.create()
harvest_log_table.create()
log.debug('Harvest tables created')
else:
from ckan.model.meta import engine
@ -74,10 +74,10 @@ def setup():
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
column_names = [column['name'] for column in columns]
if not 'title' in column_names:
if 'title' not in column_names:
log.debug('Harvest tables need to be updated')
migrate_v2()
if not 'frequency' in column_names:
if 'frequency' not in column_names:
log.debug('Harvest tables need to be updated')
migrate_v3()
@ -89,14 +89,14 @@ def setup():
log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate))
sources_to_migrate = [s[0] for s in sources_to_migrate]
migrate_v3_create_datasets(sources_to_migrate)
# Check if harvest_log table exist - needed for existing users
if not 'harvest_log' in inspector.get_table_names():
if 'harvest_log' not in inspector.get_table_names():
harvest_log_table.create()
# Check if harvest_object has a index
index_names = [index['name'] for index in inspector.get_indexes("harvest_object")]
if not "harvest_job_id_idx" in index_names:
if "harvest_job_id_idx" not in index_names:
log.debug('Creating index for harvest_object')
Index("harvest_job_id_idx", harvest_object_table.c.harvest_job_id).create()
@ -104,6 +104,7 @@ def setup():
class HarvestError(Exception):
pass
class HarvestDomainObject(DomainObject):
'''Convenience methods for searching objects
'''
@ -112,7 +113,7 @@ class HarvestDomainObject(DomainObject):
@classmethod
def get(cls, key, default=None, attr=None):
'''Finds a single entity in the register.'''
if attr == None:
if attr is None:
attr = cls.key_attr
kwds = {attr: key}
o = cls.filter(**kwds).first()
@ -154,6 +155,7 @@ class HarvestJob(HarvestDomainObject):
'''
pass
class HarvestObject(HarvestDomainObject):
'''A Harvest Object is created every time an element is fetched from a
harvest source. Its contents can be processed and imported to ckan
@ -161,9 +163,11 @@ class HarvestObject(HarvestDomainObject):
'''
class HarvestObjectExtra(HarvestDomainObject):
'''Extra key value data for Harvest objects'''
class HarvestGatherError(HarvestDomainObject):
'''Gather errors are raised during the **gather** stage of a harvesting
job.
@ -197,15 +201,15 @@ class HarvestObjectError(HarvestDomainObject):
stage=stage, line=line)
try:
err.save()
except InvalidRequestError, e:
except InvalidRequestError:
# Clear any in-progress sqlalchemy transactions
try:
Session.rollback()
except:
except Exception:
pass
try:
Session.remove()
except:
except Exception:
pass
err.save()
finally:
@ -213,13 +217,15 @@ class HarvestObjectError(HarvestDomainObject):
if line else message
log.debug(log_message)
class HarvestLog(HarvestDomainObject):
'''HarvestLog objects are created each time something is logged
using python's standard logging module
'''
pass
def harvest_object_before_insert_listener(mapper,connection,target):
def harvest_object_before_insert_listener(mapper, connection, target):
'''
For compatibility with old harvesters, check if the source id has
been set, and set it automatically from the job if not.
@ -241,22 +247,26 @@ def define_harvester_tables():
global harvest_object_error_table
global harvest_log_table
harvest_source_table = Table('harvest_source', metadata,
harvest_source_table = Table(
'harvest_source',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('url', types.UnicodeText, nullable=False),
Column('title', types.UnicodeText, default=u''),
Column('description', types.UnicodeText, default=u''),
Column('config', types.UnicodeText, default=u''),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('type',types.UnicodeText,nullable=False),
Column('active',types.Boolean,default=True),
Column('type', types.UnicodeText, nullable=False),
Column('active', types.Boolean, default=True),
Column('user_id', types.UnicodeText, default=u''),
Column('publisher_id', types.UnicodeText, default=u''),
Column('frequency', types.UnicodeText, default=u'MANUAL'),
Column('next_run', types.DateTime),
)
# Was harvesting_job
harvest_job_table = Table('harvest_job', metadata,
harvest_job_table = Table(
'harvest_job',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('gather_started', types.DateTime),
@ -268,7 +278,9 @@ def define_harvester_tables():
)
# A harvest_object contains a representation of one dataset during a
# particular harvest
harvest_object_table = Table('harvest_object', metadata,
harvest_object_table = Table(
'harvest_object',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
# The guid is the 'identity' of the dataset, according to the source.
# So if you reharvest it, then the harvester knows which dataset to
@ -278,7 +290,7 @@ def define_harvester_tables():
# When you harvest a dataset multiple times, only the latest
# successfully imported harvest_object should be flagged 'current'.
# The import_stage usually reads and writes it.
Column('current',types.Boolean,default=False),
Column('current', types.Boolean, default=False),
Column('gathered', types.DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', types.DateTime),
Column('content', types.UnicodeText, nullable=True),
@ -288,32 +300,39 @@ def define_harvester_tables():
# state: WAITING, FETCH, IMPORT, COMPLETE, ERROR
Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime),
Column('retry_times',types.Integer, default=0),
Column('retry_times', types.Integer, default=0),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True), nullable=True),
Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True),
nullable=True),
# report_status: 'added', 'updated', 'not modified', 'deleted', 'errored'
Column('report_status', types.UnicodeText, nullable=True),
Index('harvest_job_id_idx', 'harvest_job_id'),
)
# New table
harvest_object_extra_table = Table('harvest_object_extra', metadata,
harvest_object_extra_table = Table(
'harvest_object_extra',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('key',types.UnicodeText),
Column('key', types.UnicodeText),
Column('value', types.UnicodeText),
)
# New table
harvest_gather_error_table = Table('harvest_gather_error',metadata,
harvest_gather_error_table = Table(
'harvest_gather_error',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('message', types.UnicodeText),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
# New table
harvest_object_error_table = Table('harvest_object_error',metadata,
harvest_object_error_table = Table(
'harvest_object_error',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('message', types.UnicodeText),
@ -322,7 +341,9 @@ def define_harvester_tables():
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
# Harvest Log table
harvest_log_table = Table('harvest_log', metadata,
harvest_log_table = Table(
'harvest_log',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('content', types.UnicodeText, nullable=False),
Column('level', types.Enum('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', name='log_level')),
@ -351,7 +372,7 @@ def define_harvester_tables():
HarvestObject,
harvest_object_table,
properties={
'package':relation(
'package': relation(
Package,
lazy=True,
backref='harvest_objects',
@ -374,7 +395,7 @@ def define_harvester_tables():
HarvestGatherError,
harvest_gather_error_table,
properties={
'job':relation(
'job': relation(
HarvestJob,
backref='gather_errors'
),
@ -385,7 +406,7 @@ def define_harvester_tables():
HarvestObjectError,
harvest_object_error_table,
properties={
'object':relation(
'object': relation(
HarvestObject,
backref=backref('errors', cascade='all,delete-orphan')
),
@ -396,13 +417,13 @@ def define_harvester_tables():
HarvestObjectExtra,
harvest_object_extra_table,
properties={
'object':relation(
'object': relation(
HarvestObject,
backref=backref('extras', cascade='all,delete-orphan')
),
},
)
mapper(
HarvestLog,
harvest_log_table,
@ -410,6 +431,7 @@ def define_harvester_tables():
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
def migrate_v2():
log.debug('Migrating harvest tables to v2. This may take a while...')
conn = Session.connection()
@ -419,7 +441,8 @@ def migrate_v2():
ALTER TABLE harvest_object ADD COLUMN current boolean;
ALTER TABLE harvest_object ADD COLUMN harvest_source_id text;
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id) REFERENCES harvest_source(id);
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id)
REFERENCES harvest_source(id);
UPDATE harvest_object o SET harvest_source_id = j.source_id FROM harvest_job j WHERE o.harvest_job_id = j.id;
'''
@ -427,9 +450,10 @@ def migrate_v2():
# Flag current harvest_objects
guids = Session.query(distinct(HarvestObject.guid)) \
.join(Package) \
.filter(HarvestObject.package!=None) \
.filter(Package.state==u'active')
.join(Package) \
.filter(
HarvestObject.package != None # noqa: E711
).filter(Package.state == u'active')
update_statement = '''
UPDATE harvest_object
@ -456,31 +480,31 @@ def migrate_v3():
log.debug('Migrating harvest tables to v3. This may take a while...')
conn = Session.connection()
statement = """CREATE TABLE harvest_object_extra (
id text NOT NULL,
harvest_object_id text,
"key" text,
"value" text
statement = """CREATE TABLE harvest_object_extra (
id text NOT NULL,
harvest_object_id text,
"key" text,
"value" text
);
ALTER TABLE harvest_object
ADD COLUMN import_started timestamp without time zone,
ADD COLUMN import_finished timestamp without time zone,
ADD COLUMN "state" text,
ADD COLUMN "report_status" text;
ADD COLUMN import_started timestamp without time zone,
ADD COLUMN import_finished timestamp without time zone,
ADD COLUMN "state" text,
ADD COLUMN "report_status" text;
ALTER TABLE harvest_source
ADD COLUMN frequency text,
ADD COLUMN frequency text,
ADD COLUMN next_run timestamp without time zone;
ALTER TABLE harvest_job
ADD COLUMN finished timestamp without time zone;
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_pkey PRIMARY KEY (id);
ADD CONSTRAINT harvest_object_extra_pkey PRIMARY KEY (id);
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
UPDATE harvest_object set state = 'COMPLETE' where package_id is not null;
UPDATE harvest_object set state = 'ERROR' where package_id is null;
@ -494,13 +518,14 @@ ALTER TABLE harvest_object
ADD CONSTRAINT harvest_object_package_id_fkey FOREIGN KEY (package_id) REFERENCES package(id) DEFERRABLE;
ALTER TABLE harvest_object_error
ADD COLUMN line integer;
ADD COLUMN line integer;
"""
conn.execute(statement)
Session.commit()
log.info('Harvest tables migrated to v3')
class PackageIdHarvestSourceIdMismatch(Exception):
"""
The package created for the harvest source must match the id of the
@ -508,6 +533,7 @@ class PackageIdHarvestSourceIdMismatch(Exception):
"""
pass
def migrate_v3_create_datasets(source_ids=None):
import pylons
from paste.registry import Registry
@ -530,15 +556,14 @@ def migrate_v3_create_datasets(source_ids=None):
log.debug('No harvest sources to migrate')
return
site_user_name = logic.get_action('get_site_user')({'model': model, 'ignore_auth': True},{})['name']
site_user_name = logic.get_action('get_site_user')({'model': model, 'ignore_auth': True}, {})['name']
context = {'model': model,
'session': model.Session,
'user': site_user_name, # TODO: auth of existing sources?
'user': site_user_name, # TODO: auth of existing sources?
'return_id_only': True,
'extras_as_string': True,
}
}
def gen_new_name(title):
name = munge_title_to_name(title).replace('_', '-')
@ -585,9 +610,10 @@ def migrate_v3_create_datasets(source_ids=None):
raise PackageIdHarvestSourceIdMismatch
log.info('Created new package for source {0} ({1})'.format(source.id, source.url))
except logic.ValidationError,e:
except logic.ValidationError, e:
log.error('Validation Error: %s' % str(e.error_summary))
def clean_harvest_log(condition):
Session.query(HarvestLog).filter(HarvestLog.created <= condition)\
.delete(synchronize_session=False)

View File

@ -14,16 +14,11 @@ except ImportError:
class DefaultTranslation():
pass
from ckan.lib.navl import dictization_functions
from ckanext.harvest import logic as harvest_logic
from ckanext.harvest.model import setup as model_setup
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.log import DBLogHandler
log = getLogger(__name__)
assert not log.disabled
@ -44,10 +39,9 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
if p.toolkit.check_ckan_version(min_version='2.5.0'):
p.implements(p.ITranslation, inherit=True)
startup = False
## IPackageController
# IPackageController
def after_create(self, context, data_dict):
if 'type' in data_dict and data_dict['type'] == DATASET_TYPE_NAME and not self.startup:
@ -71,13 +65,12 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
# check_ckan_version should be more clever than this
if p.toolkit.check_ckan_version(max_version='2.1.99') and (
not 'type' in data_dict or data_dict['type'] != DATASET_TYPE_NAME):
'type' not in data_dict or data_dict['type'] != DATASET_TYPE_NAME):
# This is a normal dataset, check if it was harvested and if so, add
# info about the HarvestObject and HarvestSource
harvest_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.package_id==data_dict['id']) \
.filter(HarvestObject.current==True) \
.first()
.filter(HarvestObject.package_id == data_dict['id']) \
.filter(HarvestObject.current==True).first() # noqa
if harvest_object:
for key, value in [
@ -88,7 +81,6 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
_add_extra(data_dict, key, value)
return data_dict
def before_search(self, search_params):
'''Prevents the harvesters being shown in dataset search results.'''
@ -118,14 +110,13 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
data_dict['status'] = status_action(context, {'id': source.id})
elif not 'type' in data_dict or data_dict['type'] != DATASET_TYPE_NAME:
elif 'type' not in data_dict or data_dict['type'] != DATASET_TYPE_NAME:
# This is a normal dataset, check if it was harvested and if so, add
# info about the HarvestObject and HarvestSource
harvest_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.package_id==data_dict['id']) \
.filter(HarvestObject.current==True) \
.first()
.filter(HarvestObject.package_id == data_dict['id']) \
.filter(HarvestObject.current == True).first() # noqa
# If the harvest extras are there, remove them. This can happen eg
# when calling package_update or resource_update, which call
@ -135,7 +126,6 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
if not e['key']
in ('harvest_object_id', 'harvest_source_id', 'harvest_source_title',)]
# We only want to add these extras at index time so they are part
# of the cached data_dict used to display, search results etc. We
# don't want them added when editing the dataset, otherwise we get
@ -152,7 +142,7 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
return data_dict
## IDatasetForm
# IDatasetForm
def is_fallback(self):
return False
@ -228,20 +218,42 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
# (ie they are the ones for a package type)
controller = 'ckanext.harvest.controllers.view:ViewController'
map.connect('{0}_delete'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/delete/:id',controller=controller, action='delete')
map.connect('{0}_refresh'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/refresh/:id',controller=controller,
action='refresh')
map.connect('{0}_admin'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/admin/:id', controller=controller, action='admin')
map.connect('{0}_about'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/about/:id', controller=controller, action='about')
map.connect('{0}_clear'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/clear/:id', controller=controller, action='clear')
map.connect('{0}_delete'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/delete/:id',
controller=controller,
action='delete')
map.connect('{0}_refresh'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/refresh/:id',
controller=controller,
action='refresh')
map.connect('{0}_admin'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/admin/:id',
controller=controller,
action='admin')
map.connect('{0}_about'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/about/:id',
controller=controller,
action='about')
map.connect('{0}_clear'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/clear/:id',
controller=controller,
action='clear')
map.connect('harvest_job_list', '/' + DATASET_TYPE_NAME + '/{source}/job', controller=controller, action='list_jobs')
map.connect('harvest_job_show_last', '/' + DATASET_TYPE_NAME + '/{source}/job/last', controller=controller, action='show_last_job')
map.connect('harvest_job_show', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}', controller=controller, action='show_job')
map.connect('harvest_job_abort', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}/abort', controller=controller, action='abort_job')
map.connect('harvest_job_list', '/' + DATASET_TYPE_NAME + '/{source}/job',
controller=controller,
action='list_jobs')
map.connect('harvest_job_show_last', '/' + DATASET_TYPE_NAME + '/{source}/job/last',
controller=controller,
action='show_last_job')
map.connect('harvest_job_show', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}',
controller=controller,
action='show_job')
map.connect('harvest_job_abort', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}/abort',
controller=controller,
action='abort_job')
map.connect('harvest_object_show', '/' + DATASET_TYPE_NAME + '/object/:id', controller=controller, action='show_object')
map.connect('harvest_object_for_dataset_show', '/dataset/harvest_object/:id', controller=controller, action='show_object', ref_type='dataset')
map.connect('harvest_object_show', '/' + DATASET_TYPE_NAME + '/object/:id',
controller=controller,
action='show_object')
map.connect('harvest_object_for_dataset_show', '/dataset/harvest_object/:id',
controller=controller,
action='show_object',
ref_type='dataset')
return map
@ -270,7 +282,7 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
# https://github.com/ckan/ckan/pull/4521
config['ckan.legacy_route_mappings'] = json.dumps(mappings)
## IActions
# IActions
def get_actions(self):
@ -279,7 +291,7 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
return action_functions
## IAuthFunctions
# IAuthFunctions
def get_auth_functions(self):
@ -288,7 +300,7 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
return auth_functions
## ITemplateHelpers
# ITemplateHelpers
def get_helpers(self):
from ckanext.harvest import helpers as harvest_helpers
@ -306,31 +318,33 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
def dataset_facets(self, facets_dict, package_type):
if package_type <> 'harvest':
if package_type != 'harvest':
return facets_dict
return OrderedDict([('frequency', 'Frequency'),
('source_type','Type'),
])
('source_type', 'Type'),
])
def organization_facets(self, facets_dict, organization_type, package_type):
if package_type <> 'harvest':
if package_type != 'harvest':
return facets_dict
return OrderedDict([('frequency', 'Frequency'),
('source_type','Type'),
])
('source_type', 'Type'),
])
def _add_extra(data_dict, key, value):
if not 'extras' in data_dict:
if 'extras' not in data_dict:
data_dict['extras'] = []
data_dict['extras'].append({
'key': key, 'value': value, 'state': u'active'
})
def _get_logic_functions(module_root, logic_functions = {}):
def _get_logic_functions(module_root, logic_functions={}):
for module_name in ['get', 'create', 'update', 'patch', 'delete']:
module_path = '%s.%s' % (module_root, module_name,)
@ -341,12 +355,13 @@ def _get_logic_functions(module_root, logic_functions = {}):
module = getattr(module, part)
for key, value in module.__dict__.items():
if not key.startswith('_') and (hasattr(value, '__call__')
and (value.__module__ == module_path)):
if not key.startswith('_') and (hasattr(value, '__call__')
and (value.__module__ == module_path)):
logic_functions[key] = value
return logic_functions
def _create_harvest_source_object(context, data_dict):
'''
Creates an actual HarvestSource object with the data dict
@ -375,7 +390,7 @@ def _create_harvest_source_object(context, data_dict):
'publisher_id', 'config', 'frequency']
for o in opt:
if o in data_dict and data_dict[o] is not None:
source.__setattr__(o,data_dict[o])
source.__setattr__(o, data_dict[o])
source.active = not data_dict.get('state', None) == 'deleted'
@ -385,6 +400,7 @@ def _create_harvest_source_object(context, data_dict):
return source
def _update_harvest_source_object(context, data_dict):
'''
Updates an actual HarvestSource object with the data dict
@ -406,14 +422,13 @@ def _update_harvest_source_object(context, data_dict):
log.error('Harvest source %s does not exist', source_id)
raise logic.NotFound('Harvest source %s does not exist' % source_id)
fields = ['url', 'title', 'description', 'user_id',
'publisher_id', 'frequency']
for f in fields:
if f in data_dict and data_dict[f] is not None:
if f == 'url':
data_dict[f] = data_dict[f].strip()
source.__setattr__(f,data_dict[f])
source.__setattr__(f, data_dict[f])
# Avoids clashes with the dataset type
if 'source_type' in data_dict:
@ -424,14 +439,14 @@ def _update_harvest_source_object(context, data_dict):
# Don't change state unless explicitly set in the dict
if 'state' in data_dict:
source.active = data_dict.get('state') == 'active'
source.active = data_dict.get('state') == 'active'
# Don't commit yet, let package_create do it
source.add()
# Abort any pending jobs
if not source.active:
jobs = HarvestJob.filter(source=source,status=u'New')
jobs = HarvestJob.filter(source=source, status=u'New')
log.info('Harvest source %s not active, so aborting %i outstanding jobs', source_id, jobs.count())
if jobs:
for job in jobs:
@ -440,6 +455,7 @@ def _update_harvest_source_object(context, data_dict):
return source
def _delete_harvest_source_object(context, data_dict):
'''
Deletes an actual HarvestSource object with the id provided on the
@ -479,6 +495,7 @@ def _delete_harvest_source_object(context, data_dict):
return source
def _configure_db_logger(config):
# Log scope
#

View File

@ -32,6 +32,7 @@ REDIS_DB = 0
EXCHANGE_TYPE = 'direct'
EXCHANGE_NAME = 'ckan.harvest'
def get_connection():
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
if backend in ('amqp', 'ampq'): # "ampq" is for compat with old typo
@ -40,6 +41,7 @@ def get_connection():
return get_connection_redis()
raise Exception('not a valid queue type %s' % backend)
def get_connection_amqp():
try:
port = int(config.get('ckan.harvest.mq.port', PORT))
@ -60,12 +62,13 @@ def get_connection_amqp():
return pika.BlockingConnection(parameters)
def get_connection_redis():
import redis
return redis.StrictRedis(host=config.get('ckan.harvest.mq.hostname', HOSTNAME),
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
password=config.get('ckan.harvest.mq.password', None),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
password=config.get('ckan.harvest.mq.password', None),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
def get_gather_queue_name():
@ -77,6 +80,7 @@ def get_fetch_queue_name():
return 'ckan.harvest.{0}.fetch'.format(config.get('ckan.site_id',
'default'))
def get_gather_routing_key():
return 'ckanext-harvest:{0}:harvest_job_id'.format(
config.get('ckan.site_id', 'default'))
@ -121,8 +125,8 @@ def resubmit_jobs():
# 3 minutes for fetch and import max
if (datetime.datetime.now() - date_of_key).seconds > 180:
redis.rpush(get_fetch_routing_key(),
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
json.dumps({'harvest_object_id': key.split(':')[-1]})
)
redis.delete(key)
# gather queue
@ -133,31 +137,36 @@ def resubmit_jobs():
# 3 hours for a gather
if (datetime.datetime.now() - date_of_key).seconds > 7200:
redis.rpush(get_gather_routing_key(),
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
json.dumps({'harvest_job_id': key.split(':')[-1]})
)
redis.delete(key)
class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):
self.connection = connection
self.channel = channel
self.exchange = exchange
self.routing_key = routing_key
def send(self, body, **kw):
return self.channel.basic_publish(self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
delivery_mode=2, # make message persistent
),
**kw)
def close(self):
self.connection.close()
class RedisPublisher(object):
def __init__(self, redis, routing_key):
self.redis = redis ## not used
self.redis = redis # not used
self.routing_key = routing_key
def send(self, body, **kw):
value = json.dumps(body)
# remove if already there
@ -168,6 +177,7 @@ class RedisPublisher(object):
def close(self):
return
def get_publisher(routing_key):
connection = get_connection()
backend = config.get('ckan.harvest.mq.type', MQ_TYPE)
@ -317,7 +327,7 @@ def gather_callback(channel, method, header, body):
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id':id})
publisher.send({'harvest_object_id': id})
log.debug('Sent {0} objects to the fetch queue'.format(len(harvest_object_ids)))
else:
@ -325,7 +335,7 @@ def gather_callback(channel, method, header, body):
# * remove a harvester and it still has sources that are then refreshed
# * add a new harvester and restart CKAN but not the gather queue.
msg = 'System error - No harvester could be found for source type %s' % job.source.type
err = HarvestGatherError(message=msg,job=job)
err = HarvestGatherError(message=msg, job=job)
err.save()
log.error(msg)
@ -410,6 +420,7 @@ def fetch_callback(channel, method, header, body):
model.Session.remove()
channel.basic_ack(method.delivery_tag)
def fetch_and_import_stages(harvester, obj):
obj.fetch_started = datetime.datetime.utcnow()
obj.state = "FETCH"
@ -443,12 +454,14 @@ def fetch_and_import_stages(harvester, obj):
obj.save()
if obj.state == 'ERROR':
obj.report_status = 'errored'
elif obj.current == False:
elif obj.current is False:
obj.report_status = 'deleted'
elif len(model.Session.query(HarvestObject)
.filter_by(package_id = obj.package_id)
.limit(2)
.all()) == 2:
elif len(
model.Session.query(HarvestObject)
.filter_by(package_id=obj.package_id)
.limit(2)
.all()
) == 2:
obj.report_status = 'updated'
else:
obj.report_status = 'added'

View File

@ -1,9 +1,6 @@
import factory
import ckanext.harvest.model as harvest_model
try:
from ckan.new_tests.factories import _get_action_user_name
except ImportError:
from ckan.tests.factories import _get_action_user_name
from ckantoolkit.tests.factories import _get_action_user_name
from ckan.plugins import toolkit
@ -71,7 +68,7 @@ class HarvestObject(factory.Factory):
FACTORY_FOR = harvest_model.HarvestObject
_return_type = 'dict'
#source = factory.SubFactory(HarvestSourceObj)
# source = factory.SubFactory(HarvestSourceObj)
job = factory.SubFactory(HarvestJobObj)
@classmethod

View File

@ -138,7 +138,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
return self.respond_action(out)
# if we wanted to server a file from disk, then we'd call this:
#return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
# return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
self.respond('Mock CKAN doesnt recognize that call', status=400)
@ -188,7 +188,7 @@ def serve(port=PORT):
'''Runs a CKAN-alike app (over HTTP) that is used for harvesting tests'''
# Choose the directory to serve files from
#os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)),
# os.chdir(os.path.join(os.path.dirname(os.path.abspath(__file__)),
# 'mock_ckan_files'))
class TestServer(SocketServer.TCPServer):
@ -220,276 +220,274 @@ DATASETS = [
'groups': [{'id': 'group1-id', 'name': 'group1'}],
'extras': []},
{
"id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"name": "cabinet-office-energy-use",
"private": False,
"maintainer_email": None,
"revision_timestamp": "2010-11-23T22:34:55.089925",
"organization":
{
"description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister, and ensure the effective running of government. We are also the corporate headquarters for government, in partnership with HM Treasury, and we take the lead in certain critical policy areas.\r\nCO is a ministerial department, supported by 18 agencies and public bodies\r\n\r\nYou can find out more at https://www.gov.uk/government/organisations/cabinet-office",
"created": "2012-06-27T14:48:40.244951",
"title": "Cabinet Office",
"name": "cabinet-office",
"revision_timestamp": "2013-04-02T14:27:23.086886",
"is_organization": True,
"id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"name": "cabinet-office-energy-use",
"private": False,
"maintainer_email": None,
"revision_timestamp": "2010-11-23T22:34:55.089925",
"organization":
{
"description": "The Cabinet Office supports the Prime Minister and Deputy Prime Minister,"
" and ensure the effective running of government. We are also the corporate"
" headquarters for government, in partnership with HM Treasury, and we take"
" the lead in certain critical policy areas.\r\nCO is a ministerial department,"
" supported by 18 agencies and public bodies\r\n\r\nYou can find out more at"
" https://www.gov.uk/government/organisations/cabinet-office",
"created": "2012-06-27T14:48:40.244951",
"title": "Cabinet Office",
"name": "cabinet-office",
"revision_timestamp": "2013-04-02T14:27:23.086886",
"is_organization": True,
"state": "active",
"image_url": "",
"revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05",
"type": "organization",
"id": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"approval_status": "pending"
},
"update_frequency": "other",
"metadata_created": "2010-08-02T09:19:47.600853",
"last_major_modification": "2010-08-02T09:19:47.600853",
"metadata_modified": "2014-05-09T22:00:01.486366",
"temporal_granularity": "",
"author_email": None,
"geographic_granularity": "point",
"geographic_coverage": [],
"state": "active",
"image_url": "",
"revision_id": "4be8825d-d3f4-4fb2-b80b-43e36f574c05",
"type": "organization",
"id": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"approval_status": "pending"
},
"update_frequency": "other",
"metadata_created": "2010-08-02T09:19:47.600853",
"last_major_modification": "2010-08-02T09:19:47.600853",
"metadata_modified": "2014-05-09T22:00:01.486366",
"temporal_granularity": "",
"author_email": None,
"geographic_granularity": "point",
"geographic_coverage": [ ],
"state": "active",
"version": None,
"temporal_coverage-to": "",
"license_id": "uk-ogl",
"type": "dataset",
"published_via": "",
"resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"version": None,
"temporal_coverage-to": "",
"license_id": "uk-ogl",
"type": "dataset",
"published_via": "",
"resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"num_resources": 1,
"tags":
[
{
"vocabulary_id": None,
"display_name": "consumption",
"name": "consumption",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
{
"vocabulary_id": None,
"display_name": "energy",
"name": "energy",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "9f2ae723-602f-4290-80c4-6637ad617a45"
}
],
"precision": "",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"num_resources": 1,
"tags":
[
{
"vocabulary_id": None,
"display_name": "consumption",
"name": "consumption",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "84ce26de-6711-4e85-9609-f7d8a87b0fc8"
},
{
"vocabulary_id": None,
"display_name": "energy",
"name": "energy",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"id": "9f2ae723-602f-4290-80c4-6637ad617a45"
"taxonomy_url": "",
"groups": [{"id": "remote-group-id", "name": "remote-group"}],
"creator_user_id": None,
"national_statistic": "no",
"relationships_as_subject": [],
"num_tags": 8,
"update_frequency-other": "Real-time",
"isopen": True,
"url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/",
"notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.",
"owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"theme-secondary":
[
"Environment"
],
"extras":
[
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "categories",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "6813d71b-785b-4f56-b296-1b2acb34eed6"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "2010-07-30",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_released",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_updated",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "bff63465-4f96-44e7-bb87-6e66fff5e596"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "000000: ",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_coverage",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "414bcd35-b628-4218-99e2-639615183df8"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "point",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "no",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "national_statistic",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "9f04b202-3646-49be-b69e-7fa997399ff3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\","
" \"certification_type\": \"automatically awarded\", \"level\": \"raw\","
" \"title\": \"Cabinet Office 70 Whitehall energy use\","
" \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\","
" \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\","
" \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\","
" \"cert_title\": \"Basic Level Certificate\"}",
"revision_timestamp": "2014-11-12T02:52:35.048060",
"state": "active",
"key": "odi-certificate",
"revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394",
"id": "373a3cbb-d9c0-45a6-9a78-b95c86398766"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-from",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "39f72eed-6f76-4733-b636-7541cee3404f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-to",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Towns & Cities",
"revision_timestamp": "2015-03-16T18:10:08.802815",
"state": "active",
"key": "theme-primary",
"revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97",
"id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "[\"Environment\"]",
"revision_timestamp": "2015-04-08T20:57:04.895214",
"state": "active",
"key": "theme-secondary",
"revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c",
"id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Real-time",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "update_frequency",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "e8ad4837-514e-4446-81a2-ffacfa7cf683"
}
],
"license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/",
"individual_resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"title": "Cabinet Office 70 Whitehall energy use",
"revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375",
"date_released": "30/7/2010",
"theme-primary": "Towns & Cities"
}
],
"precision": "",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"taxonomy_url": "",
"groups": [{"id": "remote-group-id", "name": "remote-group"}],
"groups": [],
"creator_user_id": None,
"national_statistic": "no",
"relationships_as_subject": [],
"num_tags": 8,
"update_frequency-other": "Real-time",
"isopen": True,
"url": "http://www.carbonculture.net/orgs/cabinet-office/70-whitehall/",
"notes": "Cabinet Office head office energy use updated from on-site meters showing use, cost and carbon impact.",
"owner_org": "aa1e068a-23da-4563-b9c2-2cad272b663e",
"theme-secondary":
[
"Environment"
],
"extras":
[
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "categories",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "6813d71b-785b-4f56-b296-1b2acb34eed6"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "2010-07-30",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_released",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "515f638b-e2cf-40a6-a8a7-cbc8001269e3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "date_updated",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "bff63465-4f96-44e7-bb87-6e66fff5e596"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "000000: ",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_coverage",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "414bcd35-b628-4218-99e2-639615183df8"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "point",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "geographic_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "c7b460dd-c61f-4cd2-90c2-eceb6c91fe9b"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "no",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "national_statistic",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "9f04b202-3646-49be-b69e-7fa997399ff3"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "{\"status\": \"final\", \"source\": \"Automatically awarded by ODI\", \"certification_type\": \"automatically awarded\", \"level\": \"raw\", \"title\": \"Cabinet Office 70 Whitehall energy use\", \"created_at\": \"2014-10-28T12:25:57Z\", \"jurisdiction\": \"GB\", \"certificate_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922\", \"badge_url\": \"https://certificates.theodi.org/datasets/5480/certificates/17922/badge.png\", \"cert_title\": \"Basic Level Certificate\"}",
"revision_timestamp": "2014-11-12T02:52:35.048060",
"state": "active",
"key": "odi-certificate",
"revision_id": "eae9763b-e258-4d76-9ec2-7f5baf655394",
"id": "373a3cbb-d9c0-45a6-9a78-b95c86398766"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-from",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "39f72eed-6f76-4733-b636-7541cee3404f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_coverage-to",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "818e2c8f-fee0-49da-8bea-ea3c9401ece5"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "temporal_granularity",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "f868b950-d3ce-4fbe-88ca-5cbc4b672320"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Towns & Cities",
"revision_timestamp": "2015-03-16T18:10:08.802815",
"state": "active",
"key": "theme-primary",
"revision_id": "fc2b6630-84f8-4c88-8ac7-0ca275b2bc97",
"id": "bdcf00fd-3248-4c2f-9cf8-b90706c88e8d"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "[\"Environment\"]",
"revision_timestamp": "2015-04-08T20:57:04.895214",
"state": "active",
"key": "theme-secondary",
"revision_id": "c2c48530-ff82-4af1-9373-cdc64d5bc83c",
"id": "417482c5-a9c0-4430-8c4e-0c76e59fe44f"
},
{
"package_id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",
"value": "Real-time",
"revision_timestamp": "2010-08-02T09:19:47.600853",
"state": "active",
"key": "update_frequency",
"revision_id": "08bac459-1d44-44fb-b388-20f4d8394364",
"id": "e8ad4837-514e-4446-81a2-ffacfa7cf683"
}
],
"license_url": "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/",
"individual_resources":
[
{
"content_length": "69837",
"cache_url": "http://data.gov.uk/data/resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"hash": "6f1e452320dafbe9a5304ac77ed7a4ff79bfafc3",
"description": "70 Whitehall energy data",
"cache_last_updated": "2013-06-19T00:59:42.762642",
"url": "http://data.carbonculture.net/orgs/cabinet-office/70-whitehall/reports/elec00.csv",
"openness_score_failure_count": "0",
"format": "CSV",
"cache_filepath": "/mnt/shared/ckan_resource_cache/f1/f156019d-ea88-46a6-8fa3-3d12582e2161/elec00.csv",
"tracking_summary":
{
"total": 0,
"recent": 0
},
"last_modified": "2014-05-09T23:00:01.435211",
"mimetype": "text/csv",
"content_type": "text/csv",
"openness_score": "3",
"openness_score_reason": "open and standardized format",
"position": 0,
"revision_id": "4fca759e-d340-4e64-b75e-22ee1d42c2b4",
"id": "f156019d-ea88-46a6-8fa3-3d12582e2161",
"size": 299107
}
],
"title": "Cabinet Office 70 Whitehall energy use",
"revision_id": "3bd6ced3-35b2-4b20-94e2-c596e24bc375",
"date_released": "30/7/2010",
"theme-primary": "Towns & Cities"
}
]
INVALID_TAGS = [
@ -520,26 +518,26 @@ GROUPS = [
REVISIONS = [
{
"id": "23daf2eb-d7ec-4d86-a844-3924acd311ea",
"timestamp": "2015-10-21T09:50:08.160045",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": [ ]
"id": "23daf2eb-d7ec-4d86-a844-3924acd311ea",
"timestamp": "2015-10-21T09:50:08.160045",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": []
},
{
"id": "8254a293-10db-4af2-9dfa-6a1f06ee899c",
"timestamp": "2015-10-21T09:46:21.198021",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": [ ]
"id": "8254a293-10db-4af2-9dfa-6a1f06ee899c",
"timestamp": "2015-10-21T09:46:21.198021",
"message": "REST API: Update object dataset1",
"author": "ross",
"approved_timestamp": None,
"packages":
[
DATASETS[1]['id']
],
"groups": []
}]

View File

@ -4,13 +4,7 @@ from nose.tools import assert_equal, assert_in
from ckanext.harvest import model as harvest_model
from ckanext.harvest.harvesters.base import HarvesterBase, munge_tag
from mock import patch
try:
from ckan.tests import helpers
from ckan.tests import factories
except ImportError:
from ckan.new_tests import helpers
from ckan.new_tests import factories
from ckantoolkit.tests import helpers, factories
_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique
@ -44,15 +38,15 @@ class TestGenNewName(object):
assert_equal(
HarvesterBase._gen_new_name('Trees'),
'trees1')
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.default_dataset_name_append': 'random-hex'})
def test_random_config(self):
factories.Dataset(name='trees')
new_name = HarvesterBase._gen_new_name('Trees')
new_name = HarvesterBase._gen_new_name('Trees')
assert re.match('trees[\da-f]{5}', new_name)
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.default_dataset_name_append': 'random-hex'})
def test_config_override(self):
@ -138,7 +132,7 @@ class TestMungeTag:
# (original, expected)
munge_list = [
('unchanged', 'unchanged'),
#('s', 's_'), # too short
# ('s', 's_'), # too short
('some spaces here', 'some-spaces--here'),
('random:other%characters&_.here', 'randomothercharactershere'),
('river-water-dashes', 'river-water-dashes'),
@ -157,17 +151,17 @@ class TestMungeTag:
assert_equal(first_munge, exp)
second_munge = munge_tag(first_munge)
assert_equal(second_munge, exp)
def test_clean_tags_package_show(self):
instance = HarvesterBase()
tags_as_dict = [{u'vocabulary_id': None,
u'state': u'active',
u'display_name': name,
u'id': u'073080c8-fef2-4743-9c9e-6216019f8b3d',
u'name': name} for name,exp in self.munge_list]
tags_as_dict = [{u'vocabulary_id': None,
u'state': u'active',
u'display_name': name,
u'id': u'073080c8-fef2-4743-9c9e-6216019f8b3d',
u'name': name} for name, exp in self.munge_list]
clean_tags = HarvesterBase._clean_tags(instance, tags_as_dict)
idx = 0
for _, exp in self.munge_list:
tag = clean_tags[idx]
@ -176,11 +170,11 @@ class TestMungeTag:
def test_clean_tags_rest(self):
instance = HarvesterBase()
tags_as_str = [name for name,exp in self.munge_list]
tags_as_str = [name for name, exp in self.munge_list]
clean_tags = HarvesterBase._clean_tags(instance, tags_as_str)
assert_equal(len(clean_tags), len(tags_as_str))
for _, exp in self.munge_list:
assert_in(exp, clean_tags)

View File

@ -2,18 +2,15 @@ import copy
from nose.tools import assert_equal, assert_raises, assert_in
import json
from mock import patch, MagicMock
from mock import patch, MagicMock, Mock
from requests.exceptions import HTTPError, RequestException
try:
from ckan.tests.helpers import reset_db, call_action
from ckan.tests.factories import Organization, Group
except ImportError:
from ckan.new_tests.helpers import reset_db, call_action
from ckan.new_tests.factories import Organization, Group
from ckantoolkit.tests.helpers import reset_db, call_action
from ckantoolkit.tests.factories import Organization, Group
from ckan import model
from ckan.plugins import toolkit
from ckanext.harvest.harvesters.ckanharvester import ContentFetchError
from ckanext.harvest.tests.factories import (HarvestSourceObj, HarvestJobObj,
HarvestObjectObj)
from ckanext.harvest.tests.lib import run_harvest
@ -133,7 +130,8 @@ class TestCkanHarvester(object):
assert was_last_job_considered_error_free()
def test_harvest_invalid_tag(self):
from nose.plugins.skip import SkipTest; raise SkipTest()
from nose.plugins.skip import SkipTest
raise SkipTest()
results_by_guid = run_harvest(
url='http://localhost:%s/invalid_tag' % mock_ckan.PORT,
harvester=CKANHarvester())
@ -329,3 +327,40 @@ class TestCkanHarvester(object):
config=json.dumps(config))
assert_in('default_extras must be a dictionary',
str(harvest_context.exception))
@patch('ckanext.harvest.harvesters.ckanharvester.pyopenssl.inject_into_urllib3')
@patch('ckanext.harvest.harvesters.ckanharvester.CKANHarvester.config')
@patch('ckanext.harvest.harvesters.ckanharvester.requests.get', side_effect=RequestException('Test exception'))
def test_get_content_handles_request_exception(
self, mock_requests_get, mock_config, mock_pyopenssl_inject
):
mock_config.return_value = {}
harvester = CKANHarvester()
with assert_raises(ContentFetchError) as context:
harvester._get_content("http://test.example.gov.uk")
assert str(context.exception) == 'Request error: Test exception'
class MockHTTPError(HTTPError):
def __init__(self):
self.response = Mock()
self.response.status_code = 404
self.request = Mock()
self.request.url = "http://test.example.gov.uk"
@patch('ckanext.harvest.harvesters.ckanharvester.pyopenssl.inject_into_urllib3')
@patch('ckanext.harvest.harvesters.ckanharvester.CKANHarvester.config')
@patch('ckanext.harvest.harvesters.ckanharvester.requests.get', side_effect=MockHTTPError())
def test_get_content_handles_http_error(
self, mock_requests_get, mock_config, mock_pyopenssl_inject
):
mock_config.return_value = {}
harvester = CKANHarvester()
with assert_raises(ContentFetchError) as context:
harvester._get_content("http://test.example.gov.uk")
assert str(context.exception) == 'HTTP error: 404 http://test.example.gov.uk'

View File

@ -2,25 +2,11 @@ import json
import factories
import unittest
from mock import patch
from nose.tools import assert_equal, assert_raises
from nose.tools import assert_equal, assert_raises, assert_in
from nose.plugins.skip import SkipTest
try:
from nose.tools import assert_in
except ImportError:
from ckan.tests.helpers import assert_in
except ImportError:
from ckan.new_tests.helpers import assert_in
try:
import ipdb; ipdb.set_trace()
from ckan.tests import factories as ckan_factories
from ckan.tests.helpers import (_get_test_app, reset_db,
FunctionalTestBase)
except ImportError:
from ckan.new_tests import factories as ckan_factories
from ckan.new_tests.helpers import (_get_test_app, reset_db,
FunctionalTestBase)
from ckantoolkit.tests import factories as ckan_factories
from ckantoolkit.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
from ckan import plugins as p
from ckan.plugins import toolkit
@ -255,7 +241,7 @@ class TestHarvestSourceActionCreate(HarvestSourceActionBase):
for key in source_dict.keys():
assert_equal(source_dict[key], result[key])
# Check that source was actually created
source = harvest_model.HarvestSource.get(result['id'])
assert_equal(source.url, source_dict['url'])
@ -409,11 +395,11 @@ class TestActions(ActionBase):
job_1 = factories.HarvestJobObj(source=source_1)
dataset_1 = ckan_factories.Dataset()
object_1_ = factories.HarvestObjectObj(job=job_1, source=source_1,
package_id=dataset_1['id'])
package_id=dataset_1['id'])
job_2 = factories.HarvestJobObj(source=source_2)
dataset_2 = ckan_factories.Dataset()
object_2_ = factories.HarvestObjectObj(job=job_2, source=source_2,
package_id=dataset_2['id'])
package_id=dataset_2['id'])
# execute
context = {'model': model, 'session': model.Session,
@ -755,7 +741,7 @@ class TestHarvestDBLog(unittest.TestCase):
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_harvest_db_logger(self):
# Create source and check if harvest_log table is populated
data_dict = SOURCE_DICT.copy()
@ -763,11 +749,11 @@ class TestHarvestDBLog(unittest.TestCase):
source = factories.HarvestSourceObj(**data_dict)
content = 'Harvest source created: %s' % source.id
log = harvest_model.Session.query(harvest_model.HarvestLog).\
filter(harvest_model.HarvestLog.content==content).first()
filter(harvest_model.HarvestLog.content == content).first()
self.assertIsNotNone(log)
self.assertEqual(log.level, 'INFO')
context = {
'model': model,
'session': model.Session,
@ -780,7 +766,7 @@ class TestHarvestDBLog(unittest.TestCase):
self.assertIn('content', data[0])
self.assertIn('created', data[0])
self.assertTrue(data[0]['created'] > data[1]['created'])
per_page = 1
data = toolkit.get_action('harvest_log_list')(context, {'level': 'info', 'per_page': per_page})
self.assertEqual(len(data), per_page)

View File

@ -1,13 +1,11 @@
import logging
from pprint import pprint
from nose.plugins.skip import SkipTest;
from nose.plugins.skip import SkipTest
from ckan import model
from ckan.model import Package, Session
from ckan.lib.helpers import url_for,json
from ckan.model import Session
from ckan.lib.base import config
#TODO: remove references to old tests
# TODO: remove references to old tests
try:
from ckan.tests import CreateTestData
except ImportError:
@ -17,7 +15,6 @@ try:
except ImportError:
from ckan.tests.legacy.functional.base import FunctionalTestCase
from ckanext.harvest.plugin import Harvest
from ckanext.harvest.model import HarvestSource, HarvestJob, setup as harvest_model_setup
log = logging.getLogger(__name__)
@ -33,11 +30,11 @@ class HarvestAuthBaseCase():
def teardown_class(cls):
pass
def _test_auth_not_allowed(self,user_name = None, source = None, status = 401):
def _test_auth_not_allowed(self, user_name=None, source=None, status=401):
if not source:
# Create harvest source
source = HarvestSource(url=u'http://test-source.com',type='ckan')
source = HarvestSource(url=u'http://test-source.com', type='ckan')
Session.add(source)
Session.commit()
@ -47,19 +44,19 @@ class HarvestAuthBaseCase():
extra_environ = {}
# List
res = self.app.get('/harvest', status=status, extra_environ=extra_environ)
self.app.get('/harvest', status=status, extra_environ=extra_environ)
# Create
res = self.app.get('/harvest/new', status=status, extra_environ=extra_environ)
self.app.get('/harvest/new', status=status, extra_environ=extra_environ)
# Read
res = self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
self.app.get('/harvest/%s' % source.id, status=status, extra_environ=extra_environ)
# Edit
res = self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)
self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)
def _test_auth_allowed(self,user_name,auth_profile=None):
def _test_auth_allowed(self, user_name, auth_profile=None):
extra_environ={'REMOTE_USER': user_name.encode('utf8')}
extra_environ = {'REMOTE_USER': user_name.encode('utf8')}
# List
res = self.app.get('/harvest', extra_environ=extra_environ)
@ -71,7 +68,7 @@ class HarvestAuthBaseCase():
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert not 'publisher_id' in res
assert 'publisher_id' not in res
fv = res.forms['source-new']
fv['url'] = u'http://test-source.com'
@ -84,7 +81,7 @@ class HarvestAuthBaseCase():
fv['publisher_id'] = self.publisher1.id
res = fv.submit('save', extra_environ=extra_environ)
assert not 'Error' in res, res
assert 'Error' not in res, res
source = Session.query(HarvestSource).first()
assert source.url == u'http://test-source.com'
@ -102,13 +99,13 @@ class HarvestAuthBaseCase():
if auth_profile == 'publisher':
assert 'publisher_id' in res
else:
assert not 'publisher_id' in res
assert 'publisher_id' not in res
fv = res.forms['source-new']
fv['title'] = u'Test harvest source Updated'
res = fv.submit('save', extra_environ=extra_environ)
assert not 'Error' in res, res
assert 'Error' not in res, res
source = Session.query(HarvestSource).first()
assert source.title == u'Test harvest source Updated'
@ -120,16 +117,14 @@ class HarvestAuthBaseCase():
assert job.source_id == source.id
class TestAuthDefaultProfile(FunctionalTestCase,HarvestAuthBaseCase):
class TestAuthDefaultProfile(FunctionalTestCase, HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile','') != ''):
if (config.get('ckan.harvest.auth.profile', '') != ''):
raise SkipTest('Skipping default auth profile tests. Set ckan.harvest.auth.profile = \'\' to run them')
super(TestAuthDefaultProfile,cls).setup_class()
super(TestAuthDefaultProfile, cls).setup_class()
def setup(self):
CreateTestData.create()
@ -148,40 +143,41 @@ class TestAuthDefaultProfile(FunctionalTestCase,HarvestAuthBaseCase):
def test_auth_default_profile_notloggedin(self):
self._test_auth_not_allowed(status=302)
class TestAuthPublisherProfile(FunctionalTestCase,HarvestAuthBaseCase):
class TestAuthPublisherProfile(FunctionalTestCase, HarvestAuthBaseCase):
@classmethod
def setup_class(cls):
if (config.get('ckan.harvest.auth.profile') != 'publisher'):
raise SkipTest('Skipping publisher auth profile tests. Set ckan.harvest.auth.profile = \'publisher\' to run them')
super(TestAuthPublisherProfile,cls).setup_class()
super(TestAuthPublisherProfile, cls).setup_class()
def setup(self):
model.Session.remove()
CreateTestData.create(auth_profile='publisher')
self.sysadmin_user = model.User.get('testsysadmin')
self.normal_user = model.User.get('annafan') # Does not belong to a publisher
self.normal_user = model.User.get('annafan') # Does not belong to a publisher
self.publisher1_user = model.User.by_name('russianfan')
self.publisher2_user = model.User.by_name('tester')
# Create two Publishers
rev = model.repo.new_revision()
self.publisher1 = model.Group(name=u'test-publisher1',title=u'Test Publihser 1',type=u'publisher')
model.repo.new_revision()
self.publisher1 = model.Group(name=u'test-publisher1', title=u'Test Publihser 1', type=u'publisher')
Session.add(self.publisher1)
self.publisher2 = model.Group(name=u'test-publisher2',title=u'Test Publihser 2',type=u'publisher')
self.publisher2 = model.Group(name=u'test-publisher2', title=u'Test Publihser 2', type=u'publisher')
Session.add(self.publisher2)
member1 = model.Member(table_name = 'user',
table_id = self.publisher1_user.id,
group=self.publisher1,
capacity='admin')
member1 = model.Member(table_name='user',
table_id=self.publisher1_user.id,
group=self.publisher1,
capacity='admin')
Session.add(member1)
member2 = model.Member(table_name = 'user',
table_id = self.publisher2_user.id,
group=self.publisher2,
capacity='admin')
member2 = model.Member(table_name='user',
table_id=self.publisher2_user.id,
group=self.publisher2,
capacity='admin')
Session.add(member2)
Session.commit()
@ -196,15 +192,15 @@ class TestAuthPublisherProfile(FunctionalTestCase,HarvestAuthBaseCase):
self._test_auth_not_allowed(status=302)
def test_auth_publisher_profile_sysadmin(self):
self._test_auth_allowed(self.sysadmin_user.name,auth_profile='publisher')
self._test_auth_allowed(self.sysadmin_user.name, auth_profile='publisher')
def test_auth_publisher_profile_publisher(self):
self._test_auth_allowed(self.publisher1_user.name,auth_profile='publisher')
self._test_auth_allowed(self.publisher1_user.name, auth_profile='publisher')
def test_auth_publisher_profile_different_publisher(self):
# Create a source for publisher 1
source = HarvestSource(url=u'http://test-source.com',type='ckan',
source = HarvestSource(url=u'http://test-source.com', type='ckan',
publisher_id=self.publisher1.id)
Session.add(source)
Session.commit()
@ -227,4 +223,3 @@ class TestAuthPublisherProfile(FunctionalTestCase,HarvestAuthBaseCase):
res = self.app.get('/harvest/edit/%s' % source.id, status=status, extra_environ=extra_environ)
# Refresh
res = self.app.get('/harvest/refresh/%s' % source.id, status=status, extra_environ=extra_environ)

View File

@ -1,23 +1,8 @@
from ckan.lib.helpers import url_for
try:
from ckan.tests import helpers, factories
except ImportError:
from ckan.new_tests import helpers, factories
from ckantoolkit.tests import helpers, factories
from ckanext.harvest.tests import factories as harvest_factories
try:
from ckan.tests.helpers import assert_in
except ImportError:
# for ckan 2.2
try:
from nose.tools import assert_in
except ImportError:
# Python 2.6 doesn't have it
def assert_in(a, b, msg=None):
assert a in b, msg or '%r was not in %r' % (a, b)
from nose.tools import assert_in
import ckanext.harvest.model as harvest_model

View File

@ -1,7 +1,4 @@
try:
from ckan.tests.helpers import reset_db
except ImportError:
from ckan.new_tests.helpers import reset_db
from ckantoolkit.tests.helpers import reset_db
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester
@ -18,34 +15,35 @@ import uuid
class MockHarvester(SingletonPlugin):
implements(IHarvester)
def info(self):
return {'name': 'test', 'title': 'test', 'description': 'test'}
def gather_stage(self, harvest_job):
if harvest_job.source.url.startswith('basic_test'):
obj = HarvestObject(guid = 'test1', job = harvest_job)
obj = HarvestObject(guid='test1', job=harvest_job)
obj.extras.append(HarvestObjectExtra(key='key', value='value'))
obj2 = HarvestObject(guid = 'test2', job = harvest_job)
obj3 = HarvestObject(guid = 'test_to_delete', job = harvest_job)
obj2 = HarvestObject(guid='test2', job=harvest_job)
obj3 = HarvestObject(guid='test_to_delete', job=harvest_job)
obj.add()
obj2.add()
obj3.save() # this will commit both
obj3.save() # this will commit both
return [obj.id, obj2.id, obj3.id]
return []
def fetch_stage(self, harvest_object):
assert_equal(harvest_object.state, "FETCH")
assert harvest_object.fetch_started != None
assert harvest_object.fetch_started is not None
harvest_object.content = json.dumps({'name': harvest_object.guid})
harvest_object.save()
return True
def import_stage(self, harvest_object):
assert_equal(harvest_object.state, "IMPORT")
assert harvest_object.fetch_finished != None
assert harvest_object.import_started != None
assert harvest_object.fetch_finished is not None
assert harvest_object.import_started is not None
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
@ -68,9 +66,10 @@ class MockHarvester(SingletonPlugin):
# set previous objects to not current
previous_object = model.Session.query(HarvestObject) \
.filter(HarvestObject.guid==harvest_object.guid) \
.filter(HarvestObject.current==True) \
.first()
.filter(HarvestObject.guid == harvest_object.guid) \
.filter(
HarvestObject.current == True # noqa: E712
).first()
if previous_object:
previous_object.current = False
previous_object.save()
@ -95,13 +94,12 @@ class TestHarvestQueue(object):
def test_01_basic_harvester(self):
### make sure queues/exchanges are created first and are empty
# make sure queues/exchanges are created first and are empty
consumer = queue.get_gather_consumer()
consumer_fetch = queue.get_fetch_consumer()
consumer.queue_purge(queue=queue.get_gather_queue_name())
consumer_fetch.queue_purge(queue=queue.get_fetch_queue_name())
user = logic.get_action('get_site_user')(
{'model': model, 'ignore_auth': True}, {}
)['name']
@ -140,7 +138,7 @@ class TestHarvestQueue(object):
{'id': job_id}
)['status'] == u'Running'
## pop on item off the queue and run the callback
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
@ -152,11 +150,10 @@ class TestHarvestQueue(object):
assert all_objects[1].state == 'WAITING'
assert all_objects[2].state == 'WAITING'
assert len(model.Session.query(HarvestObject).all()) == 3
assert len(model.Session.query(HarvestObjectExtra).all()) == 1
## do three times as three harvest objects
# do three times as three harvest objects
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
queue.fetch_callback(consumer_fetch, *reply)
reply = consumer_fetch.basic_get(queue='ckan.harvest.fetch')
@ -165,8 +162,8 @@ class TestHarvestQueue(object):
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type=='dataset') \
.count()
.filter(model.Package.type == 'dataset') \
.count()
assert count == 3
all_objects = model.Session.query(HarvestObject).filter_by(current=True).all()
@ -178,10 +175,10 @@ class TestHarvestQueue(object):
assert_equal(all_objects[2].state, 'COMPLETE')
assert_equal(all_objects[2].report_status, 'added')
## fire run again to check if job is set to Finished
# fire run again to check if job is set to Finished
logic.get_action('harvest_jobs_run')(
context,
{'source_id':harvest_source['id']}
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
@ -197,12 +194,12 @@ class TestHarvestQueue(object):
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0, 'not modified': 0, 'errored': 0, 'deleted': 0})
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 3, 'updated': 0,
'not modified': 0, 'errored': 0, 'deleted': 0})
assert_equal(harvest_source_dict['status']['total_datasets'], 3)
assert_equal(harvest_source_dict['status']['job_count'], 1)
########### Second run ########################
# Second run
harvest_job = logic.get_action('harvest_job_create')(
context,
{'source_id': harvest_source['id'], 'run': True}
@ -214,7 +211,7 @@ class TestHarvestQueue(object):
{'id': job_id}
)['status'] == u'Running'
## pop on item off the queue and run the callback
# pop on item off the queue and run the callback
reply = consumer.basic_get(queue='ckan.harvest.gather')
queue.gather_callback(consumer, *reply)
@ -230,8 +227,8 @@ class TestHarvestQueue(object):
queue.fetch_callback(consumer_fetch, *reply)
count = model.Session.query(model.Package) \
.filter(model.Package.type=='dataset') \
.count()
.filter(model.Package.type == 'dataset') \
.count()
assert_equal(count, 3)
all_objects = model.Session.query(HarvestObject).filter_by(report_status='added').all()
@ -246,7 +243,7 @@ class TestHarvestQueue(object):
# run to make sure job is marked as finshed
logic.get_action('harvest_jobs_run')(
context,
{'source_id':harvest_source['id']}
{'source_id': harvest_source['id']}
)
harvest_job = logic.get_action('harvest_job_show')(
@ -260,11 +257,11 @@ class TestHarvestQueue(object):
{'id': harvest_source['id']}
)
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2, 'not modified': 0, 'errored': 0, 'deleted': 1})
assert_equal(harvest_source_dict['status']['last_job']['stats'], {'added': 0, 'updated': 2,
'not modified': 0, 'errored': 0, 'deleted': 1})
assert_equal(harvest_source_dict['status']['total_datasets'], 2)
assert_equal(harvest_source_dict['status']['job_count'], 2)
def test_redis_queue_purging(self):
'''
Test that Redis queue purging doesn't purge the wrong keys.

View File

@ -4,11 +4,8 @@
import json
from nose.tools import assert_equal
from ckantoolkit.tests.helpers import reset_db
try:
from ckan.tests.helpers import reset_db
except ImportError:
from ckan.new_tests.helpers import reset_db
from ckan import model
from ckan import plugins as p
from ckan.plugins import toolkit

View File

@ -2,3 +2,4 @@ pika==0.9.8
redis==2.10.1
requests==2.20.0
pyOpenSSL==18.0.0
ckantoolkit==0.0.3

View File

@ -22,3 +22,6 @@ previous = true
domain = ckanext-harvest
directory = i18n
statistics = true
[flake8]
max-line-length = 127