From 98b9498b5745d1129ed665c2f22a83db595c478a Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 28 Jan 2021 09:51:17 +0100 Subject: [PATCH] Removed old messaging system not quite used from collection and Transformation workflow code refactor --- .../actionmanager/project/utils/ReadCSV.java | 2 +- .../project/utils/ReadExcel.java | 3 +- .../common/AggregationCounter.java | 60 ++++---- .../GenerateNativeStoreSparkJob.java | 133 ++++++----------- .../collection/plugin/CollectorPlugin.java | 4 +- .../plugin/oai/OaiCollectorPlugin.java | 12 +- .../collection/plugin/oai/OaiIterator.java | 20 +-- ...Exception.java => CollectorException.java} | 12 +- .../collection/worker/CollectorWorker.java | 93 ++++++++++++ .../worker/CollectorWorkerApplication.java | 55 +++++++ .../worker/DnetCollectorWorker.java | 139 ------------------ .../DnetCollectorWorkerApplication.java | 49 ------ .../worker/utils/CollectorPluginFactory.java | 8 +- .../worker/utils/HttpConnector.java | 24 +-- .../DnetTransformationException.java | 39 ++--- .../transformation/TransformSparkJobNode.java | 33 ++--- .../transformation/TransformationFactory.java | 87 ++++++----- .../dhp/transformation/xslt/Cleaner.java | 3 +- .../xslt/XSLTTransformationFunction.java | 102 ++++++------- .../collection_input_parameters.json | 36 ----- .../dhp/collection/collector_parameter.json | 6 + .../collection/oozie_app/config-default.xml | 18 +++ .../dhp/collection/oozie_app/workflow.xml | 77 ++++------ .../collector/worker/collector_parameter.json | 12 -- .../oozie_app/config-default.xml | 18 +++ .../dhp/transformation/oozie_app/workflow.xml | 69 ++++----- .../transformation_input_parameters.json | 22 +-- .../project/EXCELParserTest.java | 7 +- .../httpconnector/HttpConnectorTest.java | 9 +- .../DnetCollectorWorkerApplicationTests.java | 49 ++---- .../transformation/TransformationJobTest.java | 86 +++++++---- .../dhp/oa/graph/clean/CleaningFunctions.java | 24 +-- .../raw/MigrateDbEntitiesApplication.java | 2 +- .../raw/GenerateEntitiesApplicationTest.java | 2 +- .../dnetlib/dhp/oa/graph/raw/MappersTest.java | 2 +- 35 files changed, 597 insertions(+), 720 deletions(-) rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/{DnetCollectorException.java => CollectorException.java} (56%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorker.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorkerApplication.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collector/worker/collector_parameter.json create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java index dc6f46771..ca1c10611 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java @@ -17,8 +17,8 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; /** * Applies the parsing of a csv file and writes the Serialization of it in hdfs diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java index e665bc704..585a408f3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java @@ -4,7 +4,6 @@ package eu.dnetlib.dhp.actionmanager.project.utils; import java.io.*; import java.nio.charset.StandardCharsets; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -15,8 +14,8 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; /** * Applies the parsing of an excel file and writes the Serialization of it in hdfs diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationCounter.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationCounter.java index 1ac2cb54b..bf2fd22cb 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationCounter.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/aggregation/common/AggregationCounter.java @@ -1,45 +1,45 @@ -package eu.dnetlib.dhp.aggregation.common; -import org.apache.spark.util.LongAccumulator; +package eu.dnetlib.dhp.aggregation.common; import java.io.Serializable; +import org.apache.spark.util.LongAccumulator; public class AggregationCounter implements Serializable { - private LongAccumulator totalItems; - private LongAccumulator errorItems; - private LongAccumulator processedItems; + private LongAccumulator totalItems; + private LongAccumulator errorItems; + private LongAccumulator processedItems; - public AggregationCounter() { - } + public AggregationCounter() { + } - public AggregationCounter(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator processedItems) { - this.totalItems = totalItems; - this.errorItems = errorItems; - this.processedItems = processedItems; - } + public AggregationCounter(LongAccumulator totalItems, LongAccumulator errorItems, LongAccumulator processedItems) { + this.totalItems = totalItems; + this.errorItems = errorItems; + this.processedItems = processedItems; + } - public LongAccumulator getTotalItems() { - return totalItems; - } + public LongAccumulator getTotalItems() { + return totalItems; + } - public void setTotalItems(LongAccumulator totalItems) { - this.totalItems = totalItems; - } + public void setTotalItems(LongAccumulator totalItems) { + this.totalItems = totalItems; + } - public LongAccumulator getErrorItems() { - return errorItems; - } + public LongAccumulator getErrorItems() { + return errorItems; + } - public void setErrorItems(LongAccumulator errorItems) { - this.errorItems = errorItems; - } + public void setErrorItems(LongAccumulator errorItems) { + this.errorItems = errorItems; + } - public LongAccumulator getProcessedItems() { - return processedItems; - } + public LongAccumulator getProcessedItems() { + return processedItems; + } - public void setProcessedItems(LongAccumulator processedItems) { - this.processedItems = processedItems; - } + public void setProcessedItems(LongAccumulator processedItems) { + this.processedItems = processedItems; + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index c0bd4c940..c9c29b4ea 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -5,12 +5,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; import java.util.Objects; import java.util.Optional; -import org.apache.commons.cli.*; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.IntWritable; @@ -22,7 +19,6 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; import org.dom4j.Node; @@ -35,9 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.model.mdstore.Provenance; -import eu.dnetlib.message.Message; import eu.dnetlib.message.MessageManager; -import eu.dnetlib.message.MessageType; public class GenerateNativeStoreSparkJob { @@ -46,100 +40,62 @@ public class GenerateNativeStoreSparkJob { public static void main(String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - GenerateNativeStoreSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); + IOUtils + .toString( + GenerateNativeStoreSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); parser.parseArgument(args); final ObjectMapper jsonMapper = new ObjectMapper(); - final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); - final long dateOfCollection = new Long(parser.get("dateOfCollection")); + final String provenanceArgument = parser.get("provenance"); + log.info("Provenance is {}", provenanceArgument); + final Provenance provenance = jsonMapper.readValue(provenanceArgument, Provenance.class); + final String dateOfCollectionArgs = parser.get("dateOfCollection"); + log.info("dateOfCollection is {}", dateOfCollectionArgs); + final long dateOfCollection = new Long(dateOfCollectionArgs); + final String sequenceFileInputPath = parser.get("input"); + log.info("sequenceFileInputPath is {}", dateOfCollectionArgs); Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final Map ongoingMap = new HashMap<>(); - final Map reportMap = new HashMap<>(); - - final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); - SparkConf conf = new SparkConf(); runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + conf, + isSparkSessionManaged, + spark -> { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - final JavaPairRDD inputRDD = sc - .sequenceFile(parser.get("input"), IntWritable.class, Text.class); + final JavaPairRDD inputRDD = sc + .sequenceFile(sequenceFileInputPath, IntWritable.class, Text.class); - final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); - final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); + final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); + final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); - final MessageManager manager = new MessageManager( - parser.get("rabbitHost"), - parser.get("rabbitUser"), - parser.get("rabbitPassword"), - false, - false, - null); + final JavaRDD nativeStore = inputRDD + .map( + item -> parseRecord( + item._2().toString(), + parser.get("xpath"), + parser.get("encoding"), + provenance, + dateOfCollection, + totalItems, + invalidRecords)) + .filter(Objects::nonNull) + .distinct(); - final JavaRDD mappeRDD = inputRDD - .map( - item -> parseRecord( - item._2().toString(), - parser.get("xpath"), - parser.get("encoding"), - provenance, - dateOfCollection, - totalItems, - invalidRecords)) - .filter(Objects::nonNull) - .distinct(); + final Encoder encoder = Encoders.bean(MetadataRecord.class); + final Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder); + final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); + mdStoreRecords.add(mdstore.count()); - ongoingMap.put("ongoing", "0"); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } + mdstore.write().format("parquet").save(parser.get("output")); - final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); - final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); - mdStoreRecords.add(mdstore.count()); - ongoingMap.put("ongoing", "" + totalItems.value()); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } - mdstore.write().format("parquet").save(parser.get("output")); - reportMap.put("inputItem", "" + totalItems.value()); - reportMap.put("invalidRecords", "" + invalidRecords.value()); - reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); - if (!test) { - manager - .sendMessage( - new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), - parser.get("rabbitReportQueue"), - true, - false); - manager.close(); - } - }); + }); } @@ -166,12 +122,9 @@ public class GenerateNativeStoreSparkJob { } return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection); } catch (Throwable e) { - if (invalidRecords != null) - invalidRecords.add(1); - e.printStackTrace(); + invalidRecords.add(1); return null; } } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java index 4a0c70c45..7146e610e 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/CollectorPlugin.java @@ -4,9 +4,9 @@ package eu.dnetlib.dhp.collection.plugin; import java.util.stream.Stream; import eu.dnetlib.collector.worker.model.ApiDescriptor; -import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.CollectorException; public interface CollectorPlugin { - Stream collect(ApiDescriptor api) throws DnetCollectorException; + Stream collect(ApiDescriptor api) throws CollectorException; } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java index 7f71f401d..c4c52271a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiCollectorPlugin.java @@ -15,7 +15,7 @@ import com.google.common.collect.Lists; import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.CollectorException; public class OaiCollectorPlugin implements CollectorPlugin { @@ -27,7 +27,7 @@ public class OaiCollectorPlugin implements CollectorPlugin { private OaiIteratorFactory oaiIteratorFactory; @Override - public Stream collect(final ApiDescriptor api) throws DnetCollectorException { + public Stream collect(final ApiDescriptor api) throws CollectorException { final String baseUrl = api.getBaseUrl(); final String mdFormat = api.getParams().get(FORMAT_PARAM); final String setParam = api.getParams().get(OAI_SET_PARAM); @@ -46,19 +46,19 @@ public class OaiCollectorPlugin implements CollectorPlugin { } if (baseUrl == null || baseUrl.isEmpty()) { - throw new DnetCollectorException("Param 'baseurl' is null or empty"); + throw new CollectorException("Param 'baseurl' is null or empty"); } if (mdFormat == null || mdFormat.isEmpty()) { - throw new DnetCollectorException("Param 'mdFormat' is null or empty"); + throw new CollectorException("Param 'mdFormat' is null or empty"); } if (fromDate != null && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) { - throw new DnetCollectorException("Invalid date (YYYY-MM-DD): " + fromDate); + throw new CollectorException("Invalid date (YYYY-MM-DD): " + fromDate); } if (untilDate != null && !untilDate.matches("\\d{4}-\\d{2}-\\d{2}")) { - throw new DnetCollectorException("Invalid date (YYYY-MM-DD): " + untilDate); + throw new CollectorException("Invalid date (YYYY-MM-DD): " + untilDate); } final Iterator> iters = sets diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java index d61f13fb5..e54bae67d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/plugin/oai/OaiIterator.java @@ -16,7 +16,7 @@ import org.dom4j.DocumentException; import org.dom4j.Node; import org.dom4j.io.SAXReader; -import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.CollectorException; import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import eu.dnetlib.dhp.collection.worker.utils.XmlCleaner; @@ -58,7 +58,7 @@ public class OaiIterator implements Iterator { this.started = true; try { this.token = firstPage(); - } catch (final DnetCollectorException e) { + } catch (final CollectorException e) { throw new RuntimeException(e); } } @@ -80,7 +80,7 @@ public class OaiIterator implements Iterator { while (queue.isEmpty() && token != null && !token.isEmpty()) { try { token = otherPages(token); - } catch (final DnetCollectorException e) { + } catch (final CollectorException e) { throw new RuntimeException(e); } } @@ -92,7 +92,7 @@ public class OaiIterator implements Iterator { public void remove() { } - private String firstPage() throws DnetCollectorException { + private String firstPage() throws CollectorException { try { String url = baseUrl + "?verb=ListRecords&metadataPrefix=" + URLEncoder.encode(mdFormat, "UTF-8"); if (set != null && !set.isEmpty()) { @@ -108,7 +108,7 @@ public class OaiIterator implements Iterator { return downloadPage(url); } catch (final UnsupportedEncodingException e) { - throw new DnetCollectorException(e); + throw new CollectorException(e); } } @@ -126,18 +126,18 @@ public class OaiIterator implements Iterator { return result.trim(); } - private String otherPages(final String resumptionToken) throws DnetCollectorException { + private String otherPages(final String resumptionToken) throws CollectorException { try { return downloadPage( baseUrl + "?verb=ListRecords&resumptionToken=" + URLEncoder.encode(resumptionToken, "UTF-8")); } catch (final UnsupportedEncodingException e) { - throw new DnetCollectorException(e); + throw new CollectorException(e); } } - private String downloadPage(final String url) throws DnetCollectorException { + private String downloadPage(final String url) throws CollectorException { final String xml = httpConnector.getInputSource(url); Document doc; @@ -151,7 +151,7 @@ public class OaiIterator implements Iterator { } catch (final DocumentException e1) { final String resumptionToken = extractResumptionToken(xml); if (resumptionToken == null) { - throw new DnetCollectorException("Error parsing cleaned document:" + cleaned, e1); + throw new CollectorException("Error parsing cleaned document:" + cleaned, e1); } return resumptionToken; } @@ -164,7 +164,7 @@ public class OaiIterator implements Iterator { log.warn("noRecordsMatch for oai call: " + url); return null; } else { - throw new DnetCollectorException(code + " - " + errorNode.getText()); + throw new CollectorException(code + " - " + errorNode.getText()); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java similarity index 56% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorException.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java index f40962c21..71d225f13 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorException.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorException.java @@ -1,16 +1,16 @@ package eu.dnetlib.dhp.collection.worker; -public class DnetCollectorException extends Exception { +public class CollectorException extends Exception { /** */ private static final long serialVersionUID = -290723075076039757L; - public DnetCollectorException() { + public CollectorException() { super(); } - public DnetCollectorException( + public CollectorException( final String message, final Throwable cause, final boolean enableSuppression, @@ -18,15 +18,15 @@ public class DnetCollectorException extends Exception { super(message, cause, enableSuppression, writableStackTrace); } - public DnetCollectorException(final String message, final Throwable cause) { + public CollectorException(final String message, final Throwable cause) { super(message, cause); } - public DnetCollectorException(final String message) { + public CollectorException(final String message) { super(message); } - public DnetCollectorException(final Throwable cause) { + public CollectorException(final Throwable cause) { super(cause); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java new file mode 100644 index 000000000..380db641a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorker.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.collection.worker; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.collector.worker.model.ApiDescriptor; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; + +public class CollectorWorker { + + private static final Logger log = LoggerFactory.getLogger(CollectorWorker.class); + + private final CollectorPluginFactory collectorPluginFactory; + + private final ApiDescriptor api; + + private final String hdfsuri; + + private final String hdfsPath; + + public CollectorWorker( + final CollectorPluginFactory collectorPluginFactory, + final ApiDescriptor api, + final String hdfsuri, + final String hdfsPath) { + this.collectorPluginFactory = collectorPluginFactory; + this.api = api; + this.hdfsuri = hdfsuri; + this.hdfsPath = hdfsPath; + + } + + public void collect() throws CollectorException { + try { + final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol()); + + // ====== Init HDFS File System Object + Configuration conf = new Configuration(); + // Set FileSystem URI + conf.set("fs.defaultFS", hdfsuri); + // Because of Maven + conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); + + System.setProperty("hadoop.home.dir", "/"); + // Get the filesystem - HDFS + FileSystem.get(URI.create(hdfsuri), conf); + Path hdfswritepath = new Path(hdfsPath); + + log.info("Created path " + hdfswritepath.toString()); + + final AtomicInteger counter = new AtomicInteger(0); + try (SequenceFile.Writer writer = SequenceFile + .createWriter( + conf, + SequenceFile.Writer.file(hdfswritepath), + SequenceFile.Writer.keyClass(IntWritable.class), + SequenceFile.Writer.valueClass(Text.class))) { + final IntWritable key = new IntWritable(counter.get()); + final Text value = new Text(); + plugin + .collect(api) + .forEach( + content -> { + key.set(counter.getAndIncrement()); + value.set(content); + try { + writer.append(key, value); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } catch (Throwable e) { + throw new CollectorException("Error on collecting ", e); + } + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java new file mode 100644 index 000000000..5e8d0f9c2 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/CollectorWorkerApplication.java @@ -0,0 +1,55 @@ + +package eu.dnetlib.dhp.collection.worker; + +import org.apache.commons.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.collector.worker.model.ApiDescriptor; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; + +/** + * DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module + * will be executed on the hadoop cluster and taking in input some parameters that tells it which is the right collector + * plugin to use and where store the data into HDFS path + * + * @author Sandro La Bruzzo + */ +public class CollectorWorkerApplication { + + private static final Logger log = LoggerFactory.getLogger(CollectorWorkerApplication.class); + + private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); + + /** + * @param args + */ + public static void main(final String[] args) throws Exception { + + final ArgumentApplicationParser argumentParser = new ArgumentApplicationParser( + IOUtils + .toString( + CollectorWorker.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/collector_parameter.json"))); + argumentParser.parseArgument(args); + + final String hdfsuri = argumentParser.get("namenode"); + + log.info("hdfsURI is {}", hdfsuri); + final String hdfsPath = argumentParser.get("hdfsPath"); + log.info("hdfsPath is {}" + hdfsPath); + final String apiDescriptor = argumentParser.get("apidescriptor"); + log.info("apiDescriptor is {}" + apiDescriptor); + + final ObjectMapper jsonMapper = new ObjectMapper(); + + final ApiDescriptor api = jsonMapper.readValue(apiDescriptor, ApiDescriptor.class); + + final CollectorWorker worker = new CollectorWorker(collectorPluginFactory, api, hdfsuri, hdfsPath); + worker.collect(); + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorker.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorker.java deleted file mode 100644 index e686ad518..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorker.java +++ /dev/null @@ -1,139 +0,0 @@ - -package eu.dnetlib.dhp.collection.worker; - -import java.io.IOException; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.collector.worker.model.ApiDescriptor; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; -import eu.dnetlib.message.Message; -import eu.dnetlib.message.MessageManager; -import eu.dnetlib.message.MessageType; - -public class DnetCollectorWorker { - - private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorker.class); - - private final CollectorPluginFactory collectorPluginFactory; - - private final ArgumentApplicationParser argumentParser; - - private final MessageManager manager; - - public DnetCollectorWorker( - final CollectorPluginFactory collectorPluginFactory, - final ArgumentApplicationParser argumentParser, - final MessageManager manager) - throws DnetCollectorException { - this.collectorPluginFactory = collectorPluginFactory; - this.argumentParser = argumentParser; - this.manager = manager; - } - - public void collect() throws DnetCollectorException { - try { - final ObjectMapper jsonMapper = new ObjectMapper(); - final ApiDescriptor api = jsonMapper.readValue(argumentParser.get("apidescriptor"), ApiDescriptor.class); - - final CollectorPlugin plugin = collectorPluginFactory.getPluginByProtocol(api.getProtocol()); - - final String hdfsuri = argumentParser.get("namenode"); - - // ====== Init HDFS File System Object - Configuration conf = new Configuration(); - // Set FileSystem URI - conf.set("fs.defaultFS", hdfsuri); - // Because of Maven - conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); - conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); - - System.setProperty("HADOOP_USER_NAME", argumentParser.get("userHDFS")); - System.setProperty("hadoop.home.dir", "/"); - // Get the filesystem - HDFS - FileSystem.get(URI.create(hdfsuri), conf); - Path hdfswritepath = new Path(argumentParser.get("hdfsPath")); - - log.info("Created path " + hdfswritepath.toString()); - - final Map ongoingMap = new HashMap<>(); - final Map reportMap = new HashMap<>(); - final AtomicInteger counter = new AtomicInteger(0); - try (SequenceFile.Writer writer = SequenceFile - .createWriter( - conf, - SequenceFile.Writer.file(hdfswritepath), - SequenceFile.Writer.keyClass(IntWritable.class), - SequenceFile.Writer.valueClass(Text.class))) { - final IntWritable key = new IntWritable(counter.get()); - final Text value = new Text(); - plugin - .collect(api) - .forEach( - content -> { - key.set(counter.getAndIncrement()); - value.set(content); - if (counter.get() % 10 == 0) { - try { - ongoingMap.put("ongoing", "" + counter.get()); - log - .debug( - "Sending message: " - + manager - .sendMessage( - new Message( - argumentParser.get("workflowId"), - "Collection", - MessageType.ONGOING, - ongoingMap), - argumentParser.get("rabbitOngoingQueue"), - true, - false)); - } catch (Exception e) { - log.error("Error on sending message ", e); - } - } - try { - writer.append(key, value); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - ongoingMap.put("ongoing", "" + counter.get()); - manager - .sendMessage( - new Message( - argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, ongoingMap), - argumentParser.get("rabbitOngoingQueue"), - true, - false); - reportMap.put("collected", "" + counter.get()); - manager - .sendMessage( - new Message( - argumentParser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), - argumentParser.get("rabbitOngoingQueue"), - true, - false); - manager.close(); - } catch (Throwable e) { - throw new DnetCollectorException("Error on collecting ", e); - } - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorkerApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorkerApplication.java deleted file mode 100644 index da30e8793..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/DnetCollectorWorkerApplication.java +++ /dev/null @@ -1,49 +0,0 @@ - -package eu.dnetlib.dhp.collection.worker; - -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; -import eu.dnetlib.message.MessageManager; - -/** - * DnetCollectortWorkerApplication is the main class responsible to start the Dnet Collection into HDFS. This module - * will be executed on the hadoop cluster and taking in input some parameters that tells it which is the right collector - * plugin to use and where store the data into HDFS path - * - * @author Sandro La Bruzzo - */ -public class DnetCollectorWorkerApplication { - - private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorkerApplication.class); - - private static final CollectorPluginFactory collectorPluginFactory = new CollectorPluginFactory(); - - private static ArgumentApplicationParser argumentParser; - - /** @param args */ - public static void main(final String[] args) throws Exception { - - argumentParser = new ArgumentApplicationParser( - IOUtils - .toString( - DnetCollectorWorker.class - .getResourceAsStream( - "/eu/dnetlib/collector/worker/collector_parameter.json"))); - argumentParser.parseArgument(args); - log.info("hdfsPath =" + argumentParser.get("hdfsPath")); - log.info("json = " + argumentParser.get("apidescriptor")); - final MessageManager manager = new MessageManager( - argumentParser.get("rabbitHost"), - argumentParser.get("rabbitUser"), - argumentParser.get("rabbitPassword"), - false, - false, - null); - final DnetCollectorWorker worker = new DnetCollectorWorker(collectorPluginFactory, argumentParser, manager); - worker.collect(); - } -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java index 7a0028e79..6b070b191 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/CollectorPluginFactory.java @@ -3,18 +3,18 @@ package eu.dnetlib.dhp.collection.worker.utils; import eu.dnetlib.dhp.collection.plugin.CollectorPlugin; import eu.dnetlib.dhp.collection.plugin.oai.OaiCollectorPlugin; -import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.CollectorException; public class CollectorPluginFactory { - public CollectorPlugin getPluginByProtocol(final String protocol) throws DnetCollectorException { + public CollectorPlugin getPluginByProtocol(final String protocol) throws CollectorException { if (protocol == null) - throw new DnetCollectorException("protocol cannot be null"); + throw new CollectorException("protocol cannot be null"); switch (protocol.toLowerCase().trim()) { case "oai": return new OaiCollectorPlugin(); default: - throw new DnetCollectorException("UNknown protocol"); + throw new CollectorException("UNknown protocol"); } } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java index 5d6108fad..ff3c18aba 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/worker/utils/HttpConnector.java @@ -19,7 +19,7 @@ import org.apache.commons.lang.math.NumberUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.CollectorException; public class HttpConnector { @@ -42,9 +42,9 @@ public class HttpConnector { * * @param requestUrl the URL * @return the content of the downloaded resource - * @throws DnetCollectorException when retrying more than maxNumberOfRetry times + * @throws CollectorException when retrying more than maxNumberOfRetry times */ - public String getInputSource(final String requestUrl) throws DnetCollectorException { + public String getInputSource(final String requestUrl) throws CollectorException { return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList()); } @@ -53,15 +53,15 @@ public class HttpConnector { * * @param requestUrl the URL * @return the content of the downloaded resource as InputStream - * @throws DnetCollectorException when retrying more than maxNumberOfRetry times + * @throws CollectorException when retrying more than maxNumberOfRetry times */ - public InputStream getInputSourceAsStream(final String requestUrl) throws DnetCollectorException { + public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorException { return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); } private String attemptDownlaodAsString( final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList) - throws DnetCollectorException { + throws CollectorException { try { final InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); try { @@ -75,16 +75,16 @@ public class HttpConnector { IOUtils.closeQuietly(s); } } catch (final InterruptedException e) { - throw new DnetCollectorException(e); + throw new CollectorException(e); } } private InputStream attemptDownload( final String requestUrl, final int retryNumber, final CollectorPluginErrorLogList errorList) - throws DnetCollectorException { + throws CollectorException { if (retryNumber > maxNumberOfRetry) { - throw new DnetCollectorException("Max number of retries exceeded. Cause: \n " + errorList); + throw new CollectorException("Max number of retries exceeded. Cause: \n " + errorList); } log.debug("Downloading " + requestUrl + " - try: " + retryNumber); @@ -144,7 +144,7 @@ public class HttpConnector { return attemptDownload(requestUrl, retryNumber + 1, errorList); } } catch (final InterruptedException e) { - throw new DnetCollectorException(e); + throw new CollectorException(e); } } @@ -173,13 +173,13 @@ public class HttpConnector { } private String obtainNewLocation(final Map> headerMap) - throws DnetCollectorException { + throws CollectorException { for (final String key : headerMap.keySet()) { if (key != null && key.toLowerCase().equals("location") && headerMap.get(key).size() > 0) { return headerMap.get(key).get(0); } } - throw new DnetCollectorException( + throw new CollectorException( "The requested url has been MOVED, but 'location' param is MISSING"); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/DnetTransformationException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/DnetTransformationException.java index 2c932e40b..45bd844e2 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/DnetTransformationException.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/DnetTransformationException.java @@ -1,28 +1,29 @@ + package eu.dnetlib.dhp.transformation; public class DnetTransformationException extends Exception { - public DnetTransformationException() { - super(); - } + public DnetTransformationException() { + super(); + } - public DnetTransformationException( - final String message, - final Throwable cause, - final boolean enableSuppression, - final boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } + public DnetTransformationException( + final String message, + final Throwable cause, + final boolean enableSuppression, + final boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } - public DnetTransformationException(final String message, final Throwable cause) { - super(message, cause); - } + public DnetTransformationException(final String message, final Throwable cause) { + super(message, cause); + } - public DnetTransformationException(final String message) { - super(message); - } + public DnetTransformationException(final String message) { + super(message); + } - public DnetTransformationException(final Throwable cause) { - super(cause); - } + public DnetTransformationException(final Throwable cause) { + super(cause); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java index 6e07e5173..c6ed5a1e3 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformSparkJobNode.java @@ -9,11 +9,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import eu.dnetlib.dhp.aggregation.common.AggregationCounter; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -30,10 +25,15 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.vocabulary.VocabularyHelper; +import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; import eu.dnetlib.dhp.utils.DHPUtils; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.message.Message; import eu.dnetlib.message.MessageManager; import eu.dnetlib.message.MessageType; @@ -59,10 +59,9 @@ public class TransformSparkJobNode { .orElse(Boolean.TRUE); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("input"); - final String outputPath = parser.get("output"); + final String inputPath = parser.get("mdstoreInputPath"); + final String outputPath = parser.get("mdstoreOutputPath"); // TODO this variable will be used after implementing Messaging with DNet Aggregator - final String workflowId = parser.get("workflowId"); final String isLookupUrl = parser.get("isLookupUrl"); log.info(String.format("isLookupUrl: %s", isLookupUrl)); @@ -76,24 +75,22 @@ public class TransformSparkJobNode { spark -> transformRecords(parser.getObjectMap(), isLookupService, spark, inputPath, outputPath)); } - - public static void transformRecords(final Mapargs, final ISLookUpService isLookUpService, final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException { + public static void transformRecords(final Map args, final ISLookUpService isLookUpService, + final SparkSession spark, final String inputPath, final String outputPath) throws DnetTransformationException { final LongAccumulator totalItems = spark.sparkContext().longAccumulator("TotalItems"); final LongAccumulator errorItems = spark.sparkContext().longAccumulator("errorItems"); final LongAccumulator transformedItems = spark.sparkContext().longAccumulator("transformedItems"); - final AggregationCounter ct = new AggregationCounter(totalItems, errorItems,transformedItems ); + final AggregationCounter ct = new AggregationCounter(totalItems, errorItems, transformedItems); final Encoder encoder = Encoders.bean(MetadataRecord.class); final Dataset mdstoreInput = spark.read().format("parquet").load(inputPath).as(encoder); - final MapFunction XSLTTransformationFunction = TransformationFactory.getTransformationPlugin(args,ct, isLookUpService); + final MapFunction XSLTTransformationFunction = TransformationFactory + .getTransformationPlugin(args, ct, isLookUpService); mdstoreInput.map(XSLTTransformationFunction, encoder).write().save(outputPath); - log.info("Transformed item "+ ct.getProcessedItems().count()); - log.info("Total item "+ ct.getTotalItems().count()); - log.info("Transformation Error item "+ ct.getErrorItems().count()); + log.info("Transformed item " + ct.getProcessedItems().count()); + log.info("Total item " + ct.getTotalItems().count()); + log.info("Transformation Error item " + ct.getErrorItems().count()); } - - - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java index 0296458a5..58292139a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/TransformationFactory.java @@ -1,62 +1,69 @@ + package eu.dnetlib.dhp.transformation; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.api.java.function.MapFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.api.java.function.MapFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; public class TransformationFactory { - private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class); - public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//TITLE = \"%s\" return $x//CODE/text()"; + private static final Logger log = LoggerFactory.getLogger(TransformationFactory.class); + public static final String TRULE_XQUERY = "for $x in collection('/db/DRIVER/TransformationRuleDSResources/TransformationRuleDSResourceType') where $x//TITLE = \"%s\" return $x//CODE/text()"; + public static MapFunction getTransformationPlugin( + final Map jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) + throws DnetTransformationException { - public static MapFunction getTransformationPlugin(final Map jobArgument, final AggregationCounter counters, final ISLookUpService isLookupService) throws DnetTransformationException { + try { + final String transformationPlugin = jobArgument.get("transformationPlugin"); - try { - final String transformationPlugin = jobArgument.get("transformationPlugin"); + log.info("Transformation plugin required " + transformationPlugin); + switch (transformationPlugin) { + case "XSLT_TRANSFORM": { + final String transformationRuleName = jobArgument.get("transformationRuleTitle"); + if (StringUtils.isBlank(transformationRuleName)) + throw new DnetTransformationException("Missing Parameter transformationRule"); + final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); - log.info("Transformation plugin required "+transformationPlugin); - switch (transformationPlugin) { - case "XSLT_TRANSFORM": { - final String transformationRuleName = jobArgument.get("transformationRule"); - if (StringUtils.isBlank(transformationRuleName)) - throw new DnetTransformationException("Missing Parameter transformationRule"); - final VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService); + final String transformationRule = queryTransformationRuleFromIS( + transformationRuleName, isLookupService); - final String transformationRule = queryTransformationRuleFromIS(transformationRuleName, isLookupService); + final long dateOfTransformation = new Long(jobArgument.get("dateOfTransformation")); + return new XSLTTransformationFunction(counters, transformationRule, dateOfTransformation, + vocabularies); - final long dateOfTransformation = new Long(jobArgument.get("dateOfTransformation")); - return new XSLTTransformationFunction(counters,transformationRule,dateOfTransformation,vocabularies); + } + default: + throw new DnetTransformationException( + "transformation plugin does not exists for " + transformationPlugin); - } - default: - throw new DnetTransformationException("transformation plugin does not exists for " + transformationPlugin); + } - } + } catch (Throwable e) { + throw new DnetTransformationException(e); + } + } - } catch (Throwable e) { - throw new DnetTransformationException(e); - } - } - - private static String queryTransformationRuleFromIS(final String transformationRuleName, final ISLookUpService isLookUpService) throws Exception { - final String query = String.format(TRULE_XQUERY, transformationRuleName); - log.info("asking query to IS: "+ query); - List result = isLookUpService.quickSearchProfile(query); - - if (result==null || result.isEmpty()) - throw new DnetTransformationException("Unable to find transformation rule with name: "+ transformationRuleName); - return result.get(0); - } + private static String queryTransformationRuleFromIS(final String transformationRuleName, + final ISLookUpService isLookUpService) throws Exception { + final String query = String.format(TRULE_XQUERY, transformationRuleName); + log.info("asking query to IS: " + query); + List result = isLookUpService.quickSearchProfile(query); + if (result == null || result.isEmpty()) + throw new DnetTransformationException( + "Unable to find transformation rule with name: " + transformationRuleName); + return result.get(0); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/Cleaner.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/Cleaner.java index 2c6d776af..7b0fdd484 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/Cleaner.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/Cleaner.java @@ -1,7 +1,6 @@ package eu.dnetlib.dhp.transformation.xslt; - import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Qualifier; import net.sf.saxon.s9api.*; @@ -40,6 +39,6 @@ public class Cleaner implements ExtensionFunction, Serializable { Qualifier cleanedValue = vocabularies.getSynonymAsQualifier(vocabularyName, currentValue); return new XdmAtomicValue( - cleanedValue != null ? cleanedValue.getClassid() : currentValue); + cleanedValue != null ? cleanedValue.getClassid() : currentValue); } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java index c02b83345..d8707cd76 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/transformation/xslt/XSLTTransformationFunction.java @@ -1,66 +1,68 @@ package eu.dnetlib.dhp.transformation.xslt; +import java.io.ByteArrayInputStream; +import java.io.StringWriter; + +import javax.xml.transform.stream.StreamSource; + +import org.apache.spark.api.java.function.MapFunction; + import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; import net.sf.saxon.s9api.*; -import org.apache.spark.api.java.function.MapFunction; - -import javax.xml.transform.stream.StreamSource; -import java.io.ByteArrayInputStream; -import java.io.StringWriter; public class XSLTTransformationFunction implements MapFunction { - private final AggregationCounter aggregationCounter; + private final AggregationCounter aggregationCounter; - private final String transformationRule; + private final String transformationRule; - private final Cleaner cleanFunction; + private final Cleaner cleanFunction; - private final long dateOfTransformation; + private final long dateOfTransformation; - public XSLTTransformationFunction( - final AggregationCounter aggregationCounter, - final String transformationRule, - long dateOfTransformation, - final VocabularyGroup vocabularies) - throws Exception { - this.aggregationCounter = aggregationCounter; - this.transformationRule = transformationRule; - this.dateOfTransformation = dateOfTransformation; - cleanFunction = new Cleaner(vocabularies); - } + public XSLTTransformationFunction( + final AggregationCounter aggregationCounter, + final String transformationRule, + long dateOfTransformation, + final VocabularyGroup vocabularies) + throws Exception { + this.aggregationCounter = aggregationCounter; + this.transformationRule = transformationRule; + this.dateOfTransformation = dateOfTransformation; + cleanFunction = new Cleaner(vocabularies); + } - @Override - public MetadataRecord call(MetadataRecord value) { - aggregationCounter.getTotalItems().add(1); - try { - Processor processor = new Processor(false); - processor.registerExtensionFunction(cleanFunction); - final XsltCompiler comp = processor.newXsltCompiler(); - XsltExecutable xslt = comp - .compile(new StreamSource(new ByteArrayInputStream(transformationRule.getBytes()))); - XdmNode source = processor - .newDocumentBuilder() - .build(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes()))); - XsltTransformer trans = xslt.load(); - trans.setInitialContextNode(source); - final StringWriter output = new StringWriter(); - Serializer out = processor.newSerializer(output); - out.setOutputProperty(Serializer.Property.METHOD, "xml"); - out.setOutputProperty(Serializer.Property.INDENT, "yes"); - trans.setDestination(out); - trans.transform(); - final String xml = output.toString(); - value.setBody(xml); - value.setDateOfTransformation(dateOfTransformation); - aggregationCounter.getProcessedItems().add(1); - return value; - } catch (Throwable e) { - aggregationCounter.getErrorItems().add(1); - return null; - } - } + @Override + public MetadataRecord call(MetadataRecord value) { + aggregationCounter.getTotalItems().add(1); + try { + Processor processor = new Processor(false); + processor.registerExtensionFunction(cleanFunction); + final XsltCompiler comp = processor.newXsltCompiler(); + XsltExecutable xslt = comp + .compile(new StreamSource(new ByteArrayInputStream(transformationRule.getBytes()))); + XdmNode source = processor + .newDocumentBuilder() + .build(new StreamSource(new ByteArrayInputStream(value.getBody().getBytes()))); + XsltTransformer trans = xslt.load(); + trans.setInitialContextNode(source); + final StringWriter output = new StringWriter(); + Serializer out = processor.newSerializer(output); + out.setOutputProperty(Serializer.Property.METHOD, "xml"); + out.setOutputProperty(Serializer.Property.INDENT, "yes"); + trans.setDestination(out); + trans.transform(); + final String xml = output.toString(); + value.setBody(xml); + value.setDateOfTransformation(dateOfTransformation); + aggregationCounter.getProcessedItems().add(1); + return value; + } catch (Throwable e) { + aggregationCounter.getErrorItems().add(1); + return null; + } + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json index 4a6aec5ee..7f5113930 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collection_input_parameters.json @@ -41,46 +41,10 @@ "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true }, - { - "paramName": "ru", - "paramLongName": "rabbitUser", - "paramDescription": "the user to connect with RabbitMq for messaging", - "paramRequired": true - }, - { - "paramName": "rp", - "paramLongName": "rabbitPassword", - "paramDescription": "the password to connect with RabbitMq for messaging", - "paramRequired": true - }, - { - "paramName": "rh", - "paramLongName": "rabbitHost", - "paramDescription": "the host of the RabbitMq server", - "paramRequired": true - }, - { - "paramName": "ro", - "paramLongName": "rabbitOngoingQueue", - "paramDescription": "the name of the ongoing queue", - "paramRequired": true - }, - { - "paramName": "rr", - "paramLongName": "rabbitReportQueue", - "paramDescription": "the name of the report queue", - "paramRequired": true - }, { "paramName": "w", "paramLongName": "workflowId", "paramDescription": "the identifier of the dnet Workflow", - "paramRequired": true - }, - { - "paramName": "t", - "paramLongName": "isTest", - "paramDescription": "the name of the report queue", "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json new file mode 100644 index 000000000..901664e0d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/collector_parameter.json @@ -0,0 +1,6 @@ +[ + {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true}, + {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true}, + {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true}, + {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": false} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml index 3e7f68401..38cd83da7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/oozie_app/workflow.xml @@ -1,10 +1,5 @@ - - sequenceFilePath - the path to store the sequence file of the native metadata collected - - mdStorePath the path of the native mdstore @@ -39,72 +34,52 @@ The identifier of the workflow + + ${jobTracker} + ${nameNode} + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - - - + - ${jobTracker} - ${nameNode} - eu.dnetlib.dhp.collection.worker.DnetCollectorWorker - -p${sequenceFilePath} - -a${apiDescription} - -n${nameNode} - -rh${rmq_host} - -ru${rmq_user} - -rp${rmq_pwd} - -rr${rmq_report} - -ro${rmq_ongoing} - -usandro.labruzzo - -w${workflowId} + eu.dnetlib.dhp.collection.worker.CollectorWorkerApplication + --hdfsPath${workingDir}/sequenceFile_${mdstoreVersion} + --apidescriptor${apiDescription} + --namenode${nameNode} + - ${jobTracker} - ${nameNode} yarn cluster - GenerateNativeStoreSparkJob + Generate Native MetadataStore eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob - dhp-aggregations-1.0.0-SNAPSHOT.jar - --num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" - --encoding ${metadataEncoding} - --dateOfCollection ${timestamp} - --provenance ${dataSourceInfo} + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --encoding${metadataEncoding} + --dateOfCollection${timestamp} + --provenance${dataSourceInfo} --xpath${identifierPath} - --input${sequenceFilePath} + --input${workingDir}/sequenceFile --output${mdStorePath} - -rh${rmq_host} - -ru${rmq_user} - -rp${rmq_pwd} - -rr${rmq_report} - -ro${rmq_ongoing} -w${workflowId} - - - - - - - - diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collector/worker/collector_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collector/worker/collector_parameter.json deleted file mode 100644 index c247d15e4..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collector/worker/collector_parameter.json +++ /dev/null @@ -1,12 +0,0 @@ -[ - {"paramName":"p", "paramLongName":"hdfsPath", "paramDescription": "the path where storing the sequential file", "paramRequired": true}, - {"paramName":"a", "paramLongName":"apidescriptor", "paramDescription": "the JSON encoding of the API Descriptor", "paramRequired": true}, - {"paramName":"n", "paramLongName":"namenode", "paramDescription": "the Name Node URI", "paramRequired": true}, - {"paramName":"u", "paramLongName":"userHDFS", "paramDescription": "the user wich create the hdfs seq file", "paramRequired": true}, - {"paramName":"ru", "paramLongName":"rabbitUser", "paramDescription": "the user to connect with RabbitMq for messaging", "paramRequired": true}, - {"paramName":"rp", "paramLongName":"rabbitPassword", "paramDescription": "the password to connect with RabbitMq for messaging", "paramRequired": true}, - {"paramName":"rh", "paramLongName":"rabbitHost", "paramDescription": "the host of the RabbitMq server", "paramRequired": true}, - {"paramName":"ro", "paramLongName":"rabbitOngoingQueue", "paramDescription": "the name of the ongoing queue", "paramRequired": true}, - {"paramName":"rr", "paramLongName":"rabbitReportQueue", "paramDescription": "the name of the report queue", "paramRequired": true}, - {"paramName":"w", "paramLongName":"workflowId", "paramDescription": "the identifier of the dnet Workflow", "paramRequired": true} -] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml new file mode 100644 index 000000000..2e0ed9aee --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/config-default.xml @@ -0,0 +1,18 @@ + + + jobTracker + yarnRM + + + nameNode + hdfs://nameservice1 + + + oozie.use.system.libpath + true + + + oozie.action.sharelib.for.spark + spark2 + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml index 4b1e3d84b..b36bc3766 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/oozie_app/workflow.xml @@ -2,7 +2,7 @@ mdstoreInputPath - the path of the input MDStore + the path of the native MDStore @@ -11,66 +11,57 @@ - transformationRule + transformationRuleTitle The transformation Rule to apply - timestamp - The timestamp of the collection date + transformationPlugin + The transformation Plugin - workflowId - The identifier of the workflow + dateOfTransformation + The timestamp of the transformation date + + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - - - - + - ${jobTracker} - ${nameNode} yarn cluster - MDBuilder + Transform MetadataStore eu.dnetlib.dhp.transformation.TransformSparkJobNode - dhp-aggregations-1.0.0-SNAPSHOT.jar - --num-executors 50 --conf spark.yarn.jars="hdfs://hadoop-rm1.garr-pa1.d4science.org:8020/user/oozie/share/lib/lib_20180405103059/spark2" - --dateOfCollection ${timestamp} - -mt yarn - --input${mdstoreInputPath} - --output${mdstoreOutputPath} - -w${workflowId} - -tr${transformationRule} - -ru${rmq_user} - -rp${rmq_pwd} - -rh${rmq_host} - -ro${rmq_ongoing} - -rr${rmq_report} + dhp-aggregations-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + --mdstoreInputPath${mdstoreInputPath} + --mdstoreOutputPath${mdstoreOutputPath} + --dateOfTransformation${dateOfTransformation} + --transformationPlugin${transformationPlugin} + --transformationRuleTitle${transformationRuleTitle} + + - - - - - - - - + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json index fd2a96ea0..cbd2f25ab 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/transformation/transformation_input_parameters.json @@ -13,28 +13,32 @@ }, { "paramName": "i", - "paramLongName": "input", + "paramLongName": "mdstoreInputPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true }, { "paramName": "o", - "paramLongName": "output", + "paramLongName": "mdstoreOutputPath", "paramDescription": "the path of the result DataFrame on HDFS", "paramRequired": true }, - { - "paramName": "w", - "paramLongName": "workflowId", - "paramDescription": "the identifier of the dnet Workflow", - "paramRequired": true - }, { "paramName": "tr", - "paramLongName": "transformationRule", + "paramLongName": "transformationRuleTitle", "paramDescription": "the transformation Rule to apply to the input MDStore", "paramRequired": true }, + + { + "paramName": "i", + "paramLongName": "isLookupUrl", + "paramDescription": "the Information System Service LookUp URL", + "paramRequired": true + }, + + + { "paramName": "tp", "paramLongName": "transformationPlugin", diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java index c1142ad9c..5c37e9ec3 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java @@ -6,16 +6,15 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import eu.dnetlib.dhp.collection.worker.DnetCollectorException; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; - import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser; +import eu.dnetlib.dhp.collection.worker.CollectorException; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; @Disabled public class EXCELParserTest { @@ -31,7 +30,7 @@ public class EXCELParserTest { } @Test - public void test1() throws DnetCollectorException, IOException, InvalidFormatException, ClassNotFoundException, + public void test1() throws CollectorException, IOException, InvalidFormatException, ClassNotFoundException, IllegalAccessException, InstantiationException { EXCELParser excelParser = new EXCELParser(); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java index 3b9d1c3ab..f5ef280a0 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java @@ -1,8 +1,6 @@ package eu.dnetlib.dhp.actionmanager.project.httpconnector; -import eu.dnetlib.dhp.collection.worker.DnetCollectorException; -import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; @@ -11,6 +9,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import eu.dnetlib.dhp.collection.worker.CollectorException; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; + @Disabled public class HttpConnectorTest { @@ -31,12 +32,12 @@ public class HttpConnectorTest { @Test - public void testGetInputSource() throws DnetCollectorException { + public void testGetInputSource() throws CollectorException { System.out.println(connector.getInputSource(URL)); } @Test - public void testGoodServers() throws DnetCollectorException { + public void testGoodServers() throws CollectorException { System.out.println(connector.getInputSource(URL_GOODSNI_SERVER)); } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java index c745219fe..fc19f2064 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/collector/worker/DnetCollectorWorkerApplicationTests.java @@ -5,17 +5,19 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.*; import java.io.File; +import java.nio.file.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.collector.worker.model.ApiDescriptor; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.collection.worker.DnetCollectorWorker; +import eu.dnetlib.dhp.collection.worker.CollectorWorker; import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory; import eu.dnetlib.message.Message; import eu.dnetlib.message.MessageManager; @@ -23,43 +25,6 @@ import eu.dnetlib.message.MessageManager; @Disabled public class DnetCollectorWorkerApplicationTests { - private final ArgumentApplicationParser argumentParser = mock(ArgumentApplicationParser.class); - private final MessageManager messageManager = mock(MessageManager.class); - - private DnetCollectorWorker worker; - - @BeforeEach - public void setup() throws Exception { - ObjectMapper mapper = new ObjectMapper(); - final String apiJson = mapper.writeValueAsString(getApi()); - when(argumentParser.get("apidescriptor")).thenReturn(apiJson); - when(argumentParser.get("namenode")).thenReturn("file://tmp/test.seq"); - when(argumentParser.get("hdfsPath")).thenReturn("/tmp/file.seq"); - when(argumentParser.get("userHDFS")).thenReturn("sandro"); - when(argumentParser.get("workflowId")).thenReturn("sandro"); - when(argumentParser.get("rabbitOngoingQueue")).thenReturn("sandro"); - - when(messageManager.sendMessage(any(Message.class), anyString(), anyBoolean(), anyBoolean())) - .thenAnswer( - a -> { - System.out.println("sent message: " + a.getArguments()[0]); - return true; - }); - when(messageManager.sendMessage(any(Message.class), anyString())) - .thenAnswer( - a -> { - System.out.println("Called"); - return true; - }); - worker = new DnetCollectorWorker(new CollectorPluginFactory(), argumentParser, messageManager); - } - - @AfterEach - public void dropDown() { - File f = new File("/tmp/file.seq"); - f.delete(); - } - @Test public void testFindPlugin() throws Exception { final CollectorPluginFactory collectorPluginEnumerator = new CollectorPluginFactory(); @@ -79,8 +44,14 @@ public class DnetCollectorWorkerApplicationTests { } @Test - public void testFeeding() throws Exception { + public void testFeeding(@TempDir Path testDir) throws Exception { + + System.out.println(testDir.toString()); + CollectorWorker worker = new CollectorWorker(new CollectorPluginFactory(), getApi(), + "file://" + testDir.toString() + "/file.seq", testDir.toString() + "/file.seq"); worker.collect(); + + // TODO create ASSERT HERE } private ApiDescriptor getApi() { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java index 5479e0b57..6a80e01e2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java @@ -1,6 +1,7 @@ package eu.dnetlib.dhp.transformation; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.Mockito.lenient; @@ -14,13 +15,13 @@ import java.util.stream.Stream; import javax.xml.transform.stream.StreamSource; -import eu.dnetlib.dhp.aggregation.common.AggregationCounter; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; -import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; @@ -31,8 +32,14 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; + +import eu.dnetlib.dhp.aggregation.common.AggregationCounter; import eu.dnetlib.dhp.collection.CollectionJobTest; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.model.mdstore.MetadataRecord; +import eu.dnetlib.dhp.transformation.xslt.XSLTTransformationFunction; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @ExtendWith(MockitoExtension.class) public class TransformationJobTest { @@ -49,8 +56,8 @@ public class TransformationJobTest { lenient().when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARIES_XQUERY)).thenReturn(vocs()); lenient() - .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) - .thenReturn(synonyms()); + .when(isLookUpService.quickSearchProfile(VocabularyGroup.VOCABULARY_SYNONYMS_XQUERY)) + .thenReturn(synonyms()); vocabularies = VocabularyGroup.loadVocsFromIS(isLookUpService); } @@ -67,7 +74,6 @@ public class TransformationJobTest { spark.stop(); } - @Test @DisplayName("Test Transform Single XML using XSLTTransformator") public void testTransformSaxonHE() throws Exception { @@ -76,19 +82,15 @@ public class TransformationJobTest { final MetadataRecord mr = new MetadataRecord(); mr.setBody(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/transform/input.xml"))); - - // We Load the XSLT trasformation Rule from the classpath + // We Load the XSLT transformation Rule from the classpath XSLTTransformationFunction tr = loadTransformationRule("/eu/dnetlib/dhp/transform/ext_simple.xsl"); - //Print the record + // Print the record System.out.println(tr.call(mr).getBody()); - //TODO Create significant Assert + // TODO Create significant Assert } - - - @DisplayName("Test TransformSparkJobNode.main") @Test public void transformTest(@TempDir Path testDir) throws Exception { @@ -96,24 +98,44 @@ public class TransformationJobTest { final String mdstore_input = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile(); final String mdstore_output = testDir.toString() + "/version"; - - mockupTrasformationRule("simpleTRule","/eu/dnetlib/dhp/transform/ext_simple.xsl"); + mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl"); // final String arguments = "-issm true -i %s -o %s -d 1 -w 1 -tp XSLT_TRANSFORM -tr simpleTRule"; - final Map parameters = Stream.of(new String[][] { - { "dateOfTransformation", "1234" }, - { "transformationPlugin", "XSLT_TRANSFORM" }, - { "transformationRule", "simpleTRule" }, + final Map parameters = Stream.of(new String[][] { + { + "dateOfTransformation", "1234" + }, + { + "transformationPlugin", "XSLT_TRANSFORM" + }, + { + "transformationRuleTitle", "simpleTRule" + }, }).collect(Collectors.toMap(data -> data[0], data -> data[1])); - TransformSparkJobNode.transformRecords(parameters,isLookUpService,spark,mdstore_input, mdstore_output); - - - + TransformSparkJobNode.transformRecords(parameters, isLookUpService, spark, mdstore_input, mdstore_output); // TODO introduce useful assertions + + final Encoder encoder = Encoders.bean(MetadataRecord.class); + final Dataset mOutput = spark.read().format("parquet").load(mdstore_output).as(encoder); + + final Long total = mOutput.count(); + + final long recordTs = mOutput + .filter((FilterFunction) p -> p.getDateOfTransformation() == 1234) + .count(); + + final long recordNotEmpty = mOutput + .filter((FilterFunction) p -> !StringUtils.isBlank(p.getBody())) + .count(); + + assertEquals(total, recordTs); + + assertEquals(total, recordNotEmpty); + } @Test @@ -128,27 +150,27 @@ public class TransformationJobTest { Files.deleteIfExists(tempDirWithPrefix); } - - private void mockupTrasformationRule(final String trule, final String path)throws Exception { + private void mockupTrasformationRule(final String trule, final String path) throws Exception { final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path)); - lenient().when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY,trule))) - .thenReturn(Collections.singletonList(trValue)); + lenient() + .when(isLookUpService.quickSearchProfile(String.format(TransformationFactory.TRULE_XQUERY, trule))) + .thenReturn(Collections.singletonList(trValue)); } private XSLTTransformationFunction loadTransformationRule(final String path) throws Exception { final String trValue = IOUtils.toString(this.getClass().getResourceAsStream(path)); final LongAccumulator la = new LongAccumulator(); - return new XSLTTransformationFunction(new AggregationCounter(la,la,la),trValue, 0,vocabularies); + return new XSLTTransformationFunction(new AggregationCounter(la, la, la), trValue, 0, vocabularies); } private List vocs() throws IOException { return IOUtils - .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/terms.txt")); + .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/terms.txt")); } private List synonyms() throws IOException { return IOUtils - .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/synonyms.txt")); + .readLines(TransformationJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/transform/synonyms.txt")); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java index 42ce7f90b..ac483f10b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctions.java @@ -59,17 +59,19 @@ public class CleaningFunctions { } } if (Objects.nonNull(r.getAuthor())) { - r.getAuthor() - .stream() - .filter(Objects::nonNull) - .forEach(a -> { - if (Objects.nonNull(a.getPid())) { - a.getPid() - .stream() - .filter(Objects::nonNull) - .forEach(p -> fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES)); - } - }); + r + .getAuthor() + .stream() + .filter(Objects::nonNull) + .forEach(a -> { + if (Objects.nonNull(a.getPid())) { + a + .getPid() + .stream() + .filter(Objects::nonNull) + .forEach(p -> fixVocabName(p.getQualifier(), ModelConstants.DNET_PID_TYPES)); + } + }); } if (value instanceof Publication) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index db1a2ef57..7ff06e428 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -55,9 +55,9 @@ import org.slf4j.LoggerFactory; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.DbClient; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate; -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Dataset; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java index 8293faac4..83303ae8e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java @@ -15,8 +15,8 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 5c8e4e4c6..e54fe28aa 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -21,8 +21,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; +import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Dataset;