enrichment steps #38

Merged
claudio.atzori merged 334 commits from miriam.baglioni/dnet-hadoop:master into enrichment_wfs 2020-08-11 16:40:26 +02:00
12 changed files with 771 additions and 4 deletions
Showing only changes of commit abc45f2708 - Show all commits

View File

@ -0,0 +1,20 @@
package eu.dnetlib.dhp.actionmanager.project;
import java.util.LinkedList;
public class CollectorPluginErrorLogList extends LinkedList<String> {
private static final long serialVersionUID = -6925786561303289704L;
@Override
public String toString() {
String log = new String();
int index = 0;
for (String errorMessage : this) {
log += String.format("Retry #%s: %s / ", index++, errorMessage);
}
return log;
}
}

View File

@ -0,0 +1,20 @@
package eu.dnetlib.dhp.actionmanager.project;
public class CollectorServiceException extends Exception {
private static final long serialVersionUID = 7523999812098059764L;
public CollectorServiceException(String string) {
super(string);
}
public CollectorServiceException(String string, Throwable exception) {
super(string, exception);
}
public CollectorServiceException(Throwable exception) {
super(exception);
}
}

View File

@ -1,5 +1,5 @@
package eu.dnetlib.dhp.actionset.h2020programme; package eu.dnetlib.dhp.actionmanager.project;
import java.io.*; import java.io.*;
import java.net.URL; import java.net.URL;
@ -24,7 +24,7 @@ public class GetFile {
.toString( .toString(
GetFile.class GetFile.class
.getResourceAsStream( .getResourceAsStream(
"/eu/dnetlib/dhp/actionset/h2020programme/parameters.json"))); "/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -0,0 +1,240 @@
package eu.dnetlib.dhp.actionmanager.project;
import java.io.IOException;
import java.io.InputStream;
import java.net.*;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.util.List;
import java.util.Map;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author jochen, michele, andrea
*/
public class HttpConnector {
private static final Log log = LogFactory.getLog(HttpConnector.class);
private int maxNumberOfRetry = 6;
private int defaultDelay = 120; // seconds
private int readTimeOut = 120; // seconds
private String responseType = null;
private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)";
public HttpConnector() {
CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL));
}
/**
* Given the URL returns the content via HTTP GET
*
* @param requestUrl the URL
* @return the content of the downloaded resource
* @throws CollectorServiceException when retrying more than maxNumberOfRetry times
*/
public String getInputSource(final String requestUrl) throws CollectorServiceException {
return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
}
/**
* Given the URL returns the content as a stream via HTTP GET
*
* @param requestUrl the URL
* @return the content of the downloaded resource as InputStream
* @throws CollectorServiceException when retrying more than maxNumberOfRetry times
*/
public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorServiceException {
return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
}
private String attemptDownlaodAsString(final String requestUrl, final int retryNumber,
final CollectorPluginErrorLogList errorList)
throws CollectorServiceException {
try {
InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
try {
return IOUtils.toString(s);
} catch (IOException e) {
log.error("error while retrieving from http-connection occured: " + requestUrl, e);
Thread.sleep(defaultDelay * 1000);
errorList.add(e.getMessage());
return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList);
} finally {
IOUtils.closeQuietly(s);
}
} catch (InterruptedException e) {
throw new CollectorServiceException(e);
}
}
private InputStream attemptDownload(final String requestUrl, final int retryNumber,
final CollectorPluginErrorLogList errorList)
throws CollectorServiceException {
if (retryNumber > maxNumberOfRetry) {
throw new CollectorServiceException("Max number of retries exceeded. Cause: \n " + errorList);
}
log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
try {
InputStream input = null;
try {
final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection();
urlConn.setInstanceFollowRedirects(false);
urlConn.setReadTimeout(readTimeOut * 1000);
urlConn.addRequestProperty("User-Agent", userAgent);
if (log.isDebugEnabled()) {
logHeaderFields(urlConn);
}
int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
log.warn("waiting and repeating request after " + retryAfter + " sec.");
Thread.sleep(retryAfter * 1000);
errorList.add("503 Service Unavailable");
urlConn.disconnect();
return attemptDownload(requestUrl, retryNumber + 1, errorList);
} else if ((urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM)
|| (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP)) {
final String newUrl = obtainNewLocation(urlConn.getHeaderFields());
log.debug("The requested url has been moved to " + newUrl);
errorList
.add(
String
.format(
"%s %s. Moved to: %s", urlConn.getResponseCode(), urlConn.getResponseMessage(),
newUrl));
urlConn.disconnect();
return attemptDownload(newUrl, retryNumber + 1, errorList);
} else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
log
.error(
String
.format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
Thread.sleep(defaultDelay * 1000);
errorList.add(String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
urlConn.disconnect();
return attemptDownload(requestUrl, retryNumber + 1, errorList);
} else {
input = urlConn.getInputStream();
responseType = urlConn.getContentType();
return input;
}
} catch (IOException e) {
log.error("error while retrieving from http-connection occured: " + requestUrl, e);
Thread.sleep(defaultDelay * 1000);
errorList.add(e.getMessage());
return attemptDownload(requestUrl, retryNumber + 1, errorList);
}
} catch (InterruptedException e) {
throw new CollectorServiceException(e);
}
}
private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
log.debug("StatusCode: " + urlConn.getResponseMessage());
for (Map.Entry<String, List<String>> e : urlConn.getHeaderFields().entrySet()) {
if (e.getKey() != null) {
for (String v : e.getValue()) {
log.debug(" key: " + e.getKey() + " - value: " + v);
}
}
}
}
private int obtainRetryAfter(final Map<String, List<String>> headerMap) {
for (String key : headerMap.keySet()) {
if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0)
&& NumberUtils.isCreatable(headerMap.get(key).get(0))) {
return Integer
.parseInt(headerMap.get(key).get(0)) + 10;
}
}
return -1;
}
private String obtainNewLocation(final Map<String, List<String>> headerMap) throws CollectorServiceException {
for (String key : headerMap.keySet()) {
if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) {
return headerMap.get(key).get(0);
}
}
throw new CollectorServiceException("The requested url has been MOVED, but 'location' param is MISSING");
}
/**
* register for https scheme; this is a workaround and not intended for the use in trusted environments
*/
public void initTrustManager() {
final X509TrustManager tm = new X509TrustManager() {
@Override
public void checkClientTrusted(final X509Certificate[] xcs, final String string) {
}
@Override
public void checkServerTrusted(final X509Certificate[] xcs, final String string) {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
};
try {
final SSLContext ctx = SSLContext.getInstance("TLS");
ctx.init(null, new TrustManager[] {
tm
}, null);
HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory());
} catch (GeneralSecurityException e) {
log.fatal(e);
throw new IllegalStateException(e);
}
}
public int getMaxNumberOfRetry() {
return maxNumberOfRetry;
}
public void setMaxNumberOfRetry(final int maxNumberOfRetry) {
this.maxNumberOfRetry = maxNumberOfRetry;
}
public int getDefaultDelay() {
return defaultDelay;
}
public void setDefaultDelay(final int defaultDelay) {
this.defaultDelay = defaultDelay;
}
public int getReadTimeOut() {
return readTimeOut;
}
public void setReadTimeOut(final int readTimeOut) {
this.readTimeOut = readTimeOut;
}
public String getResponseType() {
return responseType;
}
}

View File

@ -0,0 +1,139 @@
package eu.dnetlib.dhp.actionmanager.project;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.common.RelationInverse;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class PrepareProjects implements Closeable {
private static final Log log = LogFactory.getLog(PrepareProjects.class);
private final Configuration conf;
private final BufferedWriter writer;
private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final HttpConnector httpConnector;
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
IOUtils
.toString(
PrepareProjects.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
parser.parseArgument(args);
final String fileURL = parser.get("fileURL");
final String hdfsPath = parser.get("hdfsPath");
final String hdfsNameNode = parser.get("hdfsNameNode");
try (final PrepareProjects prepareProjects = new PrepareProjects(hdfsPath, hdfsNameNode)) {
log.info("Getting projects...");
prepareProjects.execute(fileURL);
}
}
public void execute(final String fileURL) throws Exception {
String projects = httpConnector.getInputSource(fileURL);
final CSVFormat format = CSVFormat.EXCEL
.withHeader()
.withDelimiter(';')
.withQuote('"')
.withTrim();
final CSVParser parser = CSVParser.parse(projects, format);
final Set<String> headers = parser.getHeaderMap().keySet();
}
public List<Relation> processBlacklistEntry(ResultSet rs) {
try {
Relation direct = new Relation();
Relation inverse = new Relation();
String source_prefix = ModelSupport.entityIdPrefix.get(rs.getString("source_type"));
String target_prefix = ModelSupport.entityIdPrefix.get(rs.getString("target_type"));
String source_direct = source_prefix + "|" + rs.getString("source");
direct.setSource(source_direct);
inverse.setTarget(source_direct);
String target_direct = target_prefix + "|" + rs.getString("target");
direct.setTarget(target_direct);
inverse.setSource(target_direct);
String encoding = rs.getString("relationship");
RelationInverse ri = ModelSupport.relationInverseMap.get(encoding);
direct.setRelClass(ri.getRelation());
inverse.setRelClass(ri.getInverse());
direct.setRelType(ri.getRelType());
inverse.setRelType(ri.getRelType());
direct.setSubRelType(ri.getSubReltype());
inverse.setSubRelType(ri.getSubReltype());
return Arrays.asList(direct, inverse);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
writer.close();
}
public PrepareProjects(
final String hdfsPath, String hdfsNameNode)
throws Exception {
this.conf = new Configuration();
this.conf.set("fs.defaultFS", hdfsNameNode);
this.httpConnector = new HttpConnector();
FileSystem fileSystem = FileSystem.get(this.conf);
Path hdfsWritePath = new Path(hdfsPath);
FSDataOutputStream fsDataOutputStream = null;
if (fileSystem.exists(hdfsWritePath)) {
fsDataOutputStream = fileSystem.append(hdfsWritePath);
} else {
fsDataOutputStream = fileSystem.create(hdfsWritePath);
}
this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
}
protected void writeRelation(final Relation r) {
try {
writer.write(OBJECT_MAPPER.writeValueAsString(r));
writer.newLine();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,52 @@
package eu.dnetlib.dhp.actionmanager.project;
import java.io.Serializable;
public class Programme implements Serializable {
private String rcn;
private String code;
private String title;
private String shortTitle;
private String language;
public String getRcn() {
return rcn;
}
public void setRcn(String rcn) {
this.rcn = rcn;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getShortTitle() {
return shortTitle;
}
public void setShortTitle(String shortTitle) {
this.shortTitle = shortTitle;
}
public String getLanguage() {
return language;
}
public void setLanguage(String language) {
this.language = language;
}
}

View File

@ -0,0 +1,196 @@
package eu.dnetlib.dhp.actionmanager.project;
import java.io.Serializable;
public class Project implements Serializable {
private String rcn;
private String id;
private String acronym;
private String status;
private String programme;
private String topics;
private String frameworkProgramme;
private String title;
private String startDate;
private String endDate;
private String projectUrl;
private String objective;
private String totalCost;
private String ecMaxContribution;
private String call;
private String fundingScheme;
private String coordinator;
private String coordinatorCountry;
private String participants;
private String participantCountries;
private String subjects;
public String getRcn() {
return rcn;
}
public void setRcn(String rcn) {
this.rcn = rcn;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAcronym() {
return acronym;
}
public void setAcronym(String acronym) {
this.acronym = acronym;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getProgramme() {
return programme;
}
public void setProgramme(String programme) {
this.programme = programme;
}
public String getTopics() {
return topics;
}
public void setTopics(String topics) {
this.topics = topics;
}
public String getFrameworkProgramme() {
return frameworkProgramme;
}
public void setFrameworkProgramme(String frameworkProgramme) {
this.frameworkProgramme = frameworkProgramme;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getStartDate() {
return startDate;
}
public void setStartDate(String startDate) {
this.startDate = startDate;
}
public String getEndDate() {
return endDate;
}
public void setEndDate(String endDate) {
this.endDate = endDate;
}
public String getProjectUrl() {
return projectUrl;
}
public void setProjectUrl(String projectUrl) {
this.projectUrl = projectUrl;
}
public String getObjective() {
return objective;
}
public void setObjective(String objective) {
this.objective = objective;
}
public String getTotalCost() {
return totalCost;
}
public void setTotalCost(String totalCost) {
this.totalCost = totalCost;
}
public String getEcMaxContribution() {
return ecMaxContribution;
}
public void setEcMaxContribution(String ecMaxContribution) {
this.ecMaxContribution = ecMaxContribution;
}
public String getCall() {
return call;
}
public void setCall(String call) {
this.call = call;
}
public String getFundingScheme() {
return fundingScheme;
}
public void setFundingScheme(String fundingScheme) {
this.fundingScheme = fundingScheme;
}
public String getCoordinator() {
return coordinator;
}
public void setCoordinator(String coordinator) {
this.coordinator = coordinator;
}
public String getCoordinatorCountry() {
return coordinatorCountry;
}
public void setCoordinatorCountry(String coordinatorCountry) {
this.coordinatorCountry = coordinatorCountry;
}
public String getParticipants() {
return participants;
}
public void setParticipants(String participants) {
this.participants = participants;
}
public String getParticipantCountries() {
return participantCountries;
}
public void setParticipantCountries(String participantCountries) {
this.participantCountries = participantCountries;
}
public String getSubjects() {
return subjects;
}
public void setSubjects(String subjects) {
this.subjects = subjects;
}
}

View File

@ -0,0 +1,74 @@
package eu.dnetlib.dhp.actionmanager.project;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
public class SparkAtomicActionJob {
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
SparkAtomicActionJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String projectPath = parser.get("projectPath");
log.info("projectPath: {}", projectPath);
final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath);
final String programmePath = parser.get("programmePath");
log.info("programmePath {}: ", programmePath);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
getAtomicActions(
spark,
projectPath,
programmePath,
outputPath);
});
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static void getAtomicActions(SparkSession spark, String projectPatj, String programmePath,
String outputPath) {
}
}

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "pjfu",
"paramLongName": "projectsFileURL",
"paramDescription": "the URL from where to get the projects file",
"paramRequired": true
},
{
"paramName": "pfu",
"paramLongName": "programmeFileURL",
"paramDescription": "the URL from where to get the programme file",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet",
"paramRequired": true
}
]

View File

@ -33,7 +33,7 @@
<java> <java>
<main-class>eu.dnetlib.dhp.actionset.h2020programme.GetFile</main-class> <main-class>eu.dnetlib.dhp.actionset.h2020programme.GetFile</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileUrl</arg><arg>${projectFileURL}</arg> <arg>--fileURL</arg><arg>${projectFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/projects.csv</arg> <arg>--hdfsPath</arg><arg>${workingDir}/projects.csv</arg>
</java> </java>
<ok to="shell_get_programme_file"/> <ok to="shell_get_programme_file"/>
@ -44,7 +44,7 @@
<java> <java>
<main-class>eu.dnetlib.dhp.actionset.h2020programme.GetFile</main-class> <main-class>eu.dnetlib.dhp.actionset.h2020programme.GetFile</main-class>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
<arg>--fileUrl</arg><arg>${programmeFileURL}</arg> <arg>--fileURL</arg><arg>${programmeFileURL}</arg>
<arg>--hdfsPath</arg><arg>${workingDir}/programme.csv</arg> <arg>--hdfsPath</arg><arg>${workingDir}/programme.csv</arg>
</java> </java>
<ok to="End"/> <ok to="End"/>