spatial-d4science/ckanext/spatial/validation/validation.py

358 lines
13 KiB
Python
Raw Normal View History

import os
from pkg_resources import resource_stream
2012-10-22 20:36:03 +02:00
from ckanext.spatial.model import GeminiDocument
from lxml import etree
log = __import__("logging").getLogger(__name__)
class BaseValidator(object):
'''Base class for a validator.'''
name = None
title = None
@classmethod
def is_valid(cls, xml):
'''
Runs the validation on the supplied XML etree.
Returns a tuple, the first value is a boolean indicating
whether the validation passed or not. The second is a list of tuples,
each containing the error message and the error line.
Returns tuple:
(is_valid, [(error_message_string, error_line_number)])
'''
raise NotImplementedError
class XsdValidator(BaseValidator):
'''Base class for validators that use an XSD schema.'''
@classmethod
def _is_valid(cls, xml, xsd_filepath, xsd_name):
'''Returns whether or not an XML file is valid according to
an XSD. Returns a tuple, the first value is a boolean indicating
whether the validation passed or not. The second is a list of tuples,
each containing the error message and the error line.
Params:
xml - etree of the XML to be validated
xsd_filepath - full path to the XSD file
xsd_name - string describing the XSD
Returns:
(is_valid, [(error_message_string, error_line_number)])
'''
xsd = etree.parse(xsd_filepath)
schema = etree.XMLSchema(xsd)
# With libxml2 versions before 2.9, this fails with this error:
# gmx_schema = etree.XMLSchema(gmx_xsd)
# File "xmlschema.pxi", line 103, in lxml.etree.XMLSchema.__init__ (src/lxml/lxml.etree.c:116069)
# XMLSchemaParseError: local list type: A type, derived by list or union, must have the simple ur-type definition as base type, not '{http://www.opengis.net/gml/3.2}doubleList'., line 118
try:
schema.assertValid(xml)
except etree.DocumentInvalid:
log.info('Validation errors found using schema {0}'.format(xsd_name))
errors = []
for error in schema.error_log:
errors.append((error.message, error.line))
errors.insert
return False, errors
return True, []
class ISO19139Schema(XsdValidator):
name = 'iso19139'
title = 'ISO19139 XSD Schema'
@classmethod
def is_valid(cls, xml):
xsd_path = 'xml/iso19139'
gmx_xsd_filepath = os.path.join(os.path.dirname(__file__),
xsd_path, 'gmx/gmx.xsd')
xsd_name = 'Dataset schema (gmx.xsd)'
is_valid, errors = cls._is_valid(xml, gmx_xsd_filepath, xsd_name)
if not is_valid:
#TODO: not sure if we need this one, keeping for backwards compatibility
errors.insert(0, ('{0} Validation Error'.format(xsd_name), None))
return is_valid, errors
class ISO19139EdenSchema(XsdValidator):
name = 'iso19139eden'
title = 'ISO19139 XSD Schema (EDEN 2009-03-16)'
@classmethod
def is_valid(cls, xml):
xsd_path = 'xml/iso19139eden'
metadata_type = cls.get_record_type(xml)
if metadata_type in ('dataset', 'series'):
gmx_xsd_filepath = os.path.join(os.path.dirname(__file__),
xsd_path, 'gmx/gmx.xsd')
xsd_name = 'Dataset schema (gmx.xsd)'
is_valid, errors = cls._is_valid(xml, gmx_xsd_filepath, xsd_name)
if not is_valid:
#TODO: not sure if we need this one, keeping for backwards compatibility
errors.insert(0, ('{0} Validation Error'.format(xsd_name), None))
elif metadata_type == 'service':
gmx_and_srv_xsd_filepath = os.path.join(os.path.dirname(__file__),
xsd_path, 'gmx_and_srv.xsd')
xsd_name = 'Service schemas (gmx.xsd & srv.xsd)'
is_valid, errors = cls._is_valid(xml, gmx_and_srv_xsd_filepath, xsd_name)
if not is_valid:
#TODO: not sure if we need this one, keeping for backwards compatibility
errors.insert(0, ('{0} Validation Error'.format(xsd_name), None))
else:
is_valid = False
errors = [('Metadata type not recognised "%s" - cannot choose an ISO19139 validator.' %
metadata_type, None)]
if is_valid:
return True, []
return False, errors
@classmethod
def get_record_type(cls, xml):
'''
For a given ISO19139 record, returns the "type"
e.g. "dataset", "series", "service"
xml - etree of the ISO19139 XML record
'''
gemini = GeminiDocument(xml_tree=xml)
return gemini.read_value('resource-type')
class ISO19139NGDCSchema(XsdValidator):
'''
XSD based validation for ISO 19139 documents.
Uses XSD schema from the NOAA National Geophysical Data Center:
http://ngdc.noaa.gov/metadata/published/xsd/
'''
name = 'iso19139ngdc'
title = 'ISO19139 XSD Schema (NGDC)'
@classmethod
def is_valid(cls, xml):
xsd_path = 'xml/iso19139ngdc'
xsd_filepath = os.path.join(os.path.dirname(__file__),
xsd_path, 'schema.xsd')
return cls._is_valid(xml, xsd_filepath, 'NGDC Schema (schema.xsd)')
2012-10-29 15:34:29 +01:00
class FGDCSchema(XsdValidator):
'''
XSD based validation for FGDC metadata documents.
Uses XSD schema from the Federal Geographic Data Comittee:
http://www.fgdc.gov/schemas/metadata/
'''
name = 'fgdc'
title = 'FGDC XSD Schema'
@classmethod
def is_valid(cls, xml):
xsd_path = 'xml/fgdc'
xsd_filepath = os.path.join(os.path.dirname(__file__),
xsd_path, 'fgdc-std-001-1998.xsd')
return cls._is_valid(xml, xsd_filepath, 'FGDC Schema (fgdc-std-001-1998.xsd)')
2012-10-29 15:34:29 +01:00
class SchematronValidator(BaseValidator):
'''Base class for a validator that uses Schematron.'''
has_init = False
@classmethod
def get_schematrons(cls):
'''Subclasses should override this method to implement
their validation.'''
raise NotImplementedError
@classmethod
def is_valid(cls, xml):
'''Returns whether or not an XML file is valid according to
a schematron. Returns a tuple, the first value is a boolean indicating
whether the validation passed or not. The second is a list of tuples,
each containing the error message and the error line (which defaults to
None on the schematron validation case).
Params:
xml - etree of the XML to be validated
Returns:
(is_valid, [(error_message_string, error_line_number)])
'''
if not hasattr(cls, 'schematrons'):
log.info('Compiling schematron "%s"', cls.title)
cls.schematrons = cls.get_schematrons()
for schematron in cls.schematrons:
result = schematron(xml)
errors = []
for element in result.findall("{http://purl.oclc.org/dsdl/svrl}failed-assert"):
errors.append(element)
if len(errors) > 0:
messages_already_reported = set()
error_details = []
for error in errors:
message, details = cls.extract_error_details(error)
if not message in messages_already_reported:
#TODO: perhaps can extract the source line from the error location
error_details.append((details,None))
messages_already_reported.add(message)
return False, error_details
return True, []
@classmethod
def extract_error_details(cls, failed_assert_element):
'''Given the XML Element describing a schematron test failure,
this method extracts the strings describing the failure and returns
them.
Returns:
(error_message, fuller_error_details)
'''
assert_ = failed_assert_element.get('test')
location = failed_assert_element.get('location')
message_element = failed_assert_element.find("{http://purl.oclc.org/dsdl/svrl}text")
message = message_element.text.strip()
#TODO: Do we really need such detail on the error messages?
return message, 'Error Message: "%s" Error Location: "%s" Error Assert: "%s"' % (message, location, assert_)
@classmethod
def schematron(cls, schema):
transforms = [
2012-10-22 20:37:54 +02:00
"validation/xml/schematron/iso_dsdl_include.xsl",
"validation/xml/schematron/iso_abstract_expand.xsl",
"validation/xml/schematron/iso_svrl_for_xslt1.xsl",
]
if isinstance(schema, file):
compiled = etree.parse(schema)
else:
compiled = schema
for filename in transforms:
2012-10-22 20:37:54 +02:00
with resource_stream("ckanext.spatial", filename) as stream:
xform_xml = etree.parse(stream)
xform = etree.XSLT(xform_xml)
compiled = xform(compiled)
return etree.XSLT(compiled)
class ConstraintsSchematron(SchematronValidator):
name = 'constraints'
title = 'ISO19139 Table A.1 Constraints Schematron (Medin 1.3)'
@classmethod
def get_schematrons(cls):
2012-10-22 20:37:54 +02:00
with resource_stream("ckanext.spatial",
"validation/xml/medin/ISOTS19139A1Constraints_v1.3.sch") as schema:
return [cls.schematron(schema)]
class ConstraintsSchematron14(SchematronValidator):
name = 'constraints-1.4'
title = 'ISO19139 Table A.1 Constraints Schematron (Medin/Parslow 1.4)'
@classmethod
def get_schematrons(cls):
with resource_stream("ckanext.spatial",
"validation/xml/medin/ISOTS19139A1Constraints_v1.4.sch") as schema:
return [cls.schematron(schema)]
class Gemini2Schematron(SchematronValidator):
name = 'gemini2'
title = 'GEMINI 2.1 Schematron 1.2'
@classmethod
def get_schematrons(cls):
2012-10-22 20:37:54 +02:00
with resource_stream("ckanext.spatial",
"validation/xml/gemini2/gemini2-schematron-20110906-v1.2.sch") as schema:
return [cls.schematron(schema)]
class Gemini2Schematron13(SchematronValidator):
name = 'gemini2-1.3'
title = 'GEMINI 2.1 Schematron 1.3a'
@classmethod
def get_schematrons(cls):
with resource_stream("ckanext.spatial",
"validation/xml/gemini2/Gemini2_R1r3a.sch") as schema:
return [cls.schematron(schema)]
all_validators = (ISO19139Schema,
ISO19139EdenSchema,
ISO19139NGDCSchema,
2012-10-29 15:34:29 +01:00
FGDCSchema,
ConstraintsSchematron,
ConstraintsSchematron14,
Gemini2Schematron,
Gemini2Schematron13)
class Validators(object):
'''
Validates XML against one or more profiles (i.e. validators).
'''
def __init__(self, profiles=["iso19139", "constraints", "gemini2"]):
self.profiles = profiles
self.validators = {} # name: class
for validator_class in all_validators:
self.validators[validator_class.name] = validator_class
def add_validator(self, validator_class):
self.validators[validator_class.name] = validator_class
def isvalid(self, xml):
'''For backward compatibility'''
return self.is_valid(xml)
def is_valid(self, xml):
'''Returns whether or not an XML file is valid.
Returns a tuple, the first value is a boolean indicating
whether the validation passed or not. The second is the name of the profile
that failed and the third is a list of tuples,
each containing the error message and the error line if present.
Params:
xml - etree of the XML to be validated
Returns:
(is_valid, failed_profile_name, [(error_message_string, error_line_number)])
'''
log.debug('Starting validation against profile(s) %s' % ','.join(self.profiles))
for name in self.profiles:
validator = self.validators[name]
is_valid, error_message_list = validator.is_valid(xml)
if not is_valid:
#error_message_list.insert(0, 'Validating against "%s" profile failed' % validator.title)
log.info('Validating against "%s" profile failed' % validator.title)
log.debug('%r', error_message_list)
return False, validator.name, error_message_list
log.debug('Validated against "%s"', validator.title)
log.info('Validation passed')
return True, None, []
if __name__ == '__main__':
from sys import argv
import logging
from pprint import pprint
logging.basicConfig()
if len(argv) == 3:
profiles = argv[2].split(',')
else:
profiles = ["iso19139", "constraints", "gemini2"]
v = Validators(profiles)
result = v.is_valid(etree.parse(open(argv[1])))
pprint(result)