forked from D-Net/dnet-hadoop
Merge pull request 'aggregation_on_hadoop' (#90) from sandro.labruzzo/dnet-hadoop:aggregation_on_hadoop into hadoop_aggregator
Wonderfull code... You're the Best Sandro
This commit is contained in:
commit
150a617bd1
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
package eu.dnetlib.dhp.common.vocabulary;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
package eu.dnetlib.dhp.common.vocabulary;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.*;
|
import java.util.*;
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.raw.common;
|
package eu.dnetlib.dhp.common.vocabulary;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
|
@ -2,28 +2,15 @@ Description of the Module
|
||||||
--------------------------
|
--------------------------
|
||||||
This module defines a **collector worker application** that runs on Hadoop.
|
This module defines a **collector worker application** that runs on Hadoop.
|
||||||
|
|
||||||
It is responsible for harvesting metadata using different plugins.
|
It is responsible for harvesting metadata using different collector plugins and transformation into the common metadata model.
|
||||||
|
|
||||||
The collector worker uses a message queue to inform the progress
|
# Collector Plugins
|
||||||
of the harvesting action (using a message queue for sending **ONGOING** messages) furthermore,
|
|
||||||
It gives, at the end of the job, some information about the status
|
|
||||||
of the collection i.e Number of records collected(using a message queue for sending **REPORT** messages).
|
|
||||||
|
|
||||||
To work the collection worker need some parameter like:
|
|
||||||
|
|
||||||
* **hdfsPath**: the path where storing the sequential file
|
|
||||||
* **apidescriptor**: the JSON encoding of the API Descriptor
|
|
||||||
* **namenode**: the Name Node URI
|
|
||||||
* **userHDFS**: the user wich create the hdfs seq file
|
|
||||||
* **rabbitUser**: the user to connect with RabbitMq for messaging
|
|
||||||
* **rabbitPassWord**: the password to connect with RabbitMq for messaging
|
|
||||||
* **rabbitHost**: the host of the RabbitMq server
|
|
||||||
* **rabbitOngoingQueue**: the name of the ongoing queue
|
|
||||||
* **rabbitReportQueue**: the name of the report queue
|
|
||||||
* **workflowId**: the identifier of the dnet Workflow
|
|
||||||
|
|
||||||
##Plugins
|
|
||||||
* OAI Plugin
|
* OAI Plugin
|
||||||
|
|
||||||
## Usage
|
# Transformation Plugins
|
||||||
TODO
|
TODO
|
||||||
|
|
||||||
|
|
||||||
|
# Usage
|
||||||
|
TODO
|
||||||
|
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
|
||||||
|
|
||||||
import java.util.LinkedList;
|
|
||||||
|
|
||||||
public class CollectorPluginErrorLogList extends LinkedList<String> {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = -6925786561303289704L;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
String log = new String();
|
|
||||||
int index = 0;
|
|
||||||
for (String errorMessage : this) {
|
|
||||||
log += String.format("Retry #%s: %s / ", index++, errorMessage);
|
|
||||||
}
|
|
||||||
return log;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,20 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
|
||||||
|
|
||||||
public class CollectorServiceException extends Exception {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 7523999812098059764L;
|
|
||||||
|
|
||||||
public CollectorServiceException(String string) {
|
|
||||||
super(string);
|
|
||||||
}
|
|
||||||
|
|
||||||
public CollectorServiceException(String string, Throwable exception) {
|
|
||||||
super(string, exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
public CollectorServiceException(Throwable exception) {
|
|
||||||
super(exception);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,240 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.net.*;
|
|
||||||
import java.security.GeneralSecurityException;
|
|
||||||
import java.security.cert.X509Certificate;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import javax.net.ssl.HttpsURLConnection;
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import javax.net.ssl.TrustManager;
|
|
||||||
import javax.net.ssl.X509TrustManager;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.lang3.math.NumberUtils;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author jochen, michele, andrea
|
|
||||||
*/
|
|
||||||
public class HttpConnector {
|
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(HttpConnector.class);
|
|
||||||
|
|
||||||
private int maxNumberOfRetry = 6;
|
|
||||||
private int defaultDelay = 120; // seconds
|
|
||||||
private int readTimeOut = 120; // seconds
|
|
||||||
|
|
||||||
private String responseType = null;
|
|
||||||
|
|
||||||
private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)";
|
|
||||||
|
|
||||||
public HttpConnector() {
|
|
||||||
CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given the URL returns the content via HTTP GET
|
|
||||||
*
|
|
||||||
* @param requestUrl the URL
|
|
||||||
* @return the content of the downloaded resource
|
|
||||||
* @throws CollectorServiceException when retrying more than maxNumberOfRetry times
|
|
||||||
*/
|
|
||||||
public String getInputSource(final String requestUrl) throws CollectorServiceException {
|
|
||||||
return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Given the URL returns the content as a stream via HTTP GET
|
|
||||||
*
|
|
||||||
* @param requestUrl the URL
|
|
||||||
* @return the content of the downloaded resource as InputStream
|
|
||||||
* @throws CollectorServiceException when retrying more than maxNumberOfRetry times
|
|
||||||
*/
|
|
||||||
public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorServiceException {
|
|
||||||
return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
|
|
||||||
}
|
|
||||||
|
|
||||||
private String attemptDownlaodAsString(final String requestUrl, final int retryNumber,
|
|
||||||
final CollectorPluginErrorLogList errorList)
|
|
||||||
throws CollectorServiceException {
|
|
||||||
try {
|
|
||||||
InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
|
|
||||||
try {
|
|
||||||
return IOUtils.toString(s);
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("error while retrieving from http-connection occured: " + requestUrl, e);
|
|
||||||
Thread.sleep(defaultDelay * 1000);
|
|
||||||
errorList.add(e.getMessage());
|
|
||||||
return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList);
|
|
||||||
} finally {
|
|
||||||
IOUtils.closeQuietly(s);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new CollectorServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private InputStream attemptDownload(final String requestUrl, final int retryNumber,
|
|
||||||
final CollectorPluginErrorLogList errorList)
|
|
||||||
throws CollectorServiceException {
|
|
||||||
|
|
||||||
if (retryNumber > maxNumberOfRetry) {
|
|
||||||
throw new CollectorServiceException("Max number of retries exceeded. Cause: \n " + errorList);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
|
|
||||||
try {
|
|
||||||
InputStream input = null;
|
|
||||||
|
|
||||||
try {
|
|
||||||
final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection();
|
|
||||||
urlConn.setInstanceFollowRedirects(false);
|
|
||||||
urlConn.setReadTimeout(readTimeOut * 1000);
|
|
||||||
urlConn.addRequestProperty("User-Agent", userAgent);
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
logHeaderFields(urlConn);
|
|
||||||
}
|
|
||||||
|
|
||||||
int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
|
|
||||||
if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
|
|
||||||
log.warn("waiting and repeating request after " + retryAfter + " sec.");
|
|
||||||
Thread.sleep(retryAfter * 1000);
|
|
||||||
errorList.add("503 Service Unavailable");
|
|
||||||
urlConn.disconnect();
|
|
||||||
return attemptDownload(requestUrl, retryNumber + 1, errorList);
|
|
||||||
} else if ((urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM)
|
|
||||||
|| (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP)) {
|
|
||||||
final String newUrl = obtainNewLocation(urlConn.getHeaderFields());
|
|
||||||
log.debug("The requested url has been moved to " + newUrl);
|
|
||||||
errorList
|
|
||||||
.add(
|
|
||||||
String
|
|
||||||
.format(
|
|
||||||
"%s %s. Moved to: %s", urlConn.getResponseCode(), urlConn.getResponseMessage(),
|
|
||||||
newUrl));
|
|
||||||
urlConn.disconnect();
|
|
||||||
return attemptDownload(newUrl, retryNumber + 1, errorList);
|
|
||||||
} else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
|
|
||||||
log
|
|
||||||
.error(
|
|
||||||
String
|
|
||||||
.format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
|
|
||||||
Thread.sleep(defaultDelay * 1000);
|
|
||||||
errorList.add(String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
|
|
||||||
urlConn.disconnect();
|
|
||||||
return attemptDownload(requestUrl, retryNumber + 1, errorList);
|
|
||||||
} else {
|
|
||||||
input = urlConn.getInputStream();
|
|
||||||
responseType = urlConn.getContentType();
|
|
||||||
return input;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("error while retrieving from http-connection occured: " + requestUrl, e);
|
|
||||||
Thread.sleep(defaultDelay * 1000);
|
|
||||||
errorList.add(e.getMessage());
|
|
||||||
return attemptDownload(requestUrl, retryNumber + 1, errorList);
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
throw new CollectorServiceException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
|
|
||||||
log.debug("StatusCode: " + urlConn.getResponseMessage());
|
|
||||||
|
|
||||||
for (Map.Entry<String, List<String>> e : urlConn.getHeaderFields().entrySet()) {
|
|
||||||
if (e.getKey() != null) {
|
|
||||||
for (String v : e.getValue()) {
|
|
||||||
log.debug(" key: " + e.getKey() + " - value: " + v);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private int obtainRetryAfter(final Map<String, List<String>> headerMap) {
|
|
||||||
for (String key : headerMap.keySet()) {
|
|
||||||
if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0)
|
|
||||||
&& NumberUtils.isCreatable(headerMap.get(key).get(0))) {
|
|
||||||
return Integer
|
|
||||||
.parseInt(headerMap.get(key).get(0)) + 10;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
private String obtainNewLocation(final Map<String, List<String>> headerMap) throws CollectorServiceException {
|
|
||||||
for (String key : headerMap.keySet()) {
|
|
||||||
if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) {
|
|
||||||
return headerMap.get(key).get(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
throw new CollectorServiceException("The requested url has been MOVED, but 'location' param is MISSING");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* register for https scheme; this is a workaround and not intended for the use in trusted environments
|
|
||||||
*/
|
|
||||||
public void initTrustManager() {
|
|
||||||
final X509TrustManager tm = new X509TrustManager() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void checkClientTrusted(final X509Certificate[] xcs, final String string) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void checkServerTrusted(final X509Certificate[] xcs, final String string) {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public X509Certificate[] getAcceptedIssuers() {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
try {
|
|
||||||
final SSLContext ctx = SSLContext.getInstance("TLS");
|
|
||||||
ctx.init(null, new TrustManager[] {
|
|
||||||
tm
|
|
||||||
}, null);
|
|
||||||
HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory());
|
|
||||||
} catch (GeneralSecurityException e) {
|
|
||||||
log.fatal(e);
|
|
||||||
throw new IllegalStateException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMaxNumberOfRetry() {
|
|
||||||
return maxNumberOfRetry;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMaxNumberOfRetry(final int maxNumberOfRetry) {
|
|
||||||
this.maxNumberOfRetry = maxNumberOfRetry;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getDefaultDelay() {
|
|
||||||
return defaultDelay;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setDefaultDelay(final int defaultDelay) {
|
|
||||||
this.defaultDelay = defaultDelay;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getReadTimeOut() {
|
|
||||||
return readTimeOut;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setReadTimeOut(final int readTimeOut) {
|
|
||||||
this.readTimeOut = readTimeOut;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getResponseType() {
|
|
||||||
return responseType;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -17,7 +17,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
|
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project.utils;
|
||||||
import java.io.*;
|
import java.io.*;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -14,7 +15,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -43,6 +43,106 @@ public class GenerateNativeStoreSparkJob {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
|
private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
GenerateNativeStoreSparkJob.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
|
||||||
|
parser.parseArgument(args);
|
||||||
|
final ObjectMapper jsonMapper = new ObjectMapper();
|
||||||
|
final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class);
|
||||||
|
final long dateOfCollection = new Long(parser.get("dateOfCollection"));
|
||||||
|
|
||||||
|
Boolean isSparkSessionManaged = Optional
|
||||||
|
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||||
|
.map(Boolean::valueOf)
|
||||||
|
.orElse(Boolean.TRUE);
|
||||||
|
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
|
|
||||||
|
final Map<String, String> ongoingMap = new HashMap<>();
|
||||||
|
final Map<String, String> reportMap = new HashMap<>();
|
||||||
|
|
||||||
|
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
|
||||||
|
|
||||||
|
SparkConf conf = new SparkConf();
|
||||||
|
runWithSparkSession(
|
||||||
|
conf,
|
||||||
|
isSparkSessionManaged,
|
||||||
|
spark -> {
|
||||||
|
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||||
|
|
||||||
|
final JavaPairRDD<IntWritable, Text> inputRDD = sc
|
||||||
|
.sequenceFile(parser.get("input"), IntWritable.class, Text.class);
|
||||||
|
|
||||||
|
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
|
||||||
|
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
|
||||||
|
|
||||||
|
final MessageManager manager = new MessageManager(
|
||||||
|
parser.get("rabbitHost"),
|
||||||
|
parser.get("rabbitUser"),
|
||||||
|
parser.get("rabbitPassword"),
|
||||||
|
false,
|
||||||
|
false,
|
||||||
|
null);
|
||||||
|
|
||||||
|
final JavaRDD<MetadataRecord> mappeRDD = inputRDD
|
||||||
|
.map(
|
||||||
|
item -> parseRecord(
|
||||||
|
item._2().toString(),
|
||||||
|
parser.get("xpath"),
|
||||||
|
parser.get("encoding"),
|
||||||
|
provenance,
|
||||||
|
dateOfCollection,
|
||||||
|
totalItems,
|
||||||
|
invalidRecords))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
|
.distinct();
|
||||||
|
|
||||||
|
ongoingMap.put("ongoing", "0");
|
||||||
|
if (!test) {
|
||||||
|
manager
|
||||||
|
.sendMessage(
|
||||||
|
new Message(
|
||||||
|
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
|
||||||
|
parser.get("rabbitOngoingQueue"),
|
||||||
|
true,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
||||||
|
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
|
||||||
|
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
|
||||||
|
mdStoreRecords.add(mdstore.count());
|
||||||
|
ongoingMap.put("ongoing", "" + totalItems.value());
|
||||||
|
if (!test) {
|
||||||
|
manager
|
||||||
|
.sendMessage(
|
||||||
|
new Message(
|
||||||
|
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
|
||||||
|
parser.get("rabbitOngoingQueue"),
|
||||||
|
true,
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
mdstore.write().format("parquet").save(parser.get("output"));
|
||||||
|
reportMap.put("inputItem", "" + totalItems.value());
|
||||||
|
reportMap.put("invalidRecords", "" + invalidRecords.value());
|
||||||
|
reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
|
||||||
|
if (!test) {
|
||||||
|
manager
|
||||||
|
.sendMessage(
|
||||||
|
new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
|
||||||
|
parser.get("rabbitReportQueue"),
|
||||||
|
true,
|
||||||
|
false);
|
||||||
|
manager.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public static MetadataRecord parseRecord(
|
public static MetadataRecord parseRecord(
|
||||||
final String input,
|
final String input,
|
||||||
final String xpath,
|
final String xpath,
|
||||||
|
@ -73,103 +173,5 @@ public class GenerateNativeStoreSparkJob {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils
|
|
||||||
.toString(
|
|
||||||
GenerateNativeStoreSparkJob.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
|
|
||||||
parser.parseArgument(args);
|
|
||||||
final ObjectMapper jsonMapper = new ObjectMapper();
|
|
||||||
final Provenance provenance = jsonMapper.readValue(parser.get("provenance"), Provenance.class);
|
|
||||||
final long dateOfCollection = new Long(parser.get("dateOfCollection"));
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final Map<String, String> ongoingMap = new HashMap<>();
|
|
||||||
final Map<String, String> reportMap = new HashMap<>();
|
|
||||||
|
|
||||||
final boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
final JavaPairRDD<IntWritable, Text> inputRDD = sc
|
|
||||||
.sequenceFile(parser.get("input"), IntWritable.class, Text.class);
|
|
||||||
|
|
||||||
final LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
|
|
||||||
final LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
|
|
||||||
|
|
||||||
final MessageManager manager = new MessageManager(
|
|
||||||
parser.get("rabbitHost"),
|
|
||||||
parser.get("rabbitUser"),
|
|
||||||
parser.get("rabbitPassword"),
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
null);
|
|
||||||
|
|
||||||
final JavaRDD<MetadataRecord> mappeRDD = inputRDD
|
|
||||||
.map(
|
|
||||||
item -> parseRecord(
|
|
||||||
item._2().toString(),
|
|
||||||
parser.get("xpath"),
|
|
||||||
parser.get("encoding"),
|
|
||||||
provenance,
|
|
||||||
dateOfCollection,
|
|
||||||
totalItems,
|
|
||||||
invalidRecords))
|
|
||||||
.filter(Objects::nonNull)
|
|
||||||
.distinct();
|
|
||||||
|
|
||||||
ongoingMap.put("ongoing", "0");
|
|
||||||
if (!test) {
|
|
||||||
manager
|
|
||||||
.sendMessage(
|
|
||||||
new Message(
|
|
||||||
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
|
|
||||||
parser.get("rabbitOngoingQueue"),
|
|
||||||
true,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
|
|
||||||
final Encoder<MetadataRecord> encoder = Encoders.bean(MetadataRecord.class);
|
|
||||||
final Dataset<MetadataRecord> mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
|
|
||||||
final LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
|
|
||||||
mdStoreRecords.add(mdstore.count());
|
|
||||||
ongoingMap.put("ongoing", "" + totalItems.value());
|
|
||||||
if (!test) {
|
|
||||||
manager
|
|
||||||
.sendMessage(
|
|
||||||
new Message(
|
|
||||||
parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap),
|
|
||||||
parser.get("rabbitOngoingQueue"),
|
|
||||||
true,
|
|
||||||
false);
|
|
||||||
}
|
|
||||||
mdstore.write().format("parquet").save(parser.get("output"));
|
|
||||||
reportMap.put("inputItem", "" + totalItems.value());
|
|
||||||
reportMap.put("invalidRecords", "" + invalidRecords.value());
|
|
||||||
reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
|
|
||||||
if (!test) {
|
|
||||||
manager
|
|
||||||
.sendMessage(
|
|
||||||
new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap),
|
|
||||||
parser.get("rabbitReportQueue"),
|
|
||||||
true,
|
|
||||||
false);
|
|
||||||
manager.close();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,14 +6,15 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
|
||||||
|
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
|
||||||
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
|
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
import org.junit.jupiter.api.Disabled;
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException;
|
|
||||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
|
|
||||||
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
|
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
|
||||||
|
|
||||||
@Disabled
|
@Disabled
|
||||||
|
@ -30,7 +31,7 @@ public class EXCELParserTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test1() throws CollectorServiceException, IOException, InvalidFormatException, ClassNotFoundException,
|
public void test1() throws DnetCollectorException, IOException, InvalidFormatException, ClassNotFoundException,
|
||||||
IllegalAccessException, InstantiationException {
|
IllegalAccessException, InstantiationException {
|
||||||
|
|
||||||
EXCELParser excelParser = new EXCELParser();
|
EXCELParser excelParser = new EXCELParser();
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
package eu.dnetlib.dhp.actionmanager.project.httpconnector;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.collection.worker.DnetCollectorException;
|
||||||
|
import eu.dnetlib.dhp.collection.worker.utils.HttpConnector;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||||
|
@ -29,12 +31,12 @@ public class HttpConnectorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
||||||
public void testGetInputSource() throws CollectorServiceException {
|
public void testGetInputSource() throws DnetCollectorException {
|
||||||
System.out.println(connector.getInputSource(URL));
|
System.out.println(connector.getInputSource(URL));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGoodServers() throws CollectorServiceException {
|
public void testGoodServers() throws DnetCollectorException {
|
||||||
System.out.println(connector.getInputSource(URL_GOODSNI_SERVER));
|
System.out.println(connector.getInputSource(URL_GOODSNI_SERVER));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
|
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
|
||||||
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
xmlns:oai="http://www.openarchives.org/OAI/2.0/"
|
||||||
xmlns:oaf="http://namespace.openaire.eu/oaf"
|
xmlns:oaf="http://namespace.openaire.eu/oaf"
|
||||||
xmlns:eg="http://eu/dnetlib/trasform/extension"
|
xmlns:dnetFunction="http://eu/dnetlib/trasform/extension"
|
||||||
version="2.0"
|
version="2.0"
|
||||||
exclude-result-prefixes="xsl">
|
exclude-result-prefixes="xsl">
|
||||||
<xsl:template match="/">
|
<xsl:template match="/">
|
||||||
|
@ -9,7 +9,7 @@
|
||||||
<xsl:copy-of select="//oai:header"/>
|
<xsl:copy-of select="//oai:header"/>
|
||||||
<metadata>
|
<metadata>
|
||||||
<xsl:for-each select="//*[local-name()='subject']">
|
<xsl:for-each select="//*[local-name()='subject']">
|
||||||
<subject><xsl:value-of select="eg:clean(.,'dnet:languages')"/></subject>
|
<subject><xsl:value-of select="dnetFunction:clean(.,'dnet:languages')"/></subject>
|
||||||
</xsl:for-each>
|
</xsl:for-each>
|
||||||
</metadata>
|
</metadata>
|
||||||
<oaf:about>
|
<oaf:about>
|
||||||
|
|
|
@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
|
|
|
@ -7,7 +7,7 @@ import java.util.HashMap;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableConsumer;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Country;
|
import eu.dnetlib.dhp.schema.oaf.Country;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
|
|
|
@ -38,7 +38,7 @@ import org.dom4j.DocumentFactory;
|
||||||
import org.dom4j.DocumentHelper;
|
import org.dom4j.DocumentHelper;
|
||||||
import org.dom4j.Node;
|
import org.dom4j.Node;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.common.LicenseComparator;
|
import eu.dnetlib.dhp.schema.common.LicenseComparator;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
|
|
|
@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.graph.raw;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -12,8 +11,6 @@ import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -27,7 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
import eu.dnetlib.dhp.common.HdfsSupport;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
|
||||||
|
|
|
@ -57,7 +57,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.common.DbClient;
|
import eu.dnetlib.dhp.common.DbClient;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
|
import eu.dnetlib.dhp.oa.graph.raw.common.VerifyNsPrefixPredicate;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Context;
|
import eu.dnetlib.dhp.schema.oaf.Context;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
|
|
@ -18,7 +18,7 @@ import org.dom4j.Node;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.PacePerson;
|
import eu.dnetlib.dhp.common.PacePerson;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
|
|
|
@ -19,7 +19,7 @@ import org.dom4j.Node;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.PacePerson;
|
import eu.dnetlib.dhp.common.PacePerson;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||||
|
|
|
@ -18,7 +18,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication;
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
import eu.dnetlib.dhp.schema.oaf.Qualifier;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
|
@ -16,7 +16,7 @@ import org.mockito.Mock;
|
||||||
import org.mockito.junit.jupiter.MockitoExtension;
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
|
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||||
|
|
|
@ -22,7 +22,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
|
import eu.dnetlib.dhp.oa.graph.clean.CleaningFunctionTest;
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Author;
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
|
|
@ -27,7 +27,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.common.VocabularyGroup;
|
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
|
|
||||||
@ExtendWith(MockitoExtension.class)
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
|
Loading…
Reference in New Issue