From ba486a9482894e6216f7fe9279195beb64e07606 Mon Sep 17 00:00:00 2001 From: joetsoi Date: Wed, 27 Feb 2013 11:34:09 +0000 Subject: [PATCH 1/6] add indexing of datasets whilst harvesting --- ckanext/harvest/commands/harvester.py | 4 ++-- ckanext/harvest/harvesters/base.py | 12 +++++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 1088e8d..6aeeaa0 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -251,7 +251,7 @@ class Harvester(CkanCommand): self.print_harvest_job(job) jobs = get_action('harvest_job_list')(context,{'status':u'New'}) - self.print_there_are('harvest jobs', jobs, condition=u'New') + self.print_there_are('harvest job', jobs, condition=u'New') def list_harvest_jobs(self): context = {'model': model, 'user': self.admin_user['name'], 'session':model.Session} @@ -319,7 +319,7 @@ class Harvester(CkanCommand): print ' Job id: %s' % job['id'] print ' status: %s' % job['status'] print ' source: %s' % job['source_id'] - print ' objects: %s' % len(job['objects']) + print ' objects: %s' % len(job.get('objects', [])) print 'gather_errors: %s' % len(job['gather_errors']) if (len(job['gather_errors']) > 0): diff --git a/ckanext/harvest/harvesters/base.py b/ckanext/harvest/harvesters/base.py index a7876ac..913442d 100644 --- a/ckanext/harvest/harvesters/base.py +++ b/ckanext/harvest/harvesters/base.py @@ -5,6 +5,7 @@ import uuid from sqlalchemy.sql import update,and_, bindparam from sqlalchemy.exc import InvalidRequestError +from ckan import plugins as p from ckan import model from ckan.model import Session, Package from ckan.logic import ValidationError, NotFound, get_action @@ -173,8 +174,17 @@ class HarvesterBase(SingletonPlugin): package_dict['name'] = self._gen_new_name(package_dict['title']) log.info('Package with GUID %s does not exist, let\'s create it' % harvest_object.guid) + harvest_object.current = True + harvest_object.package_id = package_dict['id'] + # Defer constraints and flush so the dataset can be indexed with + # the harvest object id (on the after_show hook from the harvester + # plugin) + harvest_object.add() + + model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') + model.Session.flush() + new_package = get_action('package_create_rest')(context, package_dict) - harvest_object.package_id = new_package['id'] # Flag the other objects linking to this package as not current anymore from ckanext.harvest.model import harvest_object_table From dab98112dc917eb5ea4fd381e6164af3f555873f Mon Sep 17 00:00:00 2001 From: amercader Date: Thu, 28 Feb 2013 15:47:35 +0000 Subject: [PATCH 2/6] Fix bug in harvest job reports --- ckanext/harvest/logic/action/get.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/harvest/logic/action/get.py b/ckanext/harvest/logic/action/get.py index 055bfeb..6433ad7 100644 --- a/ckanext/harvest/logic/action/get.py +++ b/ckanext/harvest/logic/action/get.py @@ -166,7 +166,7 @@ def harvest_job_report(context, data_dict): report = { 'gather_errors': [], - 'object_errors': [] + 'object_errors': {} } # Gather errors From 9432368bea6100c9883972663c3f09c740239780 Mon Sep 17 00:00:00 2001 From: joetsoi Date: Thu, 28 Feb 2013 19:06:21 +0000 Subject: [PATCH 3/6] fix gather_stage if there is a previous job change check on gather stage to check for changed packages since last job instead of current harvest job's gather_start fix attribute look up bug fix print_job to print 0 gather_errors instead of key error --- ckanext/harvest/commands/harvester.py | 4 ++-- ckanext/harvest/harvesters/ckanharvester.py | 17 +++++++---------- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 6aeeaa0..efd32b3 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -321,9 +321,9 @@ class Harvester(CkanCommand): print ' source: %s' % job['source_id'] print ' objects: %s' % len(job.get('objects', [])) - print 'gather_errors: %s' % len(job['gather_errors']) + print 'gather_errors: %s' % len(job.get('gather_errors', [])) if (len(job['gather_errors']) > 0): - for error in job['gather_errors']: + for error in job.get('gather_errors', []): print ' %s' % error['message'] print '' diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 4919098..ec13a90 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -35,15 +35,12 @@ class CKANHarvester(HarvesterBase): url = url, ) - try: - api_key = self.config.get('api_key',None) - if api_key: - http_request.add_header('Authorization',api_key) - http_response = urllib2.urlopen(http_request) + api_key = self.config.get('api_key',None) + if api_key: + http_request.add_header('Authorization',api_key) + http_response = urllib2.urlopen(http_request) - return http_response.read() - except Exception, e: - raise e + return http_response.read() def _set_config(self,config_str): if config_str: @@ -135,7 +132,7 @@ class CKANHarvester(HarvesterBase): get_all_packages = False # Request only the packages modified since last harvest job - last_time = harvest_job.gather_started.isoformat() + last_time = previous_job.gather_finished.isoformat() url = base_search_url + '/revision?since_time=%s' % last_time try: @@ -152,7 +149,7 @@ class CKANHarvester(HarvesterBase): continue revision = json.loads(content) - for package_id in revision.packages: + for package_id in revision['packages']: if not package_id in package_ids: package_ids.append(package_id) else: From bd128ab58b5e7982d73af14a46c5b0926a3a2101 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 1 Mar 2013 12:52:58 +0000 Subject: [PATCH 4/6] Refresh session after each harvest stage Otherwise the eg the source config got cached and you needed to restart the consumers to refresh it. --- ckanext/harvest/queue.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/queue.py b/ckanext/harvest/queue.py index d59406c..48a67a1 100644 --- a/ckanext/harvest/queue.py +++ b/ckanext/harvest/queue.py @@ -143,6 +143,7 @@ def gather_callback(channel, method, header, body): except KeyError: log.error('No harvest job id received') finally: + model.Session.remove() channel.basic_ack(method.delivery_tag) @@ -214,7 +215,7 @@ def fetch_callback(channel, method, header, body): else: obj.report_status = 'new' obj.save() - + model.Session.remove() channel.basic_ack(method.delivery_tag) def get_gather_consumer(): From 182fbf054aaa3460412aa948c7ded08fc896dac1 Mon Sep 17 00:00:00 2001 From: amercader Date: Fri, 1 Mar 2013 17:25:35 +0000 Subject: [PATCH 5/6] Add XML declaration to contents if not present --- ckanext/harvest/controllers/view.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ckanext/harvest/controllers/view.py b/ckanext/harvest/controllers/view.py index 5ac4bd8..0f6a3eb 100644 --- a/ckanext/harvest/controllers/view.py +++ b/ckanext/harvest/controllers/view.py @@ -265,6 +265,9 @@ class ViewController(BaseController): etree.fromstring(re.sub('<\?xml(.*)\?>','',content)) response.content_type = 'application/xml; charset=utf-8' + if not '\n' + content + except XMLSyntaxError: try: json.loads(obj['content']) @@ -274,7 +277,7 @@ class ViewController(BaseController): pass response.headers['Content-Length'] = len(content) - return content + return content.encode('utf-8') except NotFound: abort(404,_('Harvest object not found')) except NotAuthorized,e: From e64c8ead0fadd7ee7020448ab23eca1686f495e1 Mon Sep 17 00:00:00 2001 From: joetsoi Date: Tue, 5 Mar 2013 12:49:20 +0000 Subject: [PATCH 6/6] fix print gather_errors --- ckanext/harvest/commands/harvester.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index efd32b3..8abde60 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -322,9 +322,8 @@ class Harvester(CkanCommand): print ' objects: %s' % len(job.get('objects', [])) print 'gather_errors: %s' % len(job.get('gather_errors', [])) - if (len(job['gather_errors']) > 0): - for error in job.get('gather_errors', []): - print ' %s' % error['message'] + for error in job.get('gather_errors', []): + print ' %s' % error['message'] print ''