readd code from old branch seperating the fetch and import logic
This commit is contained in:
parent
c2a6bd14eb
commit
3adf38105e
|
@ -188,46 +188,48 @@ def fetch_callback(channel, method, header, body):
|
|||
# matches
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
if harvester.info()['name'] == obj.source.type:
|
||||
fetch_and_import_stages(harvester, obj)
|
||||
|
||||
# See if the plugin can fetch the harvest object
|
||||
obj.fetch_started = datetime.datetime.now()
|
||||
obj.state = "FETCH"
|
||||
obj.save()
|
||||
success_fetch = harvester.fetch_stage(obj)
|
||||
obj.fetch_finished = datetime.datetime.now()
|
||||
obj.save()
|
||||
if success_fetch:
|
||||
# If no errors where found, call the import method
|
||||
obj.import_started = datetime.datetime.now()
|
||||
obj.state = "IMPORT"
|
||||
obj.save()
|
||||
success_import = harvester.import_stage(obj)
|
||||
obj.import_finished = datetime.datetime.now()
|
||||
if success_import:
|
||||
obj.state = "COMPLETE"
|
||||
else:
|
||||
obj.state = "ERROR"
|
||||
obj.save()
|
||||
else:
|
||||
obj.state = "ERROR"
|
||||
obj.save()
|
||||
if obj.report_status:
|
||||
continue
|
||||
if obj.state == 'ERROR':
|
||||
obj.report_status = 'errored'
|
||||
elif obj.current == False:
|
||||
obj.report_status = 'deleted'
|
||||
elif len(model.Session.query(HarvestObject)
|
||||
.filter_by(package_id = obj.package_id)
|
||||
.limit(2)
|
||||
.all()) == 2:
|
||||
obj.report_status = 'updated'
|
||||
else:
|
||||
obj.report_status = 'added'
|
||||
obj.save()
|
||||
model.Session.remove()
|
||||
channel.basic_ack(method.delivery_tag)
|
||||
|
||||
def fetch_and_import_stages(harvester, obj):
|
||||
obj.fetch_started = datetime.datetime.now()
|
||||
obj.state = "FETCH"
|
||||
obj.save()
|
||||
success_fetch = harvester.fetch_stage(obj)
|
||||
obj.fetch_finished = datetime.datetime.now()
|
||||
obj.save()
|
||||
if success_fetch:
|
||||
# If no errors where found, call the import method
|
||||
obj.import_started = datetime.datetime.now()
|
||||
obj.state = "IMPORT"
|
||||
obj.save()
|
||||
success_import = harvester.import_stage(obj)
|
||||
obj.import_finished = datetime.datetime.now()
|
||||
if success_import:
|
||||
obj.state = "COMPLETE"
|
||||
else:
|
||||
obj.state = "ERROR"
|
||||
obj.save()
|
||||
else:
|
||||
obj.state = "ERROR"
|
||||
obj.save()
|
||||
if obj.report_status:
|
||||
continue
|
||||
if obj.state == 'ERROR':
|
||||
obj.report_status = 'errored'
|
||||
elif obj.current == False:
|
||||
obj.report_status = 'deleted'
|
||||
elif len(model.Session.query(HarvestObject)
|
||||
.filter_by(package_id = obj.package_id)
|
||||
.limit(2)
|
||||
.all()) == 2:
|
||||
obj.report_status = 'updated'
|
||||
else:
|
||||
obj.report_status = 'added'
|
||||
obj.save()
|
||||
|
||||
def get_gather_consumer():
|
||||
consumer = get_consumer('ckan.harvest.gather','harvest_job_id')
|
||||
log.debug('Gather queue consumer registered')
|
||||
|
|
Loading…
Reference in New Issue