Add command to reimport existing harvest objects
This commit is contained in:
parent
f7c6854a1d
commit
e320d0588f
|
@ -38,7 +38,12 @@ class Harvester(CkanCommand):
|
|||
|
||||
harvester fetch_consumer
|
||||
- starts the consumer for the fetching queue
|
||||
|
||||
|
||||
harvester import [{source-id}]
|
||||
- perform the import stage with the last fetched objects, optionally belonging to a certain source.
|
||||
Please note that no objects will be fetched from the remote server. It will only affect
|
||||
the last fetched objects already present in the database.
|
||||
|
||||
The commands should be run from the ckanext-harvest directory and expect
|
||||
a development.ini file to be present. Most of the time you will
|
||||
specify the config explicitly though::
|
||||
|
@ -82,9 +87,10 @@ class Harvester(CkanCommand):
|
|||
logging.getLogger('amqplib').setLevel(logging.INFO)
|
||||
consumer = get_fetch_consumer()
|
||||
consumer.wait()
|
||||
elif cmd == "initdb":
|
||||
elif cmd == 'initdb':
|
||||
self.initdb()
|
||||
|
||||
elif cmd == 'import':
|
||||
self.import_stage()
|
||||
else:
|
||||
print 'Command %s not recognized' % cmd
|
||||
|
||||
|
@ -185,9 +191,16 @@ class Harvester(CkanCommand):
|
|||
jobs = run_harvest_jobs()
|
||||
except:
|
||||
pass
|
||||
sys.exit(1)
|
||||
sys.exit(0)
|
||||
#print 'Sent %s jobs to the gather queue' % len(jobs)
|
||||
|
||||
def import_stage(self):
|
||||
if len(self.args) >= 2:
|
||||
source_id = unicode(self.args[1])
|
||||
else:
|
||||
source_id = None
|
||||
import_last_objects(source_id)
|
||||
|
||||
def print_harvest_sources(self, sources):
|
||||
if sources:
|
||||
print ''
|
||||
|
|
|
@ -2,10 +2,11 @@ import urlparse
|
|||
from sqlalchemy import distinct,func
|
||||
from ckan.model import Session, repo
|
||||
from ckan.model import Package
|
||||
|
||||
from ckan.plugins import PluginImplementations
|
||||
from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject, \
|
||||
HarvestGatherError, HarvestObjectError
|
||||
from ckanext.harvest.queue import get_gather_publisher
|
||||
from ckanext.harvest.interfaces import IHarvester
|
||||
|
||||
log = __import__("logging").getLogger(__name__)
|
||||
|
||||
|
@ -314,3 +315,37 @@ def get_harvest_objects(**kwds):
|
|||
objects = HarvestObject.filter(**kwds).all()
|
||||
return [_object_as_dict(obj) for obj in objects]
|
||||
|
||||
def import_last_objects(source_id=None):
|
||||
if source_id:
|
||||
try:
|
||||
source = HarvestSource.get(source_id)
|
||||
except:
|
||||
raise Exception('Source %s does not exist' % source_id)
|
||||
last_objects = Session.query(HarvestObject) \
|
||||
.join(HarvestJob) \
|
||||
.filter(HarvestJob.source==source) \
|
||||
.filter(HarvestObject.package!=None) \
|
||||
.order_by(HarvestObject.guid) \
|
||||
.order_by(HarvestObject.reference_date.desc()) \
|
||||
.order_by(HarvestObject.created.desc()) \
|
||||
.all()
|
||||
else:
|
||||
last_objects = Session.query(HarvestObject) \
|
||||
.filter(HarvestObject.package!=None) \
|
||||
.order_by(HarvestObject.guid) \
|
||||
.order_by(HarvestObject.reference_date.desc()) \
|
||||
.order_by(HarvestObject.created.desc()) \
|
||||
.all()
|
||||
|
||||
|
||||
last_obj_guid = ''
|
||||
imported_objects = []
|
||||
for obj in last_objects:
|
||||
if obj.guid != last_obj_guid:
|
||||
imported_objects.append(obj)
|
||||
for harvester in PluginImplementations(IHarvester):
|
||||
if harvester.get_type() == obj.job.source.type:
|
||||
harvester.import_stage(obj)
|
||||
last_obj_guid = obj.guid
|
||||
|
||||
return imported_objects
|
||||
|
|
Loading…
Reference in New Issue