diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java index ba876c6cf..39f2872d4 100644 --- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java +++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorker.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.collector.worker.plugins.CollectorPlugin; import eu.dnetlib.collector.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.message.Message; import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageType; @@ -29,12 +30,12 @@ public class DnetCollectorWorker { private final CollectorPluginFactory collectorPluginFactory; - private final DnetCollectorWorkerArgumentParser argumentParser; + private final ArgumentApplicationParser argumentParser; private final MessageManager manager; - public DnetCollectorWorker(final CollectorPluginFactory collectorPluginFactory, final DnetCollectorWorkerArgumentParser argumentParser, final MessageManager manager) throws DnetCollectorException { + public DnetCollectorWorker(final CollectorPluginFactory collectorPluginFactory, final ArgumentApplicationParser argumentParser, final MessageManager manager) throws DnetCollectorException { this.collectorPluginFactory = collectorPluginFactory; this.argumentParser = argumentParser; this.manager = manager; @@ -44,11 +45,11 @@ public class DnetCollectorWorker { public void collect() throws DnetCollectorException { try { final ObjectMapper jsonMapper = new ObjectMapper(); - final ApiDescriptor api = jsonMapper.readValue(argumentParser.getJson(), ApiDescriptor.class); + final ApiDescriptor api = jsonMapper.readValue(argumentParser.get("apidescriptor"), ApiDescriptor.class); final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol()); - final String hdfsuri = argumentParser.getNameNode(); + final String hdfsuri = argumentParser.get("namenode"); // ====== Init HDFS File System Object Configuration conf = new Configuration(); @@ -58,11 +59,11 @@ public class DnetCollectorWorker { conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - System.setProperty("HADOOP_USER_NAME", argumentParser.getUser()); + System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS")); System.setProperty("hadoop.home.dir", "/"); //Get the filesystem - HDFS FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf); - Path hdfswritepath = new Path(argumentParser.getHdfsPath()); + Path hdfswritepath = new Path(argumentParser.get("hdfsPath")); log.info("Created path " + hdfswritepath.toString()); @@ -81,7 +82,7 @@ public class DnetCollectorWorker { if (counter.get() % 10 == 0) { try { ongoingMap.put("ongoing", "" + counter.get()); - log.debug("Sending message: "+ manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.getRabbitOngoingQueue(), true, false)); + log.debug("Sending message: "+ manager.sendMessage(new Message(argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.get("rabbitOngoingQueue"), true, false)); } catch (Exception e) { log.error("Error on sending message ", e); } @@ -95,9 +96,9 @@ public class DnetCollectorWorker { }); } ongoingMap.put("ongoing", "" + counter.get()); - manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.getRabbitOngoingQueue(), true, false); + manager.sendMessage(new Message(argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.get("rabbitOngoingQueue"), true, false); reportMap.put("collected", "" + counter.get()); - manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.REPORT, reportMap), argumentParser.getRabbitOngoingQueue(), true, false); + manager.sendMessage(new Message(argumentParser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), argumentParser.get("rabbitOngoingQueue"), true, false); manager.close(); } catch (Throwable e) { throw new DnetCollectorException("Error on collecting ",e); diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java index 68c119fa8..fdd7ceb54 100644 --- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java +++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java @@ -1,7 +1,9 @@ package eu.dnetlib.collector.worker; import eu.dnetlib.collector.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.message.MessageManager; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,7 +23,7 @@ public class DnetCollectorWorkerApplication { private static CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); - private static DnetCollectorWorkerArgumentParser argumentParser = new DnetCollectorWorkerArgumentParser(); + private static ArgumentApplicationParser argumentParser; /** @@ -29,10 +31,11 @@ public class DnetCollectorWorkerApplication { */ public static void main(final String[] args) throws Exception { + argumentParser= new ArgumentApplicationParser(IOUtils.toString(DnetCollectorWorker.class.getResourceAsStream("/eu/dnetlib/collector/worker/collector_parameter.json"))); argumentParser.parseArgument(args); - log.info("hdfsPath =" + argumentParser.getHdfsPath()); - log.info("json = " + argumentParser.getJson()); - final MessageManager manager = new MessageManager(argumentParser.getRabbitHost(), argumentParser.getRabbitUser(), argumentParser.getRabbitPassword(), false, false, null); + log.info("hdfsPath =" + argumentParser.get("hdfsPath")); + log.info("json = " + argumentParser.get("apidescriptor")); + final MessageManager manager = new MessageManager(argumentParser.get("rabbitHost"), argumentParser.get("rabbitUser"), argumentParser.get("rabbitPassword"), false, false, null); final DnetCollectorWorker worker = new DnetCollectorWorker(collectorPluginFactory, argumentParser, manager); worker.collect(); diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerArgumentParser.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerArgumentParser.java deleted file mode 100644 index 9f34f898b..000000000 --- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerArgumentParser.java +++ /dev/null @@ -1,193 +0,0 @@ -package eu.dnetlib.collector.worker; - -import org.apache.commons.cli.*; - -public class DnetCollectorWorkerArgumentParser { - - private final Options options; - private String hdfsPath ; - private String json; - private String nameNode; - private String user; - private String rabbitUser; - private String rabbitPassword; - private String rabbitHost; - private String rabbitOngoingQueue; - private String rabbitReportQueue; - private String workflowId; - - public DnetCollectorWorkerArgumentParser(){ - options = new Options(); - options.addOption(Option.builder("p") - .longOpt("hdfsPath") - .required(true) - .desc("the path where storing the sequential file") - .hasArg() // This option has an argument. - .build()); - options.addOption(Option.builder("a") - .longOpt("apidescriptor") - .required(true) - .desc("the Json enconding of the API Descriptor") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("n") - .longOpt("namenode") - .required(true) - .desc("the Name Node URI") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("u") - .longOpt("userHDFS") - .required(true) - .desc("the user wich create the hdfs seq file") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("ru") - .longOpt("rabbitUser") - .required(true) - .desc("the user to connect with RabbitMq for messaging") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("rp") - .longOpt("rabbitPassWord") - .required(true) - .desc("the password to connect with RabbitMq for messaging") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("rh") - .longOpt("rabbitHost") - .required(true) - .desc("the host of the RabbitMq server") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("ro") - .longOpt("rabbitOngoingQueue") - .required(true) - .desc("the name of the ongoing queue") - .hasArg() // This option has an argument. - .build()); - - options.addOption(Option.builder("rr") - .longOpt("rabbitReportQueue") - .required(true) - .desc("the name of the report queue") - .hasArg() // This option has an argument. - .build()); - options.addOption(Option.builder("w") - .longOpt("workflowId") - .required(true) - .desc("the identifier of the dnet Workflow") - .hasArg() // This option has an argument. - .build()); - } - - public void parseArgument(final String[] args) throws DnetCollectorException { - try { - CommandLineParser parser = new DefaultParser(); - CommandLine cmd = parser.parse(options, args); - hdfsPath = cmd.getOptionValue("p"); - json = cmd.getOptionValue("a"); - nameNode = cmd.getOptionValue("n"); - user = cmd.getOptionValue("u"); - rabbitUser = cmd.getOptionValue("ru"); - rabbitPassword = cmd.getOptionValue("rp"); - rabbitHost = cmd.getOptionValue("rh"); - rabbitOngoingQueue = cmd.getOptionValue("ro"); - rabbitReportQueue = cmd.getOptionValue("rr"); - workflowId = cmd.getOptionValue("w"); - } catch (Throwable e){ - throw new DnetCollectorException("Error during parsing arguments ",e); - } - - } - - public Options getOptions() { - return options; - } - - public String getHdfsPath() { - return hdfsPath; - } - - public void setHdfsPath(String hdfsPath) { - this.hdfsPath = hdfsPath; - } - - public String getJson() { - return json; - } - - public void setJson(String json) { - this.json = json; - } - - public String getNameNode() { - return nameNode; - } - - public void setNameNode(String nameNode) { - this.nameNode = nameNode; - } - - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } - - public String getRabbitUser() { - return rabbitUser; - } - - public void setRabbitUser(String rabbitUser) { - this.rabbitUser = rabbitUser; - } - - public String getRabbitPassword() { - return rabbitPassword; - } - - public void setRabbitPassword(String rabbitPassword) { - this.rabbitPassword = rabbitPassword; - } - - public String getRabbitHost() { - return rabbitHost; - } - - public void setRabbitHost(String rabbitHost) { - this.rabbitHost = rabbitHost; - } - - public String getRabbitOngoingQueue() { - return rabbitOngoingQueue; - } - - public void setRabbitOngoingQueue(String rabbitOngoingQueue) { - this.rabbitOngoingQueue = rabbitOngoingQueue; - } - - public String getRabbitReportQueue() { - return rabbitReportQueue; - } - - public void setRabbitReportQueue(String rabbitReportQueue) { - this.rabbitReportQueue = rabbitReportQueue; - } - - public String getWorkflowId() { - return workflowId; - } - - public void setWorkflowId(String workflowId) { - this.workflowId = workflowId; - } -} diff --git a/dhp-applications/dhp-collector-worker/src/main/resources/eu/dnetlib/collector/worker/collector_parameter.json b/dhp-applications/dhp-collector-worker/src/main/resources/eu/dnetlib/collector/worker/collector_parameter.json new file mode 100644 index 000000000..c247d15e4 --- /dev/null +++ b/dhp-applications/dhp-collector-worker/src/main/resources/eu/dnetlib/collector/worker/collector_parameter.json @@ -0,0 +1,12 @@ +[ + {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true}, + {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true}, + {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true}, + {"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true}, + {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, + {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, + {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true} +] \ No newline at end of file diff --git a/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java index e0b7f0025..370c5166d 100644 --- a/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java +++ b/dhp-applications/dhp-collector-worker/src/test/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplicationTests.java @@ -3,6 +3,7 @@ package eu.dnetlib.collector.worker; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.collector.worker.utils.CollectorPluginFactory; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.message.Message; import eu.dnetlib.message.MessageManager; import org.junit.After; @@ -18,7 +19,7 @@ import static org.mockito.Mockito.*; public class DnetCollectorWorkerApplicationTests { - private DnetCollectorWorkerArgumentParser argumentParser = mock(DnetCollectorWorkerArgumentParser.class); + private ArgumentApplicationParser argumentParser = mock(ArgumentApplicationParser.class); private MessageManager messageManager = mock(MessageManager.class); private DnetCollectorWorker worker; @@ -26,12 +27,12 @@ public class DnetCollectorWorkerApplicationTests { public void setup() throws Exception { ObjectMapper mapper = new ObjectMapper(); final String apiJson = mapper.writeValueAsString(getApi()); - when(argumentParser.getJson()).thenReturn(apiJson); - when(argumentParser.getNameNode()).thenReturn("file://tmp/test.seq"); - when(argumentParser.getHdfsPath()).thenReturn("/tmp/file.seq"); - when(argumentParser.getUser()).thenReturn("sandro"); - when(argumentParser.getWorkflowId()).thenReturn("sandro"); - when(argumentParser.getRabbitOngoingQueue()).thenReturn("sandro"); + when(argumentParser.get("apidescriptor")).thenReturn(apiJson); + when(argumentParser.get("namenode")).thenReturn("file://tmp/test.seq"); + when(argumentParser.get("hdfsPath")).thenReturn("/tmp/file.seq"); + when(argumentParser.get("userHDFS")).thenReturn("sandro"); + when(argumentParser.get("workflowId")).thenReturn("sandro"); + when(argumentParser.get("rabbitOngoingQueue")).thenReturn("sandro"); when(messageManager.sendMessage(any(Message.class), anyString(), anyBoolean(),anyBoolean())).thenAnswer(a -> { System.out.println("sent message: "+a.getArguments()[0]); diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml index cac0f08c2..42acacb8a 100644 --- a/dhp-common/pom.xml +++ b/dhp-common/pom.xml @@ -15,6 +15,11 @@ jar + + commons-cli + commons-cli + 1.4 + org.apache.commons commons-lang3 @@ -44,6 +49,12 @@ amqp-client 5.6.0 + + commons-io + commons-io + 2.4 + test + diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java new file mode 100644 index 000000000..5c8e1f627 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/ArgumentApplicationParser.java @@ -0,0 +1,52 @@ +package eu.dnetlib.dhp.application; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.cli.*; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class ArgumentApplicationParser { + + private final Options options = new Options(); + private final Map objectMap = new HashMap<>(); + + public ArgumentApplicationParser(final String json_configuration) throws Exception { + final ObjectMapper mapper = new ObjectMapper(); + final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class); + createOptionMap(configuration); + } + + public ArgumentApplicationParser(final OptionsParameter[] configuration) { + createOptionMap(configuration); + } + + private void createOptionMap(final OptionsParameter[] configuration) { + Arrays.stream(configuration).map(conf -> Option.builder(conf.getParamName()) + .longOpt(conf.getParamLongName()) + .required(conf.isParamRequired()) + .desc(conf.getParamDescription()) + .hasArg() // This option has an argument. + .build()).forEach(options::addOption); + +// HelpFormatter formatter = new HelpFormatter(); +// formatter.printHelp("myapp", null, options, null, true); + + + } + + public void parseArgument(final String[] args) throws Exception { + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + Arrays.stream(cmd.getOptions()).forEach(it -> objectMap.put(it.getLongOpt(), it.getValue())); + } + + public String get(final String key) { + return objectMap.get(key); + } + + public Map getObjectMap() { + return objectMap; + } +} diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java b/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java new file mode 100644 index 000000000..92079fce7 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/application/OptionsParameter.java @@ -0,0 +1,29 @@ +package eu.dnetlib.dhp.application; + + +public class OptionsParameter { + + private String paramName; + private String paramLongName; + private String paramDescription; + private boolean paramRequired; + + public OptionsParameter() { + } + + public String getParamName() { + return paramName; + } + + public String getParamLongName() { + return paramLongName; + } + + public String getParamDescription() { + return paramDescription; + } + + public boolean isParamRequired() { + return paramRequired; + } +} diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java new file mode 100644 index 000000000..2033919b9 --- /dev/null +++ b/dhp-common/src/test/java/eu/dnetlib/dhp/application/ArgumentApplicationParserTest.java @@ -0,0 +1,51 @@ +package eu.dnetlib.dhp.application; + +import org.apache.commons.io.IOUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class ArgumentApplicationParserTest { + + + @Test + public void testParseParameter() throws Exception { + final String jsonConfiguration = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/application/parameters.json")); + assertNotNull(jsonConfiguration); + ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(new String[]{"-p", "value0", + "-a", "value1", + "-n", "value2", + "-u", "value3", + "-ru", "value4", + "-rp", "value5", + "-rh", "value6", + "-ro", "value7", + "-rr", "value8", + "-w", "value9", + }); + assertNotNull(parser.get("hdfsPath")); + assertNotNull(parser.get("apidescriptor")); + assertNotNull(parser.get("namenode")); + assertNotNull(parser.get("userHDFS")); + assertNotNull(parser.get("rabbitUser")); + assertNotNull(parser.get("rabbitPassWord")); + assertNotNull(parser.get("rabbitHost")); + assertNotNull(parser.get("rabbitOngoingQueue")); + assertNotNull(parser.get("rabbitReportQueue")); + assertNotNull(parser.get("workflowId")); + assertEquals("value0", parser.get("hdfsPath")); + assertEquals("value1", parser.get("apidescriptor")); + assertEquals("value2", parser.get("namenode")); + assertEquals("value3", parser.get("userHDFS")); + assertEquals("value4", parser.get("rabbitUser")); + assertEquals("value5", parser.get("rabbitPassWord")); + assertEquals("value6", parser.get("rabbitHost")); + assertEquals("value7", parser.get("rabbitOngoingQueue")); + assertEquals("value8", parser.get("rabbitReportQueue")); + assertEquals("value9", parser.get("workflowId")); + } + + +} diff --git a/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json b/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json new file mode 100644 index 000000000..60c2d391a --- /dev/null +++ b/dhp-common/src/test/resources/eu/dnetlib/application/parameters.json @@ -0,0 +1,12 @@ +[ + {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true}, + {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true}, + {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true}, + {"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true}, + {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rp", "paramLongName":"rabbitPassWord", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, + {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, + {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, + {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true} +] \ No newline at end of file