From b7552ba7004c505ded5b2fbf939eda1ce94708ad Mon Sep 17 00:00:00 2001 From: David Read Date: Mon, 2 Nov 2015 16:59:19 +0000 Subject: [PATCH] [#158] Try harder to use the "get datasets since time X" method of harvesting. Go back to the last completely successful harvest, rather than just consider the previous one. And that had a bug, because fetch errors were ignored, meaning one fetch error could mean that dataset never got harvested again. --- ckanext/harvest/harvesters/ckanharvester.py | 53 +++++++++++++-------- ckanext/harvest/logic/action/update.py | 16 ++++--- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 9a449a0..eb335c2 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -1,6 +1,8 @@ import urllib import urllib2 +from sqlalchemy import exists + from ckan.lib.base import c from ckan import model from ckan.logic import ValidationError, NotFound, get_action @@ -8,7 +10,7 @@ from ckan.lib.helpers import json from ckan.lib.munge import munge_name from ckan.plugins import toolkit -from ckanext.harvest.model import HarvestJob, HarvestObject +from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError import logging log = logging.getLogger(__name__) @@ -148,21 +150,9 @@ class CKANHarvester(HarvesterBase): self._set_config(harvest_job.source.config) - # Check if there is a previous harvest, ignoring whether it was - # successful (i.e. current=True) or not - previous_job = \ - model.Session.query(HarvestJob) \ - .filter(HarvestJob.source == harvest_job.source) \ - .filter(HarvestJob.gather_finished != None) \ - .filter(HarvestJob.id != harvest_job.id) \ - .order_by(HarvestJob.gather_finished.desc()) \ - .first() - # Get source URL remote_ckan_base_url = harvest_job.source.url.rstrip('/') - log.debug('Previous job: %r', previous_job) - # Filter in/out datasets from particular organizations fq_terms = [] org_filter_include = self.config.get('organizations_filter_include', []) @@ -175,15 +165,15 @@ class CKANHarvester(HarvesterBase): '-organization:%s' % org_name for org_name in org_filter_exclude) # Ideally we can request from the remote CKAN only those datasets - # modified since last harvest job - if (previous_job and - not previous_job.gather_errors and - not len(previous_job.objects) == 0 and + # modified since the last completely successful harvest. + last_error_free_job = self._last_error_free_job(harvest_job) + log.debug('Last error-free job: %r', last_error_free_job) + if (last_error_free_job and not self.config.get('force_all', False)): get_all_packages = False - # Request only the datasets modified since last harvest job - last_time = previous_job.gather_started.isoformat() + # Request only the datasets modified since + last_time = last_error_free_job.gather_started.isoformat() # Note: SOLR works in UTC, and gather_started is also UTC, so # this should work as long as local and remote clocks are # relatively accurate @@ -295,6 +285,31 @@ class CKANHarvester(HarvesterBase): return pkg_dicts + @classmethod + def _last_error_free_job(cls, harvest_job): + # TODO weed out cancelled jobs somehow. + # look for jobs with no gather errors + jobs = \ + model.Session.query(HarvestJob) \ + .filter(HarvestJob.source == harvest_job.source) \ + .filter(HarvestJob.gather_started != None) \ + .filter(HarvestJob.status == 'Finished') \ + .filter(HarvestJob.id != harvest_job.id) \ + .filter( + ~exists().where( + HarvestGatherError.harvest_job_id == HarvestJob.id)) \ + .order_by(HarvestJob.gather_started.desc()) + # now check them until we find one with no fetch/import errors + # (looping rather than doing sql, in case there are lots of objects + # and lots of jobs) + for job in jobs: + for obj in job.objects: + if obj.current is False: + # unsuccessful, so go onto the next job + break + else: + return job + def fetch_stage(self, harvest_object): # Nothing to do here - we got the package dict in the search in the # gather stage diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 9e871d9..52b25e2 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -343,16 +343,19 @@ def harvest_jobs_run(context,data_dict): if len(jobs): for job in jobs: if job['gather_finished']: - objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(and_((HarvestObject.state!=u'COMPLETE'), - (HarvestObject.state!=u'ERROR'))) \ - .order_by(HarvestObject.import_finished.desc()) + num_objects_in_progress = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.harvest_job_id==job['id']) \ + .filter(and_((HarvestObject.state!=u'COMPLETE'), + (HarvestObject.state!=u'ERROR'))) \ + .count() - if objects.count() == 0: + if num_objects_in_progress == 0: job_obj = HarvestJob.get(job['id']) job_obj.status = u'Finished' + log.info('Marking job as finished %s %s', job.source.url, job.id) + # save the time of finish, according to the last running object last_object = session.query(HarvestObject) \ .filter(HarvestObject.harvest_job_id==job['id']) \ .filter(HarvestObject.import_finished!=None) \ @@ -361,6 +364,7 @@ def harvest_jobs_run(context,data_dict): if last_object: job_obj.finished = last_object.import_finished job_obj.save() + # Reindex the harvest source dataset so it has the latest # status get_action('harvest_source_reindex')(context,