From c2bfdd16fc809a11454955e9b7604b9726d279b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Mercader?= Date: Mon, 14 Mar 2011 15:19:58 +0000 Subject: [PATCH 1/2] #1030 Move harvesting model to ckanext-harvest --- ckanext/harvest/commands/harvester.py | 26 +- ckanext/harvest/model/__init__.py | 791 ++++++++++++++++++++++++++ 2 files changed, 800 insertions(+), 17 deletions(-) create mode 100644 ckanext/harvest/model/__init__.py diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index b29d8f0..4d7b7e2 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -3,6 +3,8 @@ import re from pprint import pprint from ckan.lib.cli import CkanCommand +from ckan.model import repo +from ckanext.harvest.model import HarvestSource, HarvestingJob, HarvestedDocument class Harvester(CkanCommand): '''Harvests remotely mastered metadata @@ -106,7 +108,6 @@ class Harvester(CkanCommand): import pylons pylons.translator._push_object(_get_translator(pylons.config.get('lang'))) - from ckan.model import HarvestingJob from ckan.controllers.harvesting import HarvestingJobController from ckanext.csw.validation import Validator @@ -138,18 +139,15 @@ class Harvester(CkanCommand): print "There are no new harvesting jobs." def remove_harvesting_job(self, job_id): - from ckan import model try: - job = model.HarvestingJob.get(job_id) + job = HarvestingJob.get(job_id) job.delete() - model.repo.commit_and_remove() + repo.commit_and_remove() print "Removed job: %s" % job_id - except model.HarvestingObjectNotFound: + except: print "No such job" def register_harvesting_job(self, source_id, user_ref): - from ckan.model import HarvestSource - from ckan.model import HarvestingJob if re.match('(http|file)://', source_id): source_url = unicode(source_id) source_id = None @@ -176,7 +174,6 @@ class Harvester(CkanCommand): self.print_there_are("harvesting job", jobs, condition=status) def register_harvest_source(self, url, user_ref, publisher_ref): - from ckan.model import HarvestSource existing = self.get_harvest_sources(url=url) if existing: print "Error, there is already a harvesting source for that URL" @@ -191,19 +188,18 @@ class Harvester(CkanCommand): self.print_there_are("harvest source", sources) def remove_harvest_source(self, url): - from ckan import model - model.repo.new_revision() - sources = model.HarvestSource.filter(url=url) + repo.new_revision() + sources = HarvestSource.filter(url=url) if sources.count() == 0: print "No such source" else: source = sources[0] - jobs = model.HarvestingJob.filter(source=source) + jobs = HarvestingJob.filter(source=source) print "Removing %d jobs" % jobs.count() for job in jobs: job.delete() source.delete() - model.repo.commit_and_remove() + repo.commit_and_remove() print "Removed harvest source: %s" % url def list_harvest_sources(self): @@ -217,21 +213,17 @@ class Harvester(CkanCommand): self.print_there_are(what="harvesting job", sequence=jobs) def get_harvest_sources(self, **kwds): - from ckan.model import HarvestSource return HarvestSource.filter(**kwds).all() def get_harvesting_jobs(self, **kwds): - from ckan.model import HarvestingJob return HarvestingJob.filter(**kwds).all() def create_harvest_source(self, **kwds): - from ckan.model import HarvestSource source = HarvestSource(**kwds) source.save() return source def create_harvesting_job(self, **kwds): - from ckan.model import HarvestingJob job = HarvestingJob(**kwds) job.save() return job diff --git a/ckanext/harvest/model/__init__.py b/ckanext/harvest/model/__init__.py new file mode 100644 index 0000000..c4e8929 --- /dev/null +++ b/ckanext/harvest/model/__init__.py @@ -0,0 +1,791 @@ +import logging +import datetime +from lxml import etree + +from ckan.model.meta import * +from ckan.model.types import make_uuid +from ckan.model.types import JsonType +from ckan.model.core import * +from ckan.model.domain_object import DomainObject +from ckan.model.package import Package + +from sqlalchemy.orm import backref, relation +log = logging.getLogger(__name__) + +__all__ = [ + 'HarvestSource', 'harvest_source_table', + 'HarvestingJob', 'harvesting_job_table', + 'HarvestedDocument', 'harvested_document_table', +] + +class HarvesterError(Exception): + pass + +class HarvesterUrlError(HarvesterError): + pass + +class ValidationError(HarvesterError): + pass + +class HarvestDomainObject(DomainObject): + """Convenience methods for searching objects + """ + key_attr = 'id' + + @classmethod + def get(self, key, default=Exception, attr=None): + """Finds a single entity in the register.""" + if attr == None: + attr = self.key_attr + kwds = {attr: key} + o = self.filter(**kwds).first() + if o: + return o + if default != Exception: + return default + else: + raise Exception("%s not found: %s" % (self.__name__, key)) + + @classmethod + def filter(self, **kwds): + query = Session.query(self).autoflush(False) + return query.filter_by(**kwds) + + +class HarvestSource(HarvestDomainObject): + """A source is essentially a URL plus some other metadata. The + URL it points to should contain a manifest of resources that can + be turned into packges; or an index page containing links to such + manifests. + """ + pass + +class HarvestingJob(HarvestDomainObject): + + def __init__(self, **p): + if 'report' in p.keys(): + raise Exception( + "You cannot set the 'report' key in the constructor" + ) + HarvestDomainObject.__init__(self, **p) + self.report = {'added': [], 'errors': []} + + def save(self): + # Why is this necessary? If I don't add it the report field isn't saved + if self.report is not None: + _report = self.report + self.report = str(dict(_report)) + HarvestDomainObject.save(self) + self.report = _report + HarvestDomainObject.save(self) + +class MappedXmlObject(object): + elements = [] + + +class MappedXmlDocument(MappedXmlObject): + def __init__(self, content): + self.content = content + + def read_values(self): + values = {} + tree = self.get_content_tree() + for element in self.elements: + values[element.name] = element.read_value(tree) + self.infer_values(values) + return values + + def get_content_tree(self): + parser = etree.XMLParser(remove_blank_text=True) + if type(self.content) == unicode: + content = self.content.encode('utf8') + else: + content = self.content + return etree.fromstring(content, parser=parser) + + def infer_values(self, values): + pass + + +class MappedXmlElement(MappedXmlObject): + namespaces = {} + + def __init__(self, name, search_paths=[], multiplicity="*", elements=[]): + self.name = name + self.search_paths = search_paths + self.multiplicity = multiplicity + self.elements = elements or self.elements + + def read_value(self, tree): + values = [] + for xpath in self.get_search_paths(): + elements = self.get_elements(tree, xpath) + values = self.get_values(elements) + if values: + break + return self.fix_multiplicity(values) + + def get_search_paths(self): + if type(self.search_paths) != type([]): + search_paths = [self.search_paths] + else: + search_paths = self.search_paths + return search_paths + + def get_elements(self, tree, xpath): + return tree.xpath(xpath, namespaces=self.namespaces) + + def get_values(self, elements): + values = [] + if len(elements) == 0: + pass + else: + for element in elements: + value = self.get_value(element) + values.append(value) + return values + + def get_value(self, element): + if self.elements: + value = {} + for child in self.elements: + value[child.name] = child.read_value(element) + return value + elif type(element) == etree._ElementStringResult: + value = str(element) + elif type(element) == etree._ElementUnicodeResult: + value = unicode(element) + else: + value = self.element_tostring(element) + return value + + def element_tostring(self, element): + return etree.tostring(element, pretty_print=False) + + def fix_multiplicity(self, values): + if self.multiplicity == "0": + if values: + raise HarvesterError( + "Values found for element '%s': %s" % (self.name, values)) + else: + return "" + elif self.multiplicity == "1": + if values: + return values[0] + else: + raise HarvesterError( + "Value not found for element '%s'" % self.name) + elif self.multiplicity == "*": + return values + elif self.multiplicity == "0..1": + if values: + return values[0] + else: + return "" + elif self.multiplicity == "1..*": + return values + else: + raise HarvesterError( + "Can't fix element values for multiplicity '%s'." % \ + self.multiplicity) + + +class GeminiElement(MappedXmlElement): + + namespaces = { + "gts": "http://www.isotc211.org/2005/gts", + "gml": "http://www.opengis.net/gml/3.2", + "gmx": "http://www.isotc211.org/2005/gmx", + "gsr": "http://www.isotc211.org/2005/gsr", + "gss": "http://www.isotc211.org/2005/gss", + "gco": "http://www.isotc211.org/2005/gco", + "gmd": "http://www.isotc211.org/2005/gmd", + "srv": "http://www.isotc211.org/2005/srv", + "xlink": "http://www.w3.org/1999/xlink", + "xsi": "http://www.w3.org/2001/XMLSchema-instance", + } + + +class GeminiResponsibleParty(GeminiElement): + + elements = [ + GeminiElement( + name="organisation-name", + search_paths=[ + "gmd:organisationName/gco:CharacterString/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="position-name", + search_paths=[ + "gmd:positionName/gco:CharacterString/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="contact-info", + search_paths=[ + "gmd:contactInfo/gmd:CI_Contact", + ], + multiplicity="0..1", + elements = [ + GeminiElement( + name="email", + search_paths=[ + "gmd:address/gmd:CI_Address/gmd:electronicMailAddress/gco:CharacterString/text()", + ], + multiplicity="0..1", + ), + ] + ), + GeminiElement( + name="role", + search_paths=[ + "gmd:role/gmd:CI_RoleCode/@codeListValue", + ], + multiplicity="0..1", + ), + ] + + +class GeminiResourceLocator(GeminiElement): + + elements = [ + GeminiElement( + name="url", + search_paths=[ + "gmd:linkage/gmd:URL/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="function", + search_paths=[ + "gmd:function/gmd:CI_OnLineFunctionCode/@codeListValue", + ], + multiplicity="0..1", + ), + ] + + +class GeminiDataFormat(GeminiElement): + + elements = [ + GeminiElement( + name="name", + search_paths=[ + "gmd:name/gco:CharacterString/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="version", + search_paths=[ + "gmd:version/gco:CharacterString/text()", + ], + multiplicity="0..1", + ), + ] + + +class GeminiReferenceDate(GeminiElement): + + elements = [ + GeminiElement( + name="type", + search_paths=[ + "gmd:dateType/gmd:CI_DateTypeCode/@codeListValue", + "gmd:dateType/gmd:CI_DateTypeCode/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="value", + search_paths=[ + "gmd:date/gco:Date/text()", + "gmd:date/gco:DateTime/text()", + ], + multiplicity="1", + ), + ] + + +class GeminiDocument(MappedXmlDocument): + + # Attribute specifications from "XPaths for GEMINI" by Peter Parslow. + + elements = [ + GeminiElement( + name="guid", + search_paths="gmd:fileIdentifier/gco:CharacterString/text()", + multiplicity="0..1", + ), + GeminiElement( + name="metadata-language", + search_paths=[ + "gmd:language/gmd:LanguageCode/@codeListValue", + "gmd:language/gmd:LanguageCode/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="resource-type", + search_paths=[ + "gmd:hierarchyLevel/gmd:MD_ScopeCode/@codeListValue", + "gmd:hierarchyLevel/gmd:MD_ScopeCode/text()", + ], + multiplicity="1", + ), + GeminiResponsibleParty( + name="metadata-point-of-contact", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:pointOfContact/gmd:CI_ResponsibleParty", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:pointOfContact/gmd:CI_ResponsibleParty", + ], + multiplicity="1..*", + ), + GeminiElement( + name="metadata-date", + search_paths=[ + "gmd:dateStamp/gco:Date/text()", + "gmd:dateStamp/gco:DateTime/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="spatial-reference-system", + search_paths=[ + "gmd:referenceSystemInfo/gmd:MD_ReferenceSystem", + ], + multiplicity="0..1", + ), + GeminiElement( + name="title", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:citation/gmd:CI_Citation/gmd:title/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:citation/gmd:CI_Citation/gmd:title/gco:CharacterString/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="alternative-title", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:citation/gmd:CI_Citation/gmd:alternativeTitle/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:citation/gmd:CI_Citation/gmd:alternativeTitle/gco:CharacterString/text()", + ], + multiplicity="*", + ), + GeminiReferenceDate( + name="dataset-reference-date", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:citation/gmd:CI_Citation/gmd:date/gmd:CI_Date", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:citation/gmd:CI_Citation/gmd:date/gmd:CI_Date", + ], + multiplicity="*", + ), + ## Todo: Suggestion from PP not to bother pulling this into the package. + #GeminiElement( + # name="unique-resource-identifier", + # search_paths=[ + # "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:citation/gmd:CI_Citation/gmd:identifier/gmd:RS_Identifier", + # "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:citation/gmd:CI_Citation/gmd:identifier/gmd:RS_Identifier", + # ], + # multiplicity="1", + #), + GeminiElement( + name="abstract", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:abstract/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:abstract/gco:CharacterString/text()", + ], + multiplicity="1", + ), + GeminiResponsibleParty( + name="responsible-organisation", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:pointOfContact/gmd:CI_ResponsibleParty", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:pointOfContact/gmd:CI_ResponsibleParty", + ], + multiplicity="1..*", + ), + GeminiElement( + name="frequency-of-update", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:resourceMaintenance/gmd:MD_MaintenanceInformation/gmd:maintenanceAndUpdateFrequency/gmd:MD_MaintenanceFrequencyCode/@codeListValue", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:resourceMaintenance/gmd:MD_MaintenanceInformation/gmd:maintenanceAndUpdateFrequency/gmd:MD_MaintenanceFrequencyCode/@codeListValue", + + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:resourceMaintenance/gmd:MD_MaintenanceInformation/gmd:maintenanceAndUpdateFrequency/gmd:MD_MaintenanceFrequencyCode/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:resourceMaintenance/gmd:MD_MaintenanceInformation/gmd:maintenanceAndUpdateFrequency/gmd:MD_MaintenanceFrequencyCode/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="keyword-inspire-theme", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:descriptiveKeywords/gmd:MD_Keywords/gmd:keyword/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:descriptiveKeywords/gmd:MD_Keywords/gmd:keyword/gco:CharacterString/text()", + ], + multiplicity="*", + ), + GeminiElement( + name="keyword-controlled-other", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:descriptiveKeywords/gmd:MD_Keywords/gmd:keyword/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:descriptiveKeywords/gmd:MD_Keywords/gmd:keyword/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:keywords/gmd:MD_Keywords/gmd:keyword/gco:CharacterString/text()", + ], + multiplicity="*", + ), + GeminiElement( + name="keyword-free-text", + search_paths=[ + ], + multiplicity="*", + ), + GeminiElement( + name="limitations-on-public-access", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:resourceConstraints/gmd:MD_LegalConstraints/gmd:otherConstraints/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:resourceConstraints/gmd:MD_LegalConstraints/gmd:otherConstraints/gco:CharacterString/text()", + ], + multiplicity="1..*", + ), + GeminiElement( + name="use-constraints", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:resourceConstraints/gmd:MD_Constraints/gmd:useLimitation/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:resourceConstraints/gmd:MD_Constraints/gmd:useLimitation/gco:CharacterString/text()", + ], + multiplicity="*", + ), + GeminiElement( + name="spatial-data-service-type", + search_paths=[ + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:serviceType/gco:LocalName", + ], + multiplicity="0..1", + ), + GeminiElement( + name="spatial-resolution", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:spatialResolution/gmd:MD_Resolution/gmd:distance/gco:Distance", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:spatialResolution/gmd:MD_Resolution/gmd:distance/gco:Distance", + ], + multiplicity="0..1", + ), + #GeminiElement( + # name="spatial-resolution-units", + # search_paths=[ + # "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:spatialResolution/gmd:MD_Resolution/gmd:distance/gco:Distance/@uom", + # "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:spatialResolution/gmd:MD_Resolution/gmd:distance/gco:Distance/@uom", + # ], + # multiplicity="0..1", + #), + GeminiElement( + name="equivalent-scale", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:spatialResolution/gmd:MD_Resolution/gmd:equivalentScale/gmd:MD_RepresentativeFraction/gmd:denominator/gco:Integer/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:spatialResolution/gmd:MD_Resolution/gmd:equivalentScale/gmd:MD_RepresentativeFraction/gmd:denominator/gco:Integer/text()", + ], + multiplicity="*", + ), + GeminiElement( + name="dataset-language", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:language/gmd:LanguageCode/@codeListValue", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:language/gmd:LanguageCode/@codeListValue", + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:language/gmd:LanguageCode/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:language/gmd:LanguageCode/text()", + ], + multiplicity="*", + ), + GeminiElement( + name="topic-category", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:topicCategory/gmd:MD_TopicCategoryCode/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/gmd:topicCategory/gmd:MD_TopicCategoryCode/text()", + ], + multiplicity="*", + ), + GeminiElement( + name="extent-controlled", + search_paths=[ + ], + multiplicity="*", + ), + GeminiElement( + name="extent-free-text", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicDescription/gmd:geographicIdentifier/gmd:MD_Identifier/gmd:code/gco:CharacterString/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicDescription/gmd:geographicIdentifier/gmd:MD_Identifier/gmd:code/gco:CharacterString/text()", + ], + multiplicity="*", + ), + GeminiElement( + name="bbox-west-long", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:westBoundLongitude/gco:Decimal/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:westBoundLongitude/gco:Decimal/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="bbox-east-long", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:eastBoundLongitude/gco:Decimal/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:eastBoundLongitude/gco:Decimal/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="bbox-north-lat", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:northBoundLatitude/gco:Decimal/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:northBoundLatitude/gco:Decimal/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="bbox-south-lat", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:southBoundLatitude/gco:Decimal/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:geographicElement/gmd:EX_GeographicBoundingBox/gmd:southBoundLatitude/gco:Decimal/text()", + ], + multiplicity="1", + ), + GeminiElement( + name="temporal-extent-begin", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:temporalElement/gmd:EX_TemporalExtent/gmd:extent/gml:TimePeriod/gml:beginPosition/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:temporalElement/gmd:EX_TemporalExtent/gmd:extent/gml:TimePeriod/gml:beginPosition/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="temporal-extent-end", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:temporalElement/gmd:EX_TemporalExtent/gmd:extent/gml:TimePeriod/gml:endPosition/text()", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:temporalElement/gmd:EX_TemporalExtent/gmd:extent/gml:TimePeriod/gml:endPosition/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="vertical-extent", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:extent/gmd:EX_Extent/gmd:verticalElement/gmd:EX_VerticalExtent", + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:extent/gmd:EX_Extent/gmd:verticalElement/gmd:EX_VerticalExtent", + ], + multiplicity="0..1", + ), + GeminiElement( + name="coupled-resource", + search_paths=[ + "gmd:identificationInfo/srv:SV_ServiceIdentification/srv:operatesOn/@xlink:href", + ], + multiplicity="*", + ), + GeminiElement( + name="additional-information-source", + search_paths=[ + "gmd:identificationInfo/gmd:MD_DataIdentification/gmd:supplementalInformation/gco:CharacterString/text()", + ], + multiplicity="0..1", + ), + GeminiDataFormat( + name="data-format", + search_paths=[ + "gmd:distributionInfo/gmd:MD_Distribution/gmd:distributionFormat/gmd:MD_Format", + ], + multiplicity="*", + ), + GeminiResourceLocator( + name="resource-locator", + search_paths=[ + "gmd:distributionInfo/gmd:MD_Distribution/gmd:transferOptions/gmd:MD_DigitalTransferOptions/gmd:onLine/gmd:CI_OnlineResource", + ], + multiplicity="*", + ), + GeminiElement( + name="conformity-specification", + search_paths=[ + "gmd:dataQualityInfo/gmd:DQ_DataQuality/gmd:report/gmd:DQ_DomainConsistency/gmd:result/gmd:DQ_ConformanceResult/gmd:specification", + ], + multiplicity="0..1", + ), + GeminiElement( + name="conformity-pass", + search_paths=[ + "gmd:dataQualityInfo/gmd:DQ_DataQuality/gmd:report/gmd:DQ_DomainConsistency/gmd:result/gmd:DQ_ConformanceResult/gmd:pass/gco:Boolean/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="conformity-explanation", + search_paths=[ + "gmd:dataQualityInfo/gmd:DQ_DataQuality/gmd:report/gmd:DQ_DomainConsistency/gmd:result/gmd:DQ_ConformanceResult/gmd:explanation/gco:CharacterString/text()", + ], + multiplicity="0..1", + ), + GeminiElement( + name="lineage", + search_paths=[ + "gmd:dataQualityInfo/gmd:DQ_DataQuality/gmd:lineage/gmd:LI_Lineage/gmd:statement/gco:CharacterString/text()", + ], + multiplicity="0..1", + ) + ] + + def infer_values(self, values): + # Todo: Infer name. + self.infer_date_released(values) + self.infer_date_updated(values) + self.infer_url(values) + # Todo: Infer resources. + self.infer_tags(values) + self.infer_publisher(values) + self.infer_contact(values) + self.infer_contact_email(values) + return values + + def infer_date_released(self, values): + value = '' + for date in values['dataset-reference-date']: + if date['type'] == 'publication': + value = date['value'] + break + values['date-released'] = value + + def infer_date_updated(self, values): + value = '' + # Todo: Use last of several multiple revision dates. + for date in values['dataset-reference-date']: + if date['type'] == 'revision': + value = date['value'] + break + values['date-updated'] = value + + def infer_url(self, values): + value = '' + for locator in values['resource-locator']: + if locator['function'] == 'information': + value = locator['url'] + break + values['url'] = value + + def infer_tags(self, values): + tags = [] + for key in ['keyword-inspire-theme', 'keyword-controlled-other', 'keyword-free-text']: + for item in values[key]: + if item not in tags: + tags.append(item) + values['tags'] = tags + + def infer_publisher(self, values): + value = '' + for responsible_party in values['responsible-organisation']: + if responsible_party['role'] == 'publisher': + value = responsible_party['organisation-name'] + if value: + break + values['publisher'] = value + + def infer_contact(self, values): + value = '' + for responsible_party in values['responsible-organisation']: + value = responsible_party['organisation-name'] + if value: + break + values['contact'] = value + + def infer_contact_email(self, values): + value = '' + for responsible_party in values['responsible-organisation']: + if isinstance(responsible_party, dict) and \ + isinstance(responsible_party.get('contact-info'), dict) and \ + responsible_party['contact-info'].has_key('email'): + value = responsible_party['contact-info']['email'] + if value: + break + values['contact-email'] = value + + +class HarvestedDocument(HarvestDomainObject, + vdm.sqlalchemy.RevisionedObjectMixin, + vdm.sqlalchemy.StatefulObjectMixin, + ): + + def read_values(self): + if "gmd:MD_Metadata" in self.content: + gemini_document = GeminiDocument(self.content) + else: + raise HarvesterError, "Can't identify type of document content: %s" % self.content + return gemini_document.read_values() + +harvest_source_table = Table('harvest_source', metadata, + Column('id', types.UnicodeText, primary_key=True, default=make_uuid), + Column('url', types.UnicodeText, unique=True, nullable=False), + Column('description', types.UnicodeText, default=u''), + Column('user_ref', types.UnicodeText, default=u''), + Column('publisher_ref', types.UnicodeText, default=u''), + Column('created', DateTime, default=datetime.datetime.utcnow), +) +harvesting_job_table = Table('harvesting_job', metadata, + Column('id', types.UnicodeText, primary_key=True, default=make_uuid), + Column('status', types.UnicodeText, default=u'New', nullable=False), + Column('created', DateTime, default=datetime.datetime.utcnow), + Column('user_ref', types.UnicodeText, nullable=False), + Column('report', JsonType), + Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')), +) +harvested_document_table = Table('harvested_document', metadata, + Column('id', types.UnicodeText, primary_key=True, default=make_uuid), + Column('guid', types.UnicodeText, default=''), + Column('created', DateTime, default=datetime.datetime.utcnow), + Column('content', types.UnicodeText, nullable=False), + Column('source_id', types.UnicodeText, ForeignKey('harvest_source.id')), + Column('package_id', types.UnicodeText, ForeignKey('package.id')), +) + +vdm.sqlalchemy.make_table_stateful(harvested_document_table) +harvested_document_revision_table = vdm.sqlalchemy.make_revisioned_table(harvested_document_table) + +mapper( + HarvestedDocument, + harvested_document_table, + properties={ + 'package':relation( + Package, + # Using the plural but there should only ever be one + backref='documents', + ), + }, + extension=[ + vdm.sqlalchemy.Revisioner( + harvested_document_revision_table + ), + ] +) +mapper( + HarvestingJob, + harvesting_job_table, +) +mapper( + HarvestSource, + harvest_source_table, + properties={ + 'documents': relation( + HarvestedDocument, + backref='source', + ), + 'jobs': relation( + HarvestingJob, + backref=u'source', + order_by=harvesting_job_table.c.created, + ), + }, +) + +vdm.sqlalchemy.modify_base_object_mapper(HarvestedDocument, Revision, State) +HarvestedDocumentRevision = vdm.sqlalchemy.create_object_version( + mapper, HarvestedDocument, harvested_document_revision_table) From 2c8eb94344bf2405c39096d71339ed8b7a0b644f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adri=C3=A0=20Mercader?= Date: Mon, 14 Mar 2011 17:04:41 +0000 Subject: [PATCH 2/2] #1030 Move harvesting controller to ckanext-harvest --- ckanext/harvest/commands/harvester.py | 2 +- ckanext/harvest/controllers/harvesting.py | 429 ++++++++++++++++++++++ 2 files changed, 430 insertions(+), 1 deletion(-) create mode 100644 ckanext/harvest/controllers/harvesting.py diff --git a/ckanext/harvest/commands/harvester.py b/ckanext/harvest/commands/harvester.py index 4d7b7e2..3d2199e 100644 --- a/ckanext/harvest/commands/harvester.py +++ b/ckanext/harvest/commands/harvester.py @@ -108,7 +108,7 @@ class Harvester(CkanCommand): import pylons pylons.translator._push_object(_get_translator(pylons.config.get('lang'))) - from ckan.controllers.harvesting import HarvestingJobController + from ckanext.harvest.controllers.harvesting import HarvestingJobController from ckanext.csw.validation import Validator jobs = HarvestingJob.filter(status=u"New").all() diff --git a/ckanext/harvest/controllers/harvesting.py b/ckanext/harvest/controllers/harvesting.py new file mode 100644 index 0000000..4c0975d --- /dev/null +++ b/ckanext/harvest/controllers/harvesting.py @@ -0,0 +1,429 @@ +import urllib2 +from lxml import etree + +from pylons.i18n import _ + +from ckan.lib.base import * +from ckan.lib.helpers import literal +from ckan.lib.cache import proxy_cache +from ckan.lib.package_saver import PackageSaver, ValidationException +from ckan.lib.package_saver import WritePackageFromBoundFieldset +from ckan.lib.base import BaseController +from ckan.plugins import PluginImplementations, IPackageController +from ckanext.harvest.model import HarvesterError, HarvesterUrlError, ValidationError +from ckanext.harvest.model import GeminiDocument +from ckanext.harvest.model import HarvestedDocument + +import ckan.forms +from ckan.forms import GetPackageFieldset +from ckan.forms import GetEditFieldsetPackageData +import ckan.model as model +import ckan.authz +import ckan.rating +import ckan.misc +from ckan.lib.munge import munge_title_to_name + +log = __import__("logging").getLogger(__name__) + +def gen_new_name(title): + name = munge_title_to_name(title).replace('_', '-') + while '--' in name: + name = name.replace('--', '-') + like_q = u"%s%%" % name + pkg_query = model.Session.query(model.Package).filter(model.Package.name.ilike(like_q)).limit(100) + taken = [pkg.name for pkg in pkg_query] + if name not in taken: + return name + else: + counter = 1 + while counter < 101: + if name+str(counter) not in taken: + return name+str(counter) + return None + +class HarvestingJobController(object): + """\ + This is not a controller in the Pylons sense, just an object for managing + harvesting. + """ + def __init__(self, job, validator=None): + self.job = job + self.validator = validator + + def harvest_documents(self): + try: + content = self.get_content(self.job.source.url) + except HarvesterUrlError, exception: + msg = "Error harvesting source: %s" % exception + self.job.report['errors'].append(msg) + else: + # @@@ This is very ugly. Essentially for remote (CSW) services + # we purposely cause an error to detect what they are. + # Likely a much better idea just to have a type in the + # source table + source_type = self.detect_source_type(content) + if source_type not in ['doc', 'waf', 'csw']: + if source_type == None: + self.job.report['errors'].append( + "Unable to detect source type from content", + ) + else: + self.job.report['errors'].append( + "Source type '%s' not supported" % source_type + ) + else: + # @@@ We want a model where the harvesting returns + # documents, then each document is parsed and errors + # are associated with the document itself, and the + # documents are serialised afterwards I think. + # Here everything is done in one go. + if source_type == 'doc': + self.harvest_gemini_document(content) + elif source_type == 'csw': + self.harvest_csw_documents(url=self.job.source.url) + elif source_type == 'waf': + self.harvest_waf_documents(content) + # Set the status based on the outcome + if not self.job.report.get('errors', []): + self.job.status = u"Success" + elif self.job.report.get('added', []) and self.job.report.get('errors', []): + self.job.status = u"Partial Success" + elif not self.job.report.get('added', []) and not self.job.report.get('errors', []): + self.job.status = u"No Change" + elif not self.job.report.get('added', []) and self.job.report.get('errors', []): + self.job.status = u"Failed" + self.job.save() + return self.job + + def write_package_from_gemini_string(self, content): + """Create or update a Package based on some content that has + come from a URL. + + Also store the raw content as a HarvestedDocument (with + references to its source and its package) + """ + # Look for previously harvested document matching Gemini GUID + harvested_doc = None + package = None + gemini_document = GeminiDocument(content) + gemini_values = gemini_document.read_values() + gemini_guid = gemini_values['guid'] + harvested_documents = HarvestedDocument.filter(guid=gemini_guid).all() + if len(harvested_documents) > 1: + # A programming error; should never happen + raise Exception( + "More than one harvested document GUID %s" % gemini_guid + ) + elif len(harvested_documents) == 1: + # we've previously harvested this (i.e. it's an update) + harvested_doc = harvested_documents[0] + if harvested_doc.source is None: + # The source has been deleted, we can re-use it + log.info('This document existed from another source which was deleted, using your document instead') + harvested_doc.source = self.job.source + package = harvested_doc.package + harvested_doc.save() + package.save() + return None + if harvested_doc.source.id != self.job.source.id: + # A 'user' error: there are two or more sources + # pointing to the same harvested document + if self.job.source.id is None: + raise Exception('You cannot have an unsaved job source') + raise HarvesterError( + literal("Another source %s (publisher %s, user %s) is using metadata GUID %s" % ( + harvested_doc.source.url, + harvested_doc.source.publisher_ref, + harvested_doc.source.user_ref, + gemini_guid, + )) + ) + if harvested_doc.content == content: + log.info("Document with GUID %s unchanged, skipping..." % (gemini_guid)) + return None + else: + log.info("Harvested document with GUID %s has changed, re-creating..." % (gemini_guid)) + log.info("Updating package for %s" % gemini_guid) + package = harvested_doc.package + else: + log.info("No package with GEMINI guid %s found, let's create one" % gemini_guid) + extras = { + 'published_by': int(self.job.source.publisher_ref or 0), + 'INSPIRE': 'True', + } + # Just add some of the metadata as extras, not the whole lot + for name in [ + # Essentials + 'bbox-east-long', + 'bbox-north-lat', + 'bbox-south-lat', + 'bbox-west-long', + 'guid', + # Usefuls + 'spatial-reference-system', + 'dataset-reference-date', + 'resource-type', + 'metadata-language', # Language + 'metadata-date', # Released + ]: + extras[name] = gemini_values[name] + extras['constraint'] = '; '.join(gemini_values.get("use-constraints", '')+gemini_values.get("limitations-on-public-access")) + if gemini_values.has_key('temporal-extent-begin'): + #gemini_values['temporal-extent-begin'].sort() + extras['temporal_coverage-from'] = gemini_values['temporal-extent-begin'] + if gemini_values.has_key('temporal-extent-end'): + #gemini_values['temporal-extent-end'].sort() + extras['temporal_coverage-to'] = gemini_values['temporal-extent-end'] + package_data = { + 'title': gemini_values['title'], + 'notes': gemini_values['abstract'], + 'extras': extras, + 'tags': gemini_values['tags'], + } + if package is None or package.title != gemini_values['title']: + name = gen_new_name(gemini_values['title']) + if not name: + name = gen_new_name(str(gemini_guid)) + if not name: + raise Exception('Could not generate a unique name from the title or the GUID. Please choose a more unique title.') + package_data['name'] = name + resource_locator = gemini_values.get('resource-locator', []) and gemini_values['resource-locator'][0].get('url') or '' + if resource_locator: + package_data['resources'] = [ + { + 'url': resource_locator, + 'description': 'Resource locator', + 'format': 'Unverified', + }, + # These are generated in Drupal now + #{ + # 'url': '%s/api/2/rest/harvesteddocument/%s/xml/%s.xml'%( + # config.get('ckan.api_url', '/').rstrip('/'), + # gemini_guid, + # gemini_guid, + # ), + # 'description': 'Source GEMINI 2 document', + # 'format': 'XML', + #}, + #{ + # 'url': '%s/api/rest/harvesteddocument/%s/html'%( + # config.get('ckan.api_url', '/').rstrip('/'), + # gemini_guid, + # ), + # 'description': 'Formatted GEMINI 2 document', + # 'format': 'HTML', + #}, + ] + if package == None: + # Create new package from data. + package = self._create_package_from_data(package_data) + log.info("Created new package ID %s with GEMINI guid %s", package.id, gemini_guid) + harvested_doc = HarvestedDocument( + content=content, + guid=gemini_guid, + package=package, + source=self.job.source, + ) + harvested_doc.save() + if not harvested_doc.source_id: + raise Exception('Failed to set the source for document %r'%harvested_doc.id) + assert gemini_guid == package.documents[0].guid + return package + else: + package = self._create_package_from_data(package_data, package = package) + log.info("Updated existing package ID %s with existing GEMINI guid %s", package.id, gemini_guid) + harvested_doc.content = content + harvested_doc.source = self.job.source + harvested_doc.save() + if not harvested_doc.source_id: + raise Exception('Failed to set the source for document %r'%harvested_doc.id) + assert gemini_guid == package.documents[0].guid + return package + + def get_content(self, url): + try: + http_response = urllib2.urlopen(url) + return http_response.read() #decode_response(http_response) + except Exception, inst: + msg = "Unable to get content for URL: %s: %r" % (url, inst) + raise HarvesterUrlError(msg) + + def detect_source_type(self, content): + if "urn:ogc:def:crs:EPSG::27700', + 'temporal_coverage-from': '1977-03-10T11:45:30', + 'temporal_coverage-to': '2005-01-15T09:10:00'}, + 'name': 'council-owned-litter-bins', + 'notes': 'Location of Council owned litter bins within Borough.', + 'resources': [{'description': 'Resource locator', + 'format': 'Unverified', + 'url': 'http://www.barrowbc.gov.uk'}], + 'tags': ['Utility and governmental services'], + 'title': 'Council Owned Litter Bins'} + ''' + + if not package: + package = model.Package() + + rev = model.repo.new_revision() + + relationship_attr = ['extras', 'resources', 'tags'] + + package_properties = {} + for key, value in package_data.iteritems(): + if key not in relationship_attr: + setattr(package, key, value) + + tags = package_data.get('tags', []) + + for tag in tags: + package.add_tag_by_name(tag, autoflush=False) + + for resource_dict in package_data.get("resources", []): + resource = model.Resource(**resource_dict) + package.resources[:] = [] + package.resources.append(resource) + + for key, value in package_data.get("extras", {}).iteritems(): + extra = model.PackageExtra(key=key, value=value) + package._extras[key] = extra + + model.Session.add(package) + model.Session.flush() + + model.setup_default_user_roles(package, []) + + rev.message = _(u'Harvester: Created package %s') \ + % str(package.id) + + model.Session.add(rev) + model.Session.commit() + + return package +