forked from D-Net/dnet-hadoop
Compare commits
3 Commits
master
...
transforma
Author | SHA1 | Date |
---|---|---|
Claudio Atzori | a2095dc725 | |
Claudio Atzori | f0a9c370b6 | |
Claudio Atzori | 6c805b0784 |
|
@ -77,11 +77,11 @@ public class TransformSparkJobNode {
|
||||||
.ofNullable(parser.get("recordsPerTask"))
|
.ofNullable(parser.get("recordsPerTask"))
|
||||||
.map(Integer::valueOf)
|
.map(Integer::valueOf)
|
||||||
.orElse(RECORDS_PER_TASK);
|
.orElse(RECORDS_PER_TASK);
|
||||||
|
log.info("recordsPerTask: {}", rpt);
|
||||||
|
|
||||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||||
|
|
||||||
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
|
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
|
||||||
|
|
||||||
log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
|
log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
@ -110,9 +110,6 @@ public class TransformSparkJobNode {
|
||||||
final String workflowId = args.get("workflowId");
|
final String workflowId = args.get("workflowId");
|
||||||
log.info("workflowId is {}", workflowId);
|
log.info("workflowId is {}", workflowId);
|
||||||
|
|
||||||
MapFunction<MetadataRecord, MetadataRecord> x = TransformationFactory
|
|
||||||
.getTransformationPlugin(args, ct, isLookUpService);
|
|
||||||
|
|
||||||
final Dataset<MetadataRecord> inputMDStore = spark
|
final Dataset<MetadataRecord> inputMDStore = spark
|
||||||
.read()
|
.read()
|
||||||
.format("parquet")
|
.format("parquet")
|
||||||
|
@ -123,30 +120,24 @@ public class TransformSparkJobNode {
|
||||||
|
|
||||||
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
|
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
|
||||||
try (AggregatorReport report = new AggregatorReport(messageSender)) {
|
try (AggregatorReport report = new AggregatorReport(messageSender)) {
|
||||||
try {
|
final MapFunction<MetadataRecord, MetadataRecord> tr = TransformationFactory
|
||||||
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
.getTransformationPlugin(args, ct, report, isLookUpService);
|
||||||
.javaRDD()
|
|
||||||
.repartition(getRepartitionNumber(totalInput, rpt))
|
|
||||||
.map((Function<MetadataRecord, MetadataRecord>) x::call)
|
|
||||||
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
|
||||||
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
|
||||||
|
|
||||||
log.info("Transformed item {}", ct.getProcessedItems().count());
|
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
||||||
log.info("Total item {}", ct.getTotalItems().count());
|
.javaRDD()
|
||||||
log.info("Transformation Error item {}", ct.getErrorItems().count());
|
.repartition(getRepartitionNumber(totalInput, rpt))
|
||||||
|
.map((Function<MetadataRecord, MetadataRecord>) tr::call)
|
||||||
|
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
||||||
|
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
||||||
|
|
||||||
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
|
log.info("Transformed item {}", ct.getProcessedItems().count());
|
||||||
writeHdfsFile(
|
log.info("Total item {}", ct.getTotalItems().count());
|
||||||
spark.sparkContext().hadoopConfiguration(),
|
log.info("Transformation Error item {}", ct.getErrorItems().count());
|
||||||
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
|
||||||
} catch (Throwable e) {
|
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
|
||||||
log.error("error during record transformation", e);
|
writeHdfsFile(
|
||||||
report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage());
|
spark.sparkContext().hadoopConfiguration(),
|
||||||
report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString());
|
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
||||||
report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString());
|
|
||||||
report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString());
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||||
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
||||||
|
@ -27,7 +28,8 @@ public class TransformationFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
|
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
|
||||||
final Map<String, String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
|
final Map<String, String> jobArgument, final AggregationCounter counters, final AggregatorReport report,
|
||||||
|
final ISLookUpService isLookupService)
|
||||||
throws DnetTransformationException {
|
throws DnetTransformationException {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -45,7 +47,7 @@ public class TransformationFactory {
|
||||||
transformationRuleId, isLookupService);
|
transformationRuleId, isLookupService);
|
||||||
|
|
||||||
final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation"));
|
final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation"));
|
||||||
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
|
return new XSLTTransformationFunction(counters, report, transformationRule, dateOfTransformation,
|
||||||
vocabularies);
|
vocabularies);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,22 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.transformation.xslt;
|
package eu.dnetlib.dhp.transformation.xslt;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.common.Constants.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
import javax.xml.transform.stream.StreamSource;
|
import javax.xml.transform.stream.StreamSource;
|
||||||
|
|
||||||
|
import org.apache.avro.test.specialtypes.value;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||||
import net.sf.saxon.s9api.*;
|
import net.sf.saxon.s9api.*;
|
||||||
|
@ -25,9 +31,9 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
||||||
|
|
||||||
private final AggregationCounter aggregationCounter;
|
private final AggregationCounter aggregationCounter;
|
||||||
|
|
||||||
private final String transformationRule;
|
private final AggregatorReport report;
|
||||||
|
|
||||||
private final Cleaner cleanFunction;
|
private final String transformationRule;
|
||||||
|
|
||||||
private final long dateOfTransformation;
|
private final long dateOfTransformation;
|
||||||
|
|
||||||
|
@ -35,55 +41,72 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
||||||
|
|
||||||
public XSLTTransformationFunction(
|
public XSLTTransformationFunction(
|
||||||
final AggregationCounter aggregationCounter,
|
final AggregationCounter aggregationCounter,
|
||||||
|
final AggregatorReport report,
|
||||||
final String transformationRule,
|
final String transformationRule,
|
||||||
long dateOfTransformation,
|
long dateOfTransformation,
|
||||||
final VocabularyGroup vocabularies) {
|
final VocabularyGroup vocabularies) {
|
||||||
this.aggregationCounter = aggregationCounter;
|
this.aggregationCounter = aggregationCounter;
|
||||||
|
this.report = report;
|
||||||
this.transformationRule = transformationRule;
|
this.transformationRule = transformationRule;
|
||||||
this.vocabularies = vocabularies;
|
this.vocabularies = vocabularies;
|
||||||
this.dateOfTransformation = dateOfTransformation;
|
this.dateOfTransformation = dateOfTransformation;
|
||||||
cleanFunction = new Cleaner(vocabularies);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MetadataRecord call(MetadataRecord value) {
|
public MetadataRecord call(MetadataRecord value) {
|
||||||
aggregationCounter.getTotalItems().add(1);
|
aggregationCounter.getTotalItems().add(1);
|
||||||
|
|
||||||
|
final Processor xsltProcessor = new Processor(false);
|
||||||
|
xsltProcessor.registerExtensionFunction(new Cleaner(vocabularies));
|
||||||
|
xsltProcessor.registerExtensionFunction(new DateCleaner());
|
||||||
|
xsltProcessor.registerExtensionFunction(new PersonCleaner());
|
||||||
|
|
||||||
|
final StringWriter output = new StringWriter();
|
||||||
|
final Serializer out = xsltProcessor.newSerializer(output);
|
||||||
|
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
||||||
|
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
||||||
|
|
||||||
|
XsltTransformer transformer;
|
||||||
try {
|
try {
|
||||||
Processor processor = new Processor(false);
|
transformer = xsltProcessor
|
||||||
|
.newXsltCompiler()
|
||||||
|
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)))
|
||||||
|
.load();
|
||||||
|
} catch (SaxonApiException e) {
|
||||||
|
report.put(e.getClass().getName(), e.getMessage());
|
||||||
|
try {
|
||||||
|
report.close();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new IllegalArgumentException("error compiling the XSLT", e);
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("error compiling the XSLT", e);
|
||||||
|
}
|
||||||
|
|
||||||
processor.registerExtensionFunction(cleanFunction);
|
transformer
|
||||||
processor.registerExtensionFunction(new DateCleaner());
|
.setParameter(new QName(DATASOURCE_ID_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceId()));
|
||||||
processor.registerExtensionFunction(new PersonCleaner());
|
transformer
|
||||||
|
.setParameter(
|
||||||
|
new QName(DATASOURCE_NAME_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceName()));
|
||||||
|
|
||||||
final XsltCompiler comp = processor.newXsltCompiler();
|
try {
|
||||||
QName datasourceIDParam = new QName(DATASOURCE_ID_PARAM);
|
final XdmNode source = xsltProcessor
|
||||||
comp.setParameter(datasourceIDParam, new XdmAtomicValue(value.getProvenance().getDatasourceId()));
|
|
||||||
QName datasourceNameParam = new QName(DATASOURCE_NAME_PARAM);
|
|
||||||
comp.setParameter(datasourceNameParam, new XdmAtomicValue(value.getProvenance().getDatasourceName()));
|
|
||||||
XsltExecutable xslt = comp
|
|
||||||
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)));
|
|
||||||
XdmNode source = processor
|
|
||||||
.newDocumentBuilder()
|
.newDocumentBuilder()
|
||||||
.build(new StreamSource(IOUtils.toInputStream(value.getBody(), StandardCharsets.UTF_8)));
|
.build(new StreamSource(IOUtils.toInputStream(value.getBody(), StandardCharsets.UTF_8)));
|
||||||
XsltTransformer trans = xslt.load();
|
|
||||||
trans.setInitialContextNode(source);
|
|
||||||
final StringWriter output = new StringWriter();
|
|
||||||
Serializer out = processor.newSerializer(output);
|
|
||||||
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
|
||||||
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
|
||||||
|
|
||||||
trans.setDestination(out);
|
transformer.setInitialContextNode(source);
|
||||||
trans.transform();
|
transformer.setDestination(out);
|
||||||
final String xml = output.toString();
|
transformer.transform();
|
||||||
value.setBody(xml);
|
} catch (SaxonApiException e) {
|
||||||
value.setDateOfTransformation(dateOfTransformation);
|
report.put(e.getClass().getName(), e.getMessage());
|
||||||
aggregationCounter.getProcessedItems().add(1);
|
|
||||||
return value;
|
|
||||||
} catch (Throwable e) {
|
|
||||||
aggregationCounter.getErrorItems().add(1);
|
aggregationCounter.getErrorItems().add(1);
|
||||||
return null;
|
return null;
|
||||||
// throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final String xml = output.toString();
|
||||||
|
value.setBody(xml);
|
||||||
|
value.setDateOfTransformation(dateOfTransformation);
|
||||||
|
aggregationCounter.getProcessedItems().add(1);
|
||||||
|
return value;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AggregationCounter getAggregationCounter() {
|
public AggregationCounter getAggregationCounter() {
|
||||||
|
@ -94,10 +117,6 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
||||||
return transformationRule;
|
return transformationRule;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Cleaner getCleanFunction() {
|
|
||||||
return cleanFunction;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getDateOfTransformation() {
|
public long getDateOfTransformation() {
|
||||||
return dateOfTransformation;
|
return dateOfTransformation;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.StringReader;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -19,6 +20,8 @@ import org.apache.spark.sql.Encoder;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
|
import org.dom4j.Document;
|
||||||
|
import org.dom4j.io.SAXReader;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.DisplayName;
|
import org.junit.jupiter.api.DisplayName;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -28,6 +31,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
|
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
|
||||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||||
|
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||||
import eu.dnetlib.dhp.schema.mdstore.Provenance;
|
import eu.dnetlib.dhp.schema.mdstore.Provenance;
|
||||||
import eu.dnetlib.dhp.transformation.xslt.DateCleaner;
|
import eu.dnetlib.dhp.transformation.xslt.DateCleaner;
|
||||||
|
@ -83,9 +87,11 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
||||||
@DisplayName("Test Transform Inst.&Them.v4 record XML with zenodo_tr")
|
@DisplayName("Test Transform Inst.&Them.v4 record XML with zenodo_tr")
|
||||||
void testTransformITGv4Zenodo() throws Exception {
|
void testTransformITGv4Zenodo() throws Exception {
|
||||||
|
|
||||||
|
final String dsName = "Zenodo";
|
||||||
|
final String dsId = "opendoar___::1234";
|
||||||
// We Set the input Record getting the XML from the classpath
|
// We Set the input Record getting the XML from the classpath
|
||||||
final MetadataRecord mr = new MetadataRecord();
|
final MetadataRecord mr = new MetadataRecord();
|
||||||
mr.setProvenance(new Provenance("DSID", "DSNAME", "PREFIX"));
|
mr.setProvenance(new Provenance(dsId, dsName, "PREFIX"));
|
||||||
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input_itgv4.xml")));
|
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input_itgv4.xml")));
|
||||||
// We Load the XSLT transformation Rule from the classpath
|
// We Load the XSLT transformation Rule from the classpath
|
||||||
final XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/zenodo_tr.xslt");
|
final XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/zenodo_tr.xslt");
|
||||||
|
@ -94,7 +100,12 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
||||||
|
|
||||||
// Print the record
|
// Print the record
|
||||||
System.out.println(result.getBody());
|
System.out.println(result.getBody());
|
||||||
// TODO Create significant Assert
|
|
||||||
|
Document record = new SAXReader().read(new StringReader(result.getBody()));
|
||||||
|
assertEquals(dsName, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@name"));
|
||||||
|
assertEquals(dsId, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@id"));
|
||||||
|
|
||||||
|
// TODO Create more significant Asserts
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -282,7 +293,9 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
||||||
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
|
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
|
||||||
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
||||||
final LongAccumulator la = new LongAccumulator();
|
final LongAccumulator la = new LongAccumulator();
|
||||||
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
|
final AggregationCounter counter = new AggregationCounter(la, la, la);
|
||||||
|
final AggregatorReport report = new AggregatorReport();
|
||||||
|
return new XSLTTransformationFunction(counter, report, trValue, 0, vocabularies);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue