From 1d8a4c17c48a2ad6a9b1af1d40fd542421eb0957 Mon Sep 17 00:00:00 2001 From: amercader Date: Tue, 12 Feb 2013 18:29:30 +0000 Subject: [PATCH] [#8] Update harvesters for CsW, WAF and Doc sources These are the new versions of the spatial harvesters with significant improvement over previous ones. --- ckanext/spatial/harvesters/__init__.py | 4 + ckanext/spatial/harvesters/csw.py | 178 ++++++++++++++ ckanext/spatial/harvesters/doc.py | 112 +++++++++ ckanext/spatial/harvesters/waf.py | 313 +++++++++++++++++++++++++ setup.py | 4 + 5 files changed, 611 insertions(+) create mode 100644 ckanext/spatial/harvesters/csw.py create mode 100644 ckanext/spatial/harvesters/doc.py create mode 100644 ckanext/spatial/harvesters/waf.py diff --git a/ckanext/spatial/harvesters/__init__.py b/ckanext/spatial/harvesters/__init__.py index 2e2033b..0093d42 100644 --- a/ckanext/spatial/harvesters/__init__.py +++ b/ckanext/spatial/harvesters/__init__.py @@ -5,3 +5,7 @@ try: except ImportError: import pkgutil __path__ = pkgutil.extend_path(__path__, __name__) + +from ckanext.spatial.harvesters.csw import CSWHarvester +from ckanext.spatial.harvesters.waf import WAFHarvester +from ckanext.spatial.harvesters.doc import DocHarvester diff --git a/ckanext/spatial/harvesters/csw.py b/ckanext/spatial/harvesters/csw.py new file mode 100644 index 0000000..7dfee6a --- /dev/null +++ b/ckanext/spatial/harvesters/csw.py @@ -0,0 +1,178 @@ +import urllib +import urlparse + +import logging + +from ckan import model + +from ckan.plugins.core import SingletonPlugin, implements + +from ckanext.harvest.interfaces import IHarvester +from ckanext.harvest.model import HarvestObject +from ckanext.harvest.model import HarvestObjectExtra as HOExtra + +from ckanext.spatial.lib.csw_client import CswService +from ckanext.spatial.harvesters.base import SpatialHarvester, text_traceback + + +class CSWHarvester(SpatialHarvester, SingletonPlugin): + ''' + A Harvester for CSW servers + ''' + implements(IHarvester) + + csw=None + + def info(self): + return { + 'name': 'csw', + 'title': 'CSW Server', + 'description': 'A server that implements OGC\'s Catalog Service for the Web (CSW) standard' + } + + + def get_original_url(self, harvest_object_id): + obj = model.Session.query(HarvestObject).\ + filter(HarvestObject.id==harvest_object_id).\ + first() + + parts = urlparse.urlparse(obj.source.url) + + params = { + 'SERVICE': 'CSW', + 'VERSION': '2.0.2', + 'REQUEST': 'GetRecordById', + 'OUTPUTSCHEMA': 'http://www.isotc211.org/2005/gmd', + 'OUTPUTFORMAT':'application/xml' , + 'ID': obj.guid + } + + url = urlparse.urlunparse(( + parts.scheme, + parts.netloc, + parts.path, + None, + urllib.urlencode(params), + None + )) + + return url + + + def gather_stage(self, harvest_job): + log = logging.getLogger(__name__ + '.CSW.gather') + log.debug('CswHarvester gather_stage for job: %r', harvest_job) + # Get source URL + url = harvest_job.source.url + + self._set_source_config(harvest_job.source.config) + + try: + self._setup_csw_client(url) + except Exception, e: + self._save_gather_error('Error contacting the CSW server: %s' % e, harvest_job) + return None + + query = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\ + filter(HarvestObject.current==True).\ + filter(HarvestObject.harvest_source_id==harvest_job.source.id) + guid_to_package_id = {} + + for guid, package_id in query: + guid_to_package_id[guid] = package_id + + guids_in_db = set(guid_to_package_id.keys()) + + log.debug('Starting gathering for %s' % url) + guids_in_harvest = set() + try: + for identifier in self.csw.getidentifiers(page=10): + try: + log.info('Got identifier %s from the CSW', identifier) + if identifier is None: + log.error('CSW returned identifier %r, skipping...' % identifier) + continue + + guids_in_harvest.add(identifier) + except Exception, e: + self._save_gather_error('Error for the identifier %s [%r]' % (identifier,e), harvest_job) + continue + + + except Exception, e: + log.error('Exception: %s' % text_traceback()) + self._save_gather_error('Error gathering the identifiers from the CSW server [%s]' % str(e), harvest_job) + return None + + new = guids_in_harvest - guids_in_db + delete = guids_in_db - guids_in_harvest + change = guids_in_db & guids_in_harvest + + ids = [] + for guid in new: + obj = HarvestObject(guid=guid, job=harvest_job, + extras=[HOExtra(key='status', value='new')]) + obj.save() + ids.append(obj.id) + for guid in change: + obj = HarvestObject(guid=guid, job=harvest_job, + package_id=guid_to_package_id[guid], + extras=[HOExtra(key='status', value='change')]) + obj.save() + ids.append(obj.id) + for guid in delete: + obj = HarvestObject(guid=guid, job=harvest_job, + package_id=guid_to_package_id[guid], + extras=[HOExtra(key='status', value='delete')]) + ids.append(obj.id) + model.Session.query(HarvestObject).\ + filter_by(guid=guid).\ + update({'current': False}, False) + obj.save() + + if len(ids) == 0: + self._save_gather_error('No records received from the CSW server', harvest_job) + return None + + return ids + + def fetch_stage(self,harvest_object): + log = logging.getLogger(__name__ + '.CSW.fetch') + log.debug('CswHarvester fetch_stage for object: %s', harvest_object.id) + + url = harvest_object.source.url + try: + self._setup_csw_client(url) + except Exception, e: + self._save_object_error('Error contacting the CSW server: %s' % e, + harvest_object) + return False + + identifier = harvest_object.guid + try: + record = self.csw.getrecordbyid([identifier]) + except Exception, e: + self._save_object_error('Error getting the CSW record with GUID %s' % identifier, harvest_object) + return False + + if record is None: + self._save_object_error('Empty record for GUID %s' % identifier, + harvest_object) + return False + + try: + # Save the fetch contents in the HarvestObject + # Contents come from csw_client already declared and encoded as utf-8 + harvest_object.content = record['xml'] + harvest_object.save() + except Exception,e: + self._save_object_error('Error saving the harvest object for GUID %s [%r]' % \ + (identifier, e), harvest_object) + return False + + log.debug('XML content saved (len %s)', len(record['xml'])) + return True + + def _setup_csw_client(self, url): + self.csw = CswService(url) + diff --git a/ckanext/spatial/harvesters/doc.py b/ckanext/spatial/harvesters/doc.py new file mode 100644 index 0000000..8c0f420 --- /dev/null +++ b/ckanext/spatial/harvesters/doc.py @@ -0,0 +1,112 @@ +import hashlib +import logging + +from ckan import model + +from ckan.plugins.core import SingletonPlugin, implements + +from ckanext.harvest.interfaces import IHarvester +from ckanext.harvest.model import HarvestObject +from ckanext.harvest.model import HarvestObjectExtra as HOExtra + +from ckanext.spatial.harvesters.base import SpatialHarvester, guess_standard + + +class DocHarvester(SpatialHarvester, SingletonPlugin): + ''' + A Harvester for individual spatial metadata documents + TODO: Move to new logic + ''' + + implements(IHarvester) + + def info(self): + return { + 'name': 'single-doc', + 'title': 'Single spatial metadata document', + 'description': 'A single spatial metadata document' + } + + + def get_original_url(self, harvest_object_id): + obj = model.Session.query(HarvestObject).\ + filter(HarvestObject.id==harvest_object_id).\ + first() + if not obj: + return None + + return obj.source.url + + + def gather_stage(self,harvest_job): + log = logging.getLogger(__name__ + '.individual.gather') + log.debug('DocHarvester gather_stage for job: %r', harvest_job) + + self.harvest_job = harvest_job + + # Get source URL + url = harvest_job.source.url + + self._set_source_config(harvest_job.source.config) + + # Get contents + try: + content = self._get_content_as_unicode(url) + except Exception,e: + self._save_gather_error('Unable to get content for URL: %s: %r' % \ + (url, e),harvest_job) + return None + + existing_object = model.Session.query(HarvestObject.guid, HarvestObject.package_id).\ + filter(HarvestObject.current==True).\ + filter(HarvestObject.harvest_source_id==harvest_job.source.id).\ + first() + + def create_extras(url, status): + return [HOExtra(key='doc_location', value=url), + HOExtra(key='status', value=status)] + + if not existing_object: + guid=hashlib.md5(url.encode('utf8',errors='ignore')).hexdigest() + harvest_object = HarvestObject(job=harvest_job, + extras=create_extras(url, + 'new'), + guid=guid + ) + else: + harvest_object = HarvestObject(job=harvest_job, + extras=create_extras(url, + 'change'), + guid=existing_object.guid + ) + + harvest_object.add() + + # Check if it is an ISO document + document_format = guess_standard(content) + if document_format == 'iso': + harvest_object.content = content + else: + extra = HOExtra( + object=harvest_object, + key='original_document', + value=content) + extra.save() + + extra = HOExtra( + object=harvest_object, + key='original_format', + value=document_format) + extra.save() + + harvest_object.save() + + return [harvest_object.id] + + + + + def fetch_stage(self,harvest_object): + # The fetching was already done in the previous stage + return True + diff --git a/ckanext/spatial/harvesters/waf.py b/ckanext/spatial/harvesters/waf.py new file mode 100644 index 0000000..3c60f3c --- /dev/null +++ b/ckanext/spatial/harvesters/waf.py @@ -0,0 +1,313 @@ +import logging +import hashlib +from urlparse import urljoin +import dateutil.parser +import pyparsing as parse +import requests +from sqlalchemy.orm import aliased +from sqlalchemy.exc import DataError + +from ckan import model + +from ckan.plugins.core import SingletonPlugin, implements + +from ckanext.harvest.interfaces import IHarvester +from ckanext.harvest.model import HarvestObject +from ckanext.harvest.model import HarvestObjectExtra as HOExtra +import ckanext.harvest.queue as queue + +from ckanext.spatial.harvesters.base import SpatialHarvester, get_extra, guess_standard + +class WAFHarvester(SpatialHarvester, SingletonPlugin): + ''' + A Harvester for WAF (Web Accessible Folders) containing spatial metadata documents. + e.g. Apache serving a directory of ISO 19139 files. + ''' + + implements(IHarvester) + + def info(self): + return { + 'name': 'waf', + 'title': 'Web Accessible Folder (WAF)', + 'description': 'A Web Accessible Folder (WAF) displaying a list of spatial metadata documents' + } + + + def get_original_url(self, harvest_object_id): + url = model.Session.query(HOExtra.value).\ + filter(HOExtra.key=='waf_location').\ + filter(HOExtra.harvest_object_id==harvest_object_id).\ + first() + + return url[0] if url else None + + + def gather_stage(self,harvest_job,collection_package_id=None): + log = logging.getLogger(__name__ + '.WAF.gather') + log.debug('WafHarvester gather_stage for job: %r', harvest_job) + + self.harvest_job = harvest_job + + # Get source URL + source_url = harvest_job.source.url + + self._set_source_config(harvest_job.source.config) + + # Get contents + try: + response = requests.get(source_url, timeout=60) + content = response.content + scraper = _get_scraper(response.headers.get('server')) + except Exception,e: + self._save_gather_error('Unable to get content for URL: %s: %r' % \ + (source_url, e),harvest_job) + return None + + ###### Get current harvest object out of db ###### + + url_to_modified_db = {} ## mapping of url to last_modified in db + url_to_ids = {} ## mapping of url to guid in db + + + HOExtraAlias1 = aliased(HOExtra) + HOExtraAlias2 = aliased(HOExtra) + query = model.Session.query(HarvestObject.guid, HarvestObject.package_id, HOExtraAlias1.value, HOExtraAlias2.value).\ + join(HOExtraAlias1, HarvestObject.extras).\ + join(HOExtraAlias2, HarvestObject.extras).\ + filter(HOExtraAlias1.key=='waf_modified_date').\ + filter(HOExtraAlias2.key=='waf_location').\ + filter(HarvestObject.current==True).\ + filter(HarvestObject.harvest_source_id==harvest_job.source.id) + + + for guid, package_id, modified_date, url in query: + url_to_modified_db[url] = modified_date + url_to_ids[url] = (guid, package_id) + + ###### Get current list of records from source ###### + + url_to_modified_harvest = {} ## mapping of url to last_modified in harvest + try: + for url, modified_date in _extract_waf(content,source_url,scraper): + url_to_modified_harvest[url] = modified_date + except Exception,e: + msg = 'Error extracting URLs from %s, error was %s' % (source_url, e) + self._save_gather_error(msg,harvest_job) + return None + + ###### Compare source and db ###### + + harvest_locations = set(url_to_modified_harvest.keys()) + old_locations = set(url_to_modified_db.keys()) + + new = harvest_locations - old_locations + delete = old_locations - harvest_locations + possible_changes = old_locations & harvest_locations + change = [] + + for item in possible_changes: + if (not url_to_modified_harvest[item] or not url_to_modified_db[item] #if there is no date assume change + or url_to_modified_harvest[item] > url_to_modified_db[item]): + change.append(item) + + def create_extras(url, date, status): + extras = [HOExtra(key='waf_modified_date', value=date), + HOExtra(key='waf_location', value=url), + HOExtra(key='status', value=status)] + if collection_package_id: + extras.append( + HOExtra(key='collection_package_id', + value=collection_package_id) + ) + return extras + + + ids = [] + for location in new: + guid=hashlib.md5(location.encode('utf8',errors='ignore')).hexdigest() + obj = HarvestObject(job=harvest_job, + extras=create_extras(location, + url_to_modified_harvest[location], + 'new'), + guid=guid + ) + obj.save() + ids.append(obj.id) + + for location in change: + obj = HarvestObject(job=harvest_job, + extras=create_extras(location, + url_to_modified_harvest[location], + 'change'), + guid=url_to_ids[location][0], + package_id=url_to_ids[location][1], + ) + obj.save() + ids.append(obj.id) + + for location in delete: + obj = HarvestObject(job=harvest_job, + extras=create_extras('','', 'delete'), + guid=url_to_ids[location][0], + package_id=url_to_ids[location][1], + ) + model.Session.query(HarvestObject).\ + filter_by(guid=url_to_ids[location][0]).\ + update({'current': False}, False) + + obj.save() + ids.append(obj.id) + + if len(ids) > 0: + log.debug('{0} objects sent to the next stage: {1} new, {2} change, {3} delete'.format( + len(ids), len(new), len(change), len(delete))) + return ids + else: + self._save_gather_error('No records to change', + harvest_job) + return None + + def fetch_stage(self, harvest_object): + + # Check harvest object status + status = get_extra(harvest_object,'status') + + if status == 'delete': + # No need to fetch anything, just pass to the import stage + return True + + # We need to fetch the remote document + + # Get location + url = get_extra(harvest_object, 'waf_location') + if not url: + self._save_object_error( + 'No location defined for object {0}'.format(harvest_object.id), + harvest_object) + return False + + # Get contents + try: + content = self._get_content_as_unicode(url) + except Exception, e: + msg = 'Could not harvest WAF link {0}: {1}'.format(url, e) + self._save_object_error(msg, harvest_object) + return False + + # Check if it is an ISO document + document_format = guess_standard(content) + if document_format == 'iso': + harvest_object.content = content + harvest_object.save() + else: + extra = HOExtra( + object=harvest_object, + key='original_document', + value=content) + extra.save() + + extra = HOExtra( + object=harvest_object, + key='original_format', + value=document_format) + extra.save() + + return True + + +apache = parse.SkipTo(parse.CaselessLiteral("", include=True).suppress() \ + + parse.Optional(parse.Literal('')).suppress() \ + + parse.Optional(parse.Combine( + parse.Word(parse.alphanums+'-') + + parse.Word(parse.alphanums+':') + ,adjacent=False, joinString=' ').setResultsName('date') + ) + +iis = parse.SkipTo("
").suppress() \ + + parse.OneOrMore("
").suppress() \ + + parse.Optional(parse.Combine( + parse.Word(parse.alphanums+'/') + + parse.Word(parse.alphanums+':') + + parse.Word(parse.alphas) + , adjacent=False, joinString=' ').setResultsName('date') + ) \ + + parse.Word(parse.nums).suppress() \ + + parse.Literal('