diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 5edf89a050..d49846b235 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -3,6 +3,7 @@ package eu.dnetlib.dhp.collection; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.Provenance; +import org.apache.commons.cli.*; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -19,6 +20,7 @@ import org.dom4j.Node; import org.dom4j.io.SAXReader; import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; import java.util.Objects; public class GenerateNativeStoreSparkJob { @@ -30,7 +32,7 @@ public class GenerateNativeStoreSparkJob { totalItems.add(1); try { SAXReader reader = new SAXReader(); - Document document = reader.read(new ByteArrayInputStream(input.getBytes("UTF-8"))); + Document document = reader.read(new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))); Node node = document.selectSingleNode(xpath); final String originalIdentifier = node.getText(); if (StringUtils.isBlank(originalIdentifier)) { @@ -49,22 +51,64 @@ public class GenerateNativeStoreSparkJob { } + + + public static void main(String[] args) throws Exception { - if (args == null || args.length != 6) - //TODO Create a DHPWFException - throw new Exception("unexpected number of parameters "); + Options options = new Options(); + options.addOption(Option.builder("e") + .longOpt("encoding") + .required(true) + .desc("the encoding type should be xml or json") + .hasArg() // This option has an argument. + .build()); + options.addOption(Option.builder("d") + .longOpt("dateOfCollection") + .required(true) + .desc("the date of collection") + .hasArg() // This option has an argument. + .build()); - final String encoding = args[0]; - final long dateOfCollection = Long.valueOf(args[1]); - final String jsonProvenance = args[2]; + options.addOption(Option.builder("p") + .longOpt("provenance") + .required(true) + .desc("the json Provenance information") + .hasArg() // This option has an argument. + .build()); + + options.addOption(Option.builder("x") + .longOpt("xpath") + .required(true) + .desc("xpath of the identifier") + .hasArg() // This option has an argument. + .build()); + + options.addOption(Option.builder("i") + .longOpt("input") + .required(true) + .desc("input path of the sequence file") + .hasArg() // This option has an argument. + .build()); + options.addOption(Option.builder("o") + .longOpt("output") + .required(true) + .desc("output path of the mdstore") + .hasArg() + .build()); + + + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse( options, args); + + final String encoding = cmd.getOptionValue("e"); + final long dateOfCollection = new Long(cmd.getOptionValue("d")); + final String jsonProvenance = cmd.getOptionValue("p"); final ObjectMapper jsonMapper = new ObjectMapper(); final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class); - final String xpath = args[3]; - final String inputPath = args[4]; - final String outputPath = args[5]; - - + final String xpath = cmd.getOptionValue("x"); + final String inputPath = cmd.getOptionValue("i"); + final String outputPath = cmd.getOptionValue("o"); final SparkSession spark = SparkSession .builder() @@ -85,21 +129,11 @@ public class GenerateNativeStoreSparkJob { final Encoder encoder = Encoders.bean(MetadataRecord.class); final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); - - final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); mdStoreRecords.add(mdstore.count()); System.out.println("totalItems.value() = " + totalItems.value()); System.out.println("invalidRecords = " + invalidRecords.value()); System.out.println("mdstoreRecords.value() = " + mdStoreRecords.value()); - mdstore.write().format("parquet").save(outputPath); - - - - - - - } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml index c14e280bb1..0deda423ba 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/oozie/workflows/oozie_collection_workflows.xml @@ -1,32 +1,66 @@ + + + sequenceFilePath + the path to store the sequence file of the native metadata collected + + + + mdStorePath + the path of the native mdstore + + + + apiDescription + A json encoding of the API Description class + + + + dataSourceInfo + A json encoding of the Datasource Info + + + identifierPath + An xpath to retrieve the metadata idnentifier for the generation of DNet Identifier + + + + metadataEncoding + The type of the metadata XML/JSON + + + + timestamp + The timestamp of the collection date + + + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - + + - ${jobTracker} ${nameNode} lib/dhp-collector-worker-1.0.0.jar - /user/sandro.labruzzo/mdstores/oai_1 - {"id":"oai","baseUrl":"http://www.revista.vocesdelaeducacion.com.mx/index.php/index/oai","protocol":"oai","params":{"format":"oai_dc"}} + ${sequenceFilePath} + ${apiDescription} ${nameNode} - ${jobTracker} @@ -37,17 +71,15 @@ eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob dhp-aggregations-1.0.0-SNAPSHOT.jar --num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" - XML - 1000 - {"datasourceId":"pippo","datasourceName":"puppa","nsPrefix":"ns_prefix"} - ./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier'] - /user/sandro.labruzzo/mdstores/oai_1 - /user/sandro.labruzzo/mdstores/mdstore_1 + --encoding ${metadataEncoding} + --dateOfCollection ${timestamp} + --provenance ${dataSourceInfo} + --xpath${identifierPath} + --input${sequenceFilePath} + --output${mdStorePath} - - \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java new file mode 100644 index 0000000000..61bb472cee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collection/CollectionJobTest.java @@ -0,0 +1,51 @@ +package eu.dnetlib.dhp.collection; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.model.mdstore.Provenance; +import org.apache.commons.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class CollectionJobTest { + + + @Test + public void test () throws Exception { + Provenance provenance = new Provenance("pippo", "puppa", "ns_prefix"); + + +// GenerateNativeStoreSparkJob.main(new String[] {"XML", ""+System.currentTimeMillis(), new ObjectMapper().writeValueAsString(provenance), "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']","/home/sandro/Downloads/mdstore_oai","/home/sandro/Downloads/mdstore_result"}); + System.out.println(new ObjectMapper().writeValueAsString(provenance)); + } + + + @Test + public void testGenerationMetadataRecord() throws Exception { + + final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml")); + + MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null); + + System.out.println(record.getId()); + System.out.println(record.getOriginalId()); + + + } + + + @Test + public void TestEquals () throws IOException { + + final String xml = IOUtils.toString(this.getClass().getResourceAsStream("./record.xml")); + MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null); + MetadataRecord record1 = GenerateNativeStoreSparkJob.parseRecord(xml, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", "XML", new Provenance("foo", "bar", "ns_prefix"), System.currentTimeMillis(), null,null); + record.setBody("ciao"); + record1.setBody("mondo"); + Assert.assertTrue(record.equals(record1)); + + } + +} diff --git a/pom.xml b/pom.xml index 1cf33cef0c..fde394c834 100644 --- a/pom.xml +++ b/pom.xml @@ -130,6 +130,12 @@ 1.9 + + commons-cli + commons-cli + 1.4 + +