This commit is contained in:
Michele Artini 2019-04-01 10:40:59 +02:00
commit 1a1ec7da8e
4 changed files with 159 additions and 36 deletions

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance; import eu.dnetlib.dhp.model.mdstore.Provenance;
import org.apache.commons.cli.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -19,6 +20,7 @@ import org.dom4j.Node;
import org.dom4j.io.SAXReader; import org.dom4j.io.SAXReader;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Objects; import java.util.Objects;
public class GenerateNativeStoreSparkJob { public class GenerateNativeStoreSparkJob {
@ -30,7 +32,7 @@ public class GenerateNativeStoreSparkJob {
totalItems.add(1); totalItems.add(1);
try { try {
SAXReader reader = new SAXReader(); SAXReader reader = new SAXReader();
Document document = reader.read(new ByteArrayInputStream(input.getBytes("UTF-8"))); Document document = reader.read(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
Node node = document.selectSingleNode(xpath); Node node = document.selectSingleNode(xpath);
final String originalIdentifier = node.getText(); final String originalIdentifier = node.getText();
if (StringUtils.isBlank(originalIdentifier)) { if (StringUtils.isBlank(originalIdentifier)) {
@ -49,22 +51,64 @@ public class GenerateNativeStoreSparkJob {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args == null || args.length != 6) Options options = new Options();
//TODO Create a DHPWFException options.addOption(Option.builder("e")
throw new Exception("unexpected number of parameters "); .longOpt("encoding")
.required(true)
.desc("the encoding type should be xml or json")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("d")
.longOpt("dateOfCollection")
.required(true)
.desc("the date of collection")
.hasArg() // This option has an argument.
.build());
final String encoding = args[0]; options.addOption(Option.builder("p")
final long dateOfCollection = Long.valueOf(args[1]); .longOpt("provenance")
final String jsonProvenance = args[2]; .required(true)
.desc("the json Provenance information")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("x")
.longOpt("xpath")
.required(true)
.desc("xpath of the identifier")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("i")
.longOpt("input")
.required(true)
.desc("input path of the sequence file")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("o")
.longOpt("output")
.required(true)
.desc("output path of the mdstore")
.hasArg()
.build());
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse( options, args);
final String encoding = cmd.getOptionValue("e");
final long dateOfCollection = new Long(cmd.getOptionValue("d"));
final String jsonProvenance = cmd.getOptionValue("p");
final ObjectMapper jsonMapper = new ObjectMapper(); final ObjectMapper jsonMapper = new ObjectMapper();
final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class); final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
final String xpath = args[3]; final String xpath = cmd.getOptionValue("x");
final String inputPath = args[4]; final String inputPath = cmd.getOptionValue("i");
final String outputPath = args[5]; final String outputPath = cmd.getOptionValue("o");
final SparkSession spark = SparkSession final SparkSession spark = SparkSession
.builder() .builder()
@ -85,21 +129,11 @@ public class GenerateNativeStoreSparkJob {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class); final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder); final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count()); mdStoreRecords.add(mdstore.count());
System.out.println("totalItems.value() = " + totalItems.value()); System.out.println("totalItems.value() = " + totalItems.value());
System.out.println("invalidRecords = " + invalidRecords.value()); System.out.println("invalidRecords = " + invalidRecords.value());
System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value()); System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value());
mdstore.write().format("parquet").save(outputPath); mdstore.write().format("parquet").save(outputPath);
} }
} }

View File

@ -1,32 +1,66 @@
<workflow-app name="Oozie_Java_Wf" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Oozie_Java_Wf" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sequenceFilePath</name>
<description>the path to store the sequence file of the native metadata collected</description>
</property>
<property>
<name>mdStorePath</name>
<description>the path of the native mdstore</description>
</property>
<property>
<name>apiDescription</name>
<description>A json encoding of the API Description class</description>
</property>
<property>
<name>dataSourceInfo</name>
<description>A json encoding of the Datasource Info</description>
</property>
<property>
<name>identifierPath</name>
<description>An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier </description>
</property>
<property>
<name>metadataEncoding</name>
<description> The type of the metadata XML/JSON</description>
</property>
<property>
<name>timestamp</name>
<description>The timestamp of the collection date</description>
</property>
</parameters>
<start to="DeleteMDStoresNative"/> <start to="DeleteMDStoresNative"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="DeleteMDStoresNative"> <action name="DeleteMDStoresNative">
<fs> <fs>
<delete path='/user/sandro.labruzzo/mdstores/oai_1'/> <delete path='${sequenceFilePath}'/>
<delete path='${mdStorePath}'/>
</fs> </fs>
<ok to="CollectionWorker"/> <ok to="CollectionWorker"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="CollectionWorker"> <action name="CollectionWorker">
<shell xmlns="uri:oozie:shell-action:0.1"> <shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node> <name-node>${nameNode}</name-node>
<exec>lib/dhp-collector-worker-1.0.0.jar</exec> <exec>lib/dhp-collector-worker-1.0.0.jar</exec>
<argument>/user/sandro.labruzzo/mdstores/oai_1</argument> <argument>${sequenceFilePath}</argument>
<argument>{&quot;id&quot;:&quot;oai&quot;,&quot;baseUrl&quot;:&quot;http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai&quot;,&quot;protocol&quot;:&quot;oai&quot;,&quot;params&quot;:{&quot;format&quot;:&quot;oai_dc&quot;}}</argument> <argument>${apiDescription}</argument>
<argument>${nameNode}</argument> <argument>${nameNode}</argument>
<capture-output/> <capture-output/>
</shell> </shell>
<ok to="mdBuilder"/> <ok to="mdBuilder"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="mdBuilder"> <action name="mdBuilder">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker> <job-tracker>${jobTracker}</job-tracker>
@ -37,17 +71,15 @@
<class>eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob</class> <class>eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob</class>
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar> <jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts> <spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts>
<arg>XML</arg> <arg>--encoding</arg> <arg>${metadataEncoding}</arg>
<arg>1000</arg> <arg>--dateOfCollection</arg> <arg>${timestamp}</arg>
<arg>{"datasourceId":"pippo","datasourceName":"puppa","nsPrefix":"ns_prefix"}</arg> <arg>--provenance</arg> <arg> ${dataSourceInfo}</arg>
<arg>./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']</arg> <arg>--xpath</arg><arg>${identifierPath}</arg>
<arg>/user/sandro.labruzzo/mdstores/oai_1</arg> <arg>--input</arg><arg>${sequenceFilePath}</arg>
<arg>/user/sandro.labruzzo/mdstores/mdstore_1</arg> <arg>--output</arg><arg>${mdStorePath}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class CollectionJobTest {
@Test
public void test () throws Exception {
Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix");
// GenerateNativeStoreSparkJob.main(new String[] {"XML", ""+System.currentTimeMillis(), new ObjectMapper().writeValueAsString(provenance), "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']","/home/sandro/Downloads/mdstore_oai","/home/sandro/Downloads/mdstore_result"});
System.out.println(new ObjectMapper().writeValueAsString(provenance));
}
@Test
public void testGenerationMetadataRecord() throws Exception {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null);
System.out.println(record.getId());
System.out.println(record.getOriginalId());
}
@Test
public void TestEquals () throws IOException {
final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml"));
MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null);
MetadataRecord record1 = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null);
record.setBody("ciao");
record1.setBody("mondo");
Assert.assertTrue(record.equals(record1));
}
}

View File

@ -130,6 +130,12 @@
<version>1.9</version> <version>1.9</version>
</dependency> </dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
</dependencies> </dependencies>