Merge branch 'master' into dgu-fixes

This commit is contained in:
kentsanggds 2019-12-17 23:22:31 +00:00 committed by GitHub
commit 5fc035aa2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 336 additions and 414 deletions

View File

@ -3,13 +3,9 @@ python:
- "2.7"
env:
- CKANVERSION=master
- CKANVERSION=2.2
- CKANVERSION=2.3
- CKANVERSION=2.4
- CKANVERSION=2.5
- CKANVERSION=2.6
- CKANVERSION=2.7
- CKANVERSION=2.8
- CKANVERSION=2.7
- CKANVERSION=2.6
services:
- redis-server
- postgresql
@ -38,4 +34,4 @@ jobs:
# stop the build if there are Python syntax errors or undefined names
- flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --exclude ckan
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
- flake8 . --count --max-line-length=127 --statistics --exclude ckan
- flake8 . --count --max-line-length=127 --statistics --exclude ckan --exit-zero

View File

@ -5,14 +5,32 @@ Changelog
All notable changes to this project will be documented in this file.
The format is based on `Keep a Changelog <http://keepachangelog.com>`_
and this project adheres to `Semantic Versioning <http://semver.org/>`_.
and this project adheres to `Semantic Versioning <http://semver.org/>`_
***********
Unreleased_
***********
Fixed
-----
- get message from harvest_object_error-dict #381
*******************
1.2.0_ - 2019-11-01
*******************
Changed
-------
- Apply flake8 to be PEP-8 compliant #354
- Use ckantoolkit to clean up imports #358
- Add hook to extend the package dict in CKAN harvester
- Use CKAN core ckan.redis.url setting if present
- Remove database migration code targeting ancient versions #376
(In the unlikely event that you need to upgrade from one
of the previous DB versions just apply the changes removed
on the linked PR manually)
Fixed
-----
@ -20,6 +38,9 @@ Fixed
- Fix SSL problems for old versions of Python 2.7.x #344
- Add an 'owner_org' to the v3 package migration #348
- Fix harvest request exceptions #357
- Fix wrong toolkit reference 8e862c8
- Mark early errored jobs as finished 5ad6d86
- Resubmit awaiting objects in the DB not on Redis 5ffe6d4
*******************
1.1.4_ - 2018-10-26
@ -100,7 +121,7 @@ Added
Changed
-------
- ``gather_stage`` return empty list instead of None if errors occured
- ``gather_stage`` return empty list instead of None if errors occured
- Change ``redirect`` calls to ``h.redirect_to``
Fixed
@ -176,7 +197,8 @@ Categories
- ``Fixed`` for any bug fixes.
- ``Security`` to invite users to upgrade in case of vulnerabilities.
.. _Unreleased: https://github.com/ckan/ckanext-harvest/compare/v1.1.4...HEAD
.. _Unreleased: https://github.com/ckan/ckanext-harvest/compare/v1.2.0...HEAD
.. _1.2.0: https://github.com/ckan/ckanext-harvest/compare/v1.1.4...v1.2.0
.. _1.1.4: https://github.com/ckan/ckanext-harvest/compare/v1.1.3...v1.1.4
.. _1.1.3: https://github.com/ckan/ckanext-harvest/compare/v1.1.2...v1.1.3
.. _1.1.2: https://github.com/ckan/ckanext-harvest/compare/v1.1.1...v1.1.2

View File

@ -429,6 +429,46 @@ the configuration field)::
}
Plugins can extend the default CKAN harvester and implement the ``modify_package_dict`` in order to
modify the dataset dict generated by the harvester just before it is actually created or updated. For instance,
they might want to add or delete certain fields, or fire additional tasks based on the metadata fields.
Plugins will get the dataset dict including any processig described above (eg with the correct groups assigned,
replacement strings applied, etc). It will also be passed the harvest object, which contains the original, unmodified
dataset dict in the ``content`` property.
This is a simple example::
from ckanext.harvester.harvesters.ckanharvester import CKANHarvester
class MySiteCKANHarvester(CKANHarvester):
def modify_package_dict(self, package_dict, harvest_object):
# Set a default custom field
package_dict['remote_harvest'] = True
# Add tags
package_dict['tags'].append({'name': 'sdi'})
return package_dict
Remember to register your custom harvester plugin in your extension ``setup.py`` file, and load the plugin in the config in file afterwards::
# setup.py
entry_points='''
[ckan.plugins]
my_site=ckanext.my_site.plugin:MySitePlugin
my_site_ckan_harvester=ckanext.my_site.harvesters:MySiteCKANHarvester
'''
# ini file
ckan.plugins = ... my_site my_site_ckan_harvester
The harvesting interface
========================
@ -614,7 +654,10 @@ harvester run_test
You can run a harvester simply using the ``run_test`` command. This is handy
for running a harvest with one command in the console and see all the output
in-line. It runs the gather, fetch and import stages all in the same process.
You must ensure that you have pip installed ``dev-requirements.txt``
in ``/home/ckan/ckan/lib/default/src/ckanext-harvest`` before using the
``run_test`` command.
This is useful for developing a harvester because you can insert break-points
in your harvester, and rerun a harvest without having to restart the
gather_consumer and fetch_consumer processes each time. In addition, because it

View File

@ -19,7 +19,12 @@ else
echo "CKAN version: ${CKAN_TAG#ckan-}"
fi
python setup.py develop
pip install -r requirements.txt --allow-all-external
if [ -f requirements-py2.txt ]
then
pip install -r requirements-py2.txt
else
pip install -r requirements.txt
fi
pip install -r dev-requirements.txt --allow-all-external
cd -

View File

@ -1,3 +1,3 @@
#!/bin/sh -e
nosetests --ckan --nologcapture --with-pylons=subdir/test-core.ini ckanext/harvest
nosetests --ckan --nologcapture --with-pylons=subdir/test-core.ini -v ckanext/harvest

View File

@ -126,10 +126,10 @@ class ViewController(BaseController):
else:
abort(404, _('No content found'))
try:
etree.fromstring(re.sub('<\?xml(.*)\?>', '', content))
etree.fromstring(re.sub(r'<\?xml(.*)\?>', '', content))
except UnicodeEncodeError:
etree.fromstring(
re.sub('<\?xml(.*)\?>', '', content.encode('utf-8'))
re.sub(r'<\?xml(.*)\?>', '', content.encode('utf-8'))
)
response.content_type = 'application/xml; charset=utf-8'
if '<?xml' not in content.split('\n')[0]:
@ -233,6 +233,8 @@ class ViewController(BaseController):
def abort_job(self, source, id):
try:
context = {'model': model, 'user': c.user}
p.toolkit.get_action('harvest_job_abort')(context, {'id': id})
h.flash_success(_('Harvest job stopped'))
except p.toolkit.ObjectNotFound:

View File

@ -172,6 +172,13 @@ class CKANHarvester(HarvesterBase):
return config
def modify_package_dict(self, package_dict, harvest_object):
'''
Allows custom harvesters to modify the package dict before
creating or updating the actual package.
'''
return package_dict
def gather_stage(self, harvest_job):
log.debug('In CKANHarvester gather_stage (%s)',
harvest_job.source.url)
@ -536,6 +543,8 @@ class CKANHarvester(HarvesterBase):
# key.
resource.pop('revision_id', None)
package_dict = self.modify_package_dict(package_dict, harvest_object)
result = self._create_or_update_package(
package_dict, harvest_object, package_dict_form='package_show')

View File

@ -92,7 +92,7 @@ def harvest_job_create(context, data_dict):
source = HarvestSource.get(source_id)
if not source:
log.warn('Harvest source %s does not exist', source_id)
raise toolkit.NotFound('Harvest source %s does not exist' % source_id)
raise toolkit.ObjectNotFound('Harvest source %s does not exist' % source_id)
# Check if the source is active
if not source.active:

View File

@ -22,7 +22,8 @@ from ckan.plugins import toolkit
from ckan.logic import NotFound, check_access
from ckanext.harvest.plugin import DATASET_TYPE_NAME
from ckanext.harvest.queue import get_gather_publisher, resubmit_jobs
from ckanext.harvest.queue import (
get_gather_publisher, resubmit_jobs, resubmit_objects)
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.logic import HarvestJobExists
@ -565,10 +566,14 @@ def harvest_jobs_run(context, data_dict):
else:
log.debug('Ongoing job:%s source:%s',
job['id'], job['source_id'])
log.debug('No jobs to send to the gather queue')
# resubmit old redis tasks
# Resubmit old redis tasks
resubmit_jobs()
# Resubmit pending objects missing from Redis
resubmit_objects()
return [] # merely for backwards compatibility
@ -605,8 +610,10 @@ def send_error_mail(context, source_id, status):
obj_error = ''
job_error = ''
for harvest_object_error in islice(report.get('object_errors'), 0, 20):
obj_error += harvest_object_error['message'] + '\n'
for harvest_object_error_key in islice(report.get('object_errors'), 0, 20):
harvest_object_error = report.get('object_errors')[harvest_object_error_key]['errors']
for error in harvest_object_error:
obj_error += error['message']
for harvest_gather_error in islice(report.get('gather_errors'), 0, 20):
job_error += harvest_gather_error['message'] + '\n'

View File

@ -1,4 +1,4 @@
from sqlalchemy import distinct, func
from sqlalchemy import distinct, func, text
from ckan.model import Package, Group
from ckan import logic
@ -66,7 +66,7 @@ def harvest_job_dictize(job, context):
.join(HarvestObject) \
.filter(HarvestObject.harvest_job_id == job.id) \
.group_by(HarvestObjectError.message) \
.order_by('error_count desc') \
.order_by(text('error_count desc')) \
.limit(context.get('error_summmary_limit', 20))
out['object_error_summary'] = q.all()
q = model.Session.query(
@ -74,7 +74,7 @@ def harvest_job_dictize(job, context):
func.count(HarvestGatherError.message).label('error_count')) \
.filter(HarvestGatherError.harvest_job_id == job.id) \
.group_by(HarvestGatherError.message) \
.order_by('error_count desc') \
.order_by(text('error_count desc')) \
.limit(context.get('error_summmary_limit', 20))
out['gather_error_summary'] = q.all()
return out

View File

@ -90,7 +90,7 @@ def harvest_source_show_package_schema():
'creator_user_id': [],
'organization': [],
'notes': [],
'revision_id': [],
'revision_id': [ignore_missing],
'revision_timestamp': [ignore_missing],
'tracking_summary': [ignore_missing],
})

View File

@ -1,9 +1,7 @@
import logging
import datetime
import uuid
from sqlalchemy import event
from sqlalchemy import distinct
from sqlalchemy import Table
from sqlalchemy import Column
from sqlalchemy import ForeignKey
@ -14,12 +12,11 @@ from sqlalchemy.orm import backref, relation
from sqlalchemy.exc import InvalidRequestError
from ckan import model
from ckan import logic
from ckan.model.meta import metadata, mapper, Session
from ckan.model.meta import metadata, mapper, Session
from ckan.model.types import make_uuid
from ckan.model.domain_object import DomainObject
from ckan.model.package import Package
from ckan.lib.munge import munge_title_to_name
UPDATE_FREQUENCIES = ['MANUAL', 'MONTHLY', 'WEEKLY', 'BIWEEKLY', 'DAILY', 'ALWAYS']
@ -72,23 +69,6 @@ def setup():
log.debug('Harvest tables already exist')
# Check if existing tables need to be updated
inspector = Inspector.from_engine(engine)
columns = inspector.get_columns('harvest_source')
column_names = [column['name'] for column in columns]
if 'title' not in column_names:
log.debug('Harvest tables need to be updated')
migrate_v2()
if 'frequency' not in column_names:
log.debug('Harvest tables need to be updated')
migrate_v3()
# Check if this instance has harvest source datasets
source_ids = Session.query(HarvestSource.id).filter_by(active=True).all()
source_package_ids = Session.query(model.Package.id).filter_by(type=u'harvest', state='active').all()
sources_to_migrate = set(source_ids) - set(source_package_ids)
if sources_to_migrate:
log.debug('Creating harvest source datasets for %i existing sources', len(sources_to_migrate))
sources_to_migrate = [s[0] for s in sources_to_migrate]
migrate_v3_create_datasets(sources_to_migrate)
# Check if harvest_log table exist - needed for existing users
if 'harvest_log' not in inspector.get_table_names():
@ -247,94 +227,108 @@ def define_harvester_tables():
global harvest_object_error_table
global harvest_log_table
harvest_source_table = Table('harvest_source', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('url', types.UnicodeText, nullable=False),
Column('title', types.UnicodeText, default=u''),
Column('description', types.UnicodeText, default=u''),
Column('config', types.UnicodeText, default=u''),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('type', types.UnicodeText, nullable=False),
Column('active', types.Boolean, default=True),
Column('user_id', types.UnicodeText, default=u''),
Column('publisher_id', types.UnicodeText, default=u''),
Column('frequency', types.UnicodeText, default=u'MANUAL'),
Column('next_run', types.DateTime),
)
harvest_source_table = Table(
'harvest_source',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('url', types.UnicodeText, nullable=False),
Column('title', types.UnicodeText, default=u''),
Column('description', types.UnicodeText, default=u''),
Column('config', types.UnicodeText, default=u''),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('type', types.UnicodeText, nullable=False),
Column('active', types.Boolean, default=True),
Column('user_id', types.UnicodeText, default=u''),
Column('publisher_id', types.UnicodeText, default=u''),
Column('frequency', types.UnicodeText, default=u'MANUAL'),
Column('next_run', types.DateTime),
)
# Was harvesting_job
harvest_job_table = Table('harvest_job', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('gather_started', types.DateTime),
Column('gather_finished', types.DateTime),
Column('finished', types.DateTime),
Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
# status: New, Running, Finished
Column('status', types.UnicodeText, default=u'New', nullable=False),
)
harvest_job_table = Table(
'harvest_job',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
Column('gather_started', types.DateTime),
Column('gather_finished', types.DateTime),
Column('finished', types.DateTime),
Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
# status: New, Running, Finished
Column('status', types.UnicodeText, default=u'New', nullable=False),
)
# A harvest_object contains a representation of one dataset during a
# particular harvest
harvest_object_table = Table('harvest_object', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
# The guid is the 'identity' of the dataset, according to the source.
# So if you reharvest it, then the harvester knows which dataset to
# update because of this identity. The identity needs to be unique
# within this CKAN.
Column('guid', types.UnicodeText, default=u''),
# When you harvest a dataset multiple times, only the latest
# successfully imported harvest_object should be flagged 'current'.
# The import_stage usually reads and writes it.
Column('current', types.Boolean, default=False),
Column('gathered', types.DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', types.DateTime),
Column('content', types.UnicodeText, nullable=True),
Column('fetch_finished', types.DateTime),
Column('import_started', types.DateTime),
Column('import_finished', types.DateTime),
# state: WAITING, FETCH, IMPORT, COMPLETE, ERROR
Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime),
Column('retry_times', types.Integer, default=0),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True),
nullable=True),
# report_status: 'added', 'updated', 'not modified', 'deleted', 'errored'
Column('report_status', types.UnicodeText, nullable=True),
Index('harvest_job_id_idx', 'harvest_job_id'),
)
harvest_object_table = Table(
'harvest_object',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
# The guid is the 'identity' of the dataset, according to the source.
# So if you reharvest it, then the harvester knows which dataset to
# update because of this identity. The identity needs to be unique
# within this CKAN.
Column('guid', types.UnicodeText, default=u''),
# When you harvest a dataset multiple times, only the latest
# successfully imported harvest_object should be flagged 'current'.
# The import_stage usually reads and writes it.
Column('current', types.Boolean, default=False),
Column('gathered', types.DateTime, default=datetime.datetime.utcnow),
Column('fetch_started', types.DateTime),
Column('content', types.UnicodeText, nullable=True),
Column('fetch_finished', types.DateTime),
Column('import_started', types.DateTime),
Column('import_finished', types.DateTime),
# state: WAITING, FETCH, IMPORT, COMPLETE, ERROR
Column('state', types.UnicodeText, default=u'WAITING'),
Column('metadata_modified_date', types.DateTime),
Column('retry_times', types.Integer, default=0),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('harvest_source_id', types.UnicodeText, ForeignKey('harvest_source.id')),
Column('package_id', types.UnicodeText, ForeignKey('package.id', deferrable=True),
nullable=True),
# report_status: 'added', 'updated', 'not modified', 'deleted', 'errored'
Column('report_status', types.UnicodeText, nullable=True),
Index('harvest_job_id_idx', 'harvest_job_id'),
)
# New table
harvest_object_extra_table = Table('harvest_object_extra', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('key', types.UnicodeText),
Column('value', types.UnicodeText),
)
harvest_object_extra_table = Table(
'harvest_object_extra',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('key', types.UnicodeText),
Column('value', types.UnicodeText),
)
# New table
harvest_gather_error_table = Table('harvest_gather_error', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('message', types.UnicodeText),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
harvest_gather_error_table = Table(
'harvest_gather_error',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_job_id', types.UnicodeText, ForeignKey('harvest_job.id')),
Column('message', types.UnicodeText),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
# New table
harvest_object_error_table = Table('harvest_object_error', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('message', types.UnicodeText),
Column('stage', types.UnicodeText),
Column('line', types.Integer),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
harvest_object_error_table = Table(
'harvest_object_error',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('harvest_object_id', types.UnicodeText, ForeignKey('harvest_object.id')),
Column('message', types.UnicodeText),
Column('stage', types.UnicodeText),
Column('line', types.Integer),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
# Harvest Log table
harvest_log_table = Table('harvest_log', metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('content', types.UnicodeText, nullable=False),
Column('level', types.Enum('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', name='log_level')),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
harvest_log_table = Table(
'harvest_log',
metadata,
Column('id', types.UnicodeText, primary_key=True, default=make_uuid),
Column('content', types.UnicodeText, nullable=False),
Column('level', types.Enum('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL', name='log_level')),
Column('created', types.DateTime, default=datetime.datetime.utcnow),
)
mapper(
HarvestSource,
@ -418,100 +412,6 @@ def define_harvester_tables():
event.listen(HarvestObject, 'before_insert', harvest_object_before_insert_listener)
def migrate_v2():
log.debug('Migrating harvest tables to v2. This may take a while...')
conn = Session.connection()
statements = '''
ALTER TABLE harvest_source ADD COLUMN title text;
ALTER TABLE harvest_object ADD COLUMN current boolean;
ALTER TABLE harvest_object ADD COLUMN harvest_source_id text;
ALTER TABLE harvest_object ADD CONSTRAINT harvest_object_harvest_source_id_fkey FOREIGN KEY (harvest_source_id)
REFERENCES harvest_source(id);
UPDATE harvest_object o SET harvest_source_id = j.source_id FROM harvest_job j WHERE o.harvest_job_id = j.id;
'''
conn.execute(statements)
# Flag current harvest_objects
guids = Session.query(distinct(HarvestObject.guid)) \
.join(Package) \
.filter(
HarvestObject.package != None # noqa: E711
).filter(Package.state == u'active')
update_statement = '''
UPDATE harvest_object
SET current = TRUE
WHERE id = (
SELECT o.id
FROM harvest_object o JOIN package p ON p.id = o.package_id
WHERE o.package_id IS NOT null AND p.state = 'active'
AND o.guid = '%s'
ORDER BY metadata_modified_date DESC, fetch_finished DESC, gathered DESC
LIMIT 1)
'''
for guid in guids:
conn.execute(update_statement % guid)
conn.execute('UPDATE harvest_object SET current = FALSE WHERE current IS NOT TRUE')
Session.commit()
log.info('Harvest tables migrated to v2')
def migrate_v3():
log.debug('Migrating harvest tables to v3. This may take a while...')
conn = Session.connection()
statement = """CREATE TABLE harvest_object_extra (
id text NOT NULL,
harvest_object_id text,
"key" text,
"value" text
);
ALTER TABLE harvest_object
ADD COLUMN import_started timestamp without time zone,
ADD COLUMN import_finished timestamp without time zone,
ADD COLUMN "state" text,
ADD COLUMN "report_status" text;
ALTER TABLE harvest_source
ADD COLUMN frequency text,
ADD COLUMN next_run timestamp without time zone;
ALTER TABLE harvest_job
ADD COLUMN finished timestamp without time zone;
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_pkey PRIMARY KEY (id);
ALTER TABLE harvest_object_extra
ADD CONSTRAINT harvest_object_extra_harvest_object_id_fkey FOREIGN KEY (harvest_object_id) REFERENCES harvest_object(id);
UPDATE harvest_object set state = 'COMPLETE' where package_id is not null;
UPDATE harvest_object set state = 'ERROR' where package_id is null;
UPDATE harvest_object set retry_times = 0;
UPDATE harvest_object set report_status = 'updated' where package_id is not null;
UPDATE harvest_object set report_status = 'errored' where package_id is null;
UPDATE harvest_source set frequency = 'MANUAL';
ALTER TABLE harvest_object DROP CONSTRAINT harvest_object_package_id_fkey;
ALTER TABLE harvest_object
ADD CONSTRAINT harvest_object_package_id_fkey FOREIGN KEY (package_id) REFERENCES package(id) DEFERRABLE;
ALTER TABLE harvest_object_error
ADD COLUMN line integer;
"""
conn.execute(statement)
Session.commit()
log.info('Harvest tables migrated to v3')
class PackageIdHarvestSourceIdMismatch(Exception):
"""
The package created for the harvest source must match the id of the
@ -520,89 +420,9 @@ class PackageIdHarvestSourceIdMismatch(Exception):
pass
def migrate_v3_create_datasets(source_ids=None):
import pylons
from paste.registry import Registry
from ckan.lib.cli import MockTranslator
registry = Registry()
registry.prepare()
registry.register(pylons.translator, MockTranslator())
sources = []
if not source_ids:
sources = model.Session.query(HarvestSource).all()
else:
sources = model.Session.query(HarvestSource) \
.filter(HarvestSource.id.in_(source_ids)) \
.all()
if not sources:
log.debug('No harvest sources to migrate')
return
site_user_name = logic.get_action('get_site_user')({'model': model, 'ignore_auth': True}, {})['name']
context = {'model': model,
'session': model.Session,
'user': site_user_name, # TODO: auth of existing sources?
'return_id_only': True,
'extras_as_string': True,
}
def gen_new_name(title):
name = munge_title_to_name(title).replace('_', '-')
while '--' in name:
name = name.replace('--', '-')
pkg_obj = Session.query(Package).filter(Package.name == name).first()
if pkg_obj:
return name + str(uuid.uuid4())[:5]
else:
return name
for source in sources:
if 'id' in context:
del context['id']
if 'package' in context:
del context['package']
# Check if package already exists
try:
logic.get_action('package_show')(context, {'id': source.id})
continue
except logic.NotFound:
pass
package_dict = {
'id': source.id,
'name': gen_new_name(source.title) if source.title else source.id,
'title': source.title if source.title else source.url,
'notes': source.description,
'url': source.url,
'type': 'harvest',
'source_type': source.type,
'config': source.config,
'frequency': source.frequency,
'owner_org': source.publisher_id,
}
context['message'] = 'Created package for harvest source {0}'.format(source.id)
try:
new_package_id = logic.get_action('package_create')(context, package_dict)
if new_package_id != source.id or not context['return_id_only']:
# this check only makes sense if we are sure we are returning
# the package id not the package object
raise PackageIdHarvestSourceIdMismatch
log.info('Created new package for source {0} ({1})'.format(source.id, source.url))
except logic.ValidationError, e:
log.error('Validation Error: %s' % str(e.error_summary))
def clean_harvest_log(condition):
Session.query(HarvestLog).filter(HarvestLog.created <= condition)\
.delete(synchronize_session=False)
Session.query(HarvestLog).filter(HarvestLog.created <= condition) \
.delete(synchronize_session=False)
try:
Session.commit()
except InvalidRequestError:

View File

@ -219,28 +219,41 @@ class Harvest(p.SingletonPlugin, DefaultDatasetForm, DefaultTranslation):
controller = 'ckanext.harvest.controllers.view:ViewController'
map.connect('{0}_delete'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/delete/:id',
controller=controller, action='delete')
controller=controller,
action='delete')
map.connect('{0}_refresh'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/refresh/:id',
controller=controller, action='refresh')
controller=controller,
action='refresh')
map.connect('{0}_admin'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/admin/:id',
controller=controller, action='admin')
controller=controller,
action='admin')
map.connect('{0}_about'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/about/:id',
controller=controller, action='about')
controller=controller,
action='about')
map.connect('{0}_clear'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/clear/:id',
controller=controller, action='clear')
controller=controller,
action='clear')
map.connect('harvest_job_list', '/' + DATASET_TYPE_NAME + '/{source}/job', controller=controller, action='list_jobs')
map.connect('harvest_job_list', '/' + DATASET_TYPE_NAME + '/{source}/job',
controller=controller,
action='list_jobs')
map.connect('harvest_job_show_last', '/' + DATASET_TYPE_NAME + '/{source}/job/last',
controller=controller, action='show_last_job')
controller=controller,
action='show_last_job')
map.connect('harvest_job_show', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}',
controller=controller, action='show_job')
controller=controller,
action='show_job')
map.connect('harvest_job_abort', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}/abort',
controller=controller, action='abort_job')
controller=controller,
action='abort_job')
map.connect('harvest_object_show', '/' + DATASET_TYPE_NAME + '/object/:id',
controller=controller, action='show_object')
controller=controller,
action='show_object')
map.connect('harvest_object_for_dataset_show', '/dataset/harvest_object/:id',
controller=controller, action='show_object', ref_type='dataset')
controller=controller,
action='show_object',
ref_type='dataset')
return map

View File

@ -2,6 +2,8 @@ import logging
import datetime
import json
import redis
import pika
import sqlalchemy
@ -64,11 +66,14 @@ def get_connection_amqp():
def get_connection_redis():
import redis
return redis.StrictRedis(host=config.get('ckan.harvest.mq.hostname', HOSTNAME),
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
password=config.get('ckan.harvest.mq.password', None),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
if not config.get('ckan.harvest.mq.hostname') and config.get('ckan.redis.url'):
return redis.StrictRedis.from_url(config['ckan.redis.url'])
else:
return redis.StrictRedis(
host=config.get('ckan.harvest.mq.hostname', HOSTNAME),
port=int(config.get('ckan.harvest.mq.port', REDIS_PORT)),
password=config.get('ckan.harvest.mq.password', None),
db=int(config.get('ckan.harvest.mq.redis_db', REDIS_DB)))
def get_gather_queue_name():
@ -83,12 +88,12 @@ def get_fetch_queue_name():
def get_gather_routing_key():
return 'ckanext-harvest:{0}:harvest_job_id'.format(
config.get('ckan.site_id', 'default'))
config.get('ckan.site_id', 'default'))
def get_fetch_routing_key():
return 'ckanext-harvest:{0}:harvest_object_id'.format(
config.get('ckan.site_id', 'default'))
config.get('ckan.site_id', 'default'))
def purge_queues():
@ -142,6 +147,25 @@ def resubmit_jobs():
redis.delete(key)
def resubmit_objects():
'''
Resubmit all WAITING objects on the DB that are not present in Redis
'''
if config.get('ckan.harvest.mq.type') != 'redis':
return
redis = get_connection()
publisher = get_fetch_publisher()
waiting_objects = model.Session.query(HarvestObject.id) \
.filter_by(state='WAITING') \
.all()
for object_id in waiting_objects:
if not redis.get(object_id):
log.debug('Re-sent object {} to the fetch queue'.format(object_id[0]))
publisher.send({'harvest_object_id': object_id[0]})
class Publisher(object):
def __init__(self, connection, channel, exchange, routing_key):
self.connection = connection
@ -150,13 +174,14 @@ class Publisher(object):
self.routing_key = routing_key
def send(self, body, **kw):
return self.channel.basic_publish(self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
**kw)
return self.channel.basic_publish(
self.exchange,
self.routing_key,
json.dumps(body),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
**kw)
def close(self):
self.connection.close()
@ -276,6 +301,7 @@ def get_consumer(queue_name, routing_key):
def gather_callback(channel, method, header, body):
try:
id = json.loads(body)['harvest_job_id']
log.debug('Received harvest job id: %s' % id)
@ -307,7 +333,6 @@ def gather_callback(channel, method, header, body):
# the Harvester interface, only if the source type
# matches
harvester = get_harvester(job.source.type)
if harvester:
try:
harvest_object_ids = gather_stage(harvester, job)
@ -328,7 +353,7 @@ def gather_callback(channel, method, header, body):
return False
log.debug('Received from plugin gather_stage: {0} objects (first: {1} last: {2})'.format(
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
len(harvest_object_ids), harvest_object_ids[:1], harvest_object_ids[-1:]))
for id in harvest_object_ids:
# Send the id to the fetch queue
publisher.send({'harvest_object_id': id})
@ -342,6 +367,10 @@ def gather_callback(channel, method, header, body):
err = HarvestGatherError(message=msg, job=job)
err.save()
log.error(msg)
job.status = u'Finished'
job.save()
log.info('Marking job as finished due to error: %s %s',
job.source.url, job.id)
model.Session.remove()
publisher.close()

View File

@ -1,9 +1,6 @@
import factory
import ckanext.harvest.model as harvest_model
try:
from ckan.new_tests.factories import _get_action_user_name
except ImportError:
from ckan.tests.factories import _get_action_user_name
from ckantoolkit.tests.factories import _get_action_user_name
from ckan.plugins import toolkit

View File

@ -30,10 +30,10 @@ class MockCkanHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
# The API version is recorded and then removed from the path
api_version = None
version_match = re.match('^/api/(\d)', self.path)
version_match = re.match(r'^/api/(\d)', self.path)
if version_match:
api_version = int(version_match.groups()[0])
self.path = re.sub('^/api/(\d)/', '/api/', self.path)
self.path = re.sub(r'^/api/(\d)/', '/api/', self.path)
if self.path == '/api/rest/package':
if api_version == 2:

View File

@ -4,13 +4,7 @@ from nose.tools import assert_equal, assert_in
from ckanext.harvest import model as harvest_model
from ckanext.harvest.harvesters.base import HarvesterBase, munge_tag
from mock import patch
try:
from ckan.tests import helpers
from ckan.tests import factories
except ImportError:
from ckan.new_tests import helpers
from ckan.new_tests import factories
from ckantoolkit.tests import helpers, factories
_ensure_name_is_unique = HarvesterBase._ensure_name_is_unique
@ -51,7 +45,7 @@ class TestGenNewName(object):
factories.Dataset(name='trees')
new_name = HarvesterBase._gen_new_name('Trees')
assert re.match('trees[\da-f]{5}', new_name)
assert re.match(r'trees[\da-f]{5}', new_name)
@patch.dict('ckanext.harvest.harvesters.base.config',
{'ckanext.harvest.default_dataset_name_append': 'random-hex'})
@ -129,7 +123,7 @@ class TestEnsureNameIsUnique(object):
factories.Dataset(name='trees')
name = _ensure_name_is_unique('trees', append_type='random-hex')
# e.g. 'trees0b53f'
assert re.match('trees[\da-f]{5}', name)
assert re.match(r'trees[\da-f]{5}', name)
# taken from ckan/tests/lib/test_munge.py

View File

@ -5,12 +5,8 @@ import json
from mock import patch, MagicMock, Mock
from requests.exceptions import HTTPError, RequestException
try:
from ckan.tests.helpers import reset_db, call_action
from ckan.tests.factories import Organization, Group
except ImportError:
from ckan.new_tests.helpers import reset_db, call_action
from ckan.new_tests.factories import Organization, Group
from ckantoolkit.tests.helpers import reset_db, call_action
from ckantoolkit.tests.factories import Organization, Group
from ckan import model
from ckan.plugins import toolkit
@ -309,18 +305,10 @@ class TestCkanHarvester(object):
'encoding': 'utf8',
'harvest_url': '{harvest_source_url}/dataset/{dataset_id}'
}}
tmp_c = toolkit.c
try:
# c.user is used by the validation (annoying),
# however patch doesn't work because it's a weird
# StackedObjectProxy, so we swap it manually
toolkit.c = MagicMock(user='')
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
finally:
toolkit.c = tmp_c
results_by_guid = run_harvest(
url='http://localhost:%s' % mock_ckan.PORT,
harvester=CKANHarvester(),
config=json.dumps(config))
assert_equal(results_by_guid['dataset1-id']['errors'], [])
extras = results_by_guid['dataset1-id']['dataset']['extras']
extras_dict = dict((e['key'], e['value']) for e in extras)

View File

@ -2,30 +2,11 @@ import json
import factories
import unittest
from mock import patch
from nose.tools import assert_equal, assert_raises
from nose.tools import assert_equal, assert_raises, assert_in
from nose.plugins.skip import SkipTest
try:
from ckan.tests import factories as ckan_factories
from ckan.tests.helpers import (_get_test_app, reset_db,
FunctionalTestBase)
except ImportError:
from ckan.new_tests import factories as ckan_factories
from ckan.new_tests.helpers import (_get_test_app, reset_db,
FunctionalTestBase)
try:
from ckan.tests.helpers import assert_in
except ImportError:
try:
from ckan.new_tests.helpers import assert_in
except ImportError:
# for ckan 2.2
try:
from nose.tools import assert_in
except ImportError:
# Python 2.6 doesn't have it
def assert_in(a, b, msg=None):
assert a in b, msg or '%r was not in %r' % (a, b)
from ckantoolkit.tests import factories as ckan_factories
from ckantoolkit.tests.helpers import _get_test_app, reset_db, FunctionalTestBase
from ckan import plugins as p
from ckan.plugins import toolkit
@ -33,7 +14,7 @@ from ckan import model
from ckanext.harvest.interfaces import IHarvester
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestGatherError, HarvestJob
from ckanext.harvest.model import HarvestGatherError, HarvestObjectError, HarvestObject, HarvestJob
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.logic.action.update import send_error_mail
@ -732,6 +713,39 @@ class TestHarvestErrorMail(FunctionalTestBase):
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_object_error(self, mock_mailer_mail_recipient):
context, harvest_source, harvest_job = self._create_harvest_source_and_job_if_not_existing()
data_dict = {
'guid': 'guid',
'content': 'content',
'job_id': harvest_job['id'],
'extras': {'a key': 'a value'},
'source_id': harvest_source['id']
}
harvest_object = toolkit.get_action('harvest_object_create')(
context, data_dict)
harvest_object_model = HarvestObject.get(harvest_object['id'])
# create a HarvestObjectError
msg = 'HarvestObjectError occured: %s' % harvest_job['id']
harvest_object_error = HarvestObjectError(message=msg, object=harvest_object_model)
harvest_object_error.save()
status = toolkit.get_action('harvest_source_show_status')(context, {'id': harvest_source['id']})
send_error_mail(
context,
harvest_source['id'],
status
)
assert_equal(1, status['last_job']['stats']['errored'])
assert mock_mailer_mail_recipient.called
@patch('ckan.lib.mailer.mail_recipient')
def test_error_mail_sent_with_org(self, mock_mailer_mail_recipient):
context, harvest_source, job = self._create_harvest_source_with_owner_org_and_job_if_not_existing()
@ -755,13 +769,14 @@ class TestHarvestErrorMail(FunctionalTestBase):
assert_equal(2, mock_mailer_mail_recipient.call_count)
class TestHarvestDBLog(unittest.TestCase):
# Skip for now as the Harvest DB log doesn't work on CKAN 2.9
class XXTestHarvestDBLog(unittest.TestCase):
@classmethod
def setup_class(cls):
reset_db()
harvest_model.setup()
def test_harvest_db_logger(self):
def xxtest_harvest_db_logger(self):
# Create source and check if harvest_log table is populated
data_dict = SOURCE_DICT.copy()
data_dict['source_type'] = 'test'

View File

@ -1,23 +1,8 @@
from ckan.lib.helpers import url_for
try:
from ckan.tests import helpers, factories
except ImportError:
from ckan.new_tests import helpers, factories
from ckantoolkit.tests import helpers, factories
from ckanext.harvest.tests import factories as harvest_factories
try:
from ckan.tests.helpers import assert_in
except ImportError:
# for ckan 2.2
try:
from nose.tools import assert_in
except ImportError:
# Python 2.6 doesn't have it
def assert_in(a, b, msg=None):
assert a in b, msg or '%r was not in %r' % (a, b)
from nose.tools import assert_in
import ckanext.harvest.model as harvest_model

View File

@ -1,8 +1,4 @@
from mock import patch
try:
from ckan.tests.helpers import reset_db
except ImportError:
from ckan.new_tests.helpers import reset_db
from ckantoolkit.tests.helpers import reset_db
import ckanext.harvest.model as harvest_model
from ckanext.harvest.model import HarvestObject, HarvestObjectExtra
from ckanext.harvest.interfaces import IHarvester

View File

@ -4,11 +4,8 @@
import json
from nose.tools import assert_equal
from ckantoolkit.tests.helpers import reset_db
try:
from ckan.tests.helpers import reset_db
except ImportError:
from ckan.new_tests.helpers import reset_db
from ckan import model
from ckan import plugins as p
from ckan.plugins import toolkit

View File

@ -2,3 +2,4 @@ pika==0.9.8
redis==2.10.1
requests==2.20.0
pyOpenSSL==18.0.0
ckantoolkit==0.0.3

View File

@ -22,3 +22,6 @@ previous = true
domain = ckanext-harvest
directory = i18n
statistics = true
[flake8]
max-line-length = 127

View File

@ -1,6 +1,6 @@
from setuptools import setup, find_packages
version = '1.1.4'
version = '1.2.0'
setup(
name='ckanext-harvest',

View File

@ -24,7 +24,7 @@ ckan.legacy_templates = false
# Logging configuration
[loggers]
keys = root, ckan, sqlalchemy, ckan_harvester
keys = root, ckan, sqlalchemy
[handlers]
keys = console, dblog