[metadata transformation] separate the XSLT creation errors from the XSLT application, let the 1st propagate
This commit is contained in:
parent
f3f7604e6c
commit
6c805b0784
|
@ -110,9 +110,6 @@ public class TransformSparkJobNode {
|
||||||
final String workflowId = args.get("workflowId");
|
final String workflowId = args.get("workflowId");
|
||||||
log.info("workflowId is {}", workflowId);
|
log.info("workflowId is {}", workflowId);
|
||||||
|
|
||||||
MapFunction<MetadataRecord, MetadataRecord> x = TransformationFactory
|
|
||||||
.getTransformationPlugin(args, ct, isLookUpService);
|
|
||||||
|
|
||||||
final Dataset<MetadataRecord> inputMDStore = spark
|
final Dataset<MetadataRecord> inputMDStore = spark
|
||||||
.read()
|
.read()
|
||||||
.format("parquet")
|
.format("parquet")
|
||||||
|
@ -124,10 +121,13 @@ public class TransformSparkJobNode {
|
||||||
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
|
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
|
||||||
try (AggregatorReport report = new AggregatorReport(messageSender)) {
|
try (AggregatorReport report = new AggregatorReport(messageSender)) {
|
||||||
try {
|
try {
|
||||||
|
final MapFunction<MetadataRecord, MetadataRecord> tr = TransformationFactory
|
||||||
|
.getTransformationPlugin(args, ct, report, isLookUpService);
|
||||||
|
|
||||||
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
||||||
.javaRDD()
|
.javaRDD()
|
||||||
.repartition(getRepartitionNumber(totalInput, rpt))
|
.repartition(getRepartitionNumber(totalInput, rpt))
|
||||||
.map((Function<MetadataRecord, MetadataRecord>) x::call)
|
.map((Function<MetadataRecord, MetadataRecord>) tr::call)
|
||||||
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
||||||
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ public class TransformSparkJobNode {
|
||||||
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
log.error("error during record transformation", e);
|
log.error("error during record transformation", e);
|
||||||
report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage());
|
report.put(e.getClass().getName(), e.getMessage());
|
||||||
report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString());
|
report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString());
|
||||||
report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString());
|
report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString());
|
||||||
report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString());
|
report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString());
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||||
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
||||||
|
@ -27,7 +28,8 @@ public class TransformationFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
|
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
|
||||||
final Map<String, String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
|
final Map<String, String> jobArgument, final AggregationCounter counters, final AggregatorReport report,
|
||||||
|
final ISLookUpService isLookupService)
|
||||||
throws DnetTransformationException {
|
throws DnetTransformationException {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -45,7 +47,7 @@ public class TransformationFactory {
|
||||||
transformationRuleId, isLookupService);
|
transformationRuleId, isLookupService);
|
||||||
|
|
||||||
final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation"));
|
final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation"));
|
||||||
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
|
return new XSLTTransformationFunction(counters, report, transformationRule, dateOfTransformation,
|
||||||
vocabularies);
|
vocabularies);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,10 +7,13 @@ import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
import javax.xml.transform.stream.StreamSource;
|
import javax.xml.transform.stream.StreamSource;
|
||||||
|
|
||||||
|
import org.apache.avro.test.specialtypes.value;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||||
import net.sf.saxon.s9api.*;
|
import net.sf.saxon.s9api.*;
|
||||||
|
@ -25,9 +28,9 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
||||||
|
|
||||||
private final AggregationCounter aggregationCounter;
|
private final AggregationCounter aggregationCounter;
|
||||||
|
|
||||||
private final String transformationRule;
|
private final AggregatorReport report;
|
||||||
|
|
||||||
private final Cleaner cleanFunction;
|
private final String transformationRule;
|
||||||
|
|
||||||
private final long dateOfTransformation;
|
private final long dateOfTransformation;
|
||||||
|
|
||||||
|
@ -35,55 +38,66 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
||||||
|
|
||||||
public XSLTTransformationFunction(
|
public XSLTTransformationFunction(
|
||||||
final AggregationCounter aggregationCounter,
|
final AggregationCounter aggregationCounter,
|
||||||
|
final AggregatorReport report,
|
||||||
final String transformationRule,
|
final String transformationRule,
|
||||||
long dateOfTransformation,
|
long dateOfTransformation,
|
||||||
final VocabularyGroup vocabularies) {
|
final VocabularyGroup vocabularies) {
|
||||||
this.aggregationCounter = aggregationCounter;
|
this.aggregationCounter = aggregationCounter;
|
||||||
|
this.report = report;
|
||||||
this.transformationRule = transformationRule;
|
this.transformationRule = transformationRule;
|
||||||
this.vocabularies = vocabularies;
|
this.vocabularies = vocabularies;
|
||||||
this.dateOfTransformation = dateOfTransformation;
|
this.dateOfTransformation = dateOfTransformation;
|
||||||
cleanFunction = new Cleaner(vocabularies);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetadataRecord call(MetadataRecord value) {
|
public MetadataRecord call(MetadataRecord value) {
|
||||||
aggregationCounter.getTotalItems().add(1);
|
aggregationCounter.getTotalItems().add(1);
|
||||||
try {
|
|
||||||
Processor processor = new Processor(false);
|
|
||||||
|
|
||||||
processor.registerExtensionFunction(cleanFunction);
|
final Processor xsltProcessor = new Processor(false);
|
||||||
processor.registerExtensionFunction(new DateCleaner());
|
xsltProcessor.registerExtensionFunction(new Cleaner(vocabularies));
|
||||||
processor.registerExtensionFunction(new PersonCleaner());
|
xsltProcessor.registerExtensionFunction(new DateCleaner());
|
||||||
|
xsltProcessor.registerExtensionFunction(new PersonCleaner());
|
||||||
|
|
||||||
final XsltCompiler comp = processor.newXsltCompiler();
|
|
||||||
QName datasourceIDParam = new QName(DATASOURCE_ID_PARAM);
|
|
||||||
comp.setParameter(datasourceIDParam, new XdmAtomicValue(value.getProvenance().getDatasourceId()));
|
|
||||||
QName datasourceNameParam = new QName(DATASOURCE_NAME_PARAM);
|
|
||||||
comp.setParameter(datasourceNameParam, new XdmAtomicValue(value.getProvenance().getDatasourceName()));
|
|
||||||
XsltExecutable xslt = comp
|
|
||||||
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)));
|
|
||||||
XdmNode source = processor
|
|
||||||
.newDocumentBuilder()
|
|
||||||
.build(new StreamSource(IOUtils.toInputStream(value.getBody(), StandardCharsets.UTF_8)));
|
|
||||||
XsltTransformer trans = xslt.load();
|
|
||||||
trans.setInitialContextNode(source);
|
|
||||||
final StringWriter output = new StringWriter();
|
final StringWriter output = new StringWriter();
|
||||||
Serializer out = processor.newSerializer(output);
|
final Serializer out = xsltProcessor.newSerializer(output);
|
||||||
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
||||||
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
||||||
|
|
||||||
trans.setDestination(out);
|
XsltTransformer transformer;
|
||||||
trans.transform();
|
try {
|
||||||
|
transformer = xsltProcessor
|
||||||
|
.newXsltCompiler()
|
||||||
|
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)))
|
||||||
|
.load();
|
||||||
|
} catch (SaxonApiException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
transformer
|
||||||
|
.setParameter(new QName(DATASOURCE_ID_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceId()));
|
||||||
|
transformer
|
||||||
|
.setParameter(
|
||||||
|
new QName(DATASOURCE_NAME_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceName()));
|
||||||
|
|
||||||
|
try {
|
||||||
|
final XdmNode source = xsltProcessor
|
||||||
|
.newDocumentBuilder()
|
||||||
|
.build(new StreamSource(IOUtils.toInputStream(value.getBody(), StandardCharsets.UTF_8)));
|
||||||
|
|
||||||
|
transformer.setInitialContextNode(source);
|
||||||
|
transformer.setDestination(out);
|
||||||
|
transformer.transform();
|
||||||
|
} catch (SaxonApiException e) {
|
||||||
|
report.put(e.getClass().getName(), e.getMessage());
|
||||||
|
aggregationCounter.getErrorItems().add(1);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
final String xml = output.toString();
|
final String xml = output.toString();
|
||||||
value.setBody(xml);
|
value.setBody(xml);
|
||||||
value.setDateOfTransformation(dateOfTransformation);
|
value.setDateOfTransformation(dateOfTransformation);
|
||||||
aggregationCounter.getProcessedItems().add(1);
|
aggregationCounter.getProcessedItems().add(1);
|
||||||
return value;
|
return value;
|
||||||
} catch (Throwable e) {
|
|
||||||
aggregationCounter.getErrorItems().add(1);
|
|
||||||
return null;
|
|
||||||
// throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public AggregationCounter getAggregationCounter() {
|
public AggregationCounter getAggregationCounter() {
|
||||||
|
@ -94,10 +108,6 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
||||||
return transformationRule;
|
return transformationRule;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Cleaner getCleanFunction() {
|
|
||||||
return cleanFunction;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getDateOfTransformation() {
|
public long getDateOfTransformation() {
|
||||||
return dateOfTransformation;
|
return dateOfTransformation;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.StringReader;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -19,6 +20,8 @@ import org.apache.spark.sql.Encoder;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
import org.dom4j.Document;
|
||||||
|
import org.dom4j.io.SAXReader;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.DisplayName;
|
import org.junit.jupiter.api.DisplayName;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -28,6 +31,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
|
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.Provenance;
|
import eu.dnetlib.dhp.schema.mdstore.Provenance;
|
||||||
import eu.dnetlib.dhp.transformation.xslt.DateCleaner;
|
import eu.dnetlib.dhp.transformation.xslt.DateCleaner;
|
||||||
|
@ -83,9 +87,11 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
||||||
@DisplayName("Test Transform Inst.&Them.v4 record XML with zenodo_tr")
|
@DisplayName("Test Transform Inst.&Them.v4 record XML with zenodo_tr")
|
||||||
void testTransformITGv4Zenodo() throws Exception {
|
void testTransformITGv4Zenodo() throws Exception {
|
||||||
|
|
||||||
|
final String dsName = "Zenodo";
|
||||||
|
final String dsId = "opendoar___::1234";
|
||||||
// We Set the input Record getting the XML from the classpath
|
// We Set the input Record getting the XML from the classpath
|
||||||
final MetadataRecord mr = new MetadataRecord();
|
final MetadataRecord mr = new MetadataRecord();
|
||||||
mr.setProvenance(new Provenance("DSID", "DSNAME", "PREFIX"));
|
mr.setProvenance(new Provenance(dsId, dsName, "PREFIX"));
|
||||||
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input_itgv4.xml")));
|
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input_itgv4.xml")));
|
||||||
// We Load the XSLT transformation Rule from the classpath
|
// We Load the XSLT transformation Rule from the classpath
|
||||||
final XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/zenodo_tr.xslt");
|
final XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/zenodo_tr.xslt");
|
||||||
|
@ -94,7 +100,12 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
||||||
|
|
||||||
// Print the record
|
// Print the record
|
||||||
System.out.println(result.getBody());
|
System.out.println(result.getBody());
|
||||||
// TODO Create significant Assert
|
|
||||||
|
Document record = new SAXReader().read(new StringReader(result.getBody()));
|
||||||
|
assertEquals(dsName, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@name"));
|
||||||
|
assertEquals(dsId, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@id"));
|
||||||
|
|
||||||
|
// TODO Create more significant Asserts
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -282,7 +293,9 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
||||||
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
|
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
|
||||||
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
||||||
final LongAccumulator la = new LongAccumulator();
|
final LongAccumulator la = new LongAccumulator();
|
||||||
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
|
final AggregationCounter counter = new AggregationCounter(la, la, la);
|
||||||
|
final AggregatorReport report = new AggregatorReport();
|
||||||
|
return new XSLTTransformationFunction(counter, report, trValue, 0, vocabularies);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue