Merge branch '157-version-three-apify'

This commit is contained in:
amercader 2016-02-17 10:08:27 +00:00
commit 7f506913f8
8 changed files with 406 additions and 202 deletions

View File

@ -8,12 +8,14 @@ ckanext-harvest - Remote harvesting extension
This extension provides a common harvesting framework for ckan extensions
and adds a CLI and a WUI to CKAN to manage harvesting sources and jobs.
Requires CKAN 2.0 or later.
Installation
============
This extension requires CKAN v2.0 or later on both the CKAN it is installed
into and the CKANs it harvests. However you are unlikely to encounter a CKAN
running a version lower than 2.0.
1. The harvest extension can use two different backends. You can choose whichever
you prefer depending on your needs, but Redis has been found to be more stable
and reliable so it is the recommended one:
@ -690,6 +692,26 @@ following steps with the one you are using.
You can of course modify this periodicity, this `Wikipedia page <http://en.wikipedia.org/wiki/Cron#CRON_expression>`_
has a good overview of the crontab syntax.
Tests
=====
You can run the tests like this:
cd ckanext-harvest
nosetests --reset-db --ckan --with-pylons=test-core.ini ckanext/harvest/tests
Here are some common errors and solutions:
* ``(OperationalError) no such table: harvest_object_error u'delete from "harvest_object_error"``
The database has got into in a bad state. Run the tests again but with the ``--reset-db`` parameter.
* ``(ProgrammingError) relation "harvest_object_extra" does not exist``
The database has got into in a bad state. Run the tests again but *without* the ``--reset-db`` parameter.
* ``(OperationalError) near "SET": syntax error``
You are testing with SQLite as the database, but the CKAN Harvester needs PostgreSQL. Specify test-core.ini instead of test.ini.
Community
=========

View File

@ -216,14 +216,33 @@ class HarvesterBase(SingletonPlugin):
except Exception, e:
self._save_gather_error('%r' % e.message, harvest_job)
def _create_or_update_package(self, package_dict, harvest_object):
def _create_or_update_package(self, package_dict, harvest_object,
package_dict_form='rest'):
'''
Creates a new package or updates an exisiting one according to the
package dictionary provided. The package dictionary should look like
the REST API response for a package:
package dictionary provided.
http://ckan.net/api/rest/package/statistics-catalunya
The package dictionary can be in one of two forms:
1. 'rest' - as seen on the RESTful API:
http://datahub.io/api/rest/dataset/1996_population_census_data_canada
This is the legacy form. It is the default to provide backward
compatibility.
* 'extras' is a dict e.g. {'theme': 'health', 'sub-theme': 'cancer'}
* 'tags' is a list of strings e.g. ['large-river', 'flood']
2. 'package_show' form, as provided by the Action API (CKAN v2.0+):
http://datahub.io/api/action/package_show?id=1996_population_census_data_canada
* 'extras' is a list of dicts
e.g. [{'key': 'theme', 'value': 'health'},
{'key': 'sub-theme', 'value': 'cancer'}]
* 'tags' is a list of dicts
e.g. [{'name': 'large-river'}, {'name': 'flood'}]
Note that the package_dict must contain an id, which will be used to
check if the package needs to be created or updated (use the remote
@ -241,6 +260,7 @@ class HarvesterBase(SingletonPlugin):
use the output of package_show logic function (maybe keeping support
for rest api based dicts
'''
assert package_dict_form in ('rest', 'package_show')
try:
# Change default schema
schema = default_create_package_schema()
@ -274,6 +294,7 @@ class HarvesterBase(SingletonPlugin):
# Check if package exists
try:
# _find_existing_package can be overridden if necessary
existing_package_dict = self._find_existing_package(package_dict)
# In case name has been modified when first importing. See issue #101.
@ -286,11 +307,14 @@ class HarvesterBase(SingletonPlugin):
# Update package
context.update({'id':package_dict['id']})
package_dict.setdefault('name',
existing_package_dict['name'])
new_package = p.toolkit.get_action('package_update_rest')(context, package_dict)
existing_package_dict['name'])
new_package = p.toolkit.get_action(
'package_update' if package_dict_form == 'package_show'
else 'package_update_rest')(context, package_dict)
else:
log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid)
log.info('No changes to package with GUID %s, skipping...' % harvest_object.guid)
# NB harvest_object.current/package_id are not set
return 'unchanged'
@ -332,7 +356,9 @@ class HarvesterBase(SingletonPlugin):
model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
model.Session.flush()
new_package = p.toolkit.get_action('package_create_rest')(context, package_dict)
new_package = p.toolkit.get_action(
'package_create' if package_dict_form == 'package_show'
else 'package_create_rest')(context, package_dict)
Session.commit()

View File

@ -1,16 +1,19 @@
import urllib
import urllib2
import httplib
import datetime
import socket
from sqlalchemy import exists
from ckan.lib.base import c
from ckan import model
from ckan.model import Session, Package
from ckan.logic import ValidationError, NotFound, get_action
from ckan.lib.helpers import json
from ckan.lib.munge import munge_name
from simplejson.scanner import JSONDecodeError
from ckan.plugins import toolkit
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
HarvestObjectError
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError
import logging
log = logging.getLogger(__name__)
@ -27,14 +30,11 @@ class CKANHarvester(HarvesterBase):
api_version = 2
action_api_version = 3
def _get_rest_api_offset(self):
return '/api/%d/rest' % self.api_version
def _get_action_api_offset(self):
return '/api/%d/action' % self.action_api_version
def _get_search_api_offset(self):
return '/api/%d/search' % self.api_version
return '%s/package_search' % self._get_action_api_offset()
def _get_content(self, url):
http_request = urllib2.Request(url=url)
@ -54,10 +54,15 @@ class CKANHarvester(HarvesterBase):
raise ContentFetchError('URL error: %s' % e.reason)
except httplib.HTTPException, e:
raise ContentFetchError('HTTP Exception: %s' % e)
except socket.error, e:
raise ContentFetchError('HTTP socket error: %s' % e)
except Exception, e:
raise ContentFetchError('HTTP general exception: %s' % e)
return http_response.read()
def _get_group(self, base_url, group_name):
url = base_url + self._get_rest_api_offset() + '/group/' + munge_name(group_name)
url = base_url + self._get_action_api_offset() + '/group_show?id=' + \
munge_name(group_name)
try:
content = self._get_content(url)
return json.loads(content)
@ -151,149 +156,215 @@ class CKANHarvester(HarvesterBase):
def gather_stage(self, harvest_job):
log.debug('In CKANHarvester gather_stage (%s)',
harvest_job.source.url)
toolkit.requires_ckan_version(min_version='2.0')
get_all_packages = True
package_ids = []
self._set_config(harvest_job.source.config)
# Check if this source has been harvested before
previous_job = Session.query(HarvestJob) \
.filter(HarvestJob.source==harvest_job.source) \
.filter(HarvestJob.gather_finished!=None) \
.filter(HarvestJob.id!=harvest_job.id) \
.order_by(HarvestJob.gather_finished.desc()) \
.limit(1).first()
# Get source URL
base_url = harvest_job.source.url.rstrip('/')
base_rest_url = base_url + self._get_rest_api_offset()
base_search_url = base_url + self._get_search_api_offset()
remote_ckan_base_url = harvest_job.source.url.rstrip('/')
# Filter in/out datasets from particular organizations
fq_terms = []
org_filter_include = self.config.get('organizations_filter_include', [])
org_filter_exclude = self.config.get('organizations_filter_exclude', [])
def get_pkg_ids_for_organizations(orgs):
pkg_ids = set()
for organization in orgs:
url = base_search_url + '/dataset?organization=%s' % organization
content = self._get_content(url)
content_json = json.loads(content)
result_count = int(content_json['count'])
pkg_ids |= set(content_json['results'])
while len(pkg_ids) < result_count or not content_json['results']:
url = base_search_url + '/dataset?organization=%s&offset=%s' % (organization, len(pkg_ids))
content = self._get_content(url)
content_json = json.loads(content)
pkg_ids |= set(content_json['results'])
return pkg_ids
include_pkg_ids = get_pkg_ids_for_organizations(org_filter_include)
exclude_pkg_ids = get_pkg_ids_for_organizations(org_filter_exclude)
if org_filter_include:
fq_terms.append(' OR '.join(
'organization:%s' % org_name for org_name in org_filter_include))
elif org_filter_exclude:
fq_terms.extend(
'-organization:%s' % org_name for org_name in org_filter_exclude)
if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0):
if not self.config.get('force_all',False):
get_all_packages = False
# Ideally we can request from the remote CKAN only those datasets
# modified since the last completely successful harvest.
last_error_free_job = self._last_error_free_job(harvest_job)
log.debug('Last error-free job: %r', last_error_free_job)
if (last_error_free_job and
not self.config.get('force_all', False)):
get_all_packages = False
# Request only the packages modified since last harvest job
last_time = previous_job.gather_finished.isoformat()
url = base_search_url + '/revision?since_time=%s' % last_time
# Request only the datasets modified since
last_time = last_error_free_job.gather_started
# Note: SOLR works in UTC, and gather_started is also UTC, so
# this should work as long as local and remote clocks are
# relatively accurate. Going back a little earlier, just in case.
get_changes_since = \
(last_time - datetime.timedelta(hours=1)).isoformat()
log.info('Searching for datasets modified since: %s UTC',
get_changes_since)
try:
content = self._get_content(url)
revision_ids = json.loads(content)
if len(revision_ids):
for revision_id in revision_ids:
url = base_rest_url + '/revision/%s' % revision_id
try:
content = self._get_content(url)
except ContentFetchError,e:
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
continue
revision = json.loads(content)
package_ids = revision['packages']
else:
log.info('No revisions since last harvest %s',
last_time)
return []
except ContentNotFoundError, e:
log.info('No revisions since last harvest %s', last_time)
return []
except ContentFetchError, e:
# Any other error indicates that revision filtering is not
# working for whatever reason, so fallback to just getting
# all the packages, which is expensive but reliable.
log.info('CKAN instance %s does not suport revision '
'filtering: %s',
base_url, e)
get_all_packages = True
if get_all_packages:
# Request all remote packages
url = base_rest_url + '/package'
fq_since_last_time = 'metadata_modified:[{since}Z TO *]' \
.format(since=get_changes_since)
try:
content = self._get_content(url)
package_ids = json.loads(content)
except ContentFetchError,e:
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
return None
except JSONDecodeError,e:
self._save_gather_error('Unable to decode content for URL: %s: %s' % (url, str(e)),harvest_job)
pkg_dicts = self._search_for_datasets(
remote_ckan_base_url,
fq_terms + [fq_since_last_time])
except SearchError, e:
log.info('Searching for datasets changed since last time '
'gave an error: %s', e)
get_all_packages = True
if not get_all_packages and not pkg_dicts:
log.info('No datasets have been updated on the remote '
'CKAN instance since the last harvest job %s',
last_time)
return None
if org_filter_include:
package_ids = set(package_ids) & include_pkg_ids
elif org_filter_exclude:
package_ids = set(package_ids) - exclude_pkg_ids
try:
object_ids = []
if len(package_ids):
for package_id in package_ids:
# Create a new HarvestObject for this identifier
obj = HarvestObject(guid = package_id, job = harvest_job)
obj.save()
object_ids.append(obj.id)
return object_ids
else:
self._save_gather_error('No packages received for URL: %s' % url,
harvest_job)
return None
except Exception, e:
self._save_gather_error('%r'%e.message,harvest_job)
def fetch_stage(self,harvest_object):
log.debug('In CKANHarvester fetch_stage')
self._set_config(harvest_object.job.source.config)
# Get source URL
url = harvest_object.source.url.rstrip('/')
url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid
# Get contents
try:
content = self._get_content(url)
except ContentFetchError,e:
self._save_object_error('Unable to get content for package: %s: %r' % \
(url, e),harvest_object)
# Fall-back option - request all the datasets from the remote CKAN
if get_all_packages:
# Request all remote packages
try:
pkg_dicts = self._search_for_datasets(remote_ckan_base_url,
fq_terms)
except SearchError, e:
log.info('Searching for all datasets gave an error: %s', e)
self._save_gather_error(
'Unable to search remote CKAN for datasets:%s url:%s'
'terms:%s' % (e, remote_ckan_base_url, fq_terms),
harvest_job)
return None
if not pkg_dicts:
self._save_gather_error(
'No datasets found at CKAN: %s' % remote_ckan_base_url,
harvest_job)
return None
# Save the fetched contents in the HarvestObject
harvest_object.content = content
harvest_object.save()
return True
# Create harvest objects for each dataset
try:
package_ids = set()
object_ids = []
for pkg_dict in pkg_dicts:
if pkg_dict['id'] in package_ids:
log.info('Discarding duplicate dataset %s - probably due '
'to datasets being changed at the same time as '
'when the harvester was paging through',
pkg_dict['id'])
continue
package_ids.add(pkg_dict['id'])
log.debug('Creating HarvestObject for %s %s',
pkg_dict['name'], pkg_dict['id'])
obj = HarvestObject(guid=pkg_dict['id'],
job=harvest_job,
content=json.dumps(pkg_dict))
obj.save()
object_ids.append(obj.id)
return object_ids
except Exception, e:
self._save_gather_error('%r' % e.message, harvest_job)
def _search_for_datasets(self, remote_ckan_base_url, fq_terms=None):
'''Does a dataset search on a remote CKAN and returns the results.
Deals with paging to return all the results, not just the first page.
'''
base_search_url = remote_ckan_base_url + self._get_search_api_offset()
params = {'rows': '100', 'start': '0'}
# There is the worry that datasets will be changed whilst we are paging
# through them.
# * In SOLR 4.7 there is a cursor, but not using that yet
# because few CKANs are running that version yet.
# * However we sort, then new names added or removed before the current
# page would cause existing names on the next page to be missed or
# double counted.
# * Another approach might be to sort by metadata_modified and always
# ask for changes since (and including) the date of the last item of
# the day before. However if the entire page is of the exact same
# time, then you end up in an infinite loop asking for the same page.
# * We choose a balanced approach of sorting by ID, which means
# datasets are only missed if some are removed, which is far less
# likely than any being added. If some are missed then it is assumed
# they will harvested the next time anyway. When datasets are added,
# we are at risk of seeing datasets twice in the paging, so we detect
# and remove any duplicates.
params['sort'] = 'id asc'
if fq_terms:
params['fq'] = ' '.join(fq_terms)
pkg_dicts = []
pkg_ids = set()
previous_content = None
while True:
url = base_search_url + '?' + urllib.urlencode(params)
log.debug('Searching for CKAN datasets: %s', url)
try:
content = self._get_content(url)
except ContentFetchError, e:
raise SearchError(
'Error sending request to search remote '
'CKAN instance %s using URL %r. Error: %s' %
(remote_ckan_base_url, url, e))
if previous_content and content == previous_content:
raise SearchError('The paging doesn\'t seem to work. URL: %s' %
url)
try:
response_dict = json.loads(content)
except ValueError:
raise SearchError('Response from remote CKAN was not JSON: %r'
% content)
try:
pkg_dicts_page = response_dict.get('result', {}).get('results',
[])
except ValueError:
raise SearchError('Response JSON did not contain '
'result/results: %r' % response_dict)
# Weed out any datasets found on previous pages (should datasets be
# changing while we page)
ids_in_page = set(p['id'] for p in pkg_dicts_page)
duplicate_ids = ids_in_page & pkg_ids
if duplicate_ids:
pkg_dicts_page = [p for p in pkg_dicts_page
if p['id'] not in duplicate_ids]
pkg_ids |= ids_in_page
pkg_dicts.extend(pkg_dicts_page)
if len(pkg_dicts_page) == 0:
break
params['start'] = str(int(params['start']) + int(params['rows']))
return pkg_dicts
@classmethod
def _last_error_free_job(cls, harvest_job):
# TODO weed out cancelled jobs somehow.
# look for jobs with no gather errors
jobs = \
model.Session.query(HarvestJob) \
.filter(HarvestJob.source == harvest_job.source) \
.filter(HarvestJob.gather_started != None) \
.filter(HarvestJob.status == 'Finished') \
.filter(HarvestJob.id != harvest_job.id) \
.filter(
~exists().where(
HarvestGatherError.harvest_job_id == HarvestJob.id)) \
.order_by(HarvestJob.gather_started.desc())
# now check them until we find one with no fetch/import errors
# (looping rather than doing sql, in case there are lots of objects
# and lots of jobs)
for job in jobs:
for obj in job.objects:
if obj.current is False and \
obj.report_status != 'not modified':
# unsuccessful, so go onto the next job
break
else:
return job
def fetch_stage(self, harvest_object):
# Nothing to do here - we got the package dict in the search in the
# gather stage
return True
def import_stage(self, harvest_object):
log.debug('In CKANHarvester import_stage')
context = {'model': model, 'session': Session,
context = {'model': model, 'session': model.Session,
'user': self._get_user_name()}
if not harvest_object:
log.error('No harvest object received')
@ -411,38 +482,39 @@ class CKANHarvester(HarvesterBase):
if default_groups:
if not 'groups' in package_dict:
package_dict['groups'] = []
package_dict['groups'].extend([g for g in default_groups if g not in package_dict['groups']])
# Find any extras whose values are not strings and try to convert
# them to strings, as non-string extras are not allowed anymore in
# CKAN 2.0.
for key in package_dict['extras'].keys():
if not isinstance(package_dict['extras'][key], basestring):
try:
package_dict['extras'][key] = json.dumps(
package_dict['extras'][key])
except TypeError:
# If converting to a string fails, just delete it.
del package_dict['extras'][key]
package_dict['groups'].extend(
[g for g in default_groups
if g not in package_dict['groups']])
# Set default extras if needed
default_extras = self.config.get('default_extras', {})
def get_extra(key, package_dict):
for extra in package_dict.get('extras', []):
if extra['key'] == key:
return extra
if default_extras:
override_extras = self.config.get('override_extras', False)
if not 'extras' in package_dict:
package_dict['extras'] = {}
for key, value in default_extras.iteritems():
if not key in package_dict['extras'] or override_extras:
# Look for replacement strings
if isinstance(value,basestring):
value = value.format(harvest_source_id=harvest_object.job.source.id,
harvest_source_url=harvest_object.job.source.url.strip('/'),
harvest_source_title=harvest_object.job.source.title,
harvest_job_id=harvest_object.job.id,
harvest_object_id=harvest_object.id,
dataset_id=package_dict['id'])
existing_extra = get_extra(key, package_dict)
if existing_extra and not override_extras:
continue # no need for the default
if existing_extra:
package_dict['extras'].remove(existing_extra)
# Look for replacement strings
if isinstance(value, basestring):
value = value.format(
harvest_source_id=harvest_object.job.source.id,
harvest_source_url=
harvest_object.job.source.url.strip('/'),
harvest_source_title=
harvest_object.job.source.title,
harvest_job_id=harvest_object.job.id,
harvest_object_id=harvest_object.id,
dataset_id=package_dict['id'])
package_dict['extras'][key] = value
package_dict['extras'].append({'key': key, 'value': value})
for resource in package_dict.get('resources', []):
# Clear remote url_type for resources (eg datastore, upload) as
@ -455,24 +527,8 @@ class CKANHarvester(HarvesterBase):
# key.
resource.pop('revision_id', None)
result = self._create_or_update_package(package_dict,harvest_object)
if result is True and self.config.get('read_only', False) is True:
package = model.Package.get(package_dict['id'])
# Clear default permissions
model.clear_user_roles(package)
# Setup harvest user as admin
user_name = self.config.get('user',u'harvest')
user = model.User.get(user_name)
pkg_role = model.PackageRole(package=package, user=user, role=model.Role.ADMIN)
# Other users can only read
for user_name in (u'visitor',u'logged_in'):
user = model.User.get(user_name)
pkg_role = model.PackageRole(package=package, user=user, role=model.Role.READER)
result = self._create_or_update_package(
package_dict, harvest_object, package_dict_form='package_show')
return result
except ValidationError, e:
@ -480,7 +536,8 @@ class CKANHarvester(HarvesterBase):
(harvest_object.guid, e.error_dict),
harvest_object, 'Import')
except Exception, e:
self._save_object_error('%r'%e,harvest_object,'Import')
self._save_object_error('%s' % e, harvest_object, 'Import')
class ContentFetchError(Exception):
pass
@ -490,3 +547,7 @@ class ContentNotFoundError(ContentFetchError):
class RemoteResourceError(Exception):
pass
class SearchError(Exception):
pass

View File

@ -442,17 +442,21 @@ def harvest_jobs_run(context, data_dict):
if len(jobs):
for job in jobs:
if job['gather_finished']:
objects = \
num_objects_in_progress = \
session.query(HarvestObject.id) \
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(and_((HarvestObject.state != u'COMPLETE'),
(HarvestObject.state != u'ERROR'))) \
.order_by(HarvestObject.import_finished.desc())
.count()
if objects.count() == 0:
if num_objects_in_progress == 0:
job_obj = HarvestJob.get(job['id'])
job_obj.status = u'Finished'
log.info('Marking job as finished %s %s',
job_obj.source.url, job_obj.id)
# save the time of finish, according to the last running
# object
last_object = session.query(HarvestObject) \
.filter(HarvestObject.harvest_job_id == job['id']) \
.filter(HarvestObject.import_finished != None) \
@ -463,7 +467,7 @@ def harvest_jobs_run(context, data_dict):
else:
job_obj.finished = job['gather_finished']
job_obj.save()
log.info('Marking job as finished: %s', job_obj)
# Reindex the harvest source dataset so it has the latest
# status
get_action('harvest_source_reindex')(

View File

@ -1,6 +1,7 @@
import json
import re
import copy
import urllib
import SimpleHTTPServer
import SocketServer
@ -22,6 +23,8 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
self.test_name = None
else:
self.path = re.sub('^/([^/]+)/', '/', self.path)
if self.test_name == 'site_down':
return self.respond('Site is down', status=500)
# The API version is recorded and then removed from the path
api_version = None
@ -36,6 +39,9 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
else:
dataset_refs = [d['name'] for d in DATASETS]
return self.respond_json(dataset_refs)
if self.path == '/api/action/package_list':
dataset_names = [d['name'] for d in DATASETS]
return self.respond_action(dataset_names)
if self.path.startswith('/api/rest/package/'):
dataset_ref = self.path.split('/')[-1]
dataset = self.get_dataset(dataset_ref)
@ -47,7 +53,7 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
dataset_ref = params['id']
dataset = self.get_dataset(dataset_ref)
if dataset:
return self.respond_json(dataset)
return self.respond_action(dataset)
if self.path.startswith('/api/search/dataset'):
params = self.get_url_params()
if params.keys() == ['organization']:
@ -69,6 +75,52 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
if rev['id'] == revision_ref:
return self.respond_json(rev)
self.respond('Cannot find revision', status=404)
# /api/3/action/package_search?fq=metadata_modified:[2015-10-23T14:51:13.282361Z TO *]&rows=1000
if self.path.startswith('/api/action/package_search'):
params = self.get_url_params()
if self.test_name == 'datasets_added':
if params['start'] == '0':
# when page 1 is retrieved, the site only has 1 dataset
datasets = [DATASETS[0]['name']]
elif params['start'] == '100':
# when page 2 is retrieved, the site now has new datasets,
# and so the second page has the original dataset, pushed
# onto this page now, plus a new one
datasets = [DATASETS[0]['name'],
DATASETS[1]['name']]
else:
datasets = []
else:
# ignore sort param for now
if 'sort' in params:
del params['sort']
if params['start'] != '0':
datasets = []
elif set(params.keys()) == set(['rows', 'start']):
datasets = ['dataset1', DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == '-organization:org1':
datasets = [DATASETS[1]['name']]
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
params['fq'] == 'organization:org1':
datasets = ['dataset1']
elif set(params.keys()) == set(['fq', 'rows', 'start']) and \
'metadata_modified' in params['fq']:
assert '+TO+' not in params['fq'], \
'Spaces should not be decoded by now - seeing + '\
'means they were double encoded and SOLR doesnt like '\
'that'
datasets = [DATASETS[1]['name']]
else:
return self.respond(
'Not implemented search params %s' % params,
status=400)
out = {'count': len(datasets),
'results': [self.get_dataset(dataset_ref_)
for dataset_ref_ in datasets]}
return self.respond_action(out)
# if we wanted to server a file from disk, then we'd call this:
#return SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
@ -90,11 +142,13 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
return org
def get_url_params(self):
params = self.path.split('?')[-1].split('&')
params_str = self.path.split('?')[-1]
params_unicode = urllib.unquote_plus(params_str).decode('utf8')
params = params_unicode.split('&')
return dict([param.split('=') for param in params])
def respond_action(self, result_dict, status=200):
response_dict = {'result': result_dict}
response_dict = {'result': result_dict, 'success': True}
return self.respond_json(response_dict, status=status)
def respond_json(self, content_dict, status=200):
@ -141,6 +195,7 @@ DATASETS = [
'name': 'dataset1',
'title': 'Test Dataset1',
'owner_org': 'org1-id',
'tags': [{'name': 'test-tag'}],
'extras': []},
{
"id": "1c65c66a-fdec-4138-9c64-0f9bf087bcbb",

View File

@ -2,7 +2,7 @@ import copy
from nose.tools import assert_equal
import json
from mock import patch
from mock import patch, MagicMock
try:
from ckan.tests.helpers import reset_db
@ -24,6 +24,16 @@ import mock_ckan
mock_ckan.serve()
def was_last_job_considered_error_free():
last_job = model.Session.query(harvest_model.HarvestJob) \
.order_by(harvest_model.HarvestJob.created.desc()) \
.first()
job = MagicMock()
job.source = last_job.source
job.id = ''
return bool(CKANHarvester._last_error_free_job(job))
class TestCkanHarvester(object):
@classmethod
def setup(cls):
@ -37,38 +47,40 @@ class TestCkanHarvester(object):
harvester = CKANHarvester()
obj_ids = harvester.gather_stage(job)
assert_equal(job.gather_errors, [])
assert_equal(type(obj_ids), list)
assert_equal(len(obj_ids), len(mock_ckan.DATASETS))
harvest_object = harvest_model.HarvestObject.get(obj_ids[0])
assert_equal(harvest_object.guid, mock_ckan.DATASETS[0]['id'])
assert_equal(
json.loads(harvest_object.content),
mock_ckan.DATASETS[0])
def test_fetch_normal(self):
source = HarvestSourceObj(url='http://localhost:%s/' % mock_ckan.PORT)
job = HarvestJobObj(source=source)
harvest_object = HarvestObjectObj(guid=mock_ckan.DATASETS[0]['id'],
job=job)
harvest_object = HarvestObjectObj(
guid=mock_ckan.DATASETS[0]['id'],
job=job,
content=json.dumps(mock_ckan.DATASETS[0]))
harvester = CKANHarvester()
result = harvester.fetch_stage(harvest_object)
assert_equal(harvest_object.errors, [])
assert_equal(result, True)
assert_equal(
harvest_object.content,
json.dumps(
mock_ckan.convert_dataset_to_restful_form(
mock_ckan.DATASETS[0])))
def test_import_normal(self):
org = Organization()
harvest_object = HarvestObjectObj(
guid=mock_ckan.DATASETS[0]['id'],
content=json.dumps(mock_ckan.convert_dataset_to_restful_form(
mock_ckan.DATASETS[0])),
content=json.dumps(mock_ckan.DATASETS[0]),
job__source__owner_org=org['id'])
harvester = CKANHarvester()
result = harvester.import_stage(harvest_object)
assert_equal(harvest_object.errors, [])
assert_equal(result, True)
assert harvest_object.package_id
dataset = model.Package.get(harvest_object.package_id)
@ -90,6 +102,7 @@ class TestCkanHarvester(object):
assert_equal(result['report_status'], 'added')
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[1]['name'])
assert_equal(result['errors'], [])
assert was_last_job_considered_error_free()
def test_harvest_twice(self):
run_harvest(
@ -113,7 +126,8 @@ class TestCkanHarvester(object):
assert_equal(result['errors'], [])
# the other dataset is unchanged and not harvested
assert mock_ckan.DATASETS[1]['name'] not in result
assert mock_ckan.DATASETS[0]['id'] not in result
assert was_last_job_considered_error_free()
def test_harvest_invalid_tag(self):
from nose.plugins.skip import SkipTest; raise SkipTest()
@ -127,7 +141,7 @@ class TestCkanHarvester(object):
assert_equal(result['dataset']['name'], mock_ckan.DATASETS[0]['name'])
def test_exclude_organizations(self):
config = {'organizations_filter_exclude': ['org1-id']}
config = {'organizations_filter_exclude': ['org1']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
@ -136,7 +150,7 @@ class TestCkanHarvester(object):
assert mock_ckan.DATASETS[1]['id'] in results_by_guid
def test_include_organizations(self):
config = {'organizations_filter_include': ['org1-id']}
config = {'organizations_filter_include': ['org1']}
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
@ -160,3 +174,20 @@ class TestCkanHarvester(object):
assert_equal(result['report_status'], 'not modified')
assert 'dataset' not in result
assert_equal(result['errors'], [])
assert was_last_job_considered_error_free()
def test_harvest_whilst_datasets_added(self):
results_by_guid = run_harvest(
url='http://localhost:%s/datasets_added' % mock_ckan.PORT,
harvester=CKANHarvester())
assert_equal(sorted(results_by_guid.keys()),
[mock_ckan.DATASETS[1]['id'],
mock_ckan.DATASETS[0]['id']])
def test_harvest_site_down(self):
results_by_guid = run_harvest(
url='http://localhost:%s/site_down' % mock_ckan.PORT,
harvester=CKANHarvester())
assert not results_by_guid
assert not was_last_job_considered_error_free()

View File

@ -29,6 +29,10 @@ def run_harvest_job(job, harvester):
# queue.gather_callback, which determines the harvester and then calls
# gather_stage. We simply call the gather_stage.
obj_ids = queue.gather_stage(harvester, job)
if not isinstance(obj_ids, list):
# gather had nothing to do or errored. Carry on to ensure the job is
# closed properly
obj_ids = []
# The object ids are put onto the fetch queue, consumed by
# queue.fetch_callback which calls queue.fetch_and_import_stages

View File

@ -1 +1,2 @@
factory-boy>=2
mock