From bbb87d0e3df1d66ad9f74b812cd9fc310c9f0793 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Thu, 10 Oct 2019 11:33:51 +0200 Subject: [PATCH] implemented saxonHE on transformation spark job --- .../collector/worker/DnetCollectorWorker.java | 2 +- .../ArgumentApplicationParser.java | 3 +- dhp-workflows/dhp-aggregation/pom.xml | 6 + .../dhp/transformation/TransformFunction.java | 31 +++-- .../transformation/TransformSparkJobNode.java | 128 ++++-------------- .../dhp/transformation/functions/Cleaner.java | 30 ++++ .../transformation_input_parameters.json | 16 +++ .../transformation/TransformationJobTest.java | 33 ++++- .../eu/dnetlib/dhp/transform/ext_simple.xsl | 23 ++++ .../resources/eu/dnetlib/dhp/transform/tr.xml | 10 +- 10 files changed, 156 insertions(+), 126 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/functions/Cleaner.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java index 39f2872d4..3d458d8fc 100644 --- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java +++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java @@ -62,7 +62,7 @@ public class DnetCollectorWorker { System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS")); System.setProperty("hadoop.home.dir", "/"); //Get the filesystem - HDFS - FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf); + FileSystem.get(URI.create(hdfsuri), conf); Path hdfswritepath = new Path(argumentParser.get("hdfsPath")); log.info("Created path " + hdfswritepath.toString()); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java index 5c8e1f627..fc83a8d69 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java @@ -3,11 +3,12 @@ package eu.dnetlib.dhp.application; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.cli.*; +import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.Map; -public class ArgumentApplicationParser { +public class ArgumentApplicationParser implements Serializable { private final Options options = new Options(); private final Map objectMap = new HashMap<>(); diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml index 21ddaea99..8402f7350 100644 --- a/dhp-workflows/dhp-aggregation/pom.xml +++ b/dhp-workflows/dhp-aggregation/pom.xml @@ -27,6 +27,12 @@ 1.0.0-SNAPSHOT + + net.sf.saxon + Saxon-HE + 9.5.1-5 + + diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java index b6d247e13..c186058a3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformFunction.java @@ -1,13 +1,11 @@ package eu.dnetlib.dhp.transformation; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.transformation.functions.Cleaner; +import net.sf.saxon.s9api.*; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.util.LongAccumulator; -import javax.xml.transform.OutputKeys; -import javax.xml.transform.Transformer; -import javax.xml.transform.TransformerFactory; -import javax.xml.transform.stream.StreamResult; import javax.xml.transform.stream.StreamSource; import java.io.ByteArrayInputStream; import java.io.StringWriter; @@ -20,10 +18,11 @@ public class TransformFunction implements MapFunction encoder = Encoders.bean(MetadataRecord.class); final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); - final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); - final TransformFunction transformFunction = new TransformFunction(totalItems, errorItems, transformedItems, trasformationRule, dateOfCollection) ; mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath); - - if (rabbitHost != null) { - System.out.println("SEND FINAL REPORT"); - final Map reportMap = new HashMap<>(); reportMap.put("inputItem" , ""+ totalItems.value()); reportMap.put("invalidRecords", "" + errorItems.value()); reportMap.put("mdStoreSize", "" + transformedItems.value()); - final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null); - - System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap)); - manager.sendMessage(new Message(workflowId, "Transform", MessageType.REPORT, reportMap), rabbitReportQueue, true, false); - manager.close(); + if (!test) { + final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null); + manager.sendMessage(new Message(workflowId, "Transform", MessageType.REPORT, reportMap), rabbitReportQueue, true, false); + manager.close(); + } } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/functions/Cleaner.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/functions/Cleaner.java new file mode 100644 index 000000000..a0d4be94c --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/functions/Cleaner.java @@ -0,0 +1,30 @@ +package eu.dnetlib.dhp.transformation.functions; + +import net.sf.saxon.s9api.*; + +public class Cleaner implements ExtensionFunction { + + @Override + public QName getName() { + return new QName("http://eu/dnetlib/trasform/extension", "clean"); + } + + @Override + public SequenceType getResultType() { + return SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE); + } + + @Override + public SequenceType[] getArgumentTypes() { + return new SequenceType[] + { + SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE) + }; + } + + @Override + public XdmValue call(XdmValue[] xdmValues) throws SaxonApiException { + final String currentValue = xdmValues[0].itemAt(0).getStringValue(); + return new XdmAtomicValue("cleaned"+currentValue); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json new file mode 100644 index 000000000..3af21f53f --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json @@ -0,0 +1,16 @@ +[ + {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true}, + {"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true}, + {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}, + {"paramName":"tr", "paramLongName":"transformationRule","paramDescription": "the transformation Rule to apply to the input MDStore", "paramRequired": true}, + {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, + {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, + {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, + {"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false} + + +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 547b53187..0dad1743b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -1,7 +1,9 @@ package eu.dnetlib.dhp.transformation; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.transformation.functions.Cleaner; import eu.dnetlib.dhp.utils.DHPUtils; +import net.sf.saxon.s9api.*; import org.apache.commons.io.IOUtils; import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; @@ -14,21 +16,41 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; +import javax.xml.transform.stream.StreamSource; import java.io.File; +import java.io.StringWriter; import java.nio.file.Files; import java.nio.file.Path; import java.util.Comparator; public class TransformationJobTest { - - @Mock LongAccumulator accumulator; @Rule public MockitoRule mockitoRule = MockitoJUnit.rule(); + @Test + public void testTransformSaxonHE() throws Exception { + Cleaner cleanFunction = new Cleaner(); + Processor proc = new Processor(false); + proc.registerExtensionFunction(cleanFunction); + final XsltCompiler comp = proc.newXsltCompiler(); + XsltExecutable exp = comp.compile(new StreamSource(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/ext_simple.xsl"))); + XdmNode source = proc.newDocumentBuilder().build(new StreamSource(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml"))); + XsltTransformer trans = exp.load(); + trans.setInitialContextNode(source); + final StringWriter output = new StringWriter(); + Serializer out = proc.newSerializer(output); + out.setOutputProperty(Serializer.Property.METHOD,"xml"); + out.setOutputProperty(Serializer.Property.INDENT, "yes"); + trans.setDestination(out); + trans.transform(); + System.out.println(output.toString()); + } + + @Test public void transformTest() throws Exception { final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile(); @@ -39,7 +61,7 @@ public class TransformationJobTest { final String xslt = DHPUtils.compressString(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"))); System.out.println(xslt); - TransformSparkJobNode.main(new String[]{"-mt","local", "-i", mdstore_input, "-o", mdstore_output,"-d","1", "-w","1","-tr", xslt}); + TransformSparkJobNode.main(new String[]{"-mt","local", "-i", mdstore_input, "-o", mdstore_output,"-d","1", "-w","1","-tr", xslt, "-t", "true", "-ru","", "-rp","", "-rh","", "-ro","", "-rr",""}); Files.walk(tempDirWithPrefix) .sorted(Comparator.reverseOrder()) @@ -64,9 +86,6 @@ public class TransformationJobTest { @Test public void testTransformFunction() throws Exception { - - final String xmlTr = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")); - SAXReader reader = new SAXReader(); Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")); Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']"); @@ -79,6 +98,8 @@ public class TransformationJobTest { final MetadataRecord result = tf.call(record); Assert.assertNotNull(result.getBody()); + + System.out.println(result.getBody()); } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl new file mode 100644 index 000000000..90818e526 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl @@ -0,0 +1,23 @@ + + + + + + + + + + + + incomplete + collected + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml index 9e9c23b78..219b03ca3 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/tr.xml @@ -15,12 +15,18 @@ - + + + + + incomplete