From 53ec9bccca15465e383ab796d340725eb4998573 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Tue, 16 Apr 2019 12:28:01 +0200 Subject: [PATCH] changed the implemetation of RabitMQ Comunication --- .../DnetCollectorWorkerApplication.java | 17 +- .../dhp-mdstore-manager-app/pom.xml | 161 +++++++++--------- .../src/main/resources/application.properties | 4 +- .../eu/dnetlib/message/MessageManager.java | 38 ++++- .../transformation/TransformSparkJobNode.java | 7 + 5 files changed, 137 insertions(+), 90 deletions(-) diff --git a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java index c853207d3..8f4f46513 100644 --- a/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java +++ b/dhp-applications/dhp-collector-worker/src/main/java/eu/dnetlib/collector/worker/DnetCollectorWorkerApplication.java @@ -192,16 +192,18 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner { log.info("Created path "+hdfswritepath.toString()); + final Map ongoingMap = new HashMap<>(); + final Map reportMap = new HashMap<>(); + final AtomicInteger counter = new AtomicInteger(0); try(SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(hdfswritepath), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class))) { - final AtomicInteger counter = new AtomicInteger(0); + final IntWritable key = new IntWritable(counter.get()); final Text value = new Text(); - final Map ongoingMap = new HashMap<>(); - final Map reportMap = new HashMap<>(); + plugin.collect(api).forEach(content -> { @@ -223,12 +225,13 @@ public class DnetCollectorWorkerApplication implements CommandLineRunner { } }); - ongoingMap.put("ongoing", ""+counter.get()); - manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false); - reportMap.put("collected", ""+counter.get()); - manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false); } + ongoingMap.put("ongoing", ""+counter.get()); + manager.sendMessage(new Message(workflowId,"Collection", MessageType.ONGOING, ongoingMap ), rabbitOngoingQueue, true, false); + reportMap.put("collected", ""+counter.get()); + manager.sendMessage(new Message(workflowId,"Collection", MessageType.REPORT, reportMap ), rabbitReportQueue, true, false); + manager.close(); } } diff --git a/dhp-applications/dhp-mdstore-manager-app/pom.xml b/dhp-applications/dhp-mdstore-manager-app/pom.xml index 645535c0a..c90b4224d 100644 --- a/dhp-applications/dhp-mdstore-manager-app/pom.xml +++ b/dhp-applications/dhp-mdstore-manager-app/pom.xml @@ -1,91 +1,98 @@ - 4.0.0 - dhp-mdstore-manager-app - 1.1.0-SNAPSHOT + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 + dhp-mdstore-manager-app + 1.1.0-SNAPSHOT - - - org.springframework.boot - spring-boot-starter-parent - 2.1.3.RELEASE - ../ - + + + org.springframework.boot + spring-boot-starter-parent + 2.1.3.RELEASE + ../ + - + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-data-jpa + + + org.springframework.boot + spring-boot-starter-json + + + org.postgresql + postgresql + + + com.vladmihalcea + hibernate-types-52 + 2.3.5 + + + commons-io + commons-io + 2.6 + + + + eu.dnetlib.dhp + dhp-common + 1.0.0-SNAPSHOT + + + + io.springfox + springfox-swagger2 + 2.9.2 + + + io.springfox + springfox-swagger-ui + 2.9.2 + - - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-data-jpa - - - org.springframework.boot - spring-boot-starter-json - - - org.postgresql - postgresql - - - com.vladmihalcea - hibernate-types-52 - 2.3.5 - - - commons-io - commons-io - 2.6 - - - - - eu.dnetlib.dhp - dhp-common - 1.0.0-SNAPSHOT - - - - - io.springfox - springfox-swagger2 - 2.9.2 - - - - io.springfox - springfox-swagger-ui - 2.9.2 - - - - - junit - junit - test - - + + + junit + junit + test + + + + + + org.springframework.boot + spring-boot-maven-plugin + + true + + + + diff --git a/dhp-applications/dhp-mdstore-manager-app/src/main/resources/application.properties b/dhp-applications/dhp-mdstore-manager-app/src/main/resources/application.properties index 792c39865..46f38ca64 100644 --- a/dhp-applications/dhp-mdstore-manager-app/src/main/resources/application.properties +++ b/dhp-applications/dhp-mdstore-manager-app/src/main/resources/application.properties @@ -2,8 +2,8 @@ spring.main.banner-mode = console logging.level.root = INFO spring.datasource.url=jdbc:postgresql://localhost:5432/mdstoremanager -spring.datasource.username= -spring.datasource.password= +spring.datasource.username=dnet +spring.datasource.password=dnetPwd spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.PostgreSQLDialect diff --git a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java index 2b5e90838..e3d90f7e0 100644 --- a/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java +++ b/dhp-common/src/main/java/eu/dnetlib/message/MessageManager.java @@ -19,6 +19,9 @@ public class MessageManager { private final String password; + private Connection connection; + + private Map channels = new HashMap<>(); private boolean durable; @@ -59,9 +62,36 @@ public class MessageManager { channel.queueDeclare(queueName, durable, false, this.autodelete, args); return channel; } - public boolean sendMessage(final Message message, String queueName) throws Exception { - try (Connection connection = createConnection(); Channel channel = createChannel(connection, queueName, this.durable, this.autodelete)) { + private Channel getOrCreateChannel(final String queueName, boolean durable, boolean autodelete) throws Exception { + if (channels.containsKey(queueName)) { + return channels.get(queueName); + } + + if (this.connection == null) { + this.connection = createConnection(); + } + channels.put(queueName, createChannel(this.connection, queueName, durable, autodelete)); + return channels.get(queueName); + } + + + + public void close() throws IOException { + channels.values().forEach(ch-> { + try { + ch.close(); + } catch (Exception e) { + //TODO LOG + } + }); + + this.connection.close(); + } + + public boolean sendMessage(final Message message, String queueName) throws Exception { + try { + Channel channel = getOrCreateChannel(queueName, this.durable, this.autodelete); channel.basicPublish("", queueName,null, message.toString().getBytes()); return true; } catch (Throwable e) { @@ -70,8 +100,8 @@ public class MessageManager { } public boolean sendMessage(final Message message, String queueName, boolean durable_var, boolean autodelete_var) throws Exception { - try (Connection connection = createConnection(); Channel channel = createChannel(connection, queueName, durable_var, autodelete_var)) { - + try { + Channel channel = getOrCreateChannel(queueName, durable_var, autodelete_var); channel.basicPublish("", queueName,null, message.toString().getBytes()); return true; } catch (Throwable e) { diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index ebca61b56..54dcb3d3c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -140,12 +140,19 @@ public class TransformSparkJobNode { if (rabbitHost != null) { + + System.out.println("SEND FINAL REPORT"); + final Map reportMap = new HashMap<>(); reportMap.put("inputItem" , ""+ totalItems.value()); reportMap.put("invalidRecords", "" + errorItems.value()); reportMap.put("mdStoreSize", "" + transformedItems.value()); final MessageManager manager = new MessageManager(rabbitHost, rabbitUser, rabbitPassword, false, false, null); + + + System.out.println(new Message(workflowId, "Transform", MessageType.REPORT, reportMap)); manager.sendMessage(new Message(workflowId, "Transform", MessageType.REPORT, reportMap), rabbitReportQueue, true, false); + manager.close(); } }