From abc45f2708e7497cd16d1cc09041737be6d07f55 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 18 May 2020 13:04:06 +0200 Subject: [PATCH] added dnet-45 HttpConnector and related Classes, produced the POJO for projects and programme --- .../project/CollectorPluginErrorLogList.java | 20 ++ .../project/CollectorServiceException.java | 20 ++ .../project}/GetFile.java | 4 +- .../actionmanager/project/HttpConnector.java | 240 ++++++++++++++++++ .../project/PrepareProjects.java | 139 ++++++++++ .../dhp/actionmanager/project/Programme.java | 52 ++++ .../dhp/actionmanager/project/Project.java | 196 ++++++++++++++ .../project/SparkAtomicActionJob.java | 74 ++++++ .../project/action_set_parameters.json | 26 ++ .../project}/oozie_app/workflow.xml | 4 +- .../project}/parameters.json | 0 .../h2020programme/action_set_parameters.json | 0 12 files changed, 771 insertions(+), 4 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorPluginErrorLogList.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorServiceException.java rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/{actionset/h2020programme => actionmanager/project}/GetFile.java (92%) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/HttpConnector.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Programme.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Project.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java create mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionset/h2020programme => actionmanager/project}/oozie_app/workflow.xml (95%) rename dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/{actionset/h2020programme => actionmanager/project}/parameters.json (100%) delete mode 100644 dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/action_set_parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorPluginErrorLogList.java new file mode 100644 index 000000000..bc00e4604 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorPluginErrorLogList.java @@ -0,0 +1,20 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +import java.util.LinkedList; + +public class CollectorPluginErrorLogList extends LinkedList { + + private static final long serialVersionUID = -6925786561303289704L; + + @Override + public String toString() { + String log = new String(); + int index = 0; + for (String errorMessage : this) { + log += String.format("Retry #%s: %s / ", index++, errorMessage); + } + return log; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorServiceException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorServiceException.java new file mode 100644 index 000000000..a417de50d --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/CollectorServiceException.java @@ -0,0 +1,20 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +public class CollectorServiceException extends Exception { + + private static final long serialVersionUID = 7523999812098059764L; + + public CollectorServiceException(String string) { + super(string); + } + + public CollectorServiceException(String string, Throwable exception) { + super(string, exception); + } + + public CollectorServiceException(Throwable exception) { + super(exception); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionset/h2020programme/GetFile.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/GetFile.java similarity index 92% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionset/h2020programme/GetFile.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/GetFile.java index 2fed1a0e3..bbf59a20f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionset/h2020programme/GetFile.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/GetFile.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.actionset.h2020programme; +package eu.dnetlib.dhp.actionmanager.project; import java.io.*; import java.net.URL; @@ -24,7 +24,7 @@ public class GetFile { .toString( GetFile.class .getResourceAsStream( - "/eu/dnetlib/dhp/actionset/h2020programme/parameters.json"))); + "/eu/dnetlib/dhp/actionmanager/project/parameters.json"))); Configuration conf = new Configuration(); diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/HttpConnector.java new file mode 100644 index 000000000..63f67f145 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/HttpConnector.java @@ -0,0 +1,240 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +import java.io.IOException; +import java.io.InputStream; +import java.net.*; +import java.security.GeneralSecurityException; +import java.security.cert.X509Certificate; +import java.util.List; +import java.util.Map; + +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @author jochen, michele, andrea + */ +public class HttpConnector { + + private static final Log log = LogFactory.getLog(HttpConnector.class); + + private int maxNumberOfRetry = 6; + private int defaultDelay = 120; // seconds + private int readTimeOut = 120; // seconds + + private String responseType = null; + + private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)"; + + public HttpConnector() { + CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL)); + } + + /** + * Given the URL returns the content via HTTP GET + * + * @param requestUrl the URL + * @return the content of the downloaded resource + * @throws CollectorServiceException when retrying more than maxNumberOfRetry times + */ + public String getInputSource(final String requestUrl) throws CollectorServiceException { + return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList()); + } + + /** + * Given the URL returns the content as a stream via HTTP GET + * + * @param requestUrl the URL + * @return the content of the downloaded resource as InputStream + * @throws CollectorServiceException when retrying more than maxNumberOfRetry times + */ + public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorServiceException { + return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); + } + + private String attemptDownlaodAsString(final String requestUrl, final int retryNumber, + final CollectorPluginErrorLogList errorList) + throws CollectorServiceException { + try { + InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList()); + try { + return IOUtils.toString(s); + } catch (IOException e) { + log.error("error while retrieving from http-connection occured: " + requestUrl, e); + Thread.sleep(defaultDelay * 1000); + errorList.add(e.getMessage()); + return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList); + } finally { + IOUtils.closeQuietly(s); + } + } catch (InterruptedException e) { + throw new CollectorServiceException(e); + } + } + + private InputStream attemptDownload(final String requestUrl, final int retryNumber, + final CollectorPluginErrorLogList errorList) + throws CollectorServiceException { + + if (retryNumber > maxNumberOfRetry) { + throw new CollectorServiceException("Max number of retries exceeded. Cause: \n " + errorList); + } + + log.debug("Downloading " + requestUrl + " - try: " + retryNumber); + try { + InputStream input = null; + + try { + final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection(); + urlConn.setInstanceFollowRedirects(false); + urlConn.setReadTimeout(readTimeOut * 1000); + urlConn.addRequestProperty("User-Agent", userAgent); + + if (log.isDebugEnabled()) { + logHeaderFields(urlConn); + } + + int retryAfter = obtainRetryAfter(urlConn.getHeaderFields()); + if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) { + log.warn("waiting and repeating request after " + retryAfter + " sec."); + Thread.sleep(retryAfter * 1000); + errorList.add("503 Service Unavailable"); + urlConn.disconnect(); + return attemptDownload(requestUrl, retryNumber + 1, errorList); + } else if ((urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM) + || (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP)) { + final String newUrl = obtainNewLocation(urlConn.getHeaderFields()); + log.debug("The requested url has been moved to " + newUrl); + errorList + .add( + String + .format( + "%s %s. Moved to: %s", urlConn.getResponseCode(), urlConn.getResponseMessage(), + newUrl)); + urlConn.disconnect(); + return attemptDownload(newUrl, retryNumber + 1, errorList); + } else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) { + log + .error( + String + .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); + Thread.sleep(defaultDelay * 1000); + errorList.add(String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage())); + urlConn.disconnect(); + return attemptDownload(requestUrl, retryNumber + 1, errorList); + } else { + input = urlConn.getInputStream(); + responseType = urlConn.getContentType(); + return input; + } + } catch (IOException e) { + log.error("error while retrieving from http-connection occured: " + requestUrl, e); + Thread.sleep(defaultDelay * 1000); + errorList.add(e.getMessage()); + return attemptDownload(requestUrl, retryNumber + 1, errorList); + } + } catch (InterruptedException e) { + throw new CollectorServiceException(e); + } + } + + private void logHeaderFields(final HttpURLConnection urlConn) throws IOException { + log.debug("StatusCode: " + urlConn.getResponseMessage()); + + for (Map.Entry> e : urlConn.getHeaderFields().entrySet()) { + if (e.getKey() != null) { + for (String v : e.getValue()) { + log.debug(" key: " + e.getKey() + " - value: " + v); + } + } + } + } + + private int obtainRetryAfter(final Map> headerMap) { + for (String key : headerMap.keySet()) { + if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0) + && NumberUtils.isCreatable(headerMap.get(key).get(0))) { + return Integer + .parseInt(headerMap.get(key).get(0)) + 10; + } + } + return -1; + } + + private String obtainNewLocation(final Map> headerMap) throws CollectorServiceException { + for (String key : headerMap.keySet()) { + if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) { + return headerMap.get(key).get(0); + } + } + throw new CollectorServiceException("The requested url has been MOVED, but 'location' param is MISSING"); + } + + /** + * register for https scheme; this is a workaround and not intended for the use in trusted environments + */ + public void initTrustManager() { + final X509TrustManager tm = new X509TrustManager() { + + @Override + public void checkClientTrusted(final X509Certificate[] xcs, final String string) { + } + + @Override + public void checkServerTrusted(final X509Certificate[] xcs, final String string) { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return null; + } + }; + try { + final SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(null, new TrustManager[] { + tm + }, null); + HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory()); + } catch (GeneralSecurityException e) { + log.fatal(e); + throw new IllegalStateException(e); + } + } + + public int getMaxNumberOfRetry() { + return maxNumberOfRetry; + } + + public void setMaxNumberOfRetry(final int maxNumberOfRetry) { + this.maxNumberOfRetry = maxNumberOfRetry; + } + + public int getDefaultDelay() { + return defaultDelay; + } + + public void setDefaultDelay(final int defaultDelay) { + this.defaultDelay = defaultDelay; + } + + public int getReadTimeOut() { + return readTimeOut; + } + + public void setReadTimeOut(final int readTimeOut) { + this.readTimeOut = readTimeOut; + } + + public String getResponseType() { + return responseType; + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java new file mode 100644 index 000000000..8955edeb4 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java @@ -0,0 +1,139 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVFormat; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.common.RelationInverse; +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class PrepareProjects implements Closeable { + private static final Log log = LogFactory.getLog(PrepareProjects.class); + private final Configuration conf; + private final BufferedWriter writer; + private final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final HttpConnector httpConnector; + + public static void main(final String[] args) throws Exception { + final ArgumentApplicationParser parser = new ArgumentApplicationParser( + IOUtils + .toString( + PrepareProjects.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/parameters.json"))); + + parser.parseArgument(args); + + final String fileURL = parser.get("fileURL"); + final String hdfsPath = parser.get("hdfsPath"); + final String hdfsNameNode = parser.get("hdfsNameNode"); + + try (final PrepareProjects prepareProjects = new PrepareProjects(hdfsPath, hdfsNameNode)) { + + log.info("Getting projects..."); + prepareProjects.execute(fileURL); + + } + } + + public void execute(final String fileURL) throws Exception { + + String projects = httpConnector.getInputSource(fileURL); + final CSVFormat format = CSVFormat.EXCEL + .withHeader() + .withDelimiter(';') + .withQuote('"') + .withTrim(); + final CSVParser parser = CSVParser.parse(projects, format); + final Set headers = parser.getHeaderMap().keySet(); + } + + public List processBlacklistEntry(ResultSet rs) { + try { + Relation direct = new Relation(); + Relation inverse = new Relation(); + + String source_prefix = ModelSupport.entityIdPrefix.get(rs.getString("source_type")); + String target_prefix = ModelSupport.entityIdPrefix.get(rs.getString("target_type")); + + String source_direct = source_prefix + "|" + rs.getString("source"); + direct.setSource(source_direct); + inverse.setTarget(source_direct); + + String target_direct = target_prefix + "|" + rs.getString("target"); + direct.setTarget(target_direct); + inverse.setSource(target_direct); + + String encoding = rs.getString("relationship"); + RelationInverse ri = ModelSupport.relationInverseMap.get(encoding); + direct.setRelClass(ri.getRelation()); + inverse.setRelClass(ri.getInverse()); + direct.setRelType(ri.getRelType()); + inverse.setRelType(ri.getRelType()); + direct.setSubRelType(ri.getSubReltype()); + inverse.setSubRelType(ri.getSubReltype()); + + return Arrays.asList(direct, inverse); + + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + writer.close(); + } + + public PrepareProjects( + final String hdfsPath, String hdfsNameNode) + throws Exception { + + this.conf = new Configuration(); + this.conf.set("fs.defaultFS", hdfsNameNode); + this.httpConnector = new HttpConnector(); + FileSystem fileSystem = FileSystem.get(this.conf); + Path hdfsWritePath = new Path(hdfsPath); + FSDataOutputStream fsDataOutputStream = null; + if (fileSystem.exists(hdfsWritePath)) { + fsDataOutputStream = fileSystem.append(hdfsWritePath); + } else { + fsDataOutputStream = fileSystem.create(hdfsWritePath); + } + + this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8)); + + } + + protected void writeRelation(final Relation r) { + try { + writer.write(OBJECT_MAPPER.writeValueAsString(r)); + writer.newLine(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Programme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Programme.java new file mode 100644 index 000000000..20877b1a1 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Programme.java @@ -0,0 +1,52 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +import java.io.Serializable; + +public class Programme implements Serializable { + private String rcn; + private String code; + private String title; + private String shortTitle; + private String language; + + public String getRcn() { + return rcn; + } + + public void setRcn(String rcn) { + this.rcn = rcn; + } + + public String getCode() { + return code; + } + + public void setCode(String code) { + this.code = code; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getShortTitle() { + return shortTitle; + } + + public void setShortTitle(String shortTitle) { + this.shortTitle = shortTitle; + } + + public String getLanguage() { + return language; + } + + public void setLanguage(String language) { + this.language = language; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Project.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Project.java new file mode 100644 index 000000000..abee7f861 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/Project.java @@ -0,0 +1,196 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +import java.io.Serializable; + +public class Project implements Serializable { + private String rcn; + private String id; + private String acronym; + private String status; + private String programme; + private String topics; + private String frameworkProgramme; + private String title; + private String startDate; + private String endDate; + private String projectUrl; + private String objective; + private String totalCost; + private String ecMaxContribution; + private String call; + private String fundingScheme; + private String coordinator; + private String coordinatorCountry; + private String participants; + private String participantCountries; + private String subjects; + + public String getRcn() { + return rcn; + } + + public void setRcn(String rcn) { + this.rcn = rcn; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getAcronym() { + return acronym; + } + + public void setAcronym(String acronym) { + this.acronym = acronym; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getProgramme() { + return programme; + } + + public void setProgramme(String programme) { + this.programme = programme; + } + + public String getTopics() { + return topics; + } + + public void setTopics(String topics) { + this.topics = topics; + } + + public String getFrameworkProgramme() { + return frameworkProgramme; + } + + public void setFrameworkProgramme(String frameworkProgramme) { + this.frameworkProgramme = frameworkProgramme; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getStartDate() { + return startDate; + } + + public void setStartDate(String startDate) { + this.startDate = startDate; + } + + public String getEndDate() { + return endDate; + } + + public void setEndDate(String endDate) { + this.endDate = endDate; + } + + public String getProjectUrl() { + return projectUrl; + } + + public void setProjectUrl(String projectUrl) { + this.projectUrl = projectUrl; + } + + public String getObjective() { + return objective; + } + + public void setObjective(String objective) { + this.objective = objective; + } + + public String getTotalCost() { + return totalCost; + } + + public void setTotalCost(String totalCost) { + this.totalCost = totalCost; + } + + public String getEcMaxContribution() { + return ecMaxContribution; + } + + public void setEcMaxContribution(String ecMaxContribution) { + this.ecMaxContribution = ecMaxContribution; + } + + public String getCall() { + return call; + } + + public void setCall(String call) { + this.call = call; + } + + public String getFundingScheme() { + return fundingScheme; + } + + public void setFundingScheme(String fundingScheme) { + this.fundingScheme = fundingScheme; + } + + public String getCoordinator() { + return coordinator; + } + + public void setCoordinator(String coordinator) { + this.coordinator = coordinator; + } + + public String getCoordinatorCountry() { + return coordinatorCountry; + } + + public void setCoordinatorCountry(String coordinatorCountry) { + this.coordinatorCountry = coordinatorCountry; + } + + public String getParticipants() { + return participants; + } + + public void setParticipants(String participants) { + this.participants = participants; + } + + public String getParticipantCountries() { + return participantCountries; + } + + public void setParticipantCountries(String participantCountries) { + this.participantCountries = participantCountries; + } + + public String getSubjects() { + return subjects; + } + + public void setSubjects(String subjects) { + this.subjects = subjects; + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java new file mode 100644 index 000000000..b8703378e --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -0,0 +1,74 @@ + +package eu.dnetlib.dhp.actionmanager.project; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + +import java.util.Optional; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; + +public class SparkAtomicActionJob { + private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + public static void main(String[] args) throws Exception { + + String jsonConfiguration = IOUtils + .toString( + SparkAtomicActionJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json")); + + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + + parser.parseArgument(args); + + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + + String projectPath = parser.get("projectPath"); + log.info("projectPath: {}", projectPath); + + final String outputPath = parser.get("outputPath"); + log.info("outputPath {}: ", outputPath); + + final String programmePath = parser.get("programmePath"); + log.info("programmePath {}: ", programmePath); + + SparkConf conf = new SparkConf(); + + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + getAtomicActions( + spark, + projectPath, + programmePath, + outputPath); + }); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } + + private static void getAtomicActions(SparkSession spark, String projectPatj, String programmePath, + String outputPath) { + + } +} diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json new file mode 100644 index 000000000..ca9ae9e97 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json @@ -0,0 +1,26 @@ +[ + { + "paramName": "issm", + "paramLongName": "isSparkSessionManaged", + "paramDescription": "when true will stop SparkSession after job execution", + "paramRequired": false +}, +{ +"paramName": "pjfu", +"paramLongName": "projectsFileURL", +"paramDescription": "the URL from where to get the projects file", +"paramRequired": true +}, +{ +"paramName": "pfu", +"paramLongName": "programmeFileURL", +"paramDescription": "the URL from where to get the programme file", +"paramRequired": true +}, +{ +"paramName": "o", +"paramLongName": "outputPath", +"paramDescription": "the path of the new ActionSet", +"paramRequired": true +} +] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml similarity index 95% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/workflow.xml rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml index 9b200c2a9..5bfa2e7c4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml @@ -33,7 +33,7 @@ eu.dnetlib.dhp.actionset.h2020programme.GetFile --hdfsNameNode${nameNode} - --fileUrl${projectFileURL} + --fileURL${projectFileURL} --hdfsPath${workingDir}/projects.csv @@ -44,7 +44,7 @@ eu.dnetlib.dhp.actionset.h2020programme.GetFile --hdfsNameNode${nameNode} - --fileUrl${programmeFileURL} + --fileURL${programmeFileURL} --hdfsPath${workingDir}/programme.csv diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/parameters.json rename to dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/action_set_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionset/h2020programme/action_set_parameters.json deleted file mode 100644 index e69de29bb..000000000