Merge pull request #5 from alphagov/handle-redis-errors
Add try except around Redis set to handle corrupt redis data
This commit is contained in:
commit
4071faf221
|
@ -211,8 +211,12 @@ class RedisConsumer(object):
|
|||
def consume(self, queue):
|
||||
while True:
|
||||
key, body = self.redis.blpop(self.routing_key)
|
||||
self.redis.set(self.persistance_key(body),
|
||||
str(datetime.datetime.now()))
|
||||
try:
|
||||
self.redis.set(self.persistance_key(body), str(datetime.datetime.now()))
|
||||
except Exception as e:
|
||||
log.error("Redis Exception: %s", e)
|
||||
continue
|
||||
|
||||
yield (FakeMethod(body), self, body)
|
||||
|
||||
def persistance_key(self, message):
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from mock import patch
|
||||
try:
|
||||
from ckan.tests.helpers import reset_db
|
||||
except ImportError:
|
||||
|
@ -301,3 +302,49 @@ class TestHarvestQueue(object):
|
|||
assert_equal(redis.llen(queue.get_fetch_routing_key()), 0)
|
||||
finally:
|
||||
redis.delete('ckanext-harvest:some-random-key')
|
||||
|
||||
|
||||
class TestHarvestCorruptRedis(object):
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
reset_db()
|
||||
harvest_model.setup()
|
||||
|
||||
@patch('ckanext.harvest.queue.log.error')
|
||||
def test_redis_corrupt(self, mock_log_error):
|
||||
'''
|
||||
Test that corrupt Redis doesn't stop harvest process and still processes other jobs.
|
||||
'''
|
||||
if config.get('ckan.harvest.mq.type') != 'redis':
|
||||
raise SkipTest()
|
||||
redis = queue.get_connection()
|
||||
try:
|
||||
redis.set('ckanext-harvest:some-random-key-2', 'foobar')
|
||||
|
||||
# make sure queues/exchanges are created first and are empty
|
||||
gather_consumer = queue.get_gather_consumer()
|
||||
fetch_consumer = queue.get_fetch_consumer()
|
||||
gather_consumer.queue_purge(queue=queue.get_gather_queue_name())
|
||||
fetch_consumer.queue_purge(queue=queue.get_fetch_queue_name())
|
||||
|
||||
# Create some fake jobs and objects with no harvest_job_id
|
||||
gather_publisher = queue.get_gather_publisher()
|
||||
gather_publisher.send({'harvest_job_id': str(uuid.uuid4())})
|
||||
fetch_publisher = queue.get_fetch_publisher()
|
||||
fetch_publisher.send({'harvest_object_id': None})
|
||||
h_obj_id = str(uuid.uuid4())
|
||||
fetch_publisher.send({'harvest_object_id': h_obj_id})
|
||||
|
||||
# Create some fake objects
|
||||
next(gather_consumer.consume(queue.get_gather_queue_name()))
|
||||
_, _, body = next(fetch_consumer.consume(queue.get_fetch_queue_name()))
|
||||
|
||||
json_obj = json.loads(body)
|
||||
assert json_obj['harvest_object_id'] == h_obj_id
|
||||
|
||||
assert mock_log_error.call_count == 1
|
||||
args, _ = mock_log_error.call_args_list[0]
|
||||
assert "cannot concatenate 'str' and 'NoneType' objects" in args[1]
|
||||
|
||||
finally:
|
||||
redis.delete('ckanext-harvest:some-random-key-2')
|
||||
|
|
Loading…
Reference in New Issue