[refactoring] Move common functions to lib. Adapt the CLI to use these common functions.

This commit is contained in:
Adrià Mercader 2011-04-05 11:53:39 +01:00
parent ce86cfde1b
commit e819a68f21
2 changed files with 228 additions and 172 deletions

View File

@ -4,34 +4,34 @@ from pprint import pprint
from ckan.lib.cli import CkanCommand
from ckan.model import repo
from ckanext.harvest.model import HarvestSource, HarvestingJob, HarvestedDocument
from ckanext.harvest.lib import save_extent
#from ckanext.harvest.model import HarvestSource, HarvestingJob, HarvestedDocument
#from ckanext.harvest.model import HarvestSource, HarvestJob, HarvestObject
from ckanext.harvest.lib import *
class Harvester(CkanCommand):
'''Harvests remotely mastered metadata
Usage:
harvester source {url} [{user-ref} [{publisher-ref}]]
harvester source {url} {type} [{active}] [{user-id}] [{publisher-id}]
- create new harvest source
harvester rmsource {url}
harvester rmsource {id}
- remove a harvester source (and associated jobs)
harvester sources
- lists harvest sources
harvester job {source-id} [{user-ref}]
- create new harvesting job
harvester job {source-id}
- create new harvest job
harvester rmjob {job-id}
- remove a harvesting job
- remove a harvest job
harvester jobs
- lists harvesting jobs
- lists harvest jobs
harvester run
- runs harvesting jobs
- runs harvest jobs
harvester extents
- creates or updates the extent geometry column for packages with
@ -47,12 +47,11 @@ class Harvester(CkanCommand):
summary = __doc__.split('\n')[0]
usage = __doc__
max_args = 4
max_args = 6
min_args = 0
def command(self):
self._load_config()
# Clear the 'No handlers could be found for logger "vdm"' warning message.
print ""
if len(self.args) == 0:
@ -60,43 +59,17 @@ class Harvester(CkanCommand):
sys.exit(1)
cmd = self.args[0]
if cmd == 'source':
if len(self.args) >= 2:
url = unicode(self.args[1])
else:
print self.usage
print 'Error, source url is not given.'
sys.exit(1)
if len(self.args) >= 3:
user_ref = unicode(self.args[2])
else:
user_ref = u''
if len(self.args) >= 4:
publisher_ref = unicode(self.args[3])
else:
publisher_ref = u''
self.register_harvest_source(url, user_ref, publisher_ref)
self.create_harvest_source()
elif cmd == "rmsource":
url = unicode(self.args[1])
self.remove_harvest_source(url)
self.remove_harvest_source()
elif cmd == 'sources':
self.list_harvest_sources()
elif cmd == 'job':
if len(self.args) >= 2:
source_id = unicode(self.args[1])
else:
print self.usage
print 'Error, job source is not given.'
sys.exit(1)
if len(self.args) >= 3:
user_ref = unicode(self.args[2])
else:
user_ref = u''
self.register_harvesting_job(source_id, user_ref)
self.create_harvest_job()
elif cmd == "rmjob":
job_id = unicode(self.args[1])
self.remove_harvesting_job(job_id)
self.remove_harvest_job()
elif cmd == 'jobs':
self.list_harvesting_jobs()
self.list_harvest_jobs()
elif cmd == 'run':
self.run_harvester()
elif cmd == 'extents':
@ -107,32 +80,95 @@ class Harvester(CkanCommand):
def _load_config(self):
super(Harvester, self)._load_config()
import logging
logging.basicConfig()
logger_vdm = logging.getLogger('vdm')
logger_vdm.setLevel(logging.ERROR)
def update_extents(self):
from ckan.model import PackageExtra, Package, Session
conn = Session.connection()
packages = [extra.package \
for extra in \
Session.query(PackageExtra).filter(PackageExtra.key == 'bbox-east-long').all()]
def create_harvest_source(self):
error = False
for package in packages:
try:
save_extent(package)
except:
errors = True
if error:
msg = "There was an error saving the package extent. Have you set up the package_extent table in the DB?"
if len(self.args) >= 2:
url = unicode(self.args[1])
else:
msg = "Done. Extents generated for %i packages" % len(packages)
print 'Please provide a source URL'
sys.exit(1)
if len(self.args) >= 3:
type = unicode(self.args[2])
else:
print 'Please provide a source type'
sys.exit(1)
if len(self.args) >= 4:
active = not(self.args[3].lower() == 'false' or \
self.args[3] == '0')
else:
active = True
if len(self.args) >= 5:
user_id = unicode(self.args[4])
else:
user_id = u''
if len(self.args) >= 6:
publisher_id = unicode(self.args[5])
else:
publisher_id = u''
print msg
source = create_harvest_source({
'url':url,
'type':type,
'active':active,
'user_id':user_id,
'publisher_id':publisher_id})
print 'Created new harvest source:'
self.print_harvest_source(source)
sources = get_harvest_sources()
self.print_there_are('harvest source', sources)
# Create a Harvest Job for the new Source
create_harvest_job(source.id)
print 'A new Harvest Job for this source has also been created'
def remove_harvest_source(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
delete_harvest_source(source_id)
print 'Removed harvest source: %s' % source_id
def list_harvest_sources(self):
sources = get_harvest_sources()
self.print_harvest_sources(sources)
self.print_there_are(what="harvest source", sequence=sources)
def create_harvest_job(self):
if len(self.args) >= 2:
source_id = unicode(self.args[1])
else:
print 'Please provide a source id'
sys.exit(1)
job = create_harvest_job(source_id)
self.print_harvest_job(job)
status = u'New'
jobs = get_harvest_jobs(status=status)
self.print_there_are('harvest jobs', jobs, condition=status)
def remove_harvest_job(self):
if len(self.args) >= 2:
job_id = unicode(self.args[1])
else:
print 'Please provide a job id'
sys.exit(1)
delete_harvest_job(job_id)
print 'Removed harvest job: %s' % job_id
def list_harvest_jobs(self):
jobs = get_harvest_jobs()
self.print_harvest_jobs(jobs)
self.print_there_are(what='harvest job', sequence=jobs)
#TODO: Move to lib and implement the queue system
def run_harvester(self, *args, **kwds):
from pylons.i18n.translation import _get_translator
import pylons
@ -168,95 +204,27 @@ class Harvester(CkanCommand):
else:
print "There are no new harvesting jobs."
def remove_harvesting_job(self, job_id):
try:
job = HarvestingJob.get(job_id)
job.delete()
repo.commit_and_remove()
print "Removed job: %s" % job_id
except:
print "No such job"
#TODO: move to ckanext-?? for geo stuff
def update_extents(self):
from ckan.model import PackageExtra, Package, Session
conn = Session.connection()
packages = [extra.package \
for extra in \
Session.query(PackageExtra).filter(PackageExtra.key == 'bbox-east-long').all()]
def register_harvesting_job(self, source_id, user_ref):
if re.match('(http|file)://', source_id):
source_url = unicode(source_id)
source_id = None
sources = HarvestSource.filter(url=source_url).all()
if sources:
source = sources[0]
else:
source = self.create_harvest_source(url=source_url, user_ref=user_ref, publisher_ref=u'')
error = False
for package in packages:
try:
save_extent(package)
except:
errors = True
if error:
msg = "There was an error saving the package extent. Have you set up the package_extent table in the DB?"
else:
source = HarvestSource.get(source_id)
objects = HarvestingJob.filter(status='New', source=source)
if objects.count():
raise Exception('There is already an unrun job for the harvest source %r'%source.id)
job = HarvestingJob(
source=source,
user_ref=user_ref,
status=u"New",
)
job.save()
print "Created new harvesting job:"
self.print_harvesting_job(job)
status = u"New"
jobs = HarvestingJob.filter(status=status).all()
self.print_there_are("harvesting job", jobs, condition=status)
msg = "Done. Extents generated for %i packages" % len(packages)
def register_harvest_source(self, url, user_ref, publisher_ref):
existing = self.get_harvest_sources(url=url)
if existing:
print "Error, there is already a harvesting source for that URL"
self.print_harvest_sources(existing)
sys.exit(1)
else:
source = self.create_harvest_source(url=url, user_ref=user_ref, publisher_ref=publisher_ref)
self.register_harvesting_job(source.id, user_ref)
print "Created new harvest source:"
self.print_harvest_source(source)
sources = self.get_harvest_sources()
self.print_there_are("harvest source", sources)
def remove_harvest_source(self, url):
repo.new_revision()
sources = HarvestSource.filter(url=url)
if sources.count() == 0:
print "No such source"
else:
source = sources[0]
jobs = HarvestingJob.filter(source=source)
print "Removing %d jobs" % jobs.count()
for job in jobs:
job.delete()
source.delete()
repo.commit_and_remove()
print "Removed harvest source: %s" % url
def list_harvest_sources(self):
sources = self.get_harvest_sources()
self.print_harvest_sources(sources)
self.print_there_are(what="harvest source", sequence=sources)
def list_harvesting_jobs(self):
jobs = self.get_harvesting_jobs()
self.print_harvesting_jobs(jobs)
self.print_there_are(what="harvesting job", sequence=jobs)
def get_harvest_sources(self, **kwds):
return HarvestSource.filter(**kwds).all()
def get_harvesting_jobs(self, **kwds):
return HarvestingJob.filter(**kwds).all()
def create_harvest_source(self, **kwds):
source = HarvestSource(**kwds)
source.save()
return source
def create_harvesting_job(self, **kwds):
job = HarvestingJob(**kwds)
job.save()
return job
print msg
def print_harvest_sources(self, sources):
if sources:
@ -265,43 +233,46 @@ class Harvester(CkanCommand):
self.print_harvest_source(source)
def print_harvest_source(self, source):
print "Source id: %s" % source.id
print " url: %s" % source.url
print " user: %s" % source.user_ref
print "publisher: %s" % source.publisher_ref
print " docs: %s" % len(source.documents)
print ""
print 'Source id: %s' % source.id
print ' url: %s' % source.url
print ' type: %s' % source.type
print ' active: %s' % source.active
print ' user: %s' % source.user_id
print 'publisher: %s' % source.publisher_id
print ' objects: %s' % len(source.objects)
print ''
def print_harvesting_jobs(self, jobs):
def print_harvest_jobs(self, jobs):
if jobs:
print ""
print ''
for job in jobs:
self.print_harvesting_job(job)
self.print_harvest_job(job)
def print_harvesting_job(self, job):
print "Job id: %s" % job.id
if job.user_ref:
print " user: %s" % job.user_ref
print "status: %s" % job.status
print "source: %s" % job.source.id
print " url: %s" % job.source.url
def print_harvest_job(self, job):
print 'Job id: %s' % job.id
print 'status: %s' % job.status
print 'source: %s' % job.source.id
print ' url: %s' % job.source.url
#print "report: %s" % job.report
#TODO: print errors
'''
if job.report and job.report['added']:
for package_id in job.report['added']:
print " doc: %s" % package_id
if job.report and job.report['errors']:
for msg in job.report['errors']:
print " error: %s" % msg
print ""
'''
print ''
def print_there_are(self, what, sequence, condition=""):
def print_there_are(self, what, sequence, condition=''):
is_singular = self.is_singular(sequence)
print "There %s %s %s%s%s" % (
is_singular and "is" or "are",
print 'There %s %s %s%s%s' % (
is_singular and 'is' or 'are',
len(sequence),
condition and ("%s " % condition.lower()) or "",
condition and ('%s ' % condition.lower()) or '',
what,
not is_singular and "s" or "",
not is_singular and 's' or '',
)
def is_singular(self, sequence):

View File

@ -1,9 +1,93 @@
from ckan.model import Session
from ckan.model import repo
from ckan.lib.base import config
from ckanext.harvest.model import HarvestSource, HarvestJob
log = __import__("logging").getLogger(__name__)
def get_harvest_source(id,default=Exception,attr=None):
return HarvestSource.get(id,default=default,attr=attr)
def get_harvest_sources(**kwds):
return HarvestSource.filter(**kwds).all()
def create_harvest_source(source_dict):
if not 'url' in source_dict or not source_dict['url'] or \
not 'type' in source_dict or not source_dict['type']:
raise Exception('Missing mandatory properties: url, type')
# Check if source already exists
exists = get_harvest_sources(url=source_dict['url'])
if len(exists):
raise Exception('There is already a Harvest Source for this URL: %s' % source_dict['url'])
source = HarvestSource()
source.url = source_dict['url']
source.type = source_dict['type']
print str(source_dict['active'])
opt = ['active','description','user_id','publisher_id']
for o in opt:
if o in source_dict and source_dict[o] is not None:
source.__setattr__(o,source_dict[o])
source.save()
return source
def delete_harvest_source(source_id):
try:
source = HarvestSource.get(source_id)
except:
raise Exception('Source %s does not exist' % source_id)
source.delete()
repo.commit_and_remove()
#TODO: Jobs?
return True
def get_harvest_job(id,attr=None):
return HarvestJob.get(id,attr)
def get_harvest_jobs(**kwds):
return HarvestJob.filter(**kwds).all()
def create_harvest_job(source_id):
# Check if source exists
try:
source = get_harvest_source(source_id)
except:
raise Exception('Source %s does not exist' % source_id)
# Check if there already is an unrun job for this source
exists = get_harvest_jobs(source=source,status=u'New')
if len(exists):
raise Exception('There already is an unrun job for this source')
job = HarvestJob()
job.source = source
job.save()
return job
def delete_harvest_job(job_id):
try:
job = HarvestJob.get(job_id)
except:
raise Exception('Job %s does not exist' % job_id)
job.delete()
repo.commit_and_remove()
#TODO: objects?
return True
#TODO: move to ckanext-?? for geo stuff
def get_srid(crs):
"""Returns the SRID for the provided CRS definition
The CRS can be defined in the following formats
@ -20,6 +104,7 @@ def get_srid(crs):
return int(srid)
#TODO: move to ckanext-?? for geo stuff
def save_extent(package,extent=False):
'''Updates the package extent in the package_extent geometry column
If no extent provided (as a dict with minx,miny,maxx,maxy and srid keys),