From cda210a2ca993a20507108eb94970d89e05da3af Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 25 Jan 2021 14:17:42 +0100 Subject: [PATCH 1/3] changed documentation since it didn't reflect the current status --- dhp-workflows/dhp-aggregation/README.md | 31 +++++++------------------ 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/README.md b/dhp-workflows/dhp-aggregation/README.md index 02583b443..e46fdeb16 100644 --- a/dhp-workflows/dhp-aggregation/README.md +++ b/dhp-workflows/dhp-aggregation/README.md @@ -2,28 +2,15 @@ Description of the Module -------------------------- This module defines a **collector worker application** that runs on Hadoop. -It is responsible for harvesting metadata using different plugins. +It is responsible for harvesting metadata using different collector plugins and transformation into the common metadata model. -The collector worker uses a message queue to inform the progress -of the harvesting action (using a message queue for sending **ONGOING** messages) furthermore, -It gives, at the end of the job, some information about the status -of the collection i.e Number of records collected(using a message queue for sending **REPORT** messages). - -To work the collection worker need some parameter like: - -* **hdfsPath**: the path where storing the sequential file -* **apidescriptor**: the JSON encoding of the API Descriptor -* **namenode**: the Name Node URI -* **userHDFS**: the user wich create the hdfs seq file -* **rabbitUser**: the user to connect with RabbitMq for messaging -* **rabbitPassWord**: the password to connect with RabbitMq for messaging -* **rabbitHost**: the host of the RabbitMq server -* **rabbitOngoingQueue**: the name of the ongoing queue -* **rabbitReportQueue**: the name of the report queue -* **workflowId**: the identifier of the dnet Workflow - -##Plugins +# Collector Plugins * OAI Plugin -## Usage -TODO \ No newline at end of file +# Transformation Plugins +TODO + + +# Usage +TODO + -- 2.17.1 From ffb092b8d3edf5e3f451c1499de073b6ca341efc Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 25 Jan 2021 15:05:37 +0100 Subject: [PATCH 2/3] removed duplicate code HttpConnector.java --- .../CollectorPluginErrorLogList.java | 20 -- .../CollectorServiceException.java | 20 -- .../project/httpconnector/HttpConnector.java | 240 ------------------ .../actionmanager/project/utils/ReadCSV.java | 2 +- .../project/utils/ReadExcel.java | 3 +- .../GenerateNativeStoreSparkJob.java | 198 ++++++++------- .../project/EXCELParserTest.java | 7 +- .../httpconnector/HttpConnectorTest.java | 6 +- .../eu/dnetlib/dhp/transform/ext_simple.xsl | 4 +- 9 files changed, 113 insertions(+), 387 deletions(-) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java delete mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java deleted file mode 100644 index 9d3f88265..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java +++ /dev/null @@ -1,20 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project.httpconnector; - -import java.util.LinkedList; - -public class CollectorPluginErrorLogList extends LinkedList { - - private static final long serialVersionUID = -6925786561303289704L; - - @Override - public String toString() { - String log = new String(); - int index = 0; - for (String errorMessage : this) { - log += String.format("Retry #%s: %s / ", index++, errorMessage); - } - return log; - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java deleted file mode 100644 index 9167d97b4..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java +++ /dev/null @@ -1,20 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project.httpconnector; - -public class CollectorServiceException extends Exception { - - private static final long serialVersionUID = 7523999812098059764L; - - public CollectorServiceException(String string) { - super(string); - } - - public CollectorServiceException(String string, Throwable exception) { - super(string, exception); - } - - public CollectorServiceException(Throwable exception) { - super(exception); - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java deleted file mode 100644 index e20518b55..000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java +++ /dev/null @@ -1,240 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project.httpconnector; - -import java.io.IOException; -import java.io.InputStream; -import java.net.*; -import java.security.GeneralSecurityException; -import java.security.cert.X509Certificate; -import java.util.List; -import java.util.Map; - -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.math.NumberUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * @author jochen, michele, andrea - */ -public class HttpConnector { - - private static final Log log = LogFactory.getLog(HttpConnector.class); - - private int maxNumberOfRetry = 6; - private int defaultDelay = 120; // seconds - private int readTimeOut = 120; // seconds - - private String responseType = null; - - private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)"; - - public HttpConnector() { - CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL)); - } - - /** - * Given the URL returns the content via HTTP GET - * - * @param requestUrl the URL - * @return the content of the downloaded resource - * @throws CollectorServiceException when retrying more than maxNumberOfRetry times - */ - public String getInputSource(final String requestUrl) throws CollectorServiceException { - return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList()); - } - - /** - * Given the URL returns the content as a stream via HTTP GET - * - * @param requestUrl the URL - * @return the content of the downloaded resource as InputStream - * @throws CollectorServiceException when retrying more than maxNumberOfRetry times - */ - public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorServiceException { - return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); - } - - private String attemptDownlaodAsString(final String requestUrl, final int retryNumber, - final CollectorPluginErrorLogList errorList) - throws CollectorServiceException { - try { - InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); - try { - return IOUtils.toString(s); - } catch (IOException e) { - log.error("error while retrieving from http-connection occured: " + requestUrl, e); - Thread.sleep(defaultDelay * 1000); - errorList.add(e.getMessage()); - return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList); - } finally { - IOUtils.closeQuietly(s); - } - } catch (InterruptedException e) { - throw new CollectorServiceException(e); - } - } - - private InputStream attemptDownload(final String requestUrl, final int retryNumber, - final CollectorPluginErrorLogList errorList) - throws CollectorServiceException { - - if (retryNumber > maxNumberOfRetry) { - throw new CollectorServiceException("Max number of retries exceeded. Cause: \n " + errorList); - } - - log.debug("Downloading " + requestUrl + " - try: " + retryNumber); - try { - InputStream input = null; - - try { - final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); - urlConn.setInstanceFollowRedirects(false); - urlConn.setReadTimeout(readTimeOut * 1000); - urlConn.addRequestProperty("User-Agent", userAgent); - - if (log.isDebugEnabled()) { - logHeaderFields(urlConn); - } - - int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); - if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) { - log.warn("waiting and repeating request after " + retryAfter + " sec."); - Thread.sleep(retryAfter * 1000); - errorList.add("503 Service Unavailable"); - urlConn.disconnect(); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } else if ((urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM) - || (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP)) { - final String newUrl = obtainNewLocation(urlConn.getHeaderFields()); - log.debug("The requested url has been moved to " + newUrl); - errorList - .add( - String - .format( - "%s %s. Moved to: %s", urlConn.getResponseCode(), urlConn.getResponseMessage(), - newUrl)); - urlConn.disconnect(); - return attemptDownload(newUrl, retryNumber + 1, errorList); - } else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { - log - .error( - String - .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); - Thread.sleep(defaultDelay * 1000); - errorList.add(String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); - urlConn.disconnect(); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } else { - input = urlConn.getInputStream(); - responseType = urlConn.getContentType(); - return input; - } - } catch (IOException e) { - log.error("error while retrieving from http-connection occured: " + requestUrl, e); - Thread.sleep(defaultDelay * 1000); - errorList.add(e.getMessage()); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } - } catch (InterruptedException e) { - throw new CollectorServiceException(e); - } - } - - private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { - log.debug("StatusCode: " + urlConn.getResponseMessage()); - - for (Map.Entry> e : urlConn.getHeaderFields().entrySet()) { - if (e.getKey() != null) { - for (String v : e.getValue()) { - log.debug(" key: " + e.getKey() + " - value: " + v); - } - } - } - } - - private int obtainRetryAfter(final Map> headerMap) { - for (String key : headerMap.keySet()) { - if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0) - && NumberUtils.isCreatable(headerMap.get(key).get(0))) { - return Integer - .parseInt(headerMap.get(key).get(0)) + 10; - } - } - return -1; - } - - private String obtainNewLocation(final Map> headerMap) throws CollectorServiceException { - for (String key : headerMap.keySet()) { - if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) { - return headerMap.get(key).get(0); - } - } - throw new CollectorServiceException("The requested url has been MOVED, but 'location' param is MISSING"); - } - - /** - * register for https scheme; this is a workaround and not intended for the use in trusted environments - */ - public void initTrustManager() { - final X509TrustManager tm = new X509TrustManager() { - - @Override - public void checkClientTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public void checkServerTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return null; - } - }; - try { - final SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(null, new TrustManager[] { - tm - }, null); - HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); - } catch (GeneralSecurityException e) { - log.fatal(e); - throw new IllegalStateException(e); - } - } - - public int getMaxNumberOfRetry() { - return maxNumberOfRetry; - } - - public void setMaxNumberOfRetry(final int maxNumberOfRetry) { - this.maxNumberOfRetry = maxNumberOfRetry; - } - - public int getDefaultDelay() { - return defaultDelay; - } - - public void setDefaultDelay(final int defaultDelay) { - this.defaultDelay = defaultDelay; - } - - public int getReadTimeOut() { - return readTimeOut; - } - - public void setReadTimeOut(final int readTimeOut) { - this.readTimeOut = readTimeOut; - } - - public String getResponseType() { - return responseType; - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java index 9dac34a15..dc6f46771 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java @@ -17,7 +17,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import eu.dnetlib.dhp.application.ArgumentApplicationParser; /** diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java index 23b58f2a0..e665bc704 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project.utils; import java.io.*; import java.nio.charset.StandardCharsets; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -14,7 +15,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; /** diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 861ae5201..c0bd4c940 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -43,6 +43,106 @@ public class GenerateNativeStoreSparkJob { private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateNativeStoreSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); + parser.parseArgument(args); + final ObjectMapper jsonMapper = new ObjectMapper(); + final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); + final long dateOfCollection = new Long(parser.get("dateOfCollection")); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final Map ongoingMap = new HashMap<>(); + final Map reportMap = new HashMap<>(); + + final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + final JavaPairRDD inputRDD = sc + .sequenceFile(parser.get("input"), IntWritable.class, Text.class); + + final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); + final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); + + final MessageManager manager = new MessageManager( + parser.get("rabbitHost"), + parser.get("rabbitUser"), + parser.get("rabbitPassword"), + false, + false, + null); + + final JavaRDD mappeRDD = inputRDD + .map( + item -> parseRecord( + item._2().toString(), + parser.get("xpath"), + parser.get("encoding"), + provenance, + dateOfCollection, + totalItems, + invalidRecords)) + .filter(Objects::nonNull) + .distinct(); + + ongoingMap.put("ongoing", "0"); + if (!test) { + manager + .sendMessage( + new Message( + parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), + parser.get("rabbitOngoingQueue"), + true, + false); + } + + final Encoder encoder = Encoders.bean(MetadataRecord.class); + final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); + final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); + mdStoreRecords.add(mdstore.count()); + ongoingMap.put("ongoing", "" + totalItems.value()); + if (!test) { + manager + .sendMessage( + new Message( + parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), + parser.get("rabbitOngoingQueue"), + true, + false); + } + mdstore.write().format("parquet").save(parser.get("output")); + reportMap.put("inputItem", "" + totalItems.value()); + reportMap.put("invalidRecords", "" + invalidRecords.value()); + reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); + if (!test) { + manager + .sendMessage( + new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), + parser.get("rabbitReportQueue"), + true, + false); + manager.close(); + } + }); + + } + public static MetadataRecord parseRecord( final String input, final String xpath, @@ -73,103 +173,5 @@ public class GenerateNativeStoreSparkJob { } } - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - GenerateNativeStoreSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); - parser.parseArgument(args); - final ObjectMapper jsonMapper = new ObjectMapper(); - final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); - final long dateOfCollection = new Long(parser.get("dateOfCollection")); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final Map ongoingMap = new HashMap<>(); - final Map reportMap = new HashMap<>(); - - final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - final JavaPairRDD inputRDD = sc - .sequenceFile(parser.get("input"), IntWritable.class, Text.class); - - final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); - final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); - - final MessageManager manager = new MessageManager( - parser.get("rabbitHost"), - parser.get("rabbitUser"), - parser.get("rabbitPassword"), - false, - false, - null); - - final JavaRDD mappeRDD = inputRDD - .map( - item -> parseRecord( - item._2().toString(), - parser.get("xpath"), - parser.get("encoding"), - provenance, - dateOfCollection, - totalItems, - invalidRecords)) - .filter(Objects::nonNull) - .distinct(); - - ongoingMap.put("ongoing", "0"); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } - - final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); - final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); - mdStoreRecords.add(mdstore.count()); - ongoingMap.put("ongoing", "" + totalItems.value()); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } - mdstore.write().format("parquet").save(parser.get("output")); - reportMap.put("inputItem", "" + totalItems.value()); - reportMap.put("invalidRecords", "" + invalidRecords.value()); - reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); - if (!test) { - manager - .sendMessage( - new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), - parser.get("rabbitReportQueue"), - true, - false); - manager.close(); - } - }); - - } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java index 59b536cd5..c1142ad9c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java @@ -6,14 +6,15 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; + import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser; @Disabled @@ -30,7 +31,7 @@ public class EXCELParserTest { } @Test - public void test1() throws CollectorServiceException, IOException, InvalidFormatException, ClassNotFoundException, + public void test1() throws DnetCollectorException, IOException, InvalidFormatException, ClassNotFoundException, IllegalAccessException, InstantiationException { EXCELParser excelParser = new EXCELParser(); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java index 90b3919ed..3b9d1c3ab 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.actionmanager.project.httpconnector; +import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; @@ -29,12 +31,12 @@ public class HttpConnectorTest { @Test - public void testGetInputSource() throws CollectorServiceException { + public void testGetInputSource() throws DnetCollectorException { System.out.println(connector.getInputSource(URL)); } @Test - public void testGoodServers() throws CollectorServiceException { + public void testGoodServers() throws DnetCollectorException { System.out.println(connector.getInputSource(URL_GOODSNI_SERVER)); } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl index cef50aa95..f22db961b 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl @@ -1,7 +1,7 @@ @@ -9,7 +9,7 @@ - + -- 2.17.1 From a54848a59c2b4971569f2036ef2851f87c578701 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Mon, 25 Jan 2021 15:43:04 +0100 Subject: [PATCH 3/3] Moved Vocabulary stuff to common module --- .../java/eu/dnetlib/dhp/common/vocabulary}/Vocabulary.java | 2 +- .../eu/dnetlib/dhp/common/vocabulary}/VocabularyGroup.java | 2 +- .../eu/dnetlib/dhp/common/vocabulary}/VocabularyTerm.java | 2 +- .../eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java | 2 +- .../java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java | 2 +- .../dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java | 2 +- .../dhp/oa/graph/raw/GenerateEntitiesApplication.java | 5 +---- .../dhp/oa/graph/raw/MigrateDbEntitiesApplication.java | 2 +- .../java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java | 2 +- .../java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java | 2 +- .../eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java | 2 +- .../dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java | 2 +- .../test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java | 2 +- .../dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java | 2 +- 14 files changed, 14 insertions(+), 17 deletions(-) rename {dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common => dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary}/Vocabulary.java (98%) rename {dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common => dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary}/VocabularyGroup.java (99%) rename {dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common => dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary}/VocabularyTerm.java (88%) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/Vocabulary.java similarity index 98% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/Vocabulary.java index bfc4fd6f1..1e333e93f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/Vocabulary.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/Vocabulary.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.raw.common; +package eu.dnetlib.dhp.common.vocabulary; import java.io.Serializable; import java.util.HashMap; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyGroup.java similarity index 99% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyGroup.java index 32452bdc5..fac55189b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyGroup.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyGroup.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.raw.common; +package eu.dnetlib.dhp.common.vocabulary; import java.io.Serializable; import java.util.*; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyTerm.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyTerm.java similarity index 88% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyTerm.java rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyTerm.java index 1aa1b8253..52eb7ca23 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/VocabularyTerm.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/vocabulary/VocabularyTerm.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.raw.common; +package eu.dnetlib.dhp.common.vocabulary; import java.io.Serializable; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 8231dd77e..31fe116e3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Oaf; import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.utils.ISLookupClientFactory; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java index d2d4e118f..e0cb354d4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleaningRuleMap.java @@ -7,7 +7,7 @@ import java.util.HashMap; import org.apache.commons.lang3.StringUtils; import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Country; import eu.dnetlib.dhp.schema.oaf.Qualifier; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java index cccf15398..079984a81 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/AbstractMdRecordToOafMapper.java @@ -38,7 +38,7 @@ import org.dom4j.DocumentFactory; import org.dom4j.DocumentHelper; import org.dom4j.Node; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.LicenseComparator; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Author; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index cfd190670..a2db4a506 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -12,8 +11,6 @@ import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; @@ -27,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java index 3adbd244c..db1a2ef57 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplication.java @@ -57,7 +57,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.DbClient; import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication; import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Context; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Dataset; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java index e62bc0790..4a8b24cf2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OafToOafMapper.java @@ -18,7 +18,7 @@ import org.dom4j.Node; import com.google.common.collect.Lists; import eu.dnetlib.dhp.common.PacePerson; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Field; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java index 6d2e28ba8..bc47e778f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/OdfToOafMapper.java @@ -19,7 +19,7 @@ import org.dom4j.Node; import com.google.common.collect.Lists; import eu.dnetlib.dhp.common.PacePerson; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Field; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java index cb34b0cb3..9e161da6e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/clean/CleaningFunctionTest.java @@ -18,7 +18,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Result; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java index 705f1dddb..8293faac4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplicationTest.java @@ -16,7 +16,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java index 2a62e25b2..5c8e4e4c6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java @@ -22,7 +22,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.Author; import eu.dnetlib.dhp.schema.oaf.Dataset; diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java index 0d1ec1ad1..b38da4569 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MigrateDbEntitiesApplicationTest.java @@ -27,7 +27,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup; +import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup; import eu.dnetlib.dhp.schema.oaf.*; @ExtendWith(MockitoExtension.class) -- 2.17.1