implemented saxonHE on transformation spark job

This commit is contained in:
Sandro La Bruzzo 2019-10-10 11:33:51 +02:00
parent 4b8c7c279d
commit bbb87d0e3d
10 changed files with 156 additions and 126 deletions

View File

@ -62,7 +62,7 @@ public class DnetCollectorWorker {
System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS")); System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS"));
System.setProperty("hadoop.home.dir", "/"); System.setProperty("hadoop.home.dir", "/");
//Get the filesystem - HDFS //Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf); FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(argumentParser.get("hdfsPath")); Path hdfswritepath = new Path(argumentParser.get("hdfsPath"));
log.info("Created path " + hdfswritepath.toString()); log.info("Created path " + hdfswritepath.toString());

View File

@ -3,11 +3,12 @@ package eu.dnetlib.dhp.application;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.*; import org.apache.commons.cli.*;
import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class ArgumentApplicationParser { public class ArgumentApplicationParser implements Serializable {
private final Options options = new Options(); private final Options options = new Options();
private final Map<String, String> objectMap = new HashMap<>(); private final Map<String, String> objectMap = new HashMap<>();

View File

@ -27,6 +27,12 @@
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>net.sf.saxon</groupId>
<artifactId>Saxon-HE</artifactId>
<version>9.5.1-5</version>
</dependency>
<dependency> <dependency>

View File

@ -1,13 +1,11 @@
package eu.dnetlib.dhp.transformation; package eu.dnetlib.dhp.transformation;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner;
import net.sf.saxon.s9api.*;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource; import javax.xml.transform.stream.StreamSource;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.StringWriter; import java.io.StringWriter;
@ -20,10 +18,11 @@ public class TransformFunction implements MapFunction<MetadataRecord, MetadataRe
private final LongAccumulator transformedItems; private final LongAccumulator transformedItems;
private final String trasformationRule; private final String trasformationRule;
private final long dateOfTransformation; private final long dateOfTransformation;
public TransformFunction(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator transformedItems, final String trasformationRule, long dateOfTransformation) { public TransformFunction(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator transformedItems, final String trasformationRule, long dateOfTransformation) throws Exception {
this.totalItems= totalItems; this.totalItems= totalItems;
this.errorItems = errorItems; this.errorItems = errorItems;
this.transformedItems = transformedItems; this.transformedItems = transformedItems;
@ -35,13 +34,20 @@ public class TransformFunction implements MapFunction<MetadataRecord, MetadataRe
public MetadataRecord call(MetadataRecord value) { public MetadataRecord call(MetadataRecord value) {
totalItems.add(1); totalItems.add(1);
try { try {
final TransformerFactory factory = TransformerFactory.newInstance(); final Cleaner cleanFunction = new Cleaner();
factory.newTransformer(); Processor processor = new Processor(false);
final StreamSource xsltSource = new StreamSource(new ByteArrayInputStream(trasformationRule.getBytes())); processor.registerExtensionFunction(cleanFunction);
final Transformer transformer = factory.newTransformer(xsltSource); final XsltCompiler comp = processor.newXsltCompiler();
transformer.setOutputProperty(OutputKeys.INDENT, "yes"); XsltExecutable xslt = comp.compile(new StreamSource(new ByteArrayInputStream(trasformationRule.getBytes())));
XdmNode source = processor.newDocumentBuilder().build(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes())));
XsltTransformer trans = xslt.load();
trans.setInitialContextNode(source);
final StringWriter output = new StringWriter(); final StringWriter output = new StringWriter();
transformer.transform(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes())), new StreamResult(output)); Serializer out = processor.newSerializer(output);
out.setOutputProperty(Serializer.Property.METHOD,"xml");
out.setOutputProperty(Serializer.Property.INDENT, "yes");
trans.setDestination(out);
trans.transform();
final String xml = output.toString(); final String xml = output.toString();
value.setBody(xml); value.setBody(xml);
value.setDateOfCollection(dateOfTransformation); value.setDateOfCollection(dateOfTransformation);
@ -52,4 +58,7 @@ public class TransformFunction implements MapFunction<MetadataRecord, MetadataRe
return null; return null;
} }
} }
} }

View File

@ -1,11 +1,14 @@
package eu.dnetlib.dhp.transformation; package eu.dnetlib.dhp.transformation;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.message.Message; import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType; import eu.dnetlib.message.MessageType;
import org.apache.commons.cli.*; import org.apache.commons.cli.*;
import org.apache.commons.io.IOUtils;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
@ -17,8 +20,10 @@ import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public class TransformSparkJobNode { public class TransformSparkJobNode {
@ -26,101 +31,21 @@ public class TransformSparkJobNode {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
for (int i = 0; i < args.length; i++) { final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(TransformSparkJobNode.class.getResourceAsStream("/eu/dnetlib/dhp/transformation/transformation_input_parameters.json")));
System.out.println(args[i]);
}
Options options = new Options(); parser.parseArgument(args);
options.addOption(Option.builder("mt") final String inputPath = parser.get("input");
.longOpt("master") final String outputPath = parser.get("output");
.required(true) final String workflowId = parser.get("workflowId");
.desc("should be local or yarn") final String trasformationRule = extractXSLTFromTR(Objects.requireNonNull(DHPUtils.decompressString(parser.get("transformationRule"))));
.hasArg() // This option has an argument. final String master = parser.get("master");
.build()); final String rabbitUser = parser.get("rabbitUser");
final String rabbitPassword = parser.get("rabbitPassword");
options.addOption(Option.builder("d") final String rabbitHost = parser.get("rabbitHost");
.longOpt("dateOfCollection") final String rabbitReportQueue = parser.get("rabbitReportQueue");
.required(true) final long dateOfCollection = new Long(parser.get("dateOfCollection"));
.desc("the date of collection") final boolean test = parser.get("isTest") == null?false: Boolean.valueOf(parser.get("isTest"));
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("i")
.longOpt("input")
.required(true)
.desc("input path of the sequence file")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("o")
.longOpt("output")
.required(true)
.desc("output path of the mdstore")
.hasArg()
.build());
options.addOption(Option.builder("w")
.longOpt("workflowId")
.required(true)
.desc("the identifier of the dnet Workflow")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("tr")
.longOpt("transformationRule")
.required(true)
.desc("the transformation Rule to apply to the input MDStore")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("ru")
.longOpt("rabbitUser")
.required(false)
.desc("the user to connect with RabbitMq for messaging")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("rp")
.longOpt("rabbitPassWord")
.required(false)
.desc("the password to connect with RabbitMq for messaging")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("rh")
.longOpt("rabbitHost")
.required(false)
.desc("the host of the RabbitMq server")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("ro")
.longOpt("rabbitOngoingQueue")
.required(false)
.desc("the name of the ongoing queue")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("rr")
.longOpt("rabbitReportQueue")
.required(false)
.desc("the name of the report queue")
.hasArg() // This option has an argument.
.build());
final CommandLineParser parser = new DefaultParser();
final CommandLine cmd = parser.parse( options, args);
final String inputPath = cmd.getOptionValue("i");
final String outputPath = cmd.getOptionValue("o");
final String workflowId = cmd.getOptionValue("w");
final String trasformationRule = extractXSLTFromTR(DHPUtils.decompressString(cmd.getOptionValue("tr")));
final String master = cmd.getOptionValue("mt");
final String rabbitUser = cmd.getOptionValue("ru");
final String rabbitPassword = cmd.getOptionValue("rp");
final String rabbitHost = cmd.getOptionValue("rh");
final String rabbitReportQueue = cmd.getOptionValue("rr");
final long dateOfCollection = new Long(cmd.getOptionValue("d"));
final SparkSession spark = SparkSession final SparkSession spark = SparkSession
.builder() .builder()
@ -130,31 +55,24 @@ public class TransformSparkJobNode {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); final Dataset<MetadataRecord> mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder);
final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems");
final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems");
final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems");
final TransformFunction transformFunction = new TransformFunction(totalItems, errorItems, transformedItems, trasformationRule, dateOfCollection) ; final TransformFunction transformFunction = new TransformFunction(totalItems, errorItems, transformedItems, trasformationRule, dateOfCollection) ;
mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath); mdstoreInput.map(transformFunction, encoder).write().format("parquet").save(outputPath);
if (rabbitHost != null) { if (rabbitHost != null) {
System.out.println("SEND FINAL REPORT"); System.out.println("SEND FINAL REPORT");
final Map<String, String> reportMap = new HashMap<>(); final Map<String, String> reportMap = new HashMap<>();
reportMap.put("inputItem" , ""+ totalItems.value()); reportMap.put("inputItem" , ""+ totalItems.value());
reportMap.put("invalidRecords", "" + errorItems.value()); reportMap.put("invalidRecords", "" + errorItems.value());
reportMap.put("mdStoreSize", "" + transformedItems.value()); reportMap.put("mdStoreSize", "" + transformedItems.value());
final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap)); System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap));
manager.sendMessage(new Message(workflowId, "Transform", MessageType.REPORT, reportMap), rabbitReportQueue, true, false); if (!test) {
manager.close(); final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null);
manager.sendMessage(new Message(workflowId, "Transform", MessageType.REPORT, reportMap), rabbitReportQueue, true, false);
manager.close();
}
} }
} }

