diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java index ed2a3c9b3..0cbb6c859 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/Message.java @@ -3,31 +3,36 @@ package eu.dnetlib.dhp.message; import java.io.Serializable; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; public class Message implements Serializable { + private static final long serialVersionUID = 401753881204524893L; + public static String CURRENT_PARAM = "current"; public static String TOTAL_PARAM = "total"; - /** - * - */ - private static final long serialVersionUID = 401753881204524893L; + private MessageType messageType; private String workflowId; private Map body; - public Message() { - body = new HashMap<>(); + public Message(final MessageType messageType, final String workflowId) { + this(messageType, workflowId, new LinkedHashMap<>()); } - public Message(final String workflowId, final Map body) { + public Message(final MessageType messageType, final String workflowId, final Map body) { + this.messageType = messageType; this.workflowId = workflowId; this.body = body; } + public MessageType getMessageType() { + return messageType; + } + public String getWorkflowId() { return workflowId; } @@ -46,6 +51,7 @@ public class Message implements Serializable { @Override public String toString() { - return String.format("Message [workflowId=%s, body=%s]", workflowId, body); + return String.format("Message [type=%s, workflowId=%s, body=%s]", messageType, workflowId, body); } + } diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java index 16bb0c97e..0c6eacf99 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageSender.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.message; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -45,13 +46,15 @@ public class MessageSender { } public void sendMessage(final Long current, final Long total) { - sendMessage(createMessage(current, total)); + sendMessage(createOngoingMessage(current, total)); } - private Message createMessage(final Long current, final Long total) { + public void sendReport(final Map report) { + sendMessage(new Message(MessageType.REPORT, workflowId, report)); + } - final Message m = new Message(); - m.setWorkflowId(workflowId); + private Message createOngoingMessage(final Long current, final Long total) { + final Message m = new Message(MessageType.ONGOING, workflowId); m.getBody().put(Message.CURRENT_PARAM, current.toString()); if (total != null) { m.getBody().put(Message.TOTAL_PARAM, total.toString()); diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java new file mode 100644 index 000000000..30f152c96 --- /dev/null +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/message/MessageType.java @@ -0,0 +1,20 @@ + +package eu.dnetlib.dhp.message; + +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; + +public enum MessageType { + + ONGOING, REPORT; + + public MessageType from(String value) { + return Optional + .ofNullable(value) + .map(StringUtils::upperCase) + .map(MessageType::valueOf) + .orElseThrow(() -> new IllegalArgumentException("unknown message type: " + value)); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java index a7204523a..a10572d06 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorPluginReport.java @@ -1,57 +1,39 @@ package eu.dnetlib.dhp.collection; -import static eu.dnetlib.dhp.utils.DHPUtils.*; - import java.io.Closeable; import java.io.IOException; import java.util.LinkedHashMap; import java.util.Objects; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.annotation.JsonIgnore; +import eu.dnetlib.dhp.message.MessageSender; public class CollectorPluginReport extends LinkedHashMap implements Closeable { private static final Logger log = LoggerFactory.getLogger(CollectorPluginReport.class); - @JsonIgnore - private Path path; - - @JsonIgnore - private FSDataOutputStream fos; - - public static String SUCCESS = "success"; + private MessageSender messageSender; public CollectorPluginReport() { } - public CollectorPluginReport(FileSystem fs, Path path) throws IOException { - this.path = path; - this.fos = fs.create(path); + public CollectorPluginReport(MessageSender messageSender) throws IOException { + this.messageSender = messageSender; } - public Boolean isSuccess() { - return containsKey(SUCCESS) && Boolean.valueOf(get(SUCCESS)); - } - - public void setSuccess(Boolean success) { - put(SUCCESS, String.valueOf(success)); + public void ongoing(Long current, Long total) { + messageSender.sendMessage(current, total); } @Override public void close() throws IOException { - final String data = MAPPER.writeValueAsString(this); - if (Objects.nonNull(fos)) { - log.info("writing report {} to {}", data, path.toString()); - IOUtils.write(data, fos); - populateOOZIEEnv(this); + if (Objects.nonNull(messageSender)) { + log.info("closing report: "); + this.forEach((k, v) -> log.info("{} - {}", k, v)); + messageSender.sendReport(this); } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java index ace725bfd..04e0f70c4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorker.java @@ -38,20 +38,16 @@ public class CollectorWorker { private final CollectorPluginReport report; - private final MessageSender messageSender; - public CollectorWorker( final ApiDescriptor api, final FileSystem fileSystem, final MDStoreVersion mdStoreVersion, final HttpClientParams clientParams, - final MessageSender messageSender, final CollectorPluginReport report) { this.api = api; this.fileSystem = fileSystem; this.mdStoreVersion = mdStoreVersion; this.clientParams = clientParams; - this.messageSender = messageSender; this.report = report; } @@ -78,23 +74,21 @@ public class CollectorWorker { content -> { key.set(counter.getAndIncrement()); if (counter.get() % 500 == 0) - messageSender.sendMessage(counter.longValue(), null); + report.ongoing(counter.longValue(), null); value.set(content); try { writer.append(key, value); } catch (Throwable e) { report.put(e.getClass().getName(), e.getMessage()); log.warn("setting report to failed"); - report.setSuccess(false); throw new RuntimeException(e); } }); } catch (Throwable e) { report.put(e.getClass().getName(), e.getMessage()); log.warn("setting report to failed"); - report.setSuccess(false); } finally { - messageSender.sendMessage(counter.longValue(), counter.longValue()); + report.ongoing(counter.longValue(), counter.longValue()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java index 0eea0837c..15f3f20b5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerApplication.java @@ -77,22 +77,15 @@ public class CollectorWorkerApplication { } protected void run(String mdStoreVersion, HttpClientParams clientParams, ApiDescriptor api, - String dnetMessageManagerURL, String workflowId) throws IOException { + String dnetMessageManagerURL, String workflowId) + throws IOException, CollectorException, UnknownCollectorPluginException { final MessageSender ms = new MessageSender(dnetMessageManagerURL, workflowId); final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); - final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME; - log.info("report path is {}", reportPath); - - try (CollectorPluginReport report = new CollectorPluginReport(fileSystem, new Path(reportPath))) { - final CollectorWorker worker = new CollectorWorker(api, fileSystem, currentVersion, clientParams, ms, - report); - worker.collect(); - report.setSuccess(true); - } catch (Throwable e) { - log.info("got exception {}, ignoring", e.getMessage()); + try (CollectorPluginReport report = new CollectorPluginReport(ms)) { + new CollectorWorker(api, fileSystem, currentVersion, clientParams, report).collect(); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java deleted file mode 100644 index d8cf3ec02..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/CollectorWorkerReporter.java +++ /dev/null @@ -1,62 +0,0 @@ - -package eu.dnetlib.dhp.collection; - -import static eu.dnetlib.dhp.common.Constants.REPORT_FILE_NAME; -import static eu.dnetlib.dhp.utils.DHPUtils.*; - -import java.io.IOException; -import java.util.Objects; - -import org.apache.commons.cli.ParseException; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; - -/** - * CollectorWorkerReporter - */ -public class CollectorWorkerReporter { - - private static final Logger log = LoggerFactory.getLogger(CollectorWorkerReporter.class); - - /** - * @param args - */ - public static void main(final String[] args) throws IOException, ParseException, CollectorException { - - final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( - IOUtils - .toString( - CollectorWorker.class - .getResourceAsStream( - "/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json"))); - argumentParser.parseArgument(args); - - final String nameNode = argumentParser.get("namenode"); - log.info("nameNode is {}", nameNode); - - final String mdStoreVersion = argumentParser.get("mdStoreVersion"); - log.info("mdStoreVersion is {}", mdStoreVersion); - - final MDStoreVersion currentVersion = MAPPER.readValue(mdStoreVersion, MDStoreVersion.class); - - final String reportPath = currentVersion.getHdfsPath() + REPORT_FILE_NAME; - log.info("report path is {}", reportPath); - - final Configuration conf = getHadoopConfiguration(nameNode); - CollectorPluginReport report = readHdfsFileAs(conf, reportPath, CollectorPluginReport.class); - if (Objects.isNull(report)) { - throw new CollectorException("collection report is NULL"); - } - log.info("report success: {}, size: {}", report.isSuccess(), report.size()); - report.forEach((k, v) -> log.info("{} - {}", k, v)); - if (!report.isSuccess()) { - throw new CollectorException("collection report indicates a failure"); - } - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json deleted file mode 100644 index ef65cc389..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_reporter_input_parameter.json +++ /dev/null @@ -1,14 +0,0 @@ -[ - { - "paramName": "n", - "paramLongName": "namenode", - "paramDescription": "the Name Node URI", - "paramRequired": true - }, - { - "paramName": "mv", - "paramLongName": "mdStoreVersion", - "paramDescription": "the MDStore Version bean", - "paramRequired": true - } -] diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml index 1bab59659..0678eed11 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml @@ -110,18 +110,6 @@ --retryDelay${retryDelay} --connectTimeOut${connectTimeOut} --readTimeOut${readTimeOut} - - - - - - - - - eu.dnetlib.dhp.collection.CollectorWorkerReporter - ${collection_java_xmx} - --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} - --namenode${nameNode} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java deleted file mode 100644 index fd90a1b84..000000000 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/utils/CollectorPluginReportTest.java +++ /dev/null @@ -1,30 +0,0 @@ - -package eu.dnetlib.dhp.collector.worker.utils; - -import static eu.dnetlib.dhp.utils.DHPUtils.*; - -import java.io.IOException; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import eu.dnetlib.dhp.collection.CollectorPluginReport; - -public class CollectorPluginReportTest { - - @Test - public void testSerialize() throws IOException { - CollectorPluginReport r1 = new CollectorPluginReport(); - r1.put("a", "b"); - r1.setSuccess(true); - - String s = MAPPER.writeValueAsString(r1); - - Assertions.assertNotNull(s); - - CollectorPluginReport r2 = MAPPER.readValue(s, CollectorPluginReport.class); - - Assertions.assertTrue(r2.isSuccess(), "should be true"); - } - -}