diff --git a/ckanext/harvest/harvesters/ckanharvester.py b/ckanext/harvest/harvesters/ckanharvester.py index 9a449a0..eb335c2 100644 --- a/ckanext/harvest/harvesters/ckanharvester.py +++ b/ckanext/harvest/harvesters/ckanharvester.py @@ -1,6 +1,8 @@ import urllib import urllib2 +from sqlalchemy import exists + from ckan.lib.base import c from ckan import model from ckan.logic import ValidationError, NotFound, get_action @@ -8,7 +10,7 @@ from ckan.lib.helpers import json from ckan.lib.munge import munge_name from ckan.plugins import toolkit -from ckanext.harvest.model import HarvestJob, HarvestObject +from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError import logging log = logging.getLogger(__name__) @@ -148,21 +150,9 @@ class CKANHarvester(HarvesterBase): self._set_config(harvest_job.source.config) - # Check if there is a previous harvest, ignoring whether it was - # successful (i.e. current=True) or not - previous_job = \ - model.Session.query(HarvestJob) \ - .filter(HarvestJob.source == harvest_job.source) \ - .filter(HarvestJob.gather_finished != None) \ - .filter(HarvestJob.id != harvest_job.id) \ - .order_by(HarvestJob.gather_finished.desc()) \ - .first() - # Get source URL remote_ckan_base_url = harvest_job.source.url.rstrip('/') - log.debug('Previous job: %r', previous_job) - # Filter in/out datasets from particular organizations fq_terms = [] org_filter_include = self.config.get('organizations_filter_include', []) @@ -175,15 +165,15 @@ class CKANHarvester(HarvesterBase): '-organization:%s' % org_name for org_name in org_filter_exclude) # Ideally we can request from the remote CKAN only those datasets - # modified since last harvest job - if (previous_job and - not previous_job.gather_errors and - not len(previous_job.objects) == 0 and + # modified since the last completely successful harvest. + last_error_free_job = self._last_error_free_job(harvest_job) + log.debug('Last error-free job: %r', last_error_free_job) + if (last_error_free_job and not self.config.get('force_all', False)): get_all_packages = False - # Request only the datasets modified since last harvest job - last_time = previous_job.gather_started.isoformat() + # Request only the datasets modified since + last_time = last_error_free_job.gather_started.isoformat() # Note: SOLR works in UTC, and gather_started is also UTC, so # this should work as long as local and remote clocks are # relatively accurate @@ -295,6 +285,31 @@ class CKANHarvester(HarvesterBase): return pkg_dicts + @classmethod + def _last_error_free_job(cls, harvest_job): + # TODO weed out cancelled jobs somehow. + # look for jobs with no gather errors + jobs = \ + model.Session.query(HarvestJob) \ + .filter(HarvestJob.source == harvest_job.source) \ + .filter(HarvestJob.gather_started != None) \ + .filter(HarvestJob.status == 'Finished') \ + .filter(HarvestJob.id != harvest_job.id) \ + .filter( + ~exists().where( + HarvestGatherError.harvest_job_id == HarvestJob.id)) \ + .order_by(HarvestJob.gather_started.desc()) + # now check them until we find one with no fetch/import errors + # (looping rather than doing sql, in case there are lots of objects + # and lots of jobs) + for job in jobs: + for obj in job.objects: + if obj.current is False: + # unsuccessful, so go onto the next job + break + else: + return job + def fetch_stage(self, harvest_object): # Nothing to do here - we got the package dict in the search in the # gather stage diff --git a/ckanext/harvest/logic/action/update.py b/ckanext/harvest/logic/action/update.py index 9e871d9..52b25e2 100644 --- a/ckanext/harvest/logic/action/update.py +++ b/ckanext/harvest/logic/action/update.py @@ -343,16 +343,19 @@ def harvest_jobs_run(context,data_dict): if len(jobs): for job in jobs: if job['gather_finished']: - objects = session.query(HarvestObject.id) \ - .filter(HarvestObject.harvest_job_id==job['id']) \ - .filter(and_((HarvestObject.state!=u'COMPLETE'), - (HarvestObject.state!=u'ERROR'))) \ - .order_by(HarvestObject.import_finished.desc()) + num_objects_in_progress = \ + session.query(HarvestObject.id) \ + .filter(HarvestObject.harvest_job_id==job['id']) \ + .filter(and_((HarvestObject.state!=u'COMPLETE'), + (HarvestObject.state!=u'ERROR'))) \ + .count() - if objects.count() == 0: + if num_objects_in_progress == 0: job_obj = HarvestJob.get(job['id']) job_obj.status = u'Finished' + log.info('Marking job as finished %s %s', job.source.url, job.id) + # save the time of finish, according to the last running object last_object = session.query(HarvestObject) \ .filter(HarvestObject.harvest_job_id==job['id']) \ .filter(HarvestObject.import_finished!=None) \ @@ -361,6 +364,7 @@ def harvest_jobs_run(context,data_dict): if last_object: job_obj.finished = last_object.import_finished job_obj.save() + # Reindex the harvest source dataset so it has the latest # status get_action('harvest_source_reindex')(context,