View File

@ -0,0 +1,30 @@
package eu.dnetlib.dhp.transformation.functions;
import net.sf.saxon.s9api.*;
public class Cleaner implements ExtensionFunction {
@Override
public QName getName() {
return new QName("http://eu/dnetlib/trasform/extension", "clean");
}
@Override
public SequenceType getResultType() {
return SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE);
}
@Override
public SequenceType[] getArgumentTypes() {
return new SequenceType[]
{
SequenceType.makeSequenceType(ItemType.STRING, OccurrenceIndicator.ONE)
};
}
@Override
public XdmValue call(XdmValue[] xdmValues) throws SaxonApiException {
final String currentValue = xdmValues[0].itemAt(0).getStringValue();
return new XdmAtomicValue("cleaned"+currentValue);
}
}

View File

@ -0,0 +1,16 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true},
{"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true},
{"paramName":"tr", "paramLongName":"transformationRule","paramDescription": "the transformation Rule to apply to the input MDStore", "paramRequired": true},
{"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
{"paramName":"t", "paramLongName":"isTest", "paramDescription": "the name of the report queue", "paramRequired": false}
]

View File

@ -1,7 +1,9 @@
package eu.dnetlib.dhp.transformation; package eu.dnetlib.dhp.transformation;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.transformation.functions.Cleaner;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import net.sf.saxon.s9api.*;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.util.LongAccumulator; import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document; import org.dom4j.Document;
@ -14,21 +16,41 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule; import org.mockito.junit.MockitoRule;
import javax.xml.transform.stream.StreamSource;
import java.io.File; import java.io.File;
import java.io.StringWriter;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Comparator; import java.util.Comparator;
public class TransformationJobTest { public class TransformationJobTest {
@Mock @Mock
LongAccumulator accumulator; LongAccumulator accumulator;
@Rule @Rule
public MockitoRule mockitoRule = MockitoJUnit.rule(); public MockitoRule mockitoRule = MockitoJUnit.rule();
@Test
public void testTransformSaxonHE() throws Exception {
Cleaner cleanFunction = new Cleaner();
Processor proc = new Processor(false);
proc.registerExtensionFunction(cleanFunction);
final XsltCompiler comp = proc.newXsltCompiler();
XsltExecutable exp = comp.compile(new StreamSource(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/ext_simple.xsl")));
XdmNode source = proc.newDocumentBuilder().build(new StreamSource(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml")));
XsltTransformer trans = exp.load();
trans.setInitialContextNode(source);
final StringWriter output = new StringWriter();
Serializer out = proc.newSerializer(output);
out.setOutputProperty(Serializer.Property.METHOD,"xml");
out.setOutputProperty(Serializer.Property.INDENT, "yes");
trans.setDestination(out);
trans.transform();
System.out.println(output.toString());
}
@Test @Test
public void transformTest() throws Exception { public void transformTest() throws Exception {
final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile(); final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstore").getFile();
@ -39,7 +61,7 @@ public class TransformationJobTest {
final String xslt = DHPUtils.compressString(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"))); final String xslt = DHPUtils.compressString(IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")));
System.out.println(xslt); System.out.println(xslt);
TransformSparkJobNode.main(new String[]{"-mt","local", "-i", mdstore_input, "-o", mdstore_output,"-d","1", "-w","1","-tr", xslt}); TransformSparkJobNode.main(new String[]{"-mt","local", "-i", mdstore_input, "-o", mdstore_output,"-d","1", "-w","1","-tr", xslt, "-t", "true", "-ru","", "-rp","", "-rh","", "-ro","", "-rr",""});
Files.walk(tempDirWithPrefix) Files.walk(tempDirWithPrefix)
.sorted(Comparator.reverseOrder()) .sorted(Comparator.reverseOrder())
@ -64,9 +86,6 @@ public class TransformationJobTest {
@Test @Test
public void testTransformFunction() throws Exception { public void testTransformFunction() throws Exception {
final String xmlTr = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
SAXReader reader = new SAXReader(); SAXReader reader = new SAXReader();
Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml")); Document document = reader.read(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/tr.xml"));
Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']"); Node node = document.selectSingleNode("//CODE/*[local-name()='stylesheet']");
@ -79,6 +98,8 @@ public class TransformationJobTest {
final MetadataRecord result = tf.call(record); final MetadataRecord result = tf.call(record);
Assert.assertNotNull(result.getBody()); Assert.assertNotNull(result.getBody());
System.out.println(result.getBody());
} }

View File

@ -0,0 +1,23 @@
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:eg="http://eu/dnetlib/trasform/extension"
version="2.0"
exclude-result-prefixes="xsl">
<xsl:template match="/">
<oai:record>
<xsl:copy-of select="//oai:header"/>
<metadata>
<xsl:for-each select="//*[local-name()='subject']">
<subject><xsl:value-of select="eg:clean(.)"/></subject>
</xsl:for-each>
</metadata>
<oaf:about>
<oaf:datainfo>
<oaf:TestValue>incomplete</oaf:TestValue>
<oaf:provisionMode>collected</oaf:provisionMode>
</oaf:datainfo>
</oaf:about>
</oai:record>
</xsl:template>
</xsl:stylesheet>

View File

@ -15,12 +15,18 @@
<CODE> <CODE>
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" <xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns:oai="http://www.openarchives.org/OAI/2.0/"
xmlns:oaf="http://namespace.openaire.eu/oaf" version="2.0" xmlns:oaf="http://namespace.openaire.eu/oaf"
xmlns:eg="http://eu/dnetlib/trasform/extension"
version="2.0"
exclude-result-prefixes="xsl"> exclude-result-prefixes="xsl">
<xsl:template match="/"> <xsl:template match="/">
<oai:record> <oai:record>
<xsl:copy-of select="//oai:header"/> <xsl:copy-of select="//oai:header"/>
<xsl:copy-of select="//oai:metadata"/> <metadata>
<xsl:for-each select="//*[local-name()='subject']">
<subject><xsl:value-of select="eg:clean(.)"/></subject>
</xsl:for-each>
</metadata>
<oaf:about> <oaf:about>
<oaf:datainfo> <oaf:datainfo>
<oaf:TestValue>incomplete</oaf:TestValue> <oaf:TestValue>incomplete</oaf:TestValue>