implemented command line

This commit is contained in:
Sandro La Bruzzo 2019-03-25 15:18:31 +01:00
parent 859957d0fd
commit 12c65eab4c
2 changed files with 102 additions and 36 deletions

View File

@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import org.apache.commons.cli.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@ -19,6 +20,7 @@ import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
public class GenerateNativeStoreSparkJob {
@ -30,7 +32,7 @@ public class GenerateNativeStoreSparkJob {
totalItems.add(1);
try {
SAXReader reader = new SAXReader();
Document document = reader.read(new ByteArrayInputStream(input.getBytes("UTF-8")));
Document document = reader.read(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
Node node = document.selectSingleNode(xpath);
final String originalIdentifier = node.getText();
if (StringUtils.isBlank(originalIdentifier)) {
@ -49,22 +51,64 @@ public class GenerateNativeStoreSparkJob {
}
public static void main(String[] args) throws Exception {
if (args == null || args.length != 6)
//TODO Create a DHPWFException
throw new Exception("unexpected number of parameters ");
Options options = new Options();
options.addOption(Option.builder("e")
.longOpt("encoding")
.required(true)
.desc("the encoding type should be xml or json")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("d")
.longOpt("dateOfCollection")
.required(true)
.desc("the date of collection")
.hasArg() // This option has an argument.
.build());
final String encoding = args[0];
final long dateOfCollection = Long.valueOf(args[1]);
final String jsonProvenance = args[2];
options.addOption(Option.builder("p")
.longOpt("provenance")
.required(true)
.desc("the json Provenance information")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("x")
.longOpt("xpath")
.required(true)
.desc("xpath of the identifier")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("i")
.longOpt("input")
.required(true)
.desc("input path of the sequence file")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("o")
.longOpt("output")
.required(true)
.desc("output path of the mdstore")
.hasArg()
.build());
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse( options, args);
final String encoding = cmd.getOptionValue("e");
final long dateOfCollection = new Long(cmd.getOptionValue("d"));
final String jsonProvenance = cmd.getOptionValue("p");
final ObjectMapper jsonMapper = new ObjectMapper();
final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class);
final String xpath = args[3];
final String inputPath = args[4];
final String outputPath = args[5];
final String xpath = cmd.getOptionValue("x");
final String inputPath = cmd.getOptionValue("i");
final String outputPath = cmd.getOptionValue("o");
final SparkSession spark = SparkSession
.builder()
@ -85,21 +129,11 @@ public class GenerateNativeStoreSparkJob {
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
mdStoreRecords.add(mdstore.count());
System.out.println("totalItems.value() = " + totalItems.value());
System.out.println("invalidRecords = " + invalidRecords.value());
System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value());
mdstore.write().format("parquet").save(outputPath);
}
}

View File

@ -1,32 +1,66 @@
<workflow-app name="Oozie_Java_Wf" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sequenceFilePath</name>
<description>the path to store the sequence file of the native metadata collected</description>
</property>
<property>
<name>mdStorePath</name>
<description>the path of the native mdstore</description>
</property>
<property>
<name>apiDescription</name>
<description>A json encoding of the API Description class</description>
</property>
<property>
<name>dataSourceInfo</name>
<description>A json encoding of the Datasource Info</description>
</property>
<property>
<name>identifierPath</name>
<description>An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier </description>
</property>
<property>
<name>metadataEncoding</name>
<description> The type of the metadata XML/JSON</description>
</property>
<property>
<name>timestamp</name>
<description>The timestamp of the collection date</description>
</property>
</parameters>
<start to="DeleteMDStoresNative"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="DeleteMDStoresNative">
<fs>
<delete path='/user/sandro.labruzzo/mdstores/oai_1'/>
<delete path='${sequenceFilePath}'/>
<delete path='${mdStorePath}'/>
</fs>
<ok to="CollectionWorker"/>
<error to="Kill"/>
</action>
<action name="CollectionWorker">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>lib/dhp-collector-worker-1.0.0.jar</exec>
<argument>/user/sandro.labruzzo/mdstores/oai_1</argument>
<argument>{&quot;id&quot;:&quot;oai&quot;,&quot;baseUrl&quot;:&quot;http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai&quot;,&quot;protocol&quot;:&quot;oai&quot;,&quot;params&quot;:{&quot;format&quot;:&quot;oai_dc&quot;}}</argument>
<argument>${sequenceFilePath}</argument>
<argument>${apiDescription}</argument>
<argument>${nameNode}</argument>
<capture-output/>
</shell>
<ok to="mdBuilder"/>
<error to="Kill"/>
</action>
<action name="mdBuilder">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
@ -37,17 +71,15 @@
<class>eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob</class>
<jar>dhp-aggregations-1.0.0-SNAPSHOT.jar</jar>
<spark-opts>--num-executors 50 --conf spark.yarn.jars=&quot;hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2&quot;</spark-opts>
<arg>XML</arg>
<arg>1000</arg>
<arg>{"datasourceId":"pippo","datasourceName":"puppa","nsPrefix":"ns_prefix"}</arg>
<arg>./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']</arg>
<arg>/user/sandro.labruzzo/mdstores/oai_1</arg>
<arg>/user/sandro.labruzzo/mdstores/mdstore_1</arg>
<arg>--encoding</arg> <arg>${metadataEncoding}</arg>
<arg>--dateOfCollection</arg> <arg>${timestamp}</arg>
<arg>--provenance</arg> <arg> ${dataSourceInfo}</arg>
<arg>--xpath</arg><arg>${identifierPath}</arg>
<arg>--input</arg><arg>${sequenceFilePath}</arg>
<arg>--output</arg><arg>${mdStorePath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>