From de17e0ae8c00370abe06e87b5c77c72c7388f0a8 Mon Sep 17 00:00:00 2001 From: David Read Date: Wed, 22 Jul 2015 10:25:11 +0100 Subject: [PATCH 1/2] Catch, record and recover from temporary db problems. --- ckanext/harvest/queue.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..88685f8 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -209,8 +209,17 @@ def gather_callback(channel, method, header, body): # Get a publisher for the fetch queue publisher = get_fetch_publisher() - job = HarvestJob.get(id) - + try: + job = HarvestJob.get(id) + except Exception, e: + # Occasionally we see: + # sqlalchemy.exc.OperationalError "SSL connection has been closed unexpectedly" + log.exception(e) + log.error('Connection Error during gather of %s: %r %r' % (id, e, e.args)) + # By not sending the ack, it will be retried later. + # Try to clear the issue with a remove. + model.Session.remove() + return if not job: log.error('Harvest job does not exist: %s' % id) channel.basic_ack(method.delivery_tag) @@ -280,8 +289,17 @@ def fetch_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) return False - - obj = HarvestObject.get(id) + try: + obj = HarvestObject.get(id) + except Exception, e: + # Occasionally we see: sqlalchemy.exc.OperationalError + # "SSL connection has been closed unexpectedly" + log.exception(e) + log.error('Connection Error during gather of %s: %r %r' % (id, e, e.args)) + # By not sending the ack, it will be retried later. + # Try to clear the issue with a remove. + model.Session.remove() + return if not obj: log.error('Harvest object does not exist: %s' % id) channel.basic_ack(method.delivery_tag) From 1a6dca7c00b83e41b68d4adfe9c6d11b221f5936 Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 1 Oct 2015 12:30:40 +0100 Subject: [PATCH 2/2] [#148] Catch a more specific exception. --- ckanext/harvest/queue.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index 88685f8..99384d5 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -3,18 +3,19 @@ import datetime import json import pika +import sqlalchemy from ckan.lib.base import config from ckan.plugins import PluginImplementations from ckan import model -from ckanext.harvest.model import HarvestJob, HarvestObject,HarvestGatherError +from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError from ckanext.harvest.interfaces import IHarvester log = logging.getLogger(__name__) assert not log.disabled -__all__ = ['get_gather_publisher', 'get_gather_consumer', \ +__all__ = ['get_gather_publisher', 'get_gather_consumer', 'get_fetch_publisher', 'get_fetch_consumer'] PORT = 5672 @@ -211,11 +212,12 @@ def gather_callback(channel, method, header, body): try: job = HarvestJob.get(id) - except Exception, e: - # Occasionally we see: - # sqlalchemy.exc.OperationalError "SSL connection has been closed unexpectedly" + except sqlalchemy.exc.OperationalError, e: + # Occasionally we see: sqlalchemy.exc.OperationalError + # "SSL connection has been closed unexpectedly" log.exception(e) - log.error('Connection Error during gather of %s: %r %r' % (id, e, e.args)) + log.error('Connection Error during gather of job %s: %r %r', + id, e, e.args) # By not sending the ack, it will be retried later. # Try to clear the issue with a remove. model.Session.remove() @@ -291,11 +293,12 @@ def fetch_callback(channel, method, header, body): try: obj = HarvestObject.get(id) - except Exception, e: + except sqlalchemy.exc.OperationalError, e: # Occasionally we see: sqlalchemy.exc.OperationalError # "SSL connection has been closed unexpectedly" log.exception(e) - log.error('Connection Error during gather of %s: %r %r' % (id, e, e.args)) + log.error('Connection Error during gather of harvest object %s: %r %r', + id, e, e.args) # By not sending the ack, it will be retried later. # Try to clear the issue with a remove. model.Session.remove()