[#158] Try harder to use the "get datasets since time X" method of harvesting. Go back to the last completely successful harvest, rather than just consider the previous one. And that had a bug, because fetch errors were ignored, meaning one fetch error could mean that dataset never got harvested again.

This commit is contained in:
David Read 2015-11-02 16:59:19 +00:00
parent 1a680f3fd3
commit b7552ba700
2 changed files with 44 additions and 25 deletions

View File

@ -1,6 +1,8 @@
import urllib import urllib
import urllib2 import urllib2
from sqlalchemy import exists
from ckan.lib.base import c from ckan.lib.base import c
from ckan import model from ckan import model
from ckan.logic import ValidationError, NotFound, get_action from ckan.logic import ValidationError, NotFound, get_action
@ -8,7 +10,7 @@ from ckan.lib.helpers import json
from ckan.lib.munge import munge_name from ckan.lib.munge import munge_name
from ckan.plugins import toolkit from ckan.plugins import toolkit
from ckanext.harvest.model import HarvestJob, HarvestObject from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -148,21 +150,9 @@ class CKANHarvester(HarvesterBase):
self._set_config(harvest_job.source.config) self._set_config(harvest_job.source.config)
# Check if there is a previous harvest, ignoring whether it was
# successful (i.e. current=True) or not
previous_job = \
model.Session.query(HarvestJob) \
.filter(HarvestJob.source == harvest_job.source) \
.filter(HarvestJob.gather_finished != None) \
.filter(HarvestJob.id != harvest_job.id) \
.order_by(HarvestJob.gather_finished.desc()) \
.first()
# Get source URL # Get source URL
remote_ckan_base_url = harvest_job.source.url.rstrip('/') remote_ckan_base_url = harvest_job.source.url.rstrip('/')
log.debug('Previous job: %r', previous_job)
# Filter in/out datasets from particular organizations # Filter in/out datasets from particular organizations
fq_terms = [] fq_terms = []
org_filter_include = self.config.get('organizations_filter_include', []) org_filter_include = self.config.get('organizations_filter_include', [])
@ -175,15 +165,15 @@ class CKANHarvester(HarvesterBase):
'-organization:%s' % org_name for org_name in org_filter_exclude) '-organization:%s' % org_name for org_name in org_filter_exclude)
# Ideally we can request from the remote CKAN only those datasets # Ideally we can request from the remote CKAN only those datasets
# modified since last harvest job # modified since the last completely successful harvest.
if (previous_job and last_error_free_job = self._last_error_free_job(harvest_job)
not previous_job.gather_errors and log.debug('Last error-free job: %r', last_error_free_job)
not len(previous_job.objects) == 0 and if (last_error_free_job and
not self.config.get('force_all', False)): not self.config.get('force_all', False)):
get_all_packages = False get_all_packages = False
# Request only the datasets modified since last harvest job # Request only the datasets modified since
last_time = previous_job.gather_started.isoformat() last_time = last_error_free_job.gather_started.isoformat()
# Note: SOLR works in UTC, and gather_started is also UTC, so # Note: SOLR works in UTC, and gather_started is also UTC, so
# this should work as long as local and remote clocks are # this should work as long as local and remote clocks are
# relatively accurate # relatively accurate
@ -295,6 +285,31 @@ class CKANHarvester(HarvesterBase):
return pkg_dicts return pkg_dicts
@classmethod
def _last_error_free_job(cls, harvest_job):
# TODO weed out cancelled jobs somehow.
# look for jobs with no gather errors
jobs = \
model.Session.query(HarvestJob) \
.filter(HarvestJob.source == harvest_job.source) \
.filter(HarvestJob.gather_started != None) \
.filter(HarvestJob.status == 'Finished') \
.filter(HarvestJob.id != harvest_job.id) \
.filter(
~exists().where(
HarvestGatherError.harvest_job_id == HarvestJob.id)) \
.order_by(HarvestJob.gather_started.desc())
# now check them until we find one with no fetch/import errors
# (looping rather than doing sql, in case there are lots of objects
# and lots of jobs)
for job in jobs:
for obj in job.objects:
if obj.current is False:
# unsuccessful, so go onto the next job
break
else:
return job
def fetch_stage(self, harvest_object): def fetch_stage(self, harvest_object):
# Nothing to do here - we got the package dict in the search in the # Nothing to do here - we got the package dict in the search in the
# gather stage # gather stage

View File

@ -343,16 +343,19 @@ def harvest_jobs_run(context,data_dict):
if len(jobs): if len(jobs):
for job in jobs: for job in jobs:
if job['gather_finished']: if job['gather_finished']:
objects = session.query(HarvestObject.id) \ num_objects_in_progress = \
session.query(HarvestObject.id) \
.filter(HarvestObject.harvest_job_id==job['id']) \ .filter(HarvestObject.harvest_job_id==job['id']) \
.filter(and_((HarvestObject.state!=u'COMPLETE'), .filter(and_((HarvestObject.state!=u'COMPLETE'),
(HarvestObject.state!=u'ERROR'))) \ (HarvestObject.state!=u'ERROR'))) \
.order_by(HarvestObject.import_finished.desc()) .count()
if objects.count() == 0: if num_objects_in_progress == 0:
job_obj = HarvestJob.get(job['id']) job_obj = HarvestJob.get(job['id'])
job_obj.status = u'Finished' job_obj.status = u'Finished'
log.info('Marking job as finished %s %s', job.source.url, job.id)
# save the time of finish, according to the last running object
last_object = session.query(HarvestObject) \ last_object = session.query(HarvestObject) \
.filter(HarvestObject.harvest_job_id==job['id']) \ .filter(HarvestObject.harvest_job_id==job['id']) \
.filter(HarvestObject.import_finished!=None) \ .filter(HarvestObject.import_finished!=None) \
@ -361,6 +364,7 @@ def harvest_jobs_run(context,data_dict):
if last_object: if last_object:
job_obj.finished = last_object.import_finished job_obj.finished = last_object.import_finished
job_obj.save() job_obj.save()
# Reindex the harvest source dataset so it has the latest # Reindex the harvest source dataset so it has the latest
# status # status
get_action('harvest_source_reindex')(context, get_action('harvest_source_reindex')(context,