Changes the gather state to use v3 API

Rather than using the revisions in v2 API this now uses the
package_search API so that we can extend it with proper filters in
future.
This commit is contained in:
Ross Jones 2015-09-10 16:46:25 +01:00
parent 89b6ad2ce1
commit 6dd40bfcf9
2 changed files with 28 additions and 39 deletions

View File

@ -150,9 +150,9 @@ class HarvesterBase(SingletonPlugin):
''' '''
Creates a new package or updates an exisiting one according to the Creates a new package or updates an exisiting one according to the
package dictionary provided. The package dictionary should look like package dictionary provided. The package dictionary should look like
the REST API response for a package: the Action API response for a package:
http://ckan.net/api/rest/package/statistics-catalunya http://ckan.net/api/action/package_show?id=statistics-catalunya
Note that the package_dict must contain an id, which will be used to Note that the package_dict must contain an id, which will be used to
check if the package needs to be created or updated (use the remote check if the package needs to be created or updated (use the remote
@ -161,10 +161,6 @@ class HarvesterBase(SingletonPlugin):
If the remote server provides the modification date of the remote If the remote server provides the modification date of the remote
package, add it to package_dict['metadata_modified']. package, add it to package_dict['metadata_modified'].
TODO: Not sure it is worth keeping this function. If useful it should
use the output of package_show logic function (maybe keeping support
for rest api based dicts
''' '''
try: try:
# Change default schema # Change default schema
@ -199,6 +195,7 @@ class HarvesterBase(SingletonPlugin):
# Check if package exists # Check if package exists
data_dict = {} data_dict = {}
data_dict['id'] = package_dict['id'] data_dict['id'] = package_dict['id']
try: try:
existing_package_dict = get_action('package_show')(context, data_dict) existing_package_dict = get_action('package_show')(context, data_dict)
@ -214,7 +211,7 @@ class HarvesterBase(SingletonPlugin):
context.update({'id':package_dict['id']}) context.update({'id':package_dict['id']})
package_dict.setdefault('name', package_dict.setdefault('name',
existing_package_dict['name']) existing_package_dict['name'])
new_package = get_action('package_update_rest')(context, package_dict) new_package = get_action('package_update')(context, package_dict)
else: else:
log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid) log.info('Package with GUID %s not updated, skipping...' % harvest_object.guid)
@ -258,7 +255,7 @@ class HarvesterBase(SingletonPlugin):
model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED') model.Session.execute('SET CONSTRAINTS harvest_object_package_id_fkey DEFERRED')
model.Session.flush() model.Session.flush()
new_package = get_action('package_create_rest')(context, package_dict) new_package = get_action('package_create')(context, package_dict)
Session.commit() Session.commit()

View File

@ -6,6 +6,7 @@ from ckan.model import Session, Package
from ckan.logic import ValidationError, NotFound, get_action from ckan.logic import ValidationError, NotFound, get_action
from ckan.lib.helpers import json from ckan.lib.helpers import json
from ckan.lib.munge import munge_name from ckan.lib.munge import munge_name
from urlparse import urljoin
from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \ from ckanext.harvest.model import HarvestJob, HarvestObject, HarvestGatherError, \
HarvestObjectError HarvestObjectError
@ -24,14 +25,11 @@ class CKANHarvester(HarvesterBase):
api_version = 2 api_version = 2
action_api_version = 3 action_api_version = 3
def _get_rest_api_offset(self):
return '/api/%d/rest' % self.api_version
def _get_action_api_offset(self): def _get_action_api_offset(self):
return '/api/%d/action' % self.action_api_version return '/api/%d/action' % self.action_api_version
def _get_search_api_offset(self): def _get_search_api_offset(self):
return '/api/%d/search' % self.api_version return "%s/package_search" % self._get_action_api_offset()
def _get_content(self, url): def _get_content(self, url):
http_request = urllib2.Request( http_request = urllib2.Request(
@ -46,13 +44,13 @@ class CKANHarvester(HarvesterBase):
http_response = urllib2.urlopen(http_request) http_response = urllib2.urlopen(http_request)
except urllib2.URLError, e: except urllib2.URLError, e:
raise ContentFetchError( raise ContentFetchError(
'Could not fetch url: %s, error: %s' % 'Could not fetch url: %s, error: %s' %
(url, str(e)) (url, str(e))
) )
return http_response.read() return http_response.read()
def _get_group(self, base_url, group_name): def _get_group(self, base_url, group_name):
url = base_url + self._get_rest_api_offset() + '/group/' + munge_name(group_name) url = base_url + self._get_action_api_offset() + '/group_show?id=' + munge_name(group_name)
try: try:
content = self._get_content(url) content = self._get_content(url)
return json.loads(content) return json.loads(content)
@ -157,34 +155,28 @@ class CKANHarvester(HarvesterBase):
# Get source URL # Get source URL
base_url = harvest_job.source.url.rstrip('/') base_url = harvest_job.source.url.rstrip('/')
base_rest_url = base_url + self._get_rest_api_offset()
base_search_url = base_url + self._get_search_api_offset() base_search_url = base_url + self._get_search_api_offset()
log.debug("%r", previous_job)
if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0): if (previous_job and not previous_job.gather_errors and not len(previous_job.objects) == 0):
if not self.config.get('force_all',False): if not self.config.get('force_all',False):
get_all_packages = False get_all_packages = False
# Request only the packages modified since last harvest job # Request only the packages modified since last harvest job
last_time = previous_job.gather_finished.isoformat() last_time = previous_job.gather_finished.isoformat()
url = base_search_url + '/revision?since_time=%s' % last_time log.info("Searching for datasets modified since: %s", last_time)
fq = "metadata_modified:[{last_check} TO *]".format(last_check=last_time)
url = base_search_url + '?fq={fq}&rows=1000'.format(fq)
try: try:
content = self._get_content(url) content = self._get_content(url)
revision_ids = json.loads(content) package_dicts = json.loads(content).get('result', {}).get('results', [])
if len(revision_ids): for package in package_dicts:
for revision_id in revision_ids: if not package['id'] in package_ids:
url = base_rest_url + '/revision/%s' % revision_id package_ids.append(package['id'])
try:
content = self._get_content(url)
except ContentFetchError,e:
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
continue
revision = json.loads(content)
for package_id in revision['packages']:
if not package_id in package_ids:
package_ids.append(package_id)
else: else:
log.info('No packages have been updated on the remote CKAN instance since the last harvest job') log.info('No packages have been updated on the remote CKAN instance since the last harvest job')
return None return None
@ -197,23 +189,21 @@ class CKANHarvester(HarvesterBase):
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
return None return None
if get_all_packages: if get_all_packages:
# Request all remote packages # Request all remote packages
url = base_rest_url + '/package' url = urljoin(base_url,self._get_action_api_offset() + '/package_list')
try: try:
content = self._get_content(url) content = self._get_content(url)
except ContentFetchError,e: except ContentFetchError,e:
self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job) self._save_gather_error('Unable to get content for URL: %s: %s' % (url, str(e)),harvest_job)
return None return None
package_ids = json.loads(content).get('result',[])
package_ids = json.loads(content)
try: try:
object_ids = [] object_ids = []
if len(package_ids): if len(package_ids):
for package_id in package_ids: for package_id in package_ids:
log.debug("Creating harvestjob for %s", package_id)
# Create a new HarvestObject for this identifier # Create a new HarvestObject for this identifier
obj = HarvestObject(guid = package_id, job = harvest_job) obj = HarvestObject(guid = package_id, job = harvest_job)
obj.save() obj.save()
@ -236,7 +226,7 @@ class CKANHarvester(HarvesterBase):
# Get source URL # Get source URL
url = harvest_object.source.url.rstrip('/') url = harvest_object.source.url.rstrip('/')
url = url + self._get_rest_api_offset() + '/package/' + harvest_object.guid url = url + self._get_action_api_offset() + '/package_show?id=' + harvest_object.guid
# Get contents # Get contents
try: try:
@ -246,8 +236,10 @@ class CKANHarvester(HarvesterBase):
(url, e),harvest_object) (url, e),harvest_object)
return None return None
content = json.loads(content)['result']
# Save the fetched contents in the HarvestObject # Save the fetched contents in the HarvestObject
harvest_object.content = content harvest_object.content = json.dumps(content)
harvest_object.save() harvest_object.save()
return True return True
@ -374,7 +366,7 @@ class CKANHarvester(HarvesterBase):
# Find any extras whose values are not strings and try to convert # Find any extras whose values are not strings and try to convert
# them to strings, as non-string extras are not allowed anymore in # them to strings, as non-string extras are not allowed anymore in
# CKAN 2.0. # CKAN 2.0.
for key in package_dict['extras'].keys(): for key in package_dict.get('extras', {}).keys():
if not isinstance(package_dict['extras'][key], basestring): if not isinstance(package_dict['extras'][key], basestring):
try: try:
package_dict['extras'][key] = json.dumps( package_dict['extras'][key] = json.dumps(
@ -390,7 +382,7 @@ class CKANHarvester(HarvesterBase):
if not 'extras' in package_dict: if not 'extras' in package_dict:
package_dict['extras'] = {} package_dict['extras'] = {}
for key,value in default_extras.iteritems(): for key,value in default_extras.iteritems():
if not key in package_dict['extras'] or override_extras: if not key in package_dict.get('extras', {}) or override_extras:
# Look for replacement strings # Look for replacement strings
if isinstance(value,basestring): if isinstance(value,basestring):
value = value.format(harvest_source_id=harvest_object.job.source.id, value = value.format(harvest_source_id=harvest_object.job.source.id,