diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index bb1d63c..88685f8 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -209,8 +209,17 @@ def gather_callback(channel, method, header, body): # Get a publisher for the fetch queue publisher = get_fetch_publisher() - job = HarvestJob.get(id) - + try: + job = HarvestJob.get(id) + except Exception, e: + # Occasionally we see: + # sqlalchemy.exc.OperationalError "SSL connection has been closed unexpectedly" + log.exception(e) + log.error('Connection Error during gather of %s: %r %r' % (id, e, e.args)) + # By not sending the ack, it will be retried later. + # Try to clear the issue with a remove. + model.Session.remove() + return if not job: log.error('Harvest job does not exist: %s' % id) channel.basic_ack(method.delivery_tag) @@ -280,8 +289,17 @@ def fetch_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) return False - - obj = HarvestObject.get(id) + try: + obj = HarvestObject.get(id) + except Exception, e: + # Occasionally we see: sqlalchemy.exc.OperationalError + # "SSL connection has been closed unexpectedly" + log.exception(e) + log.error('Connection Error during gather of %s: %r %r' % (id, e, e.args)) + # By not sending the ack, it will be retried later. + # Try to clear the issue with a remove. + model.Session.remove() + return if not obj: log.error('Harvest object does not exist: %s' % id) channel.basic_ack(method.delivery_tag)