forked from D-Net/dnet-hadoop
Compare commits
3 Commits
master
...
transforma
Author | SHA1 | Date |
---|---|---|
Claudio Atzori | a2095dc725 | |
Claudio Atzori | f0a9c370b6 | |
Claudio Atzori | 6c805b0784 |
|
@ -77,11 +77,11 @@ public class TransformSparkJobNode {
|
|||
.ofNullable(parser.get("recordsPerTask"))
|
||||
.map(Integer::valueOf)
|
||||
.orElse(RECORDS_PER_TASK);
|
||||
log.info("recordsPerTask: {}", rpt);
|
||||
|
||||
final ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
|
||||
|
||||
final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService);
|
||||
|
||||
log.info("Retrieved {} vocabularies", vocabularies.vocabularyNames().size());
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
@ -110,9 +110,6 @@ public class TransformSparkJobNode {
|
|||
final String workflowId = args.get("workflowId");
|
||||
log.info("workflowId is {}", workflowId);
|
||||
|
||||
MapFunction<MetadataRecord, MetadataRecord> x = TransformationFactory
|
||||
.getTransformationPlugin(args, ct, isLookUpService);
|
||||
|
||||
final Dataset<MetadataRecord> inputMDStore = spark
|
||||
.read()
|
||||
.format("parquet")
|
||||
|
@ -123,30 +120,24 @@ public class TransformSparkJobNode {
|
|||
|
||||
final MessageSender messageSender = new MessageSender(dnetMessageManagerURL, workflowId);
|
||||
try (AggregatorReport report = new AggregatorReport(messageSender)) {
|
||||
try {
|
||||
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
||||
.javaRDD()
|
||||
.repartition(getRepartitionNumber(totalInput, rpt))
|
||||
.map((Function<MetadataRecord, MetadataRecord>) x::call)
|
||||
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
||||
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
||||
final MapFunction<MetadataRecord, MetadataRecord> tr = TransformationFactory
|
||||
.getTransformationPlugin(args, ct, report, isLookUpService);
|
||||
|
||||
log.info("Transformed item {}", ct.getProcessedItems().count());
|
||||
log.info("Total item {}", ct.getTotalItems().count());
|
||||
log.info("Transformation Error item {}", ct.getErrorItems().count());
|
||||
JavaRDD<MetadataRecord> mdstore = inputMDStore
|
||||
.javaRDD()
|
||||
.repartition(getRepartitionNumber(totalInput, rpt))
|
||||
.map((Function<MetadataRecord, MetadataRecord>) tr::call)
|
||||
.filter((Function<MetadataRecord, Boolean>) Objects::nonNull);
|
||||
saveDataset(spark.createDataset(mdstore.rdd(), encoder), outputBasePath + MDSTORE_DATA_PATH);
|
||||
|
||||
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
|
||||
writeHdfsFile(
|
||||
spark.sparkContext().hadoopConfiguration(),
|
||||
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
||||
} catch (Throwable e) {
|
||||
log.error("error during record transformation", e);
|
||||
report.put(TransformSparkJobNode.class.getSimpleName(), e.getMessage());
|
||||
report.put(CONTENT_TOTALITEMS, ct.getTotalItems().value().toString());
|
||||
report.put(CONTENT_INVALIDRECORDS, ct.getErrorItems().value().toString());
|
||||
report.put(CONTENT_TRANSFORMEDRECORDS, ct.getProcessedItems().value().toString());
|
||||
throw e;
|
||||
}
|
||||
log.info("Transformed item {}", ct.getProcessedItems().count());
|
||||
log.info("Total item {}", ct.getTotalItems().count());
|
||||
log.info("Transformation Error item {}", ct.getErrorItems().count());
|
||||
|
||||
final long mdStoreSize = spark.read().load(outputBasePath + MDSTORE_DATA_PATH).count();
|
||||
writeHdfsFile(
|
||||
spark.sparkContext().hadoopConfiguration(),
|
||||
"" + mdStoreSize, outputBasePath + MDSTORE_SIZE_PATH);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction;
|
||||
|
@ -27,7 +28,8 @@ public class TransformationFactory {
|
|||
}
|
||||
|
||||
public static MapFunction<MetadataRecord, MetadataRecord> getTransformationPlugin(
|
||||
final Map<String, String> jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService)
|
||||
final Map<String, String> jobArgument, final AggregationCounter counters, final AggregatorReport report,
|
||||
final ISLookUpService isLookupService)
|
||||
throws DnetTransformationException {
|
||||
|
||||
try {
|
||||
|
@ -45,7 +47,7 @@ public class TransformationFactory {
|
|||
transformationRuleId, isLookupService);
|
||||
|
||||
final long dateOfTransformation = Long.parseLong(jobArgument.get("dateOfTransformation"));
|
||||
return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation,
|
||||
return new XSLTTransformationFunction(counters, report, transformationRule, dateOfTransformation,
|
||||
vocabularies);
|
||||
|
||||
}
|
||||
|
|
|
@ -1,16 +1,22 @@
|
|||
|
||||
package eu.dnetlib.dhp.transformation.xslt;
|
||||
|
||||
import static eu.dnetlib.dhp.common.Constants.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.io.StringWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
|
||||
import org.apache.avro.test.specialtypes.value;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
|
||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||
import net.sf.saxon.s9api.*;
|
||||
|
@ -25,9 +31,9 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
|||
|
||||
private final AggregationCounter aggregationCounter;
|
||||
|
||||
private final String transformationRule;
|
||||
private final AggregatorReport report;
|
||||
|
||||
private final Cleaner cleanFunction;
|
||||
private final String transformationRule;
|
||||
|
||||
private final long dateOfTransformation;
|
||||
|
||||
|
@ -35,55 +41,72 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
|||
|
||||
public XSLTTransformationFunction(
|
||||
final AggregationCounter aggregationCounter,
|
||||
final AggregatorReport report,
|
||||
final String transformationRule,
|
||||
long dateOfTransformation,
|
||||
final VocabularyGroup vocabularies) {
|
||||
this.aggregationCounter = aggregationCounter;
|
||||
this.report = report;
|
||||
this.transformationRule = transformationRule;
|
||||
this.vocabularies = vocabularies;
|
||||
this.dateOfTransformation = dateOfTransformation;
|
||||
cleanFunction = new Cleaner(vocabularies);
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetadataRecord call(MetadataRecord value) {
|
||||
aggregationCounter.getTotalItems().add(1);
|
||||
|
||||
final Processor xsltProcessor = new Processor(false);
|
||||
xsltProcessor.registerExtensionFunction(new Cleaner(vocabularies));
|
||||
xsltProcessor.registerExtensionFunction(new DateCleaner());
|
||||
xsltProcessor.registerExtensionFunction(new PersonCleaner());
|
||||
|
||||
final StringWriter output = new StringWriter();
|
||||
final Serializer out = xsltProcessor.newSerializer(output);
|
||||
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
||||
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
||||
|
||||
XsltTransformer transformer;
|
||||
try {
|
||||
Processor processor = new Processor(false);
|
||||
transformer = xsltProcessor
|
||||
.newXsltCompiler()
|
||||
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)))
|
||||
.load();
|
||||
} catch (SaxonApiException e) {
|
||||
report.put(e.getClass().getName(), e.getMessage());
|
||||
try {
|
||||
report.close();
|
||||
} catch (IOException ex) {
|
||||
throw new IllegalArgumentException("error compiling the XSLT", e);
|
||||
}
|
||||
throw new IllegalArgumentException("error compiling the XSLT", e);
|
||||
}
|
||||
|
||||
processor.registerExtensionFunction(cleanFunction);
|
||||
processor.registerExtensionFunction(new DateCleaner());
|
||||
processor.registerExtensionFunction(new PersonCleaner());
|
||||
transformer
|
||||
.setParameter(new QName(DATASOURCE_ID_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceId()));
|
||||
transformer
|
||||
.setParameter(
|
||||
new QName(DATASOURCE_NAME_PARAM), new XdmAtomicValue(value.getProvenance().getDatasourceName()));
|
||||
|
||||
final XsltCompiler comp = processor.newXsltCompiler();
|
||||
QName datasourceIDParam = new QName(DATASOURCE_ID_PARAM);
|
||||
comp.setParameter(datasourceIDParam, new XdmAtomicValue(value.getProvenance().getDatasourceId()));
|
||||
QName datasourceNameParam = new QName(DATASOURCE_NAME_PARAM);
|
||||
comp.setParameter(datasourceNameParam, new XdmAtomicValue(value.getProvenance().getDatasourceName()));
|
||||
XsltExecutable xslt = comp
|
||||
.compile(new StreamSource(IOUtils.toInputStream(transformationRule, StandardCharsets.UTF_8)));
|
||||
XdmNode source = processor
|
||||
try {
|
||||
final XdmNode source = xsltProcessor
|
||||
.newDocumentBuilder()
|
||||
.build(new StreamSource(IOUtils.toInputStream(value.getBody(), StandardCharsets.UTF_8)));
|
||||
XsltTransformer trans = xslt.load();
|
||||
trans.setInitialContextNode(source);
|
||||
final StringWriter output = new StringWriter();
|
||||
Serializer out = processor.newSerializer(output);
|
||||
out.setOutputProperty(Serializer.Property.METHOD, "xml");
|
||||
out.setOutputProperty(Serializer.Property.INDENT, "yes");
|
||||
|
||||
trans.setDestination(out);
|
||||
trans.transform();
|
||||
final String xml = output.toString();
|
||||
value.setBody(xml);
|
||||
value.setDateOfTransformation(dateOfTransformation);
|
||||
aggregationCounter.getProcessedItems().add(1);
|
||||
return value;
|
||||
} catch (Throwable e) {
|
||||
transformer.setInitialContextNode(source);
|
||||
transformer.setDestination(out);
|
||||
transformer.transform();
|
||||
} catch (SaxonApiException e) {
|
||||
report.put(e.getClass().getName(), e.getMessage());
|
||||
aggregationCounter.getErrorItems().add(1);
|
||||
return null;
|
||||
// throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
final String xml = output.toString();
|
||||
value.setBody(xml);
|
||||
value.setDateOfTransformation(dateOfTransformation);
|
||||
aggregationCounter.getProcessedItems().add(1);
|
||||
return value;
|
||||
}
|
||||
|
||||
public AggregationCounter getAggregationCounter() {
|
||||
|
@ -94,10 +117,6 @@ public class XSLTTransformationFunction implements MapFunction<MetadataRecord, M
|
|||
return transformationRule;
|
||||
}
|
||||
|
||||
public Cleaner getCleanFunction() {
|
||||
return cleanFunction;
|
||||
}
|
||||
|
||||
public long getDateOfTransformation() {
|
||||
return dateOfTransformation;
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -19,6 +20,8 @@ import org.apache.spark.sql.Encoder;
|
|||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.io.SAXReader;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -28,6 +31,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||
|
||||
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
|
||||
import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
|
||||
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
|
||||
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
|
||||
import eu.dnetlib.dhp.schema.mdstore.Provenance;
|
||||
import eu.dnetlib.dhp.transformation.xslt.DateCleaner;
|
||||
|
@ -83,9 +87,11 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
|||
@DisplayName("Test Transform Inst.&Them.v4 record XML with zenodo_tr")
|
||||
void testTransformITGv4Zenodo() throws Exception {
|
||||
|
||||
final String dsName = "Zenodo";
|
||||
final String dsId = "opendoar___::1234";
|
||||
// We Set the input Record getting the XML from the classpath
|
||||
final MetadataRecord mr = new MetadataRecord();
|
||||
mr.setProvenance(new Provenance("DSID", "DSNAME", "PREFIX"));
|
||||
mr.setProvenance(new Provenance(dsId, dsName, "PREFIX"));
|
||||
mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input_itgv4.xml")));
|
||||
// We Load the XSLT transformation Rule from the classpath
|
||||
final XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/zenodo_tr.xslt");
|
||||
|
@ -94,7 +100,12 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
|||
|
||||
// Print the record
|
||||
System.out.println(result.getBody());
|
||||
// TODO Create significant Assert
|
||||
|
||||
Document record = new SAXReader().read(new StringReader(result.getBody()));
|
||||
assertEquals(dsName, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@name"));
|
||||
assertEquals(dsId, record.valueOf("//*[local-name() = 'metadata']/*[local-name() = 'collectedFrom']/@id"));
|
||||
|
||||
// TODO Create more significant Asserts
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -282,7 +293,9 @@ class TransformationJobTest extends AbstractVocabularyTest {
|
|||
private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception {
|
||||
final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path));
|
||||
final LongAccumulator la = new LongAccumulator();
|
||||
return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies);
|
||||
final AggregationCounter counter = new AggregationCounter(la, la, la);
|
||||
final AggregatorReport report = new AggregatorReport();
|
||||
return new XSLTTransformationFunction(counter, report, trValue, 0, vocabularies);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue