CLI is working in both forms

This commit is contained in:
Sergey Motornyuk 2020-01-16 10:43:25 +02:00
parent be494e1af6
commit bb005415bb
19 changed files with 816 additions and 695 deletions

View File

@ -1,34 +1,32 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import click
import six
import sys
from pprint import pprint
from ckan import model
from ckan.logic import get_action, ValidationError
import ckantoolkit as tk
import click
import ckanext.harvest.utils as utils
from ckanext.harvest.logic import HarvestJobExists
def get_commands():
return [harvester]
@click.group()
def harvester():
"""Harvests remotely mastered metadata.
"""
pass
@harvester.command()
def initdb():
"""Creates the necessary tables in the database.
"""
utils.initdb()
click.secho(u'DB tables created', fg=u'green')
click.secho(u"DB tables created", fg=u"green")
@harvester.group()
def source():
@ -36,15 +34,16 @@ def source():
"""
pass
@source.command()
@click.argument(u'name')
@click.argument(u'url')
@click.argument(u'type')
@click.argument(u'title', required=False)
@click.argument(u'active', type=tk.asbool, default=True)
@click.argument(u'owner_org', required=False)
@click.argument(u'frequency', default=u'MANUAL')
@click.argument(u'config', required=False)
@click.argument(u"name")
@click.argument(u"url")
@click.argument(u"type")
@click.argument(u"title", required=False)
@click.argument(u"active", type=tk.asbool, default=True)
@click.argument(u"owner_org", required=False)
@click.argument(u"frequency", default=u"MANUAL")
@click.argument(u"config", required=False)
def create(name, url, type, title, active, owner_org, frequency, config):
"""Create new harvest source.
"""
@ -53,60 +52,62 @@ def create(name, url, type, title, active, owner_org, frequency, config):
name, url, type, title, active, owner_org, frequency, config
)
except tk.ValidationError as e:
tk.error_shout(u'Validation error:')
tk.error_shout(u"Validation error:")
for field, err in e.error_summary.items():
tk.error_shout('\t{}: {}'.format(field, err))
tk.error_shout("\t{}: {}".format(field, err))
raise click.Abort()
click.echo(result)
@source.command()
@click.argument(u'id', metavar=u'SOURCE_ID_OR_NAME')
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME")
@click.pass_context
def show(ctx, id):
"""Shows a harvest source.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
try:
with flask_app.test_request_context():
result = utils.show_harvest_source(id)
except tk.ObjectNotFound as e:
tk.error_shout(u'Source <{}> not found.'.format(id))
tk.error_shout(u"Source <{}> not found.".format(id))
raise click.Abort()
click.echo(result)
@source.command()
@click.argument(u'id', metavar=u'SOURCE_ID_OR_NAME')
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME")
@click.pass_context
def remove(ctx, id):
"""Remove (deactivate) a harvester source, whilst leaving any related
datasets, jobs and objects.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
utils.remove_harvest_source(id)
click.secho('Removed harvest source: {0}'.format(id), fg='green')
click.secho("Removed harvest source: {0}".format(id), fg="green")
@source.command()
@click.argument(u'id', metavar=u'SOURCE_ID_OR_NAME')
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME")
@click.pass_context
def clear(ctx, id):
"""Clears all datasets, jobs and objects related to a harvest source,
but keeps the source itself.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
utils.clear_harvest_source(id)
click.secho('Cleared harvest source: {0}'.format(id), fg='green')
click.secho("Cleared harvest source: {0}".format(id), fg="green")
@source.command()
@click.argument(u'id', metavar=u'SOURCE_ID_OR_NAME', required=False)
@click.argument(u"id", metavar=u"SOURCE_ID_OR_NAME", required=False)
@click.pass_context
def clear_history(ctx, id):
"""If no source id is given the history for all harvest sources
@ -118,15 +119,15 @@ def clear_history(ctx, id):
the history of the harvest source with the given source id.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
result = utils.clear_harvest_source_history(id)
click.secho(result, fg='green')
click.secho(result, fg="green")
@harvester.command()
@click.argument('all', required=False)
@click.argument("all", required=False)
@click.pass_context
def sources(ctx, all):
"""Lists harvest sources.
@ -134,7 +135,7 @@ def sources(ctx, all):
If 'all' is defined, it also shows the Inactive sources
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
result = utils.list_sources(bool(all))
@ -142,13 +143,13 @@ def sources(ctx, all):
@harvester.command()
@click.argument('id', metavar='SOURCE_ID_OR_NAME')
@click.argument("id", metavar="SOURCE_ID_OR_NAME")
@click.pass_context
def job(ctx, id):
"""Create new harvest job and runs it (puts it on the gather queue).
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
try:
result = utils.create_job(id)
@ -157,20 +158,21 @@ def job(ctx, id):
ctx.abort()
click.echo(result)
@harvester.command()
@click.pass_context
def jobs(ctx):
"""Lists harvest jobs.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
result = utils.list_jobs()
click.echo(result)
@harvester.command()
@click.argument('id', metavar='SOURCE_OR_JOB_ID')
@click.argument("id", metavar="SOURCE_OR_JOB_ID")
@click.pass_context
def job_abort(ctx, id):
"""Marks a job as "Aborted" so that the source can be restarted afresh.
@ -180,16 +182,17 @@ def job_abort(ctx, id):
are currently in the gather/fetch queues.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
try:
result = utils.abort_job(id)
except tk.ObjectNotFound as e:
tk.error_shout(u'Job not found.')
tk.error_shout(u"Job not found.")
ctx.abort()
click.echo(result)
@harvester.command()
def purge_queues():
"""Removes all jobs from fetch and gather queue.
@ -204,6 +207,7 @@ def gather_consumer():
"""
utils.gather_consumer()
@harvester.command()
def fetch_consumer():
"""Starts the consumer for the fetching queue.
@ -222,13 +226,14 @@ def run(ctx):
Finished.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
utils.run_harvester()
@harvester.command()
@click.pass_context
@click.argument('id', metavar='SOURCE_ID_OR_NAME')
@click.argument("id", metavar="SOURCE_ID_OR_NAME")
def run_test(ctx, id):
"""Runs a harvest - for testing only.
@ -238,139 +243,126 @@ def run_test(ctx, id):
fire up gather/fetch_consumer processes, as is done in production.
"""
flask_app = ctx.meta['flask_app']
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
utils.run_test_harvester(id)
class Harvester(object):
'''Harvests remotely mastered metadata
Usage:
@harvester.command("import")
@click.pass_context
@click.argument("id", metavar="SOURCE_ID_OR_NAME", required=False)
@click.option(
"-j",
"--no-join-datasets",
is_flag=True,
help="Do not join harvest objects to existing datasets",
)
@click.option(
"-o",
"--harvest-object-id",
help="Id of the harvest object to which perform the import stage",
)
@click.option(
"-p",
"--package-id",
help="Id of the package whose harvest object to perform the import stage for",
)
@click.option(
"-g",
"--guid",
help="Guid of the harvest object to which perform the import stage for",
)
@click.option(
"--segments",
help="""A string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f""",
)
def import_stage(
ctx, id, no_join_datasets, harvest_object_id, guid, package_id, segments
):
"""Perform the import stage with the last fetched objects, for a
certain source or a single harvest object.
Please note that no objects will be fetched from the remote
server. It will only affect the objects already present in the
database.
To import a particular harvest source, specify its id as an argument.
To import a particular harvest object use the -o option.
To import a particular guid use the -g option.
To import a particular package use the -p option.
You will need to specify the -j flag in cases where the datasets
are not yet created (e.g. first harvest, or all previous harvests
have failed)
The --segments flag allows to define a string containing hex
digits that represent which of the 16 harvest object segments to
import. e.g. 15af will run segments 1,5,a,f
"""
ctx.invoke(initdb)
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
try:
utils.import_stage(
id,
no_join_datasets,
harvest_object_id,
guid,
package_id,
segments,
)
except tk.ObjectNotFound as e:
tk.error_shout(u"Source <{}> not found.".format(id))
@harvester.command()
@click.pass_context
def clean_harvest_log(ctx):
"""Clean-up mechanism for the harvest log table.
harvester clean_harvest_log
- Clean-up mechanism for the harvest log table.
You can configure the time frame through the configuration
parameter `ckan.harvest.log_timeframe`. The default time frame is 30 days
You can configure the time frame through the configuration
parameter `ckan.harvest.log_timeframe`. The default time frame is 30
days
harvester [-j] [-o|-g|-p {id/guid}] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, for a certain
source or a single harvest object. Please note that no objects will
be fetched from the remote server. It will only affect the objects
already present in the database.
To import a particular harvest source, specify its id as an argument.
To import a particular harvest object use the -o option.
To import a particular guid use the -g option.
To import a particular package use the -p option.
You will need to specify the -j flag in cases where the datasets are
not yet created (e.g. first harvest, or all previous harvests have
failed)
The --segments flag allows to define a string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
harvester job-all
- create new harvest jobs for all active sources.
harvester reindex
- reindexes the harvest source datasets
The commands should be run from the ckanext-harvest directory and expect
a development.ini file to be present. Most of the time you will
specify the config explicitly though::
paster harvester sources --config=../ckan/development.ini
'''
def __init__(self, name):
super(Harvester, self).__init__(name)
self.parser.add_option('-j', '--no-join-datasets', dest='no_join_datasets',
action='store_true', default=False, help='Do not join harvest objects to existing datasets')
self.parser.add_option('-o', '--harvest-object-id', dest='harvest_object_id',
default=False, help='Id of the harvest object to which perform the import stage')
self.parser.add_option('-p', '--package-id', dest='package_id',
default=False, help='Id of the package whose harvest object to perform the import stage for')
self.parser.add_option('-g', '--guid', dest='guid',
default=False, help='Guid of the harvest object to which perform the import stage for')
self.parser.add_option('--segments', dest='segments',
default=False, help='''A string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f''')
def command(self):
self._load_config()
"""
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
utils.clean_harvest_log()
if cmd == 'import':
self.initdb()
self.import_stage()
elif cmd == 'job-all':
self.create_harvest_job_all()
elif cmd == 'harvesters-info':
harvesters_info = tk.get_action('harvesters_info_show')()
pprint(harvesters_info)
elif cmd == 'reindex':
self.reindex()
elif cmd == 'clean_harvest_log':
self.clean_harvest_log()
else:
print('Command {0} not recognized'.format(cmd))
@harvester.command("job-all")
@click.pass_context
def job_all(ctx):
"""Create new harvest jobs for all active sources.
"""
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
result = utils.job_all()
click.echo(result)
@harvester.command()
@click.pass_context
def reindex(ctx):
"""Reindexes the harvest source datasets.
def import_stage(self):
if len(self.args) >= 2:
source_id_or_name = six.text_type(self.args[1])
context = {'model': model, 'session': model.Session,
'user': _admin_user()['name']}
source = tk.get_action('harvest_source_show')(
context, {'id': source_id_or_name})
source_id = source['id']
else:
source_id = None
context = {'model': model, 'session': model.Session,
'user': _admin_user()['name'],
'join_datasets': not self.options.no_join_datasets,
'segments': self.options.segments}
objs_count = tk.get_action('harvest_objects_import')(context, {
'source_id': source_id,
'harvest_object_id': self.options.harvest_object_id,
'package_id': self.options.package_id,
'guid': self.options.guid,
})
print('{0} objects reimported'.format(objs_count))
def create_harvest_job_all(self):
context = {'model': model, 'user': _admin_user()['name'], 'session': model.Session}
jobs = tk.get_action('harvest_job_create_all')(context, {})
print('Created {0} new harvest jobs'.format(len(jobs)))
def reindex(self):
context = {'model': model, 'user': _admin_user()['name']}
tk.get_action('harvest_sources_reindex')(context, {})
"""
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
utils.reindex()
def clean_harvest_log(self):
from datetime import datetime, timedelta
from ckantoolkit import config
from ckanext.harvest.model import clean_harvest_log
@harvester.command("harvesters_info")
@click.pass_context
def harvesters_info(ctx):
"""
# Log time frame - in days
log_timeframe = tk.asint(config.get('ckan.harvest.log_timeframe', 30))
condition = datetime.utcnow() - timedelta(days=log_timeframe)
"""
flask_app = ctx.meta["flask_app"]
with flask_app.test_request_context():
result = utils.harvesters_info()
# Delete logs older then the given date
clean_harvest_log(condition=condition)
click.echo(result)

View File

@ -1,6 +1,8 @@
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)

View File

@ -7,11 +7,17 @@ from ckan import model
from ckan.logic import get_action, ValidationError
from ckan.plugins import toolkit
import ckantoolkit as tk
from ckan.lib.cli import CkanCommand
from ckantoolkit import config
import ckanext.harvest.utils as utils
from ckanext.harvest.logic import HarvestJobExists
class Harvester(CkanCommand):
'''Harvests remotely mastered metadata
"""Harvests remotely mastered metadata
Usage:
@ -110,9 +116,9 @@ class Harvester(CkanCommand):
paster harvester sources --config=../ckan/development.ini
'''
"""
summary = __doc__.split('\n')[0]
summary = __doc__.split("\n")[0]
usage = __doc__
max_args = 9
min_args = 0
@ -121,119 +127,133 @@ class Harvester(CkanCommand):
super(Harvester, self).__init__(name)
self.parser.add_option('-j', '--no-join-datasets', dest='no_join_datasets',
action='store_true', default=False, help='Do not join harvest objects to existing datasets')
self.parser.add_option(
"-j",
"--no-join-datasets",
dest="no_join_datasets",
action="store_true",
default=False,
help="Do not join harvest objects to existing datasets",
)
self.parser.add_option('-o', '--harvest-object-id', dest='harvest_object_id',
default=False, help='Id of the harvest object to which perform the import stage')
self.parser.add_option(
"-o",
"--harvest-object-id",
dest="harvest_object_id",
default=False,
help="Id of the harvest object to which perform the import stage",
)
self.parser.add_option('-p', '--package-id', dest='package_id',
default=False, help='Id of the package whose harvest object to perform the import stage for')
self.parser.add_option(
"-p",
"--package-id",
dest="package_id",
default=False,
help="Id of the package whose harvest object to perform the import stage for",
)
self.parser.add_option('-g', '--guid', dest='guid',
default=False, help='Guid of the harvest object to which perform the import stage for')
self.parser.add_option(
"-g",
"--guid",
dest="guid",
default=False,
help="Guid of the harvest object to which perform the import stage for",
)
self.parser.add_option('--segments', dest='segments',
default=False, help='''A string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f''')
self.parser.add_option(
"--segments",
dest="segments",
default=False,
help="""A string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f""",
)
def command(self):
self._load_config()
# We'll need a sysadmin user to perform most of the actions
# We will use the sysadmin site user (named as the site_id)
context = {'model': model, 'session': model.Session, 'ignore_auth': True}
self.admin_user = get_action('get_site_user')(context, {})
context = {
"model": model,
"session": model.Session,
"ignore_auth": True,
}
self.admin_user = get_action("get_site_user")(context, {})
print('')
print("")
if len(self.args) == 0:
self.parser.print_usage()
sys.exit(1)
cmd = self.args[0]
if cmd == 'source':
if cmd == "source":
if len(self.args) > 2:
self.create_harvest_source()
else:
self.show_harvest_source()
elif cmd == 'rmsource':
elif cmd == "rmsource":
self.remove_harvest_source()
elif cmd == 'clearsource':
elif cmd == "clearsource":
self.clear_harvest_source()
elif cmd == 'clearsource_history':
elif cmd == "clearsource_history":
self.clear_harvest_source_history()
elif cmd == 'sources':
elif cmd == "sources":
self.list_harvest_sources()
elif cmd == 'job':
elif cmd == "job":
self.create_harvest_job()
elif cmd == 'jobs':
elif cmd == "jobs":
self.list_harvest_jobs()
elif cmd == 'job_abort':
elif cmd == "job_abort":
self.job_abort()
elif cmd == 'run':
elif cmd == "run":
self.run_harvester()
elif cmd == 'run_test':
elif cmd == "run_test":
self.run_test_harvest()
elif cmd == 'gather_consumer':
import logging
from ckanext.harvest.queue import (get_gather_consumer,
gather_callback, get_gather_queue_name)
logging.getLogger('amqplib').setLevel(logging.INFO)
consumer = get_gather_consumer()
for method, header, body in consumer.consume(queue=get_gather_queue_name()):
gather_callback(consumer, method, header, body)
elif cmd == 'fetch_consumer':
import logging
logging.getLogger('amqplib').setLevel(logging.INFO)
from ckanext.harvest.queue import (get_fetch_consumer, fetch_callback,
get_fetch_queue_name)
consumer = get_fetch_consumer()
for method, header, body in consumer.consume(queue=get_fetch_queue_name()):
fetch_callback(consumer, method, header, body)
elif cmd == 'purge_queues':
elif cmd == "gather_consumer":
utils.gather_consumer()
elif cmd == "fetch_consumer":
utils.fetch_consumer()
elif cmd == "purge_queues":
self.purge_queues()
elif cmd == 'initdb':
elif cmd == "initdb":
self.initdb()
elif cmd == 'import':
elif cmd == "import":
self.initdb()
self.import_stage()
elif cmd == 'job-all':
elif cmd == "job-all":
self.create_harvest_job_all()
elif cmd == 'harvesters-info':
harvesters_info = get_action('harvesters_info_show')()
pprint(harvesters_info)
elif cmd == 'reindex':
elif cmd == "harvesters-info":
print(utils.harvesters_info())
elif cmd == "reindex":
self.reindex()
elif cmd == 'clean_harvest_log':
elif cmd == "clean_harvest_log":
self.clean_harvest_log()
else:
print('Command {0} not recognized'.format(cmd))
print("Command {0} not recognized".format(cmd))
def _load_config(self):
super(Harvester, self)._load_config()
def initdb(self):
from ckanext.harvest.model import setup as db_setup
db_setup()
print('DB tables created')
utils.initdb()
print("DB tables created")
def create_harvest_source(self):
if len(self.args) >= 2:
name = unicode(self.args[1])
else:
print('Please provide a source name')
print("Please provide a source name")
sys.exit(1)
if len(self.args) >= 3:
url = unicode(self.args[2])
else:
print('Please provide a source URL')
print("Please provide a source URL")
sys.exit(1)
if len(self.args) >= 4:
type = unicode(self.args[3])
else:
print('Please provide a source type')
print("Please provide a source type")
sys.exit(1)
if len(self.args) >= 5:
@ -241,8 +261,9 @@ class Harvester(CkanCommand):
else:
title = None
if len(self.args) >= 6:
active = not(self.args[5].lower() == 'false' or
self.args[5] == '0')
active = not (
self.args[5].lower() == "false" or self.args[5] == "0"
)
else:
active = True
if len(self.args) >= 7:
@ -252,313 +273,127 @@ class Harvester(CkanCommand):
if len(self.args) >= 8:
frequency = unicode(self.args[7])
if not frequency:
frequency = 'MANUAL'
frequency = "MANUAL"
else:
frequency = 'MANUAL'
frequency = "MANUAL"
if len(self.args) >= 9:
config = unicode(self.args[8])
else:
config = None
try:
data_dict = {
'name': name,
'url': url,
'source_type': type,
'title': title,
'active': active,
'owner_org': owner_org,
'frequency': frequency,
'config': config,
}
context = {
'model': model,
'session': model.Session,
'user': self.admin_user['name'],
'ignore_auth': True,
}
source = get_action('harvest_source_create')(context, data_dict)
print('Created new harvest source:')
self.print_harvest_source(source)
sources = get_action('harvest_source_list')(context, {})
self.print_there_are('harvest source', sources)
# Create a harvest job for the new source if not regular job.
if not data_dict['frequency']:
get_action('harvest_job_create')(
context, {'source_id': source['id'], 'run': True})
print('A new Harvest Job for this source has also been created')
result = utils.create_harvest_source(
name, url, type, title, active, owner_org, frequency, config
)
except ValidationError as e:
print('An error occurred:')
print("An error occurred:")
print(str(e.error_dict))
raise e
print(result)
def clear_harvest_source_history(self):
source_id = None
if len(self.args) >= 2:
source_id = unicode(self.args[1])
context = {
'model': model,
'user': self.admin_user['name'],
'session': model.Session
}
if source_id is not None:
get_action('harvest_source_job_history_clear')(context, {'id': source_id})
print('Cleared job history of harvest source: {0}'.format(source_id))
else:
'''
Purge queues, because we clean all harvest jobs and
objects in the database.
'''
self.purge_queues()
cleared_sources_dicts = get_action('harvest_sources_job_history_clear')(context, {})
print('Cleared job history for all harvest sources: {0} source(s)'.format(len(cleared_sources_dicts)))
print(utils.clear_harvest_source_history(source_id))
def show_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print('Please provide a source name')
print("Please provide a source name")
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
self.print_harvest_source(source)
print(utils.show_harvest_source(source_id_or_name))
def remove_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print('Please provide a source id')
print("Please provide a source id")
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
get_action('harvest_source_delete')(context, {'id': source['id']})
print('Removed harvest source: {0}'.format(source_id_or_name))
utils.remove_harvest_source(source_id_or_name)
def clear_harvest_source(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print('Please provide a source id')
print("Please provide a source id")
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
get_action('harvest_source_clear')(context, {'id': source['id']})
print('Cleared harvest source: {0}'.format(source_id_or_name))
utils.clear_harvest_source(source_id_or_name)
def list_harvest_sources(self):
if len(self.args) >= 2 and self.args[1] == 'all':
data_dict = {}
what = 'harvest source'
if len(self.args) >= 2 and self.args[1] == "all":
all = True
else:
data_dict = {'only_active': True}
what = 'active harvest source'
all = False
context = {'model': model, 'session': model.Session, 'user': self.admin_user['name']}
sources = get_action('harvest_source_list')(context, data_dict)
self.print_harvest_sources(sources)
self.print_there_are(what=what, sequence=sources)
print(utils.list_sources(all))
def create_harvest_job(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print('Please provide a source id')
print("Please provide a source id")
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
context = {'model': model, 'session': model.Session, 'user': self.admin_user['name']}
job = get_action('harvest_job_create')(
context, {'source_id': source['id'], 'run': True})
self.print_harvest_job(job)
jobs = get_action('harvest_job_list')(context, {'status': u'New'})
self.print_there_are('harvest job', jobs, condition=u'New')
print(utils.create_job(source_id_or_name))
def list_harvest_jobs(self):
context = {'model': model, 'user': self.admin_user['name'], 'session': model.Session}
jobs = get_action('harvest_job_list')(context, {})
self.print_harvest_jobs(jobs)
self.print_there_are(what='harvest job', sequence=jobs)
print(utils.list_jobs())
def job_abort(self):
if len(self.args) >= 2:
job_or_source_id_or_name = unicode(self.args[1])
else:
print('Please provide a job id or source name/id')
print("Please provide a job id or source name/id")
sys.exit(1)
context = {'model': model, 'user': self.admin_user['name'],
'session': model.Session}
job = get_action('harvest_job_abort')(
context, {'id': job_or_source_id_or_name})
print('Job status: {0}'.format(job['status']))
print(utils.abort_job(source_id_or_name))
def run_harvester(self):
context = {'model': model, 'user': self.admin_user['name'],
'session': model.Session}
get_action('harvest_jobs_run')(context, {})
utils.run_harvester()
def run_test_harvest(self):
from ckanext.harvest import queue
from ckanext.harvest.tests import lib
from ckanext.harvest.logic import HarvestJobExists
from ckanext.harvest.model import HarvestJob
# Determine the source
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
else:
print('Please provide a source id')
print("Please provide a source id")
sys.exit(1)
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
# Determine the job
try:
job_dict = get_action('harvest_job_create')(
context, {'source_id': source['id']})
except HarvestJobExists:
running_jobs = get_action('harvest_job_list')(
context, {'source_id': source['id'], 'status': 'Running'})
if running_jobs:
print('\nSource "{0}" apparently has a "Running" job:\n{1}'
.format(source.get('name') or source['id'], running_jobs))
resp = raw_input('Abort it? (y/n)')
if not resp.lower().startswith('y'):
sys.exit(1)
job_dict = get_action('harvest_job_abort')(
context, {'source_id': source['id']})
else:
print('Reusing existing harvest job')
jobs = get_action('harvest_job_list')(
context, {'source_id': source['id'], 'status': 'New'})
assert len(jobs) == 1, \
'Multiple "New" jobs for this source! {0}'.format(jobs)
job_dict = jobs[0]
job_obj = HarvestJob.get(job_dict['id'])
harvester = queue.get_harvester(source['source_type'])
assert harvester, \
'No harvester found for type: {0}'.format(source['source_type'])
lib.run_harvest_job(job_obj, harvester)
utils.run_test_harvester(source_id_or_name)
def import_stage(self):
if len(self.args) >= 2:
source_id_or_name = unicode(self.args[1])
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name']}
source = get_action('harvest_source_show')(
context, {'id': source_id_or_name})
source_id = source['id']
context = {
"model": model,
"session": model.Session,
"user": self.admin_user["name"],
}
source = get_action("harvest_source_show")(
context, {"id": source_id_or_name}
)
source_id = source["id"]
else:
source_id = None
context = {'model': model, 'session': model.Session,
'user': self.admin_user['name'],
'join_datasets': not self.options.no_join_datasets,
'segments': self.options.segments}
objs_count = get_action('harvest_objects_import')(context, {
'source_id': source_id,
'harvest_object_id': self.options.harvest_object_id,
'package_id': self.options.package_id,
'guid': self.options.guid,
})
print('{0} objects reimported'.format(objs_count))
utils.import_stage(
source_id,
self.options.no_join_datasets,
self.options.harvest_object_id,
self.options.guid,
self.options.package_id,
self.options.segments,
)
def create_harvest_job_all(self):
context = {'model': model, 'user': self.admin_user['name'], 'session': model.Session}
jobs = get_action('harvest_job_create_all')(context, {})
print('Created {0} new harvest jobs'.format(len(jobs)))
print(utils.job_all())
def reindex(self):
context = {'model': model, 'user': self.admin_user['name']}
get_action('harvest_sources_reindex')(context, {})
utils.reindex()
def purge_queues(self):
from ckanext.harvest.queue import purge_queues
purge_queues()
def print_harvest_sources(self, sources):
if sources:
print('')
for source in sources:
self.print_harvest_source(source)
def print_harvest_source(self, source):
print('Source id: {0}'.format(source.get('id')))
if 'name' in source:
# 'name' is only there if the source comes from the Package
print(' name: {0}'.format(source.get('name')))
print(' url: {0}'.format(source.get('url')))
# 'type' if source comes from HarvestSource, 'source_type' if it comes
# from the Package
print(' type: {0}'.format(source.get('source_type') or
source.get('type')))
print(' active: {0}'.format(source.get('active',
source.get('state') == 'active')))
print('frequency: {0}'.format(source.get('frequency')))
print(' jobs: {0}'.format(source.get('status').get('job_count')))
print('')
def print_harvest_jobs(self, jobs):
if jobs:
print('')
for job in jobs:
self.print_harvest_job(job)
def print_harvest_job(self, job):
print(' Job id: {0}'.format(job.get('id')))
print(' status: {0}'.format(job.get('status')))
print(' source: {0}'.format(job.get('source_id')))
print(' objects: {0}'.format(len(job.get('objects', []))))
print('gather_errors: {0}'.format(len(job.get('gather_errors', []))))
for error in job.get('gather_errors', []):
print(' {0}'.format(error['message']))
print('')
def print_there_are(self, what, sequence, condition=''):
is_singular = self.is_singular(sequence)
print('There {0} {1} {2}{3}{4}'.format(
is_singular and 'is' or 'are',
len(sequence),
condition and ('{0} '.format(condition.lower())) or '',
what,
not is_singular and 's' or '',
))
def is_singular(self, sequence):
return len(sequence) == 1
utils.purge_queues()
def clean_harvest_log(self):
from datetime import datetime, timedelta
from ckanext.harvest.model import clean_harvest_log
# Log time frame - in days
log_timeframe = toolkit.asint(config.get('ckan.harvest.log_timeframe', 30))
condition = datetime.utcnow() - timedelta(days=log_timeframe)
# Delete logs older then the given date
clean_harvest_log(condition=condition)
utils.clean_harvest_log()

View File

@ -21,6 +21,7 @@ import json
from ckan.lib.base import BaseController, c, request, response, render, abort
from ckanext.harvest.logic import HarvestJobExists, HarvestSourceInactiveError
import ckanext.harvest.utils as utils
from ckanext.harvest.utils import (
DATASET_TYPE_NAME
)
@ -220,19 +221,7 @@ class ViewController(BaseController):
abort(401, self.not_auth_message)
def admin(self, id):
try:
context = {'model': model, 'user': c.user}
p.toolkit.check_access('harvest_source_update', context, {'id': id})
harvest_source = p.toolkit.get_action('harvest_source_show')(
context, {'id': id}
)
return render(
'source/admin.html', extra_vars={'harvest_source': harvest_source}
)
except p.toolkit.ObjectNotFound:
abort(404, _('Harvest source not found'))
except p.toolkit.NotAuthorized:
abort(401, self.not_auth_message)
return utils.admin_view(id)
def abort_job(self, source, id):
try:

View File

@ -0,0 +1,4 @@
harvest_css:
output: ckanext-harvest/%(version)s_harvest_css.css
contents:
- styles/harvest.css

View File

@ -1,5 +1,8 @@
# -*- coding: utf-8 -*-
import hashlib
import json
import six
import logging
import datetime
@ -434,7 +437,7 @@ def harvest_objects_import(context, data_dict):
for obj_id in last_objects_ids:
if segments and \
str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
str(hashlib.md5(six.ensure_binary(obj_id[0])).hexdigest())[0] not in segments:
continue
obj = session.query(HarvestObject).get(obj_id)

View File

@ -5,12 +5,13 @@ import json
from logging import getLogger
from six import string_types, text_type
from sqlalchemy.util import OrderedDict
from collections import OrderedDict
from ckan import logic
from ckan import model
import ckan.plugins as p
from ckan.lib.plugins import DefaultDatasetForm
try:
from ckan.lib.plugins import DefaultTranslation
except ImportError:
@ -252,6 +253,15 @@ class Harvest(MixinPlugin, p.SingletonPlugin, DefaultDatasetForm, DefaultTransla
'harvest_read': 'harvest.read',
'harvest_edit': 'harvest.edit',
})
bp_routes = [
"delete", "refresh", "admin", "about",
"clear", "job_list", "job_show_last", "job_show",
"job_abort", "object_show"
]
mappings.update({
'harvest_' + route: 'harvester.' + route
for route in bp_routes
})
# https://github.com/ckan/ckan/pull/4521
config['ckan.legacy_route_mappings'] = json.dumps(mappings)

View File

@ -2,11 +2,18 @@
import ckan.plugins as p
import ckanext.harvest.cli as cli
import ckanext.harvest.views as views
class MixinPlugin(p.SingletonPlugin):
p.implements(p.IClick)
p.implements(p.IBlueprint)
# IClick
def get_commands(self):
return cli.get_commands()
# IBlueprint
def get_blueprint(self):
return views.get_blueprints()

View File

@ -1,10 +1,7 @@
# -*- coding: utf-8 -*-
import ckan.plugins as p
from ckanext.harvest.utils import (
DATASET_TYPE_NAME
)
from ckanext.harvest.utils import DATASET_TYPE_NAME
class MixinPlugin(p.SingletonPlugin):
p.implements(p.IRoutes, inherit=True)
@ -15,43 +12,76 @@ class MixinPlugin(p.SingletonPlugin):
# Most of the routes are defined via the IDatasetForm interface
# (ie they are the ones for a package type)
controller = 'ckanext.harvest.controllers.view:ViewController'
controller = "ckanext.harvest.controllers.view:ViewController"
map.connect('{0}_delete'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/delete/:id',
controller=controller,
action='delete')
map.connect('{0}_refresh'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/refresh/:id',
controller=controller,
action='refresh')
map.connect('{0}_admin'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/admin/:id',
controller=controller,
action='admin')
map.connect('{0}_about'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/about/:id',
controller=controller,
action='about')
map.connect('{0}_clear'.format(DATASET_TYPE_NAME), '/' + DATASET_TYPE_NAME + '/clear/:id',
controller=controller,
action='clear')
map.connect(
"{0}_delete".format(DATASET_TYPE_NAME),
"/" + DATASET_TYPE_NAME + "/delete/:id",
controller=controller,
action="delete",
)
map.connect(
"{0}_refresh".format(DATASET_TYPE_NAME),
"/" + DATASET_TYPE_NAME + "/refresh/:id",
controller=controller,
action="refresh",
)
map.connect(
"{0}_admin".format(DATASET_TYPE_NAME),
"/" + DATASET_TYPE_NAME + "/admin/:id",
controller=controller,
action="admin",
)
map.connect(
"{0}_about".format(DATASET_TYPE_NAME),
"/" + DATASET_TYPE_NAME + "/about/:id",
controller=controller,
action="about",
)
map.connect(
"{0}_clear".format(DATASET_TYPE_NAME),
"/" + DATASET_TYPE_NAME + "/clear/:id",
controller=controller,
action="clear",
)
map.connect('harvest_job_list', '/' + DATASET_TYPE_NAME + '/{source}/job',
controller=controller,
action='list_jobs')
map.connect('harvest_job_show_last', '/' + DATASET_TYPE_NAME + '/{source}/job/last',
controller=controller,
action='show_last_job')
map.connect('harvest_job_show', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}',
controller=controller,
action='show_job')
map.connect('harvest_job_abort', '/' + DATASET_TYPE_NAME + '/{source}/job/{id}/abort',
controller=controller,
action='abort_job')
map.connect(
"harvest_job_list",
"/" + DATASET_TYPE_NAME + "/{source}/job",
controller=controller,
action="list_jobs",
)
map.connect(
"harvest_job_show_last",
"/" + DATASET_TYPE_NAME + "/{source}/job/last",
controller=controller,
action="show_last_job",
)
map.connect(
"harvest_job_show",
"/" + DATASET_TYPE_NAME + "/{source}/job/{id}",
controller=controller,
action="show_job",
)
map.connect(
"harvest_job_abort",
"/" + DATASET_TYPE_NAME + "/{source}/job/{id}/abort",
controller=controller,
action="abort_job",
)
map.connect('harvest_object_show', '/' + DATASET_TYPE_NAME + '/object/:id',
controller=controller,
action='show_object')
map.connect('harvest_object_for_dataset_show', '/dataset/harvest_object/:id',
controller=controller,
action='show_object',
ref_type='dataset')
map.connect(
"harvest_object_show",
"/" + DATASET_TYPE_NAME + "/object/:id",
controller=controller,
action="show_object",
)
map.connect(
"harvest_object_for_dataset_show",
"/dataset/harvest_object/:id",
controller=controller,
action="show_object",
ref_type="dataset",
)
return map

View File

@ -0,0 +1,8 @@
main:
filters: rjsmin
output: ckanext-harvest/%(version)s_harvest_extra_fieldmain.js
extra:
preload:
- base/main
contents:
- extra_fields.js

View File

@ -1,6 +1,9 @@
{% ckan_extends %}
{% block styles %}
{{ super() }}
{% resource 'ckanext-harvest/styles/harvest.css' %}
{{ super() }}
{% set type = 'asset' if h.ckan_version() > '2.9' else 'resource' %}
{% include 'harvest/snippets/harvest_' ~ type ~ '.html' %}
{% endblock %}

View File

@ -0,0 +1 @@
{% asset 'ckanext-harvest/harvest_css' %}

View File

@ -0,0 +1 @@
{% asset 'harvest-extra-field/main' %}

View File

@ -0,0 +1 @@
{% resource 'harvest-extra-field/main' %}

View File

@ -6,7 +6,7 @@
{% if harvest_source.status and harvest_source.status.last_job %}
{% snippet "snippets/job_details.html", job=harvest_source.status.last_job %}
<div class="form-actions">
<a href="{{ h.url_for(controller='ckanext.harvest.controllers.view:ViewController', action='show_last_job', source=harvest_source.name) }}" class="btn pull-right btn-default">
<a href="{{ h.url_for('harvest_job_show_last', source=harvest_source.name) }}" class="btn pull-right btn-default">
<i class="fa fa-briefcase icon-briefcase"></i>
{{ _('View full job report') }}
</a>

View File

@ -1,6 +1,5 @@
{% extends "source/admin_base.html" %}
{% set controller = 'ckanext.harvest.controllers.view:ViewController' %}
{% block subtitle %}{{ _('Harvest Jobs')}} - {{ super() }}{% endblock %}
@ -17,7 +16,7 @@
<li class="dataset-item">
<div class="dataset-content">
<h3 class="dataset-heading">
<a href="{{ h.url_for(controller=controller, action='show_job', source=harvest_source.name, id=job.id) }}">
<a href="{{ h.url_for('harvest_job_show', source=harvest_source.name, id=job.id) }}">
{{ _('Job: ') }} {{ job.id }}
</a>
{% if job.status != 'Finished' %}
@ -66,4 +65,3 @@
</div>
{% endblock %}

View File

@ -1,5 +1,8 @@
{% import 'macros/form.html' as form %}
{% resource 'harvest-extra-field/main' %}
{% set type = 'asset' if h.ckan_version() > '2.9' else 'resource' %}
{% include 'harvest/snippets/harvest_extra_field_' ~ type ~ '.html' %}
<form id="source-new" class="form-horizontal dataset-form {{ h.bootstrap_version() }}" method="post" >

View File

@ -1,18 +1,24 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import ckan.plugins.toolkit as tk
import ckan.model as model
from six import StringIO
import ckan.plugins.toolkit as tk
import six
from six import StringIO
DATASET_TYPE_NAME = 'harvest'
DATASET_TYPE_NAME = "harvest"
###############################################################################
# CLI #
###############################################################################
def _admin_user():
context = {'model': model, 'session': model.Session, 'ignore_auth': True}
return tk.get_action('get_site_user')(context, {})
context = {"model": model, "session": model.Session, "ignore_auth": True}
return tk.get_action("get_site_user")(context, {})
def _print_harvest_sources(sources, output):
@ -26,117 +32,133 @@ def _print_harvest_jobs(jobs, output):
def _print_harvest_job(job, output):
print(('\tJob id: {}\n'
'\t\tstatus: {}\n'
'\t\tsource: {}\n'
'\t\tobjects: {}').format(job.get('id'), job.get('status'),
job.get('source_id'),
len(job.get('objects', []))),
file=output)
print(
(
"\tJob id: {}\n"
"\t\tstatus: {}\n"
"\t\tsource: {}\n"
"\t\tobjects: {}"
).format(
job.get("id"),
job.get("status"),
job.get("source_id"),
len(job.get("objects", [])),
),
file=output,
)
print('\t\tgather_errors: {0}'.format(len(job.get('gather_errors', []))),
file=output)
for error in job.get('gather_errors', []):
print('\t\t\t{0}'.format(error['message']), file=output)
print(
"\t\tgather_errors: {0}".format(len(job.get("gather_errors", []))),
file=output,
)
for error in job.get("gather_errors", []):
print("\t\t\t{0}".format(error["message"]), file=output)
def _print_harvest_source(source, output):
print('Source id: {0}'.format(source.get('id')), file=output)
if 'name' in source:
print("Source id: {0}".format(source.get("id")), file=output)
if "name" in source:
# 'name' is only there if the source comes from the Package
print('\tname: {}'.format(source['name']), file=output)
print("\tname: {}".format(source["name"]), file=output)
data_dict = {
'url': source.get('url'),
"url": source.get("url"),
# 'type' if source comes from HarvestSource, 'source_type' if
# it comes from the Package
'type': source.get('source_type') or source.get('type'),
'active': source.get('active',
source.get('state') == 'active'),
'frequency': source.get('frequency'),
'jobs': source.get('status').get('job_count')
"type": source.get("source_type") or source.get("type"),
"active": source.get("active", source.get("state") == "active"),
"frequency": source.get("frequency"),
"jobs": source.get("status").get("job_count"),
}
print(('\turl: {url}\n'
'\ttype: {type}\n'
'\tactive: {active}\n'
'\tfrequency: {frequency}\n'
'\tjobs: {jobs}\n'
'\n').format(**data_dict),
file=output)
print(
(
"\turl: {url}\n"
"\ttype: {type}\n"
"\tactive: {active}\n"
"\tfrequency: {frequency}\n"
"\tjobs: {jobs}\n"
"\n"
).format(**data_dict),
file=output,
)
def _there_are(what, sequence, condition=''):
def _there_are(what, sequence, condition=""):
is_singular = len(sequence) == 1
return ('There {0} {1} {2}{3}{4}'.format(
is_singular and 'is' or 'are',
return "There {0} {1} {2}{3}{4}".format(
is_singular and "is" or "are",
len(sequence),
condition and ('{0} '.format(condition.lower())) or '',
condition and ("{0} ".format(condition.lower())) or "",
what,
not is_singular and 's' or '',
))
not is_singular and "s" or "",
)
def initdb():
from ckanext.harvest.model import setup as db_setup
db_setup()
def create_harvest_source(name,
url,
type,
title=None,
active=True,
owner_org=None,
frequency='MANUAL',
config=None):
def create_harvest_source(
name,
url,
type,
title=None,
active=True,
owner_org=None,
frequency="MANUAL",
config=None,
):
output = StringIO()
data_dict = {
'name': name,
'url': url,
'source_type': type,
'title': title,
'active': active,
'owner_org': owner_org,
'frequency': frequency,
'config': config,
"name": name,
"url": url,
"source_type": type,
"title": title,
"active": active,
"owner_org": owner_org,
"frequency": frequency,
"config": config,
}
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name'],
'ignore_auth': True,
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
"ignore_auth": True,
}
source = tk.get_action('harvest_source_create')(context, data_dict)
source = tk.get_action("harvest_source_create")(context, data_dict)
print('Created new harvest source:', file=output)
print("Created new harvest source:", file=output)
_print_harvest_source(source, output)
sources = tk.get_action('harvest_source_list')(context, {})
print(_there_are('harvest source', sources), file=output)
sources = tk.get_action("harvest_source_list")(context, {})
print(_there_are("harvest source", sources), file=output)
# Create a harvest job for the new source if not regular job.
if not data_dict['frequency']:
tk.get_action('harvest_job_create')(context, {
'source_id': source['id'],
'run': True
})
print('A new Harvest Job for this source has also been created',
file=output)
if not data_dict["frequency"]:
tk.get_action("harvest_job_create")(
context, {"source_id": source["id"], "run": True}
)
print(
"A new Harvest Job for this source has also been created",
file=output,
)
return output.getvalue()
def show_harvest_source(source_id_or_name):
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name']
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
source = tk.get_action('harvest_source_show')(context, {
'id': source_id_or_name
})
source = tk.get_action("harvest_source_show")(
context, {"id": source_id_or_name}
)
output = StringIO()
_print_harvest_source(source, output)
return output.getvalue()
@ -144,69 +166,72 @@ def show_harvest_source(source_id_or_name):
def remove_harvest_source(source_id_or_name):
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name']
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
source = tk.get_action('harvest_source_show')(context, {
'id': source_id_or_name
})
tk.get_action('harvest_source_delete')(context, {'id': source['id']})
source = tk.get_action("harvest_source_show")(
context, {"id": source_id_or_name}
)
tk.get_action("harvest_source_delete")(context, {"id": source["id"]})
def clear_harvest_source(source_id_or_name):
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name']
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
source = tk.get_action('harvest_source_show')(context, {
'id': source_id_or_name
})
tk.get_action('harvest_source_clear')(context, {'id': source['id']})
source = tk.get_action("harvest_source_show")(
context, {"id": source_id_or_name}
)
tk.get_action("harvest_source_clear")(context, {"id": source["id"]})
def clear_harvest_source_history(source_id):
context = {
'model': model,
'user': _admin_user()['name'],
'session': model.Session
"model": model,
"user": _admin_user()["name"],
"session": model.Session,
}
if source_id is not None:
tk.get_action('harvest_source_job_history_clear')(context, {
'id': source_id
})
return ('Cleared job history of harvest source: {0}'.format(source_id))
tk.get_action("harvest_source_job_history_clear")(
context, {"id": source_id}
)
return "Cleared job history of harvest source: {0}".format(source_id)
else:
# Purge queues, because we clean all harvest jobs and
# objects in the database.
purge_queues()
cleared_sources_dicts = tk.get_action(
'harvest_sources_job_history_clear')(context, {})
return ('Cleared job history for all harvest sources: {0} source(s)'.
format(len(cleared_sources_dicts)))
"harvest_sources_job_history_clear"
)(context, {})
return "Cleared job history for all harvest sources: {0} source(s)".format(
len(cleared_sources_dicts)
)
def purge_queues():
from ckanext.harvest.queue import purge_queues as purge
purge()
def list_sources(all):
if all:
data_dict = {}
what = 'harvest source'
what = "harvest source"
else:
data_dict = {'only_active': True}
what = 'active harvest source'
data_dict = {"only_active": True}
what = "active harvest source"
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name']
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
sources = tk.get_action('harvest_source_list')(context, data_dict)
sources = tk.get_action("harvest_source_list")(context, data_dict)
output = StringIO()
_print_harvest_sources(sources, output)
print(_there_are(what, sources), file=output)
@ -215,73 +240,82 @@ def list_sources(all):
def create_job(source_id_or_name):
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name']
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
source = tk.get_action('harvest_source_show')(context, {
'id': source_id_or_name
})
source = tk.get_action("harvest_source_show")(
context, {"id": source_id_or_name}
)
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name']
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
job = tk.get_action('harvest_job_create')(context, {
'source_id': source['id'],
'run': True
})
job = tk.get_action("harvest_job_create")(
context, {"source_id": source["id"], "run": True}
)
output = StringIO()
_print_harvest_job(job, output)
jobs = tk.get_action('harvest_job_list')(context, {'status': u'New'})
print(_there_are('harvest job', jobs, condition=u'New'), file=output)
jobs = tk.get_action("harvest_job_list")(context, {"status": u"New"})
print(_there_are("harvest job", jobs, condition=u"New"), file=output)
return output.getvalue()
def list_jobs():
context = {
'model': model,
'user': _admin_user()['name'],
'session': model.Session
"model": model,
"user": _admin_user()["name"],
"session": model.Session,
}
jobs = tk.get_action('harvest_job_list')(context, {})
jobs = tk.get_action("harvest_job_list")(context, {})
output = StringIO()
_print_harvest_jobs(jobs, output)
print(_there_are(what='harvest job', sequence=jobs), file=output)
print(_there_are(what="harvest job", sequence=jobs), file=output)
return output.getvalue()
def abort_job(job_or_source_id_or_name):
context = {
'model': model,
'user': _admin_user()['name'],
'session': model.Session
"model": model,
"user": _admin_user()["name"],
"session": model.Session,
}
job = tk.get_action('harvest_job_abort')(context, {
'id': job_or_source_id_or_name
})
return ('Job status: {0}'.format(job['status']))
job = tk.get_action("harvest_job_abort")(
context, {"id": job_or_source_id_or_name}
)
return "Job status: {0}".format(job["status"])
def gather_consumer():
import logging
from ckanext.harvest.queue import (get_gather_consumer, gather_callback,
get_gather_queue_name)
logging.getLogger('amqplib').setLevel(logging.INFO)
from ckanext.harvest.queue import (
get_gather_consumer,
gather_callback,
get_gather_queue_name,
)
logging.getLogger("amqplib").setLevel(logging.INFO)
consumer = get_gather_consumer()
for method, header, body in consumer.consume(
queue=get_gather_queue_name()):
queue=get_gather_queue_name()
):
gather_callback(consumer, method, header, body)
def fetch_consumer():
import logging
logging.getLogger('amqplib').setLevel(logging.INFO)
from ckanext.harvest.queue import (get_fetch_consumer, fetch_callback,
get_fetch_queue_name)
logging.getLogger("amqplib").setLevel(logging.INFO)
from ckanext.harvest.queue import (
get_fetch_consumer,
fetch_callback,
get_fetch_queue_name,
)
consumer = get_fetch_consumer()
for method, header, body in consumer.consume(queue=get_fetch_queue_name()):
fetch_callback(consumer, method, header, body)
@ -289,11 +323,11 @@ def fetch_consumer():
def run_harvester():
context = {
'model': model,
'user': _admin_user()['name'],
'session': model.Session
"model": model,
"user": _admin_user()["name"],
"session": model.Session,
}
tk.get_action('harvest_jobs_run')(context, {})
tk.get_action("harvest_jobs_run")(context, {})
def run_test_harvester(source_id_or_name):
@ -303,52 +337,151 @@ def run_test_harvester(source_id_or_name):
from ckanext.harvest.model import HarvestJob
context = {
'model': model,
'session': model.Session,
'user': _admin_user()['name']
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
source = tk.get_action('harvest_source_show')(context, {
'id': source_id_or_name
})
source = tk.get_action("harvest_source_show")(
context, {"id": source_id_or_name}
)
# Determine the job
try:
job_dict = tk.get_action('harvest_job_create')(
context, {
'source_id': source['id']
})
job_dict = tk.get_action("harvest_job_create")(
context, {"source_id": source["id"]}
)
except HarvestJobExists:
running_jobs = tk.get_action('harvest_job_list')(
context, {
'source_id': source['id'],
'status': 'Running'
})
running_jobs = tk.get_action("harvest_job_list")(
context, {"source_id": source["id"], "status": "Running"}
)
if running_jobs:
print('\nSource "{0}" apparently has a "Running" job:\n{1}'.format(
source.get('name') or source['id'], running_jobs))
print(
'\nSource "{0}" apparently has a "Running" job:\n{1}'.format(
source.get("name") or source["id"], running_jobs
)
)
if six.PY2:
resp = raw_input('Abort it? (y/n)')
resp = raw_input("Abort it? (y/n)")
else:
resp = input('Abort it? (y/n)')
if not resp.lower().startswith('y'):
resp = input("Abort it? (y/n)")
if not resp.lower().startswith("y"):
sys.exit(1)
job_dict = tk.get_action('harvest_job_abort')(
context, {
'source_id': source['id']
})
job_dict = tk.get_action("harvest_job_abort")(
context, {"source_id": source["id"]}
)
else:
print('Reusing existing harvest job')
jobs = tk.get_action('harvest_job_list')(context, {
'source_id': source['id'],
'status': 'New'
})
assert len(jobs) == 1, \
'Multiple "New" jobs for this source! {0}'.format(jobs)
print("Reusing existing harvest job")
jobs = tk.get_action("harvest_job_list")(
context, {"source_id": source["id"], "status": "New"}
)
assert (
len(jobs) == 1
), 'Multiple "New" jobs for this source! {0}'.format(jobs)
job_dict = jobs[0]
job_obj = HarvestJob.get(job_dict['id'])
job_obj = HarvestJob.get(job_dict["id"])
harvester = queue.get_harvester(source['source_type'])
assert harvester, \
'No harvester found for type: {0}'.format(source['source_type'])
harvester = queue.get_harvester(source["source_type"])
assert harvester, "No harvester found for type: {0}".format(
source["source_type"]
)
lib.run_harvest_job(job_obj, harvester)
def import_stage(
source_id_or_name,
no_join_datasets,
harvest_object_id,
guid,
package_id,
segments,
):
if source_id_or_name:
context = {
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
}
source = tk.get_action("harvest_source_show")(
context, {"id": source_id_or_name}
)
source_id = source["id"]
else:
source_id = None
context = {
"model": model,
"session": model.Session,
"user": _admin_user()["name"],
"join_datasets": not no_join_datasets,
"segments": segments,
}
objs_count = tk.get_action("harvest_objects_import")(
context,
{
"source_id": source_id,
"harvest_object_id": harvest_object_id,
"package_id": package_id,
"guid": guid,
},
)
print("{0} objects reimported".format(objs_count))
def job_all():
context = {
"model": model,
"user": _admin_user()["name"],
"session": model.Session,
}
jobs = tk.get_action("harvest_job_create_all")(context, {})
return "Created {0} new harvest jobs".format(len(jobs))
def reindex():
context = {"model": model, "user": _admin_user()["name"]}
tk.get_action("harvest_sources_reindex")(context, {})
def clean_harvest_log():
from datetime import datetime, timedelta
from ckantoolkit import config
from ckanext.harvest.model import clean_harvest_log
# Log time frame - in days
log_timeframe = tk.asint(config.get("ckan.harvest.log_timeframe", 30))
condition = datetime.utcnow() - timedelta(days=log_timeframe)
# Delete logs older then the given date
clean_harvest_log(condition=condition)
def harvesters_info():
harvesters_info = tk.get_action("harvesters_info_show")()
return harvesters_info
###############################################################################
# Controller #
###############################################################################
def _not_auth_message():
return tk._('Not authorized to see this page')
def admin_view(id):
try:
context = {'model': model, 'user': tk.c.user}
tk.check_access('harvest_source_update', context, {'id': id})
harvest_source = tk.get_action('harvest_source_show')(
context, {'id': id}
)
return tk.render(
'source/admin.html', extra_vars={'harvest_source': harvest_source}
)
except tk.ObjectNotFound:
return tk.abort(404, _('Harvest source not found'))
except tk.NotAuthorized:
return tk.abort(401, _not_auth_message())

101
ckanext/harvest/views.py Normal file
View File

@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
from flask import Blueprint
import ckantoolkit as tk
import ckanext.harvest.utils as utils
harvest = Blueprint("harvester", __name__)
@harvest.before_request
def before_request():
tk.c.dataset_type = utils.DATASET_TYPE_NAME
def delete(id):
return "delete"
def refresh(id):
return "refresh"
def admin(id):
return utils.admin_view(id)
def about(id):
return "about"
def clear(id):
return "clear"
def job_list(source):
return "job_list"
def job_show_last(source):
return "job_show_last"
def job_show(source, id):
return "job_show"
def job_abort(source, id):
return "job_abort"
def object_show(id, ref_type):
return "object_show {}".format(ref_type)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/delete/<id>", view_func=delete,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/refresh/<id>", view_func=refresh,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/admin/<id>", view_func=admin,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/about/<id>", view_func=about,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/clear/<id>", view_func=clear,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/<source>/job", view_func=job_list,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/<source>/job/last",
view_func=job_show_last,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/<source>/job/<id>", view_func=job_show,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/<source>/job/<id>/abort",
view_func=job_abort,
)
harvest.add_url_rule(
"/" + utils.DATASET_TYPE_NAME + "/object/<id>",
view_func=object_show,
defaults={"ref_type": "object"},
)
harvest.add_url_rule(
"/dataset/harvest_object/<id>",
view_func=object_show,
defaults={"ref_type": "dataset"},
)
def get_blueprints():
# import ipdb; ipdb.set_trace()
return [harvest]