diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java deleted file mode 100644 index 9d3f882651..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java +++ /dev/null @@ -1,20 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project.httpconnector; - -import java.util.LinkedList; - -public class CollectorPluginErrorLogList extends LinkedList { - - private static final long serialVersionUID = -6925786561303289704L; - - @Override - public String toString() { - String log = new String(); - int index = 0; - for (String errorMessage : this) { - log += String.format("Retry #%s: %s / ", index++, errorMessage); - } - return log; - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java deleted file mode 100644 index 9167d97b4c..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java +++ /dev/null @@ -1,20 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project.httpconnector; - -public class CollectorServiceException extends Exception { - - private static final long serialVersionUID = 7523999812098059764L; - - public CollectorServiceException(String string) { - super(string); - } - - public CollectorServiceException(String string, Throwable exception) { - super(string, exception); - } - - public CollectorServiceException(Throwable exception) { - super(exception); - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java deleted file mode 100644 index e20518b559..0000000000 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java +++ /dev/null @@ -1,240 +0,0 @@ - -package eu.dnetlib.dhp.actionmanager.project.httpconnector; - -import java.io.IOException; -import java.io.InputStream; -import java.net.*; -import java.security.GeneralSecurityException; -import java.security.cert.X509Certificate; -import java.util.List; -import java.util.Map; - -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.math.NumberUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * @author jochen, michele, andrea - */ -public class HttpConnector { - - private static final Log log = LogFactory.getLog(HttpConnector.class); - - private int maxNumberOfRetry = 6; - private int defaultDelay = 120; // seconds - private int readTimeOut = 120; // seconds - - private String responseType = null; - - private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)"; - - public HttpConnector() { - CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL)); - } - - /** - * Given the URL returns the content via HTTP GET - * - * @param requestUrl the URL - * @return the content of the downloaded resource - * @throws CollectorServiceException when retrying more than maxNumberOfRetry times - */ - public String getInputSource(final String requestUrl) throws CollectorServiceException { - return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList()); - } - - /** - * Given the URL returns the content as a stream via HTTP GET - * - * @param requestUrl the URL - * @return the content of the downloaded resource as InputStream - * @throws CollectorServiceException when retrying more than maxNumberOfRetry times - */ - public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorServiceException { - return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); - } - - private String attemptDownlaodAsString(final String requestUrl, final int retryNumber, - final CollectorPluginErrorLogList errorList) - throws CollectorServiceException { - try { - InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); - try { - return IOUtils.toString(s); - } catch (IOException e) { - log.error("error while retrieving from http-connection occured: " + requestUrl, e); - Thread.sleep(defaultDelay * 1000); - errorList.add(e.getMessage()); - return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList); - } finally { - IOUtils.closeQuietly(s); - } - } catch (InterruptedException e) { - throw new CollectorServiceException(e); - } - } - - private InputStream attemptDownload(final String requestUrl, final int retryNumber, - final CollectorPluginErrorLogList errorList) - throws CollectorServiceException { - - if (retryNumber > maxNumberOfRetry) { - throw new CollectorServiceException("Max number of retries exceeded. Cause: \n " + errorList); - } - - log.debug("Downloading " + requestUrl + " - try: " + retryNumber); - try { - InputStream input = null; - - try { - final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); - urlConn.setInstanceFollowRedirects(false); - urlConn.setReadTimeout(readTimeOut * 1000); - urlConn.addRequestProperty("User-Agent", userAgent); - - if (log.isDebugEnabled()) { - logHeaderFields(urlConn); - } - - int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); - if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) { - log.warn("waiting and repeating request after " + retryAfter + " sec."); - Thread.sleep(retryAfter * 1000); - errorList.add("503 Service Unavailable"); - urlConn.disconnect(); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } else if ((urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM) - || (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP)) { - final String newUrl = obtainNewLocation(urlConn.getHeaderFields()); - log.debug("The requested url has been moved to " + newUrl); - errorList - .add( - String - .format( - "%s %s. Moved to: %s", urlConn.getResponseCode(), urlConn.getResponseMessage(), - newUrl)); - urlConn.disconnect(); - return attemptDownload(newUrl, retryNumber + 1, errorList); - } else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { - log - .error( - String - .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); - Thread.sleep(defaultDelay * 1000); - errorList.add(String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); - urlConn.disconnect(); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } else { - input = urlConn.getInputStream(); - responseType = urlConn.getContentType(); - return input; - } - } catch (IOException e) { - log.error("error while retrieving from http-connection occured: " + requestUrl, e); - Thread.sleep(defaultDelay * 1000); - errorList.add(e.getMessage()); - return attemptDownload(requestUrl, retryNumber + 1, errorList); - } - } catch (InterruptedException e) { - throw new CollectorServiceException(e); - } - } - - private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { - log.debug("StatusCode: " + urlConn.getResponseMessage()); - - for (Map.Entry> e : urlConn.getHeaderFields().entrySet()) { - if (e.getKey() != null) { - for (String v : e.getValue()) { - log.debug(" key: " + e.getKey() + " - value: " + v); - } - } - } - } - - private int obtainRetryAfter(final Map> headerMap) { - for (String key : headerMap.keySet()) { - if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0) - && NumberUtils.isCreatable(headerMap.get(key).get(0))) { - return Integer - .parseInt(headerMap.get(key).get(0)) + 10; - } - } - return -1; - } - - private String obtainNewLocation(final Map> headerMap) throws CollectorServiceException { - for (String key : headerMap.keySet()) { - if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) { - return headerMap.get(key).get(0); - } - } - throw new CollectorServiceException("The requested url has been MOVED, but 'location' param is MISSING"); - } - - /** - * register for https scheme; this is a workaround and not intended for the use in trusted environments - */ - public void initTrustManager() { - final X509TrustManager tm = new X509TrustManager() { - - @Override - public void checkClientTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public void checkServerTrusted(final X509Certificate[] xcs, final String string) { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return null; - } - }; - try { - final SSLContext ctx = SSLContext.getInstance("TLS"); - ctx.init(null, new TrustManager[] { - tm - }, null); - HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); - } catch (GeneralSecurityException e) { - log.fatal(e); - throw new IllegalStateException(e); - } - } - - public int getMaxNumberOfRetry() { - return maxNumberOfRetry; - } - - public void setMaxNumberOfRetry(final int maxNumberOfRetry) { - this.maxNumberOfRetry = maxNumberOfRetry; - } - - public int getDefaultDelay() { - return defaultDelay; - } - - public void setDefaultDelay(final int defaultDelay) { - this.defaultDelay = defaultDelay; - } - - public int getReadTimeOut() { - return readTimeOut; - } - - public void setReadTimeOut(final int readTimeOut) { - this.readTimeOut = readTimeOut; - } - - public String getResponseType() { - return responseType; - } - -} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java index 9dac34a152..dc6f46771c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadCSV.java @@ -17,7 +17,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import eu.dnetlib.dhp.application.ArgumentApplicationParser; /** diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java index 23b58f2a01..e665bc704c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/ReadExcel.java @@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project.utils; import java.io.*; import java.nio.charset.StandardCharsets; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -14,7 +15,7 @@ import org.apache.hadoop.fs.Path; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; /** diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java index 861ae52012..c0bd4c9409 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.java @@ -43,6 +43,106 @@ public class GenerateNativeStoreSparkJob { private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class); + public static void main(String[] args) throws Exception { + + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + GenerateNativeStoreSparkJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); + parser.parseArgument(args); + final ObjectMapper jsonMapper = new ObjectMapper(); + final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); + final long dateOfCollection = new Long(parser.get("dateOfCollection")); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + final Map ongoingMap = new HashMap<>(); + final Map reportMap = new HashMap<>(); + + final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); + + SparkConf conf = new SparkConf(); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + final JavaPairRDD inputRDD = sc + .sequenceFile(parser.get("input"), IntWritable.class, Text.class); + + final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); + final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); + + final MessageManager manager = new MessageManager( + parser.get("rabbitHost"), + parser.get("rabbitUser"), + parser.get("rabbitPassword"), + false, + false, + null); + + final JavaRDD mappeRDD = inputRDD + .map( + item -> parseRecord( + item._2().toString(), + parser.get("xpath"), + parser.get("encoding"), + provenance, + dateOfCollection, + totalItems, + invalidRecords)) + .filter(Objects::nonNull) + .distinct(); + + ongoingMap.put("ongoing", "0"); + if (!test) { + manager + .sendMessage( + new Message( + parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), + parser.get("rabbitOngoingQueue"), + true, + false); + } + + final Encoder encoder = Encoders.bean(MetadataRecord.class); + final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); + final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); + mdStoreRecords.add(mdstore.count()); + ongoingMap.put("ongoing", "" + totalItems.value()); + if (!test) { + manager + .sendMessage( + new Message( + parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), + parser.get("rabbitOngoingQueue"), + true, + false); + } + mdstore.write().format("parquet").save(parser.get("output")); + reportMap.put("inputItem", "" + totalItems.value()); + reportMap.put("invalidRecords", "" + invalidRecords.value()); + reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); + if (!test) { + manager + .sendMessage( + new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), + parser.get("rabbitReportQueue"), + true, + false); + manager.close(); + } + }); + + } + public static MetadataRecord parseRecord( final String input, final String xpath, @@ -73,103 +173,5 @@ public class GenerateNativeStoreSparkJob { } } - public static void main(String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - GenerateNativeStoreSparkJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/collection/collection_input_parameters.json"))); - parser.parseArgument(args); - final ObjectMapper jsonMapper = new ObjectMapper(); - final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class); - final long dateOfCollection = new Long(parser.get("dateOfCollection")); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final Map ongoingMap = new HashMap<>(); - final Map reportMap = new HashMap<>(); - - final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest")); - - SparkConf conf = new SparkConf(); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - - final JavaPairRDD inputRDD = sc - .sequenceFile(parser.get("input"), IntWritable.class, Text.class); - - final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems"); - final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords"); - - final MessageManager manager = new MessageManager( - parser.get("rabbitHost"), - parser.get("rabbitUser"), - parser.get("rabbitPassword"), - false, - false, - null); - - final JavaRDD mappeRDD = inputRDD - .map( - item -> parseRecord( - item._2().toString(), - parser.get("xpath"), - parser.get("encoding"), - provenance, - dateOfCollection, - totalItems, - invalidRecords)) - .filter(Objects::nonNull) - .distinct(); - - ongoingMap.put("ongoing", "0"); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } - - final Encoder encoder = Encoders.bean(MetadataRecord.class); - final Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder); - final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords"); - mdStoreRecords.add(mdstore.count()); - ongoingMap.put("ongoing", "" + totalItems.value()); - if (!test) { - manager - .sendMessage( - new Message( - parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), - parser.get("rabbitOngoingQueue"), - true, - false); - } - mdstore.write().format("parquet").save(parser.get("output")); - reportMap.put("inputItem", "" + totalItems.value()); - reportMap.put("invalidRecords", "" + invalidRecords.value()); - reportMap.put("mdStoreSize", "" + mdStoreRecords.value()); - if (!test) { - manager - .sendMessage( - new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), - parser.get("rabbitReportQueue"), - true, - false); - manager.close(); - } - }); - - } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java index 59b536cd54..c1142ad9cf 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java @@ -6,14 +6,15 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; +import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException; -import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; + import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser; @Disabled @@ -30,7 +31,7 @@ public class EXCELParserTest { } @Test - public void test1() throws CollectorServiceException, IOException, InvalidFormatException, ClassNotFoundException, + public void test1() throws DnetCollectorException, IOException, InvalidFormatException, ClassNotFoundException, IllegalAccessException, InstantiationException { EXCELParser excelParser = new EXCELParser(); diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java index 90b3919ed9..3b9d1c3ab3 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnectorTest.java @@ -1,6 +1,8 @@ package eu.dnetlib.dhp.actionmanager.project.httpconnector; +import eu.dnetlib.dhp.collection.worker.DnetCollectorException; +import eu.dnetlib.dhp.collection.worker.utils.HttpConnector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; @@ -29,12 +31,12 @@ public class HttpConnectorTest { @Test - public void testGetInputSource() throws CollectorServiceException { + public void testGetInputSource() throws DnetCollectorException { System.out.println(connector.getInputSource(URL)); } @Test - public void testGoodServers() throws CollectorServiceException { + public void testGoodServers() throws DnetCollectorException { System.out.println(connector.getInputSource(URL_GOODSNI_SERVER)); } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl index cef50aa952..f22db961bf 100644 --- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/transform/ext_simple.xsl @@ -1,7 +1,7 @@ @@ -9,7 +9,7 @@ - +