diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index e68e960..dfe1518 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -3,12 +3,13 @@ import datetime import json import pika +import sqlalchemy from ckan.lib.base import config from ckan.plugins import PluginImplementations from ckan import model -from ckanext.harvest.model import HarvestJob, HarvestObject,HarvestGatherError +from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError from ckanext.harvest.interfaces import IHarvester log = logging.getLogger(__name__) @@ -221,8 +222,18 @@ def gather_callback(channel, method, header, body): # Get a publisher for the fetch queue publisher = get_fetch_publisher() - job = HarvestJob.get(id) - + try: + job = HarvestJob.get(id) + except sqlalchemy.exc.OperationalError, e: + # Occasionally we see: sqlalchemy.exc.OperationalError + # "SSL connection has been closed unexpectedly" + log.exception(e) + log.error('Connection Error during gather of job %s: %r %r', + id, e, e.args) + # By not sending the ack, it will be retried later. + # Try to clear the issue with a remove. + model.Session.remove() + return if not job: log.error('Harvest job does not exist: %s' % id) channel.basic_ack(method.delivery_tag) @@ -313,7 +324,18 @@ def fetch_callback(channel, method, header, body): channel.basic_ack(method.delivery_tag) return False - obj = HarvestObject.get(id) + try: + obj = HarvestObject.get(id) + except sqlalchemy.exc.OperationalError, e: + # Occasionally we see: sqlalchemy.exc.OperationalError + # "SSL connection has been closed unexpectedly" + log.exception(e) + log.error('Connection Error during gather of harvest object %s: %r %r', + id, e, e.args) + # By not sending the ack, it will be retried later. + # Try to clear the issue with a remove. + model.Session.remove() + return if not obj: log.error('Harvest object does not exist: %s' % id) channel.basic_ack(method.delivery_tag)