Allow defining segments of harvest objects to import

Useful when importing large number of objects, as it allows
parallelization
This commit is contained in:
amercader 2012-08-02 18:41:59 +01:00
parent 7011efe5dc
commit 4d2fdeac57
2 changed files with 21 additions and 2 deletions

View File

@ -40,7 +40,7 @@ class Harvester(CkanCommand):
harvester fetch_consumer
- starts the consumer for the fetching queue
harvester [-j] import [{source-id}]
harvester [-j] [--segments={segments}] import [{source-id}]
- perform the import stage with the last fetched objects, optionally belonging to a certain source.
Please note that no objects will be fetched from the remote server. It will only affect
the last fetched objects already present in the database.
@ -48,6 +48,9 @@ class Harvester(CkanCommand):
If the -j flag is provided, the objects are not joined to existing datasets. This may be useful
when importing objects for the first time.
The --segments flag allows to define a string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f
harvester job-all
- create new harvest jobs for all active sources.
@ -71,6 +74,11 @@ class Harvester(CkanCommand):
self.parser.add_option('-j', '--no-join-datasets', dest='no_join_datasets',
action='store_true', default=False, help='Do not join harvest objects to existing datasets')
self.parser.add_option('--segments', dest='segments',
default=False, help=
'''A string containing hex digits that represent which of
the 16 harvest object segments to import. e.g. 15af will run segments 1,5,a,f''')
def command(self):
self._load_config()
@ -237,13 +245,15 @@ class Harvester(CkanCommand):
#print 'Sent %s jobs to the gather queue' % len(jobs)
def import_stage(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
else:
source_id = None
context = {'model': model, 'session':model.Session, 'user': self.admin_user['name'],
'join_datasets': not self.options.no_join_datasets}
'join_datasets': not self.options.no_join_datasets,
'segments': self.options.segments}
objs = get_action('harvest_objects_import')(context,{'source_id':source_id})

View File

@ -1,3 +1,5 @@
import hashlib
import logging
from ckan.plugins import PluginImplementations
@ -81,6 +83,8 @@ def harvest_objects_import(context,data_dict):
session = context['session']
source_id = data_dict.get('source_id',None)
segments = context.get('segments',None)
join_datasets = context.get('join_datasets',True)
if source_id:
@ -109,8 +113,13 @@ def harvest_objects_import(context,data_dict):
last_objects_ids = last_objects_ids.all()
last_objects = []
for obj_id in last_objects_ids:
if segments and str(hashlib.md5(obj_id[0]).hexdigest())[0] not in segments:
continue
obj = session.query(HarvestObject).get(obj_id)
for harvester in PluginImplementations(IHarvester):
if harvester.info()['name'] == obj.source.type:
if hasattr(harvester,'force_import'):