From 4b8c7c279d36f1f735de97ce1bdb98f491c8d027 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Mon, 7 Oct 2019 17:02:53 +0200 Subject: [PATCH] Added documentation on a class, and reused ArgumetApplicationParser on dhp-aggregation --- .../dhp/model/mdstore/MetadataRecord.java | 22 ++- .../GenerateNativeStoreSparkJob.java | 126 ++---------------- .../collection_input_parameters.json | 14 ++ 3 files changed, 48 insertions(+), 114 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java index 3be1e13862..42859061c5 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/model/mdstore/MetadataRecord.java @@ -7,21 +7,41 @@ import java.io.Serializable; /** - * This class models a record inside the new MetadataStore + * This class models a record inside the new Metadata store collection on HDFS * * */ public class MetadataRecord implements Serializable { + /** + * The D-Net Identifier associated to the record + */ private String id; + /** + * The original Identifier of the record + */ private String originalId; + + /** + * The encoding of the record, should be JSON or XML + */ private String encoding; + /** + * The information about the provenance of the record see @{@link Provenance} + * for the model of this information + */ private Provenance provenance; + /** + * The content of the metadata + */ private String body; + /** + * the date when the record has been stored + */ private long dateOfCollection; diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 5973153cd7..958b271a65 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -1,12 +1,14 @@ package eu.dnetlib.dhp.collection; import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.Provenance; import eu.dnetlib.message.Message; import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageType; import org.apache.commons.cli.*; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; @@ -57,26 +59,11 @@ public class GenerateNativeStoreSparkJob { public static void main(String[] args) throws Exception { - Options options = generateApplicationArguments(); - - - CommandLineParser parser = new DefaultParser(); - CommandLine cmd = parser.parse( options, args); - - final String encoding = cmd.getOptionValue("e"); - final long dateOfCollection = new Long(cmd.getOptionValue("d")); - final String jsonProvenance = cmd.getOptionValue("p"); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(GenerateNativeStoreSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); + parser.parseArgument(args); final ObjectMapper jsonMapper = new ObjectMapper(); - final Provenance provenance = jsonMapper.readValue(jsonProvenance, Provenance.class); - final String xpath = cmd.getOptionValue("x"); - final String inputPath = cmd.getOptionValue("i"); - final String outputPath = cmd.getOptionValue("o"); - final String rabbitUser = cmd.getOptionValue("ru"); - final String rabbitPassword = cmd.getOptionValue("rp"); - final String rabbitHost = cmd.getOptionValue("rh"); - final String rabbitOngoingQueue = cmd.getOptionValue("ro"); - final String rabbitReportQueue = cmd.getOptionValue("rr"); - final String workflowId = cmd.getOptionValue("w"); + final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); + final long dateOfCollection = new Long(parser.get("dateOfCollection")); final SparkSession spark = SparkSession .builder() @@ -89,118 +76,31 @@ public class GenerateNativeStoreSparkJob { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - final JavaPairRDD inputRDD = sc.sequenceFile(inputPath, IntWritable.class, Text.class); + final JavaPairRDD inputRDD = sc.sequenceFile(parser.get("input"), IntWritable.class, Text.class); final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); - final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null); + final MessageManager manager = new MessageManager(parser.get("rabbitHost"), parser.get("rabbitUser"), parser.get("rabbitPassword"), false, false, null); - final JavaRDD mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords)) + final JavaRDD mappeRDD = inputRDD.map(item -> parseRecord(item._2().toString(), parser.get("xpath"), parser.get("encoding"),provenance, dateOfCollection, totalItems, invalidRecords)) .filter(Objects::nonNull).distinct(); ongoingMap.put("ongoing", "0"); - manager.sendMessage(new Message(workflowId,"DataFrameCreation", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false); + manager.sendMessage(new Message(parser.get("workflowId"),"DataFrameCreation", MessageType.ONGOING, ongoingMap ), parser.get("rabbitOngoingQueue"), true, false); final Encoder encoder = Encoders.bean(MetadataRecord.class); final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); mdStoreRecords.add(mdstore.count()); ongoingMap.put("ongoing", ""+ totalItems.value()); - manager.sendMessage(new Message(workflowId,"DataFrameCreation", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false); + manager.sendMessage(new Message(parser.get("workflowId"),"DataFrameCreation", MessageType.ONGOING, ongoingMap ), parser.get("rabbitOngoingQueue"), true, false); - mdstore.write().format("parquet").save(outputPath); + mdstore.write().format("parquet").save(parser.get("output")); reportMap.put("inputItem" , ""+ totalItems.value()); reportMap.put("invalidRecords", "" + invalidRecords.value()); reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); - manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false); - } - - private static Options generateApplicationArguments() { - Options options = new Options(); - options.addOption(Option.builder("e") - .longOpt("encoding") - .required(true) - .desc("the encoding type should be xml or json") - .hasArg() // This option has an argument. - .build()); - options.addOption(Option.builder("d") - .longOpt("dateOfCollection") - .required(true) - .desc("the date of collection") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("p") - .longOpt("provenance") - .required(true) - .desc("the json Provenance information") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("x") - .longOpt("xpath") - .required(true) - .desc("xpath of the identifier") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("i") - .longOpt("input") - .required(true) - .desc("input path of the sequence file") - .hasArg() // This option has an argument. - .build()); - options.addOption(Option.builder("o") - .longOpt("output") - .required(true) - .desc("output path of the mdstore") - .hasArg() - .build()); - - options.addOption(Option.builder("ru") - .longOpt("rabbitUser") - .required(true) - .desc("the user to connect with RabbitMq for messaging") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("rp") - .longOpt("rabbitPassWord") - .required(true) - .desc("the password to connect with RabbitMq for messaging") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("rh") - .longOpt("rabbitHost") - .required(true) - .desc("the host of the RabbitMq server") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("ro") - .longOpt("rabbitOngoingQueue") - .required(true) - .desc("the name of the ongoing queue") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("rr") - .longOpt("rabbitReportQueue") - .required(true) - .desc("the name of the report queue") - .hasArg() // This option has an argument. - .build()); - - - options.addOption(Option.builder("w") - .longOpt("workflowId") - .required(true) - .desc("the identifier of the dnet Workflow") - .hasArg() // This option has an argument. - .build()); - return options; + manager.sendMessage(new Message(parser.get("workflowId"),"Collection", MessageType.REPORT, reportMap ), parser.get("rabbitReportQueue"), true, false); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json new file mode 100644 index 0000000000..ed8d04315b --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json @@ -0,0 +1,14 @@ +[ + {"paramName":"e", "paramLongName":"encoding", "paramDescription": "the encoding of the input record should be JSON or XML", "paramRequired": true}, + {"paramName":"d", "paramLongName":"dateOfCollection", "paramDescription": "the date when the record has been stored", "paramRequired": true}, + {"paramName":"p", "paramLongName":"provenance", "paramDescription": "the infos about the provenance of the collected records", "paramRequired": true}, + {"paramName":"x", "paramLongName":"xpath", "paramDescription": "the xpath to identify the record ifentifier", "paramRequired": true}, + {"paramName":"i", "paramLongName":"input", "paramDescription": "the path of the sequencial file to read", "paramRequired": true}, + {"paramName":"o", "paramLongName":"output", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true}, + {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, + {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, + {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true} +] \ No newline at end of file