Compare commits

...

3 Commits

4 changed files with 90 additions and 65 deletions

View File

@ -77,11 +77,11 @@ public class TransformSparkJobNode {
.ofNullable(parser.get("recordsPerTask"))
.map(Integer::valueOf)
.orElse(RECORDS_PER_TASK);
log.info("recordsPerTask: {}", rpt);
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
SparkConf conf = new SparkConf();
@ -110,9 +110,6 @@ public class TransformSparkJobNode {
final String workflowId = args.get("workflowId");
log.info("workflowId is {}", workflowId);
MapFunction<MetadataRecord, MetadataRecord> x = TransformationFactory
.getTransformationPlugin(args, ct, isLookUpService);
final Dataset<MetadataRecord> inputMDStore = spark
.read()
.format("parquet")
@ -123,30 +120,24 @@ public class TransformSparkJobNode {
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
try (AggregatorReport report = new AggregatorReport(messageSender)) {
try {
JavaRDD<MetadataRecord> mdstore = inputMDStore
.javaRDD()
.repartition(getRepartitionNumber(totalInput, rpt))
.map((Function<MetadataRecord, MetadataRecord>) x::call)
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
final MapFunction<MetadataRecord, MetadataRecord> tr = TransformationFactory
.getTransformationPlugin(args, ct, report, isLookUpService);
log.info("Transformed item {}", ct.getProcessedItems().count());
log.info("Total item {}", ct.getTotalItems().count());
log.info("Transformation Error item {}", ct.getErrorItems().count());
JavaRDD<MetadataRecord> mdstore = inputMDStore
.javaRDD()
.repartition(getRepartitionNumber(totalInput, rpt))
.map((Function<MetadataRecord, MetadataRecord>) tr::call)
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
writeHdfsFile(
spark.sparkContext().hadoopConfiguration(),
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
} catch (Throwable e) {
log.error("error during record transformation", e);
report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage());
report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString());
report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString());
report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString());
throw e;
}
log.info("Transformed item {}", ct.getProcessedItems().count());
log.info("Total item {}", ct.getTotalItems().count());
log.info("Transformation Error item {}", ct.getErrorItems().count());
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
writeHdfsFile(
spark.sparkContext().hadoopConfiguration(),
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
}
}

View File

