Created a generic Argument parser to be used in all modules

This commit is contained in:
Sandro La Bruzzo 2019-10-03 12:22:44 +02:00
parent b259cd0bd8
commit a423a6ebfd
10 changed files with 192 additions and 213 deletions

View File

@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.collector.worker.plugins.CollectorPlugin; import eu.dnetlib.collector.worker.plugins.CollectorPlugin;
import eu.dnetlib.collector.worker.utils.CollectorPluginFactory; import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.message.Message; import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType; import eu.dnetlib.message.MessageType;
@ -29,12 +30,12 @@ public class DnetCollectorWorker {
private final CollectorPluginFactory collectorPluginFactory; private final CollectorPluginFactory collectorPluginFactory;
private final DnetCollectorWorkerArgumentParser argumentParser; private final ArgumentApplicationParser argumentParser;
private final MessageManager manager; private final MessageManager manager;
public DnetCollectorWorker(final CollectorPluginFactory collectorPluginFactory, final DnetCollectorWorkerArgumentParser argumentParser, final MessageManager manager) throws DnetCollectorException { public DnetCollectorWorker(final CollectorPluginFactory collectorPluginFactory, final ArgumentApplicationParser argumentParser, final MessageManager manager) throws DnetCollectorException {
this.collectorPluginFactory = collectorPluginFactory; this.collectorPluginFactory = collectorPluginFactory;
this.argumentParser = argumentParser; this.argumentParser = argumentParser;
this.manager = manager; this.manager = manager;
@ -44,11 +45,11 @@ public class DnetCollectorWorker {
public void collect() throws DnetCollectorException { public void collect() throws DnetCollectorException {
try { try {
final ObjectMapper jsonMapper = new ObjectMapper(); final ObjectMapper jsonMapper = new ObjectMapper();
final ApiDescriptor api = jsonMapper.readValue(argumentParser.getJson(), ApiDescriptor.class); final ApiDescriptor api = jsonMapper.readValue(argumentParser.get("apidescriptor"), ApiDescriptor.class);
final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol()); final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol());
final String hdfsuri = argumentParser.getNameNode(); final String hdfsuri = argumentParser.get("namenode");
// ====== Init HDFS File System Object // ====== Init HDFS File System Object
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -58,11 +59,11 @@ public class DnetCollectorWorker {
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
System.setProperty("HADOOP_USER_NAME", argumentParser.getUser()); System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS"));
System.setProperty("hadoop.home.dir", "/"); System.setProperty("hadoop.home.dir", "/");
//Get the filesystem - HDFS //Get the filesystem - HDFS
FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf); FileSystem fs = FileSystem.get(URI.create(hdfsuri), conf);
Path hdfswritepath = new Path(argumentParser.getHdfsPath()); Path hdfswritepath = new Path(argumentParser.get("hdfsPath"));
log.info("Created path " + hdfswritepath.toString()); log.info("Created path " + hdfswritepath.toString());
@ -81,7 +82,7 @@ public class DnetCollectorWorker {
if (counter.get() % 10 == 0) { if (counter.get() % 10 == 0) {
try { try {
ongoingMap.put("ongoing", "" + counter.get()); ongoingMap.put("ongoing", "" + counter.get());
log.debug("Sending message: "+ manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.getRabbitOngoingQueue(), true, false)); log.debug("Sending message: "+ manager.sendMessage(new Message(argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.get("rabbitOngoingQueue"), true, false));
} catch (Exception e) { } catch (Exception e) {
log.error("Error on sending message ", e); log.error("Error on sending message ", e);
} }
@ -95,9 +96,9 @@ public class DnetCollectorWorker {
}); });
} }
ongoingMap.put("ongoing", "" + counter.get()); ongoingMap.put("ongoing", "" + counter.get());
manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.getRabbitOngoingQueue(), true, false); manager.sendMessage(new Message(argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap), argumentParser.get("rabbitOngoingQueue"), true, false);
reportMap.put("collected", "" + counter.get()); reportMap.put("collected", "" + counter.get());
manager.sendMessage(new Message(argumentParser.getWorkflowId(), "Collection", MessageType.REPORT, reportMap), argumentParser.getRabbitOngoingQueue(), true, false); manager.sendMessage(new Message(argumentParser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), argumentParser.get("rabbitOngoingQueue"), true, false);
manager.close(); manager.close();
} catch (Throwable e) { } catch (Throwable e) {
throw new DnetCollectorException("Error on collecting ",e); throw new DnetCollectorException("Error on collecting ",e);

View File

@ -1,7 +1,9 @@
package eu.dnetlib.collector.worker; package eu.dnetlib.collector.worker;
import eu.dnetlib.collector.worker.utils.CollectorPluginFactory; import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -21,7 +23,7 @@ public class DnetCollectorWorkerApplication {
private static CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); private static CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory();
private static DnetCollectorWorkerArgumentParser argumentParser = new DnetCollectorWorkerArgumentParser(); private static ArgumentApplicationParser argumentParser;
/** /**
@ -29,10 +31,11 @@ public class DnetCollectorWorkerApplication {
*/ */
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
argumentParser= new ArgumentApplicationParser(IOUtils.toString(DnetCollectorWorker.class.getResourceAsStream("/eu/dnetlib/collector/worker/collector_parameter.json")));
argumentParser.parseArgument(args); argumentParser.parseArgument(args);
log.info("hdfsPath =" + argumentParser.getHdfsPath()); log.info("hdfsPath =" + argumentParser.get("hdfsPath"));
log.info("json = " + argumentParser.getJson()); log.info("json = " + argumentParser.get("apidescriptor"));
final MessageManager manager = new MessageManager(argumentParser.getRabbitHost(), argumentParser.getRabbitUser(), argumentParser.getRabbitPassword(), false, false, null); final MessageManager manager = new MessageManager(argumentParser.get("rabbitHost"), argumentParser.get("rabbitUser"), argumentParser.get("rabbitPassword"), false, false, null);
final DnetCollectorWorker worker = new DnetCollectorWorker(collectorPluginFactory, argumentParser, manager); final DnetCollectorWorker worker = new DnetCollectorWorker(collectorPluginFactory, argumentParser, manager);
worker.collect(); worker.collect();

View File

@ -1,193 +0,0 @@
package eu.dnetlib.collector.worker;
import org.apache.commons.cli.*;
public class DnetCollectorWorkerArgumentParser {
private final Options options;
private String hdfsPath ;
private String json;
private String nameNode;
private String user;
private String rabbitUser;
private String rabbitPassword;
private String rabbitHost;
private String rabbitOngoingQueue;
private String rabbitReportQueue;
private String workflowId;
public DnetCollectorWorkerArgumentParser(){
options = new Options();
options.addOption(Option.builder("p")
.longOpt("hdfsPath")
.required(true)
.desc("the path where storing the sequential file")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("a")
.longOpt("apidescriptor")
.required(true)
.desc("the Json enconding of the API Descriptor")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("n")
.longOpt("namenode")
.required(true)
.desc("the Name Node URI")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("u")
.longOpt("userHDFS")
.required(true)
.desc("the user wich create the hdfs seq file")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("ru")
.longOpt("rabbitUser")
.required(true)
.desc("the user to connect with RabbitMq for messaging")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("rp")
.longOpt("rabbitPassWord")
.required(true)
.desc("the password to connect with RabbitMq for messaging")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("rh")
.longOpt("rabbitHost")
.required(true)
.desc("the host of the RabbitMq server")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("ro")
.longOpt("rabbitOngoingQueue")
.required(true)
.desc("the name of the ongoing queue")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("rr")
.longOpt("rabbitReportQueue")
.required(true)
.desc("the name of the report queue")
.hasArg() // This option has an argument.
.build());
options.addOption(Option.builder("w")
.longOpt("workflowId")
.required(true)
.desc("the identifier of the dnet Workflow")
.hasArg() // This option has an argument.
.build());
}
public void parseArgument(final String[] args) throws DnetCollectorException {
try {
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
hdfsPath = cmd.getOptionValue("p");
json = cmd.getOptionValue("a");
nameNode = cmd.getOptionValue("n");
user = cmd.getOptionValue("u");
rabbitUser = cmd.getOptionValue("ru");
rabbitPassword = cmd.getOptionValue("rp");
rabbitHost = cmd.getOptionValue("rh");
rabbitOngoingQueue = cmd.getOptionValue("ro");
rabbitReportQueue = cmd.getOptionValue("rr");
workflowId = cmd.getOptionValue("w");
} catch (Throwable e){
throw new DnetCollectorException("Error during parsing arguments ",e);
}
}
public Options getOptions() {
return options;
}
public String getHdfsPath() {
return hdfsPath;
}
public void setHdfsPath(String hdfsPath) {
this.hdfsPath = hdfsPath;
}
public String getJson() {
return json;
}
public void setJson(String json) {
this.json = json;
}
public String getNameNode() {
return nameNode;
}
public void setNameNode(String nameNode) {
this.nameNode = nameNode;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getRabbitUser() {
return rabbitUser;
}
public void setRabbitUser(String rabbitUser) {
this.rabbitUser = rabbitUser;
}
public String getRabbitPassword() {
return rabbitPassword;
}
public void setRabbitPassword(String rabbitPassword) {
this.rabbitPassword = rabbitPassword;
}
public String getRabbitHost() {
return rabbitHost;
}
public void setRabbitHost(String rabbitHost) {
this.rabbitHost = rabbitHost;
}
public String getRabbitOngoingQueue() {
return rabbitOngoingQueue;
}
public void setRabbitOngoingQueue(String rabbitOngoingQueue) {
this.rabbitOngoingQueue = rabbitOngoingQueue;
}
public String getRabbitReportQueue() {
return rabbitReportQueue;
}
public void setRabbitReportQueue(String rabbitReportQueue) {
this.rabbitReportQueue = rabbitReportQueue;
}
public String getWorkflowId() {
return workflowId;
}
public void setWorkflowId(String workflowId) {
this.workflowId = workflowId;
}
}

View File

@ -0,0 +1,12 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
{"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}
]

View File

@ -3,6 +3,7 @@ package eu.dnetlib.collector.worker;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.collector.worker.utils.CollectorPluginFactory; import eu.dnetlib.collector.worker.utils.CollectorPluginFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.message.Message; import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageManager;
import org.junit.After; import org.junit.After;
@ -18,7 +19,7 @@ import static org.mockito.Mockito.*;
public class DnetCollectorWorkerApplicationTests { public class DnetCollectorWorkerApplicationTests {
private DnetCollectorWorkerArgumentParser argumentParser = mock(DnetCollectorWorkerArgumentParser.class); private ArgumentApplicationParser argumentParser = mock(ArgumentApplicationParser.class);
private MessageManager messageManager = mock(MessageManager.class); private MessageManager messageManager = mock(MessageManager.class);
private DnetCollectorWorker worker; private DnetCollectorWorker worker;
@ -26,12 +27,12 @@ public class DnetCollectorWorkerApplicationTests {
public void setup() throws Exception { public void setup() throws Exception {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
final String apiJson = mapper.writeValueAsString(getApi()); final String apiJson = mapper.writeValueAsString(getApi());
when(argumentParser.getJson()).thenReturn(apiJson); when(argumentParser.get("apidescriptor")).thenReturn(apiJson);
when(argumentParser.getNameNode()).thenReturn("file://tmp/test.seq"); when(argumentParser.get("namenode")).thenReturn("file://tmp/test.seq");
when(argumentParser.getHdfsPath()).thenReturn("/tmp/file.seq"); when(argumentParser.get("hdfsPath")).thenReturn("/tmp/file.seq");
when(argumentParser.getUser()).thenReturn("sandro"); when(argumentParser.get("userHDFS")).thenReturn("sandro");
when(argumentParser.getWorkflowId()).thenReturn("sandro"); when(argumentParser.get("workflowId")).thenReturn("sandro");
when(argumentParser.getRabbitOngoingQueue()).thenReturn("sandro"); when(argumentParser.get("rabbitOngoingQueue")).thenReturn("sandro");
when(messageManager.sendMessage(any(Message.class), anyString(), anyBoolean(),anyBoolean())).thenAnswer(a -> { when(messageManager.sendMessage(any(Message.class), anyString(), anyBoolean(),anyBoolean())).thenAnswer(a -> {
System.out.println("sent message: "+a.getArguments()[0]); System.out.println("sent message: "+a.getArguments()[0]);

View File

@ -15,6 +15,11 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies> <dependencies>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.4</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>commons-lang3</artifactId>
@ -44,6 +49,12 @@
<artifactId>amqp-client</artifactId> <artifactId>amqp-client</artifactId>
<version>5.6.0</version> <version>5.6.0</version>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.application;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class ArgumentApplicationParser {
private final Options options = new Options();
private final Map<String, String> objectMap = new HashMap<>();
public ArgumentApplicationParser(final String json_configuration) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
final OptionsParameter[] configuration = mapper.readValue(json_configuration, OptionsParameter[].class);
createOptionMap(configuration);
}
public ArgumentApplicationParser(final OptionsParameter[] configuration) {
createOptionMap(configuration);
}
private void createOptionMap(final OptionsParameter[] configuration) {
Arrays.stream(configuration).map(conf -> Option.builder(conf.getParamName())
.longOpt(conf.getParamLongName())
.required(conf.isParamRequired())
.desc(conf.getParamDescription())
.hasArg() // This option has an argument.
.build()).forEach(options::addOption);
// HelpFormatter formatter = new HelpFormatter();
// formatter.printHelp("myapp", null, options, null, true);
}
public void parseArgument(final String[] args) throws Exception {
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
Arrays.stream(cmd.getOptions()).forEach(it -> objectMap.put(it.getLongOpt(), it.getValue()));
}
public String get(final String key) {
return objectMap.get(key);
}
public Map<String, String> getObjectMap() {
return objectMap;
}
}

View File

@ -0,0 +1,29 @@
package eu.dnetlib.dhp.application;
public class OptionsParameter {
private String paramName;
private String paramLongName;
private String paramDescription;
private boolean paramRequired;
public OptionsParameter() {
}
public String getParamName() {
return paramName;
}
public String getParamLongName() {
return paramLongName;
}
public String getParamDescription() {
return paramDescription;
}
public boolean isParamRequired() {
return paramRequired;
}
}

View File

@ -0,0 +1,51 @@
package eu.dnetlib.dhp.application;
import org.apache.commons.io.IOUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ArgumentApplicationParserTest {
@Test
public void testParseParameter() throws Exception {
final String jsonConfiguration = IOUtils.toString(this.getClass().getResourceAsStream("/eu/dnetlib/application/parameters.json"));
assertNotNull(jsonConfiguration);
ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(new String[]{"-p", "value0",
"-a", "value1",
"-n", "value2",
"-u", "value3",
"-ru", "value4",
"-rp", "value5",
"-rh", "value6",
"-ro", "value7",
"-rr", "value8",
"-w", "value9",
});
assertNotNull(parser.get("hdfsPath"));
assertNotNull(parser.get("apidescriptor"));
assertNotNull(parser.get("namenode"));
assertNotNull(parser.get("userHDFS"));
assertNotNull(parser.get("rabbitUser"));
assertNotNull(parser.get("rabbitPassWord"));
assertNotNull(parser.get("rabbitHost"));
assertNotNull(parser.get("rabbitOngoingQueue"));
assertNotNull(parser.get("rabbitReportQueue"));
assertNotNull(parser.get("workflowId"));
assertEquals("value0", parser.get("hdfsPath"));
assertEquals("value1", parser.get("apidescriptor"));
assertEquals("value2", parser.get("namenode"));
assertEquals("value3", parser.get("userHDFS"));
assertEquals("value4", parser.get("rabbitUser"));
assertEquals("value5", parser.get("rabbitPassWord"));
assertEquals("value6", parser.get("rabbitHost"));
assertEquals("value7", parser.get("rabbitOngoingQueue"));
assertEquals("value8", parser.get("rabbitReportQueue"));
assertEquals("value9", parser.get("workflowId"));
}
}

View File

@ -0,0 +1,12 @@
[
{"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true},
{"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true},
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true},
{"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true},
{"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rp", "paramLongName":"rabbitPassWord", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true},
{"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true},
{"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true},
{"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true},
{"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true}
]