Merge branch 'purge'
This commit is contained in:
commit
1bad80e273
|
@ -45,6 +45,8 @@ class Harvester(CkanCommand):
|
||||||
|
|
||||||
harvester purge_queues
|
harvester purge_queues
|
||||||
- removes all jobs from fetch and gather queue
|
- removes all jobs from fetch and gather queue
|
||||||
|
WARNING: if using Redis, this command purges all data in the current
|
||||||
|
Redis database
|
||||||
|
|
||||||
harvester [-j] [-o] [--segments={segments}] import [{source-id}]
|
harvester [-j] [-o] [--segments={segments}] import [{source-id}]
|
||||||
- perform the import stage with the last fetched objects, for a certain
|
- perform the import stage with the last fetched objects, for a certain
|
||||||
|
|
|
@ -82,10 +82,13 @@ def purge_queues():
|
||||||
if backend in ('amqp', 'ampq'):
|
if backend in ('amqp', 'ampq'):
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
channel.queue_purge(queue=get_gather_queue_name())
|
channel.queue_purge(queue=get_gather_queue_name())
|
||||||
|
log.info('AMQP queue purged: %s', get_gather_queue_name())
|
||||||
channel.queue_purge(queue=get_fetch_queue_name())
|
channel.queue_purge(queue=get_fetch_queue_name())
|
||||||
|
log.info('AMQP queue purged: %s', get_fetch_queue_name())
|
||||||
return
|
return
|
||||||
if backend == 'redis':
|
if backend == 'redis':
|
||||||
connection.flushall()
|
connection.flushdb()
|
||||||
|
log.info('Redis database flushed')
|
||||||
|
|
||||||
def resubmit_jobs():
|
def resubmit_jobs():
|
||||||
if config.get('ckan.harvest.mq.type') != 'redis':
|
if config.get('ckan.harvest.mq.type') != 'redis':
|
||||||
|
@ -95,7 +98,7 @@ def resubmit_jobs():
|
||||||
for key in harvest_object_pending:
|
for key in harvest_object_pending:
|
||||||
date_of_key = datetime.datetime.strptime(redis.get(key),
|
date_of_key = datetime.datetime.strptime(redis.get(key),
|
||||||
"%Y-%m-%d %H:%M:%S.%f")
|
"%Y-%m-%d %H:%M:%S.%f")
|
||||||
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minuites for fetch and import max
|
if (datetime.datetime.now() - date_of_key).seconds > 180: # 3 minutes for fetch and import max
|
||||||
redis.rpush('harvest_object_id',
|
redis.rpush('harvest_object_id',
|
||||||
json.dumps({'harvest_object_id': key.split(':')[-1]})
|
json.dumps({'harvest_object_id': key.split(':')[-1]})
|
||||||
)
|
)
|
||||||
|
@ -177,7 +180,7 @@ class RedisConsumer(object):
|
||||||
def basic_ack(self, message):
|
def basic_ack(self, message):
|
||||||
self.redis.delete(self.persistance_key(message))
|
self.redis.delete(self.persistance_key(message))
|
||||||
def queue_purge(self, queue):
|
def queue_purge(self, queue):
|
||||||
self.redis.flushall()
|
self.redis.flushdb()
|
||||||
def basic_get(self, queue):
|
def basic_get(self, queue):
|
||||||
body = self.redis.lpop(self.routing_key)
|
body = self.redis.lpop(self.routing_key)
|
||||||
return (FakeMethod(body), self, body)
|
return (FakeMethod(body), self, body)
|
||||||
|
|
Loading…
Reference in New Issue