From 3adf38105e126cf1c0ac8082ccb62c4e42ce780b Mon Sep 17 00:00:00 2001 From: kindly Date: Tue, 19 Mar 2013 01:16:43 +0000 Subject: [PATCH] readd code from old branch seperating the fetch and import logic --- ckanext/harvest/queue.py | 74 +++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 7e04470..65118c7 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -188,46 +188,48 @@ def fetch_callback(channel, method, header, body): # matches for harvester in PluginImplementations(IHarvester): if harvester.info()['name'] == obj.source.type: + fetch_and_import_stages(harvester, obj) - # See if the plugin can fetch the harvest object - obj.fetch_started = datetime.datetime.now() - obj.state = "FETCH" - obj.save() - success_fetch = harvester.fetch_stage(obj) - obj.fetch_finished = datetime.datetime.now() - obj.save() - if success_fetch: - # If no errors where found, call the import method - obj.import_started = datetime.datetime.now() - obj.state = "IMPORT" - obj.save() - success_import = harvester.import_stage(obj) - obj.import_finished = datetime.datetime.now() - if success_import: - obj.state = "COMPLETE" - else: - obj.state = "ERROR" - obj.save() - else: - obj.state = "ERROR" - obj.save() - if obj.report_status: - continue - if obj.state == 'ERROR': - obj.report_status = 'errored' - elif obj.current == False: - obj.report_status = 'deleted' - elif len(model.Session.query(HarvestObject) - .filter_by(package_id = obj.package_id) - .limit(2) - .all()) == 2: - obj.report_status = 'updated' - else: - obj.report_status = 'added' - obj.save() model.Session.remove() channel.basic_ack(method.delivery_tag) +def fetch_and_import_stages(harvester, obj): + obj.fetch_started = datetime.datetime.now() + obj.state = "FETCH" + obj.save() + success_fetch = harvester.fetch_stage(obj) + obj.fetch_finished = datetime.datetime.now() + obj.save() + if success_fetch: + # If no errors where found, call the import method + obj.import_started = datetime.datetime.now() + obj.state = "IMPORT" + obj.save() + success_import = harvester.import_stage(obj) + obj.import_finished = datetime.datetime.now() + if success_import: + obj.state = "COMPLETE" + else: + obj.state = "ERROR" + obj.save() + else: + obj.state = "ERROR" + obj.save() + if obj.report_status: + continue + if obj.state == 'ERROR': + obj.report_status = 'errored' + elif obj.current == False: + obj.report_status = 'deleted' + elif len(model.Session.query(HarvestObject) + .filter_by(package_id = obj.package_id) + .limit(2) + .all()) == 2: + obj.report_status = 'updated' + else: + obj.report_status = 'added' + obj.save() + def get_gather_consumer(): consumer = get_consumer('ckan.harvest.gather','harvest_job_id') log.debug('Gather queue consumer registered')