@ -10,6 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
@ -27,7 +28,8 @@ public class TransformationFactory {
}
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
final Map<String, String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
final Map<String, String> jobArgument, final AggregationCounter counters, final AggregatorReport report,
final ISLookUpService isLookupService)
throws DnetTransformationException {
try {
@ -45,7 +47,7 @@ public class TransformationFactory {
transformationRuleId, isLookupService);
final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation"));
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
return new XSLTTransformationFunction(counters, report, transformationRule, dateOfTransformation,
vocabularies);
}

View File

@ -1,16 +1,22 @@
package eu.dnetlib.dhp.transformation.xslt;
import static eu.dnetlib.dhp.common.Constants.*;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import javax.xml.transform.stream.StreamSource;
import org.apache.avro.test.specialtypes.value;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.spark.api.java.function.MapFunction;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import net.sf.saxon.s9api.*;
@ -25,9 +31,9 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
private final AggregationCounter aggregationCounter;
private final String transformationRule;
private final AggregatorReport report;
private final Cleaner cleanFunction;
private final String transformationRule;
private final long dateOfTransformation;
@ -35,55 +41,72 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
public XSLTTransformationFunction(
final AggregationCounter aggregationCounter,
final AggregatorReport report,
final String transformationRule,
long dateOfTransformation,
final VocabularyGroup vocabularies) {
this.aggregationCounter = aggregationCounter;
this.report = report;
this.transformationRule = transformationRule;
this.vocabularies = vocabularies;
this.dateOfTransformation = dateOfTransformation;
cleanFunction = new Cleaner(vocabularies);
}
@Override
public MetadataRecord call(MetadataRecord value) {
aggregationCounter.getTotalItems().add(1);
final Processor xsltProcessor = new Processor(false);
xsltProcessor.registerExtensionFunction(new Cleaner(vocabularies));
xsltProcessor.registerExtensionFunction(new DateCleaner());
xsltProcessor.registerExtensionFunction(new PersonCleaner());
final StringWriter output = new StringWriter();
final Serializer out = xsltProcessor.newSerializer(output);
out.setOutputProperty(Serializer.Property.METHOD, "xml");
out.setOutputProperty(Serializer.Property.INDENT, "yes");
XsltTransformer transformer;
try {
Processor processor = new Processor(false);
transformer = xsltProcessor
.newXsltCompiler()
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)))
.load();
} catch (SaxonApiException e) {
report.put(e.getClass().getName(), e.getMessage());
try {
report.close();
} catch (IOException ex) {
throw new IllegalArgumentException("error compiling the XSLT", e);
}
throw new IllegalArgumentException("error compiling the XSLT", e);
}
processor.registerExtensionFunction(cleanFunction);
processor.registerExtensionFunction(new DateCleaner());
processor.registerExtensionFunction(new PersonCleaner());
transformer
.setParameter(new QName(DATASOURCE_ID_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceId()));
transformer
.setParameter(
new QName(DATASOURCE_NAME_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceName()));
final XsltCompiler comp = processor.newXsltCompiler();
QName datasourceIDParam = new QName(DATASOURCE_ID_PARAM);
comp.setParameter(datasourceIDParam, new XdmAtomicValue(value.getProvenance().getDatasourceId()));
QName datasourceNameParam = new QName(DATASOURCE_NAME_PARAM);
comp.setParameter(datasourceNameParam, new XdmAtomicValue(value.getProvenance().getDatasourceName()));
XsltExecutable xslt = comp
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)));
XdmNode source = processor
try {
final XdmNode source = xsltProcessor
.newDocumentBuilder()
.build(new StreamSource(IOUtils.toInputStream(value.getBody(), StandardCharsets.UTF_8)));
XsltTransformer trans = xslt.load();
trans.setInitialContextNode(source);
final StringWriter output = new StringWriter();
Serializer out = processor.newSerializer(output);
out.setOutputProperty(Serializer.Property.METHOD, "xml");
out.setOutputProperty(Serializer.Property.INDENT, "yes");
trans.setDestination(out);
trans.transform();
final String xml = output.toString();
value.setBody(xml);
value.setDateOfTransformation(dateOfTransformation);
aggregationCounter.getProcessedItems().add(1);
return value;
} catch (Throwable e) {
transformer.setInitialContextNode(source);
transformer.setDestination(out);
transformer.transform();
} catch (SaxonApiException e) {
report.put(e.getClass().getName(), e.getMessage());
aggregationCounter.getErrorItems().add(1);
return null;
// throw new RuntimeException(e);
}
final String xml = output.toString();
value.setBody(xml);
value.setDateOfTransformation(dateOfTransformation);
aggregationCounter.getProcessedItems().add(1);
return value;
}
public AggregationCounter getAggregationCounter() {
@ -94,10 +117,6 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
return transformationRule;
}
public Cleaner getCleanFunction() {
return cleanFunction;
}
public long getDateOfTransformation() {
return dateOfTransformation;
}

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.io.StringReader;
import java.nio.file.Path;
import java.util.Map;
import java.util.stream.Collectors;
@ -19,6 +20,8 @@ import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.io.SAXReader;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@ -28,6 +31,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.schema.mdstore.Provenance;
import eu.dnetlib.dhp.transformation.xslt.DateCleaner;
@ -83,9 +87,11 @@ class TransformationJobTest extends AbstractVocabularyTest {
@DisplayName("Test Transform Inst.&Them.v4 record XML with zenodo_tr")
void testTransformITGv4Zenodo() throws Exception {
final String dsName = "Zenodo";
final String dsId = "opendoar___::1234";
// We Set the input Record getting the XML from the classpath
final MetadataRecord mr = new MetadataRecord();
mr.setProvenance(new Provenance("DSID", "DSNAME", "PREFIX"));
mr.setProvenance(new Provenance(dsId, dsName, "PREFIX"));
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input_itgv4.xml")));
// We Load the XSLT transformation Rule from the classpath
final XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/zenodo_tr.xslt");
@ -94,7 +100,12 @@ class TransformationJobTest extends AbstractVocabularyTest {
// Print the record
System.out.println(result.getBody());
// TODO Create significant Assert
Document record = new SAXReader().read(new StringReader(result.getBody()));
assertEquals(dsName, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@name"));
assertEquals(dsId, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@id"));
// TODO Create more significant Asserts
}
@Test
@ -282,7 +293,9 @@ class TransformationJobTest extends AbstractVocabularyTest {
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
final LongAccumulator la = new LongAccumulator();
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
final AggregationCounter counter = new AggregationCounter(la, la, la);
final AggregatorReport report = new AggregatorReport();
return new XSLTTransformationFunction(counter, report, trValue, 0, vocabularies);
}
}