Merge branch 'master' into 369-broken-tests

This commit is contained in:
amercader 2019-10-18 14:40:12 +02:00
commit 12196268d8
11 changed files with 128 additions and 227 deletions

View File

@ -38,4 +38,4 @@ jobs:
# stop the build if there are Python syntax errors or undefined names
- flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --exclude ckan
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
- flake8 . --count --max-line-length=127 --statistics --exclude ckan
- flake8 . --count --max-line-length=127 --statistics --exclude ckan --exit-zero

View File

@ -14,6 +14,12 @@ Changed
-------
- Apply flake8 to be PEP-8 compliant #354
- Use ckantoolkit to clean up imports #358
- Add hook to extend the package dict in CKAN harvester
- Use CKAN core ckan.redis.url setting if present
- Remove database migration code targeting ancient versions #376
(In the unlikely event that you need to upgrade from one
of the previous DB versions just apply the changes removed
on the linked PR manually)
Fixed
-----
@ -21,6 +27,9 @@ Fixed
- Fix SSL problems for old versions of Python 2.7.x #344
- Add an 'owner_org' to the v3 package migration #348
- Fix harvest request exceptions #357
- Fix wrong toolkit reference 8e862c8
- Mark early errored jobs as finished 5ad6d86
- Resubmit awaiting objects in the DB not on Redis 5ffe6d4
*******************
1.1.4_ - 2018-10-26
@ -101,7 +110,7 @@ Added
Changed
-------
- ``gather_stage`` return empty list instead of None if errors occured
- ``gather_stage`` return empty list instead of None if errors occured
- Change ``redirect`` calls to ``h.redirect_to``
Fixed

View File

@ -425,6 +425,46 @@ the configuration field)::
}
Plugins can extend the default CKAN harvester and implement the ``modify_package_dict`` in order to
modify the dataset dict generated by the harvester just before it is actually created or updated. For instance,
they might want to add or delete certain fields, or fire additional tasks based on the metadata fields.
Plugins will get the dataset dict including any processig described above (eg with the correct groups assigned,
replacement strings applied, etc). It will also be passed the harvest object, which contains the original, unmodified
dataset dict in the ``content`` property.
This is a simple example::
from ckanext.harvester.harvesters.ckanharvester import CKANHarvester
class MySiteCKANHarvester(CKANHarvester):
def modify_package_dict(self, package_dict, harvest_object):
# Set a default custom field
package_dict['remote_harvest'] = True
# Add tags
package_dict['tags'].append({'name': 'sdi'})
return package_dict
Remember to register your custom harvester plugin in your extension ``setup.py`` file, and load the plugin in the config in file afterwards::
# setup.py
entry_points='''
[ckan.plugins]
my_site=ckanext.my_site.plugin:MySitePlugin
my_site_ckan_harvester=ckanext.my_site.harvesters:MySiteCKANHarvester
'''
# ini file
ckan.plugins = ... my_site my_site_ckan_harvester
The harvesting interface
========================
@ -610,7 +650,10 @@ harvester run_test
You can run a harvester simply using the ``run_test`` command. This is handy
for running a harvest with one command in the console and see all the output
in-line. It runs the gather, fetch and import stages all in the same process.
You must ensure that you have pip installed ``dev-requirements.txt``
in ``/home/ckan/ckan/lib/default/src/ckanext-harvest`` before using the
``run_test`` command.
This is useful for developing a harvester because you can insert break-points
in your harvester, and rerun a harvest without having to restart the
gather_consumer and fetch_consumer processes each time. In addition, because it

View File

@ -126,10 +126,10 @@ class ViewController(BaseController):
else:
abort(404, _('No content found'))
try:
etree.fromstring(re.sub('<\?xml(.*)\?>', '', content))
etree.fromstring(re.sub(r'<\?xml(.*)\?>', '', content))
except UnicodeEncodeError:
etree.fromstring(
re.sub('<\?xml(.*)\?>', '', content.encode('utf-8'))
re.sub(r'<\?xml(.*)\?>', '', content.encode('utf-8'))
)
response.content_type = 'application/xml; charset=utf-8'
if '<?xml' not in content.split('\n')[0]:

View File

@ -172,6 +172,13 @@ class CKANHarvester(HarvesterBase):
return config
def modify_package_dict(self, package_dict, harvest_object):
'''
Allows custom harvesters to modify the package dict before
creating or updating the actual package.
'''
return package_dict
def gather_stage(self, harvest_job):
log.debug('In CKANHarvester gather_stage (%s)',
harvest_job.source.url)
@ -536,6 +543,8 @@ class CKANHarvester(HarvesterBase):
# key.
resource.pop('revision_id', None)
package_dict = self.modify_package_dict(package_dict, harvest_object)
result = self._create_or_update_package(
package_dict, harvest_object, package_dict_form='package_show')

View File

@ -92,7 +92,7 @@ def harvest_job_create(context, data_dict):
source = HarvestSource.get(source_id)
if not source:
log.warn('Harvest source %s does not exist', source_id)
raise toolkit.NotFound('Harvest source %s does not exist' % source_id)
raise toolkit.ObjectNotFound('Harvest source %s does not exist' % source_id)
# Check if the source is active
if not source.active:

View File

@ -22,7 +22,8 @@ from ckan.plugins import toolkit
from ckan.logic import NotFound, check_access
from ckanext.harvest.plugin import DATASET_TYPE_NAME
from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs
from ckanext.harvest.queue import (
get_gather_publisher, resubmit_jobs, resubmit_objects)
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.logic import HarvestJobExists
@ -565,10 +566,14 @@ def harvest_jobs_run(context, data_dict):
else:
log.debug('Ongoing job:%s source:%s',
job['id'], job['source_id'])
log.debug('No jobs to send to the gather queue')
# resubmit old redis tasks
# Resubmit old redis tasks
resubmit_jobs()
# Resubmit pending objects missing from Redis
resubmit_objects()
return [] # merely for backwards compatibility

View File

@ -1,9 +1,7 @@
import logging
import datetime
import uuid
from sqlalchemy import event
from sqlalchemy import distinct
from sqlalchemy import Table
from sqlalchemy import Column
from sqlalchemy import ForeignKey
@ -14,12 +12,11 @@ from sqlalchemy.orm import backref, relation
from sqlalchemy.exc import InvalidRequestError
from ckan import model
from ckan import logic
from ckan.model.meta import metadata, mapper, Session
from ckan.model.meta import metadata, mapper, Session
from ckan.model.types import make_uuid
from ckan.model.domain_object import DomainObject
from ckan.model.package import Package
from ckan.lib.munge import munge_title_to_name
UPDATE_FREQUENCIES = ['MANUAL', 'MONTHLY', 'WEEKLY', 'BIWEEKLY', 'DAILY', 'ALWAYS']
@ -72,23 +69,6 @@ def setup():
log.debug('Harvest tables already exist')
# Check if existing tables need to be updated
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
column_names = [column['name'] for column in columns]
if 'title' not in column_names:
log.debug('Harvest tables need to be updated')
migrate_v2()
if 'frequency' not in column_names:
log.debug('Harvest tables need to be updated')
migrate_v3()
# Check if this instance has harvest source datasets
source_ids = Session.query(HarvestSource.id).filter_by(active=True).all()
source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all()
sources_to_migrate = set(source_ids) - set(source_package_ids)
if sources_to_migrate:
log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate))
sources_to_migrate = [s[0] for s in sources_to_migrate]
migrate_v3_create_datasets(sources_to_migrate)
# Check if harvest_log table exist - needed for existing users
if 'harvest_log' not in inspector.get_table_names():
@ -432,100 +412,6 @@ def define_harvester_tables():
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
def migrate_v2():
log.debug('Migrating harvest tables to v2. This may take a while...')
conn = Session.connection()
statements = '''
ALTER TABLE harvest_source ADD COLUMN title text;
ALTER TABLE harvest_object ADD COLUMN current boolean;
ALTER TABLE harvest_object ADD COLUMN harvest_source_id text;
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id)
REFERENCES harvest_source(id);
UPDATE harvest_object o SET harvest_source_id = j.source_id FROM harvest_job j WHERE o.harvest_job_id = j.id;
'''
conn.execute(statements)
# Flag current harvest_objects
guids = Session.query(distinct(HarvestObject.guid)) \
.join(Package) \
.filter(
HarvestObject.package != None # noqa: E711
).filter(Package.state == u'active')
update_statement = '''
UPDATE harvest_object
SET current = TRUE
WHERE id = (
SELECT o.id
FROM harvest_object o JOIN package p ON p.id = o.package_id
WHERE o.package_id IS NOT null AND p.state = 'active'
AND o.guid = '%s'
ORDER BY metadata_modified_date DESC, fetch_finished DESC, gathered DESC
LIMIT 1)
'''
for guid in guids:
conn.execute(update_statement % guid)
conn.execute('UPDATE harvest_object SET current = FALSE WHERE current IS NOT TRUE')
Session.commit()
log.info('Harvest tables migrated to v2')
def migrate_v3():
log.debug('Migrating harvest tables to v3. This may take a while...')
conn = Session.connection()
statement = """CREATE TABLE harvest_object_extra (
id text NOT NULL,
harvest_object_id text,
"key" text,
"value" text
);
ALTER TABLE harvest_object
ADD COLUMN import_started timestamp without time zone,
ADD COLUMN import_finished timestamp without time zone,
ADD COLUMN "state" text,
ADD COLUMN "report_status" text;
ALTER TABLE harvest_source
ADD COLUMN frequency text,
ADD COLUMN next_run timestamp without time zone;
ALTER TABLE harvest_job
ADD COLUMN finished timestamp without time zone;
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_pkey PRIMARY KEY (id);
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
UPDATE harvest_object set state = 'COMPLETE' where package_id is not null;
UPDATE harvest_object set state = 'ERROR' where package_id is null;
UPDATE harvest_object set retry_times = 0;
UPDATE harvest_object set report_status = 'updated' where package_id is not null;
UPDATE harvest_object set report_status = 'errored' where package_id is null;
UPDATE harvest_source set frequency = 'MANUAL';
ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey;
ALTER TABLE harvest_object
ADD CONSTRAINT harvest_object_package_id_fkey FOREIGN KEY (package_id) REFERENCES package(id) DEFERRABLE;
ALTER TABLE harvest_object_error
ADD COLUMN line integer;
"""
conn.execute(statement)
Session.commit()
log.info('Harvest tables migrated to v3')
class PackageIdHarvestSourceIdMismatch(Exception):
"""
The package created for the harvest source must match the id of the
@ -534,89 +420,9 @@ class PackageIdHarvestSourceIdMismatch(Exception):
pass
def migrate_v3_create_datasets(source_ids=None):
import pylons
from paste.registry import Registry
from ckan.lib.cli import MockTranslator
registry = Registry()
registry.prepare()
registry.register(pylons.translator, MockTranslator())
sources = []
if not source_ids:
sources = model.Session.query(HarvestSource).all()
else:
sources = model.Session.query(HarvestSource) \
.filter(HarvestSource.id.in_(source_ids)) \
.all()
if not sources:
log.debug('No harvest sources to migrate')
return
site_user_name = logic.get_action('get_site_user')({'model': model, 'ignore_auth': True}, {})['name']
context = {'model': model,
'session': model.Session,
'user': site_user_name, # TODO: auth of existing sources?
'return_id_only': True,
'extras_as_string': True,
}
def gen_new_name(title):
name = munge_title_to_name(title).replace('_', '-')
while '--' in name:
name = name.replace('--', '-')
pkg_obj = Session.query(Package).filter(Package.name == name).first()
if pkg_obj:
return name + str(uuid.uuid4())[:5]
else:
return name
for source in sources:
if 'id' in context:
del context['id']
if 'package' in context:
del context['package']
# Check if package already exists
try:
logic.get_action('package_show')(context, {'id': source.id})
continue
except logic.NotFound:
pass
package_dict = {
'id': source.id,
'name': gen_new_name(source.title) if source.title else source.id,
'title': source.title if source.title else source.url,
'notes': source.description,
'url': source.url,
'type': 'harvest',
'source_type': source.type,
'config': source.config,
'frequency': source.frequency,
'owner_org': source.publisher_id,
}
context['message'] = 'Created package for harvest source {0}'.format(source.id)
try:
new_package_id = logic.get_action('package_create')(context, package_dict)
if new_package_id != source.id or not context['return_id_only']:
# this check only makes sense if we are sure we are returning
# the package id not the package object
raise PackageIdHarvestSourceIdMismatch
log.info('Created new package for source {0} ({1})'.format(source.id, source.url))
except logic.ValidationError, e:
log.error('Validation Error: %s' % str(e.error_summary))
def clean_harvest_log(condition):
Session.query(HarvestLog).filter(HarvestLog.created <= condition)\
.delete(synchronize_session=False)
Session.query(HarvestLog).filter(HarvestLog.created <= condition) \
.delete(synchronize_session=False)
try:
Session.commit()
except InvalidRequestError:

View File

@ -2,6 +2,8 @@ import logging
import datetime
import json
import redis
import pika
import sqlalchemy
@ -64,11 +66,14 @@ def get_connection_amqp():
def get_connection_redis():
import redis
return redis.StrictRedis(host=config.get('ckan.harvest.mq.hostname', HOSTNAME),
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
password=config.get('ckan.harvest.mq.password', None),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
if not config.get('ckan.harvest.mq.hostname') and config.get('ckan.redis.url'):
return redis.StrictRedis.from_url(config['ckan.redis.url'])
else:
return redis.StrictRedis(
host=config.get('ckan.harvest.mq.hostname', HOSTNAME),
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
password=config.get('ckan.harvest.mq.password', None),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
def get_gather_queue_name():
@ -83,12 +88,12 @@ def get_fetch_queue_name():
def get_gather_routing_key():
return 'ckanext-harvest:{0}:harvest_job_id'.format(
config.get('ckan.site_id', 'default'))
config.get('ckan.site_id', 'default'))
def get_fetch_routing_key():
return 'ckanext-harvest:{0}:harvest_object_id'.format(
config.get('ckan.site_id', 'default'))
config.get('ckan.site_id', 'default'))
def purge_queues():
@ -142,6 +147,25 @@ def resubmit_jobs():
redis.delete(key)
def resubmit_objects():
'''
Resubmit all WAITING objects on the DB that are not present in Redis
'''
if config.get('ckan.harvest.mq.type') != 'redis':
return
redis = get_connection()
publisher = get_fetch_publisher()
waiting_objects = model.Session.query(HarvestObject.id) \
.filter_by(state='WAITING') \
.all()
for object_id in waiting_objects:
if not redis.get(object_id):
log.debug('Re-sent object {} to the fetch queue'.format(object_id[0]))
publisher.send({'harvest_object_id': object_id[0]})
class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):
self.connection = connection
@ -150,13 +174,14 @@ class Publisher(object):
self.routing_key = routing_key
def send(self, body, **kw):
return self.channel.basic_publish(self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
**kw)
return self.channel.basic_publish(
self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
**kw)
def close(self):
self.connection.close()
@ -272,6 +297,7 @@ def get_consumer(queue_name, routing_key):
def gather_callback(channel, method, header, body):
try:
id = json.loads(body)['harvest_job_id']
log.debug('Received harvest job id: %s' % id)
@ -303,7 +329,6 @@ def gather_callback(channel, method, header, body):
# the Harvester interface, only if the source type
# matches
harvester = get_harvester(job.source.type)
if harvester:
try:
harvest_object_ids = gather_stage(harvester, job)
@ -324,7 +349,7 @@ def gather_callback(channel, method, header, body):
return False
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id': id})
@ -338,6 +363,10 @@ def gather_callback(channel, method, header, body):
err = HarvestGatherError(message=msg, job=job)
err.save()
log.error(msg)
job.status = u'Finished'
job.save()
log.info('Marking job as finished due to error: %s %s',
job.source.url, job.id)
model.Session.remove()
publisher.close()
@ -437,7 +466,7 @@ def fetch_and_import_stages(harvester, obj):
obj.import_finished = datetime.datetime.utcnow()
if success_import:
obj.state = "COMPLETE"
if success_import is 'unchanged':
if success_import == 'unchanged':
obj.report_status = 'not modified'
obj.save()
return

View File

@ -30,10 +30,10 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
# The API version is recorded and then removed from the path
api_version = None
version_match = re.match('^/api/(\d)', self.path)
version_match = re.match(r'^/api/(\d)', self.path)
if version_match:
api_version = int(version_match.groups()[0])
self.path = re.sub('^/api/(\d)/', '/api/', self.path)
self.path = re.sub(r'^/api/(\d)/', '/api/', self.path)
if self.path == '/api/rest/package':
if api_version == 2:

View File

@ -45,7 +45,7 @@ class TestGenNewName(object):
factories.Dataset(name='trees')
new_name = HarvesterBase._gen_new_name('Trees')
assert re.match('trees[\da-f]{5}', new_name)
assert re.match(r'trees[\da-f]{5}', new_name)
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.default_dataset_name_append': 'random-hex'})
@ -123,7 +123,7 @@ class TestEnsureNameIsUnique(object):
factories.Dataset(name='trees')
name = _ensure_name_is_unique('trees', append_type='random-hex')
# e.g. 'trees0b53f'
assert re.match('trees[\da-f]{5}', name)
assert re.match(r'trees[\da-f]{5}', name)
# taken from ckan/tests/lib/test_munge.py