diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 327c33d6f2..8bae191d38 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index 873046e080..ad8cd57b4a 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 8099a72e4e..08f5de9ee8 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index a700a29187..369e25b24b 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index c7cb11b085..60e66f45a2 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
../
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
similarity index 99%
rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java
rename to dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
index 6e474f2f38..1909ddcca6 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/PacePerson.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
@@ -1,5 +1,5 @@
-package eu.dnetlib.dhp.oa.graph.raw.common;
+package eu.dnetlib.dhp.common;
import java.nio.charset.StandardCharsets;
import java.text.Normalizer;
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index fe5d0c431a..5e864cf940 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/LicenseComparator.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/LicenseComparator.java
new file mode 100644
index 0000000000..db523ad1ac
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/LicenseComparator.java
@@ -0,0 +1,69 @@
+
+package eu.dnetlib.dhp.schema.common;
+
+import java.util.Comparator;
+
+import eu.dnetlib.dhp.schema.oaf.Qualifier;
+
+public class LicenseComparator implements Comparator {
+
+ @Override
+ public int compare(Qualifier left, Qualifier right) {
+
+ if (left == null && right == null)
+ return 0;
+ if (left == null)
+ return 1;
+ if (right == null)
+ return -1;
+
+ String lClass = left.getClassid();
+ String rClass = right.getClassid();
+
+ if (lClass.equals(rClass))
+ return 0;
+
+ if (lClass.equals("OPEN SOURCE"))
+ return -1;
+ if (rClass.equals("OPEN SOURCE"))
+ return 1;
+
+ if (lClass.equals("OPEN"))
+ return -1;
+ if (rClass.equals("OPEN"))
+ return 1;
+
+ if (lClass.equals("6MONTHS"))
+ return -1;
+ if (rClass.equals("6MONTHS"))
+ return 1;
+
+ if (lClass.equals("12MONTHS"))
+ return -1;
+ if (rClass.equals("12MONTHS"))
+ return 1;
+
+ if (lClass.equals("EMBARGO"))
+ return -1;
+ if (rClass.equals("EMBARGO"))
+ return 1;
+
+ if (lClass.equals("RESTRICTED"))
+ return -1;
+ if (rClass.equals("RESTRICTED"))
+ return 1;
+
+ if (lClass.equals("CLOSED"))
+ return -1;
+ if (rClass.equals("CLOSED"))
+ return 1;
+
+ if (lClass.equals("UNKNOWN"))
+ return -1;
+ if (rClass.equals("UNKNOWN"))
+ return 1;
+
+ // Else (but unlikely), lexicographical ordering will do.
+ return lClass.compareTo(rClass);
+ }
+}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java
index cc77e1ea0c..9d572ee30a 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/DataInfo.java
@@ -8,7 +8,7 @@ public class DataInfo implements Serializable {
private Boolean invisible = false;
private Boolean inferred;
- private Boolean deletedbyinference;
+ private Boolean deletedbyinference = false;
private String trust;
private String inferenceprovenance;
private Qualifier provenanceaction;
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
index 1a85c6842d..8358bc4b37 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Field.java
@@ -2,6 +2,7 @@
package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
+import java.util.Objects;
public class Field implements Serializable {
@@ -39,6 +40,6 @@ public class Field implements Serializable {
if (getClass() != obj.getClass())
return false;
Field other = (Field) obj;
- return getValue().equals(other.getValue());
+ return Objects.equals(getValue(), other.getValue());
}
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
index 09742748d7..2823ee49dd 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java
@@ -106,6 +106,7 @@ public abstract class OafEntity extends Oaf implements Serializable {
.stream(lists)
.filter(Objects::nonNull)
.flatMap(List::stream)
+ .filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java
new file mode 100644
index 0000000000..c5259d07e5
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java
@@ -0,0 +1,39 @@
+
+package eu.dnetlib.dhp.schema.oaf;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class Programme implements Serializable {
+ private String code;
+ private String description;
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Programme programme = (Programme) o;
+ return Objects.equals(code, programme.code);
+ }
+
+
+}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
index 924c08cc9c..1fcfb305e7 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
@@ -58,6 +58,8 @@ public class Project extends OafEntity implements Serializable {
private Float fundedamount;
+ private List programme;
+
public Field getWebsiteurl() {
return websiteurl;
}
@@ -266,6 +268,14 @@ public class Project extends OafEntity implements Serializable {
this.fundedamount = fundedamount;
}
+ public List getProgramme() {
+ return programme;
+ }
+
+ public void setProgramme(List programme) {
+ this.programme = programme;
+ }
+
@Override
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);
@@ -320,6 +330,9 @@ public class Project extends OafEntity implements Serializable {
fundedamount = p.getFundedamount() != null && compareTrust(this, e) < 0
? p.getFundedamount()
: fundedamount;
+
+ programme = mergeLists(programme, p.getProgramme());
+
mergeOAFDataInfo(e);
}
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
index 11316f36e4..9d723f9984 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
@@ -256,7 +256,25 @@ public class Result extends OafEntity implements Serializable {
subject = mergeLists(subject, r.getSubject());
+ // merge title lists: main title with higher trust and distinct between the others
+ StructuredProperty baseMainTitle = null;
+ if (title != null) {
+ baseMainTitle = getMainTitle(title);
+ title.remove(baseMainTitle);
+ }
+
+ StructuredProperty newMainTitle = null;
+ if (r.getTitle() != null) {
+ newMainTitle = getMainTitle(r.getTitle());
+ r.getTitle().remove(newMainTitle);
+ }
+
+ if (newMainTitle != null && compareTrust(this, r) < 0)
+ baseMainTitle = newMainTitle;
+
title = mergeLists(title, r.getTitle());
+ if (title != null && baseMainTitle != null)
+ title.add(baseMainTitle);
relevantdate = mergeLists(relevantdate, r.getRelevantdate());
@@ -306,4 +324,15 @@ public class Result extends OafEntity implements Serializable {
}
return a.size() > b.size() ? a : b;
}
+
+ private StructuredProperty getMainTitle(List titles) {
+ // need to check if the list of titles contains more than 1 main title? (in that case, we should chose which
+ // main title select in the list)
+ for (StructuredProperty title : titles) {
+ if (title.getQualifier() != null && title.getQualifier().getClassid() != null)
+ if (title.getQualifier().getClassid().equals("main title"))
+ return title;
+ }
+ return null;
+ }
}
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index 22a81f7da8..ec6247102e 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 1c5465c14a..282ca476d5 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.1-SNAPSHOT
+ 1.2.2-SNAPSHOT
dhp-aggregation
@@ -38,48 +38,6 @@
${project.version}
-
- eu.dnetlib
- dnet-actionmanager-common
-
-
- eu.dnetlib
- dnet-openaireplus-mapping-utils
-
-
- saxonica
- saxon
-
-
- saxonica
- saxon-dom
-
-
- jgrapht
- jgrapht
-
-
- net.sf.ehcache
- ehcache
-
-
- org.springframework
- spring-test
-
-
- org.apache.*
- *
-
-
- apache
- *
-
-
-
-
- eu.dnetlib
- dnet-openaire-data-protos
-
net.sf.saxon
@@ -100,11 +58,15 @@
jaxen
+
- org.apache.hadoop
- hadoop-distcp
+ org.apache.commons
+ commons-csv
+ 1.8
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java
new file mode 100644
index 0000000000..c6dab13a06
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java
@@ -0,0 +1,123 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.HashMap;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import scala.Tuple2;
+
+public class PrepareProgramme {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ PrepareProgramme.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String programmePath = parser.get("programmePath");
+ log.info("programmePath {}: ", programmePath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ exec(spark, programmePath, outputPath);
+ });
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void exec(SparkSession spark, String programmePath, String outputPath) {
+ Dataset programme = readPath(spark, programmePath, CSVProgramme.class);
+
+ programme
+ .toJavaRDD()
+ .filter(p -> !p.getCode().contains("FP7"))
+ .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
+ .reduceByKey((a, b) -> {
+ if (StringUtils.isEmpty(a.getShortTitle())) {
+ if (StringUtils.isEmpty(b.getShortTitle())) {
+ if (StringUtils.isEmpty(a.getTitle())) {
+ if (StringUtils.isNotEmpty(b.getTitle())) {
+ a.setShortTitle(b.getTitle());
+ a.setLanguage(b.getLanguage());
+ }
+ } else {// notIsEmpty a.getTitle
+ if (StringUtils.isEmpty(b.getTitle())) {
+ a.setShortTitle(a.getTitle());
+ } else {
+ if (b.getLanguage().equalsIgnoreCase("en")) {
+ a.setShortTitle(b.getTitle());
+ a.setLanguage(b.getLanguage());
+ } else {
+ a.setShortTitle(a.getTitle());
+ }
+ }
+ }
+ } else {// not isEmpty b.getShortTitle
+ a.setShortTitle(b.getShortTitle());
+ // a.setLanguage(b.getLanguage());
+ }
+ }
+ return a;
+
+ })
+ .map(p -> {
+ CSVProgramme csvProgramme = p._2();
+ if (StringUtils.isEmpty(csvProgramme.getShortTitle())) {
+ csvProgramme.setShortTitle(csvProgramme.getTitle());
+ }
+ return OBJECT_MAPPER.writeValueAsString(csvProgramme);
+ })
+ .saveAsTextFile(outputPath);
+
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java
new file mode 100644
index 0000000000..3d8226f4dc
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java
@@ -0,0 +1,137 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.*;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import scala.Tuple2;
+
+public class PrepareProjects {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final HashMap programmeMap = new HashMap<>();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ PrepareProjects.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String projectPath = parser.get("projectPath");
+ log.info("projectPath {}: ", projectPath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ final String dbProjectPath = parser.get("dbProjectPath");
+ log.info("dbProjectPath {}: ", dbProjectPath);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ exec(spark, projectPath, dbProjectPath, outputPath);
+ });
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void exec(SparkSession spark, String progjectPath, String dbProjectPath, String outputPath) {
+ Dataset project = readPath(spark, progjectPath, CSVProject.class);
+ Dataset dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class);
+
+ dbProjects.joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left")
+ .flatMap((FlatMapFunction, CSVProject>) value -> {
+ Optional csvProject = Optional.ofNullable(value._2());
+ if(! csvProject.isPresent()){
+ return null;
+ }
+ List csvProjectList = new ArrayList<>();
+ String[] programme = csvProject.get().getProgramme().split(";");
+ Arrays
+ .stream(programme)
+ .forEach(p -> {
+ CSVProject proj = new CSVProject();
+ proj.setProgramme(p);
+ proj.setId(csvProject.get().getId());
+ csvProjectList.add(proj);
+ });
+
+ return csvProjectList.iterator();
+ }, Encoders.bean(CSVProject.class))
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(outputPath);
+//
+// .map(value -> {
+// Optional csvProject = Optional.ofNullable(value._2());
+// }, Encoders.bean(CSVProject.class))
+// .filter(Objects::nonNull)
+// .toJavaRDD()
+// .flatMap(p -> {
+// List csvProjectList = new ArrayList<>();
+// String[] programme = p.getProgramme().split(";");
+// Arrays
+// .stream(programme)
+// .forEach(value -> {
+// CSVProject csvProject = new CSVProject();
+// csvProject.setProgramme(value);
+// csvProject.setId(p.getId());
+// csvProjectList.add(csvProject);
+// });
+//
+// return csvProjectList.iterator();
+// })
+// .map(p -> OBJECT_MAPPER.writeValueAsString(p))
+// .saveAsTextFile(outputPath);
+
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java
new file mode 100644
index 0000000000..cfbb62f219
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java
@@ -0,0 +1,16 @@
+package eu.dnetlib.dhp.actionmanager.project;
+
+import java.io.Serializable;
+
+public class ProjectSubset implements Serializable {
+
+ private String code;
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java
new file mode 100644
index 0000000000..0015dc60f1
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java
@@ -0,0 +1,113 @@
+package eu.dnetlib.dhp.actionmanager.project;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.DbClient;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public class ReadProjectsFromDB implements Closeable {
+
+ private final DbClient dbClient;
+ private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class);
+ private final Configuration conf;
+ private final BufferedWriter writer;
+ private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final static String query = "SELECT code " +
+ "from projects where id like 'corda__h2020%' " ;
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ ReadProjectsFromDB.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json")));
+
+ parser.parseArgument(args);
+
+ final String dbUrl = parser.get("postgresUrl");
+ final String dbUser = parser.get("postgresUser");
+ final String dbPassword = parser.get("postgresPassword");
+ final String hdfsPath = parser.get("hdfsPath") ;
+ final String hdfsNameNode = parser.get("hdfsNameNode");
+
+ try (final ReadProjectsFromDB rbl = new ReadProjectsFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
+ dbPassword)) {
+
+ log.info("Processing blacklist...");
+ rbl.execute(query, rbl::processProjectsEntry);
+
+ }
+ }
+ public void execute(final String sql, final Function> producer) throws Exception {
+
+ final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r));
+
+ dbClient.processResults(sql, consumer);
+ }
+
+ public List processProjectsEntry(ResultSet rs) {
+ try {
+ ProjectSubset p = new ProjectSubset();
+ p.setCode(rs.getString("code"));
+
+ return Arrays.asList(p);
+
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void writeProject(final ProjectSubset r) {
+ try {
+ writer.write(OBJECT_MAPPER.writeValueAsString(r));
+ writer.newLine();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ReadProjectsFromDB(
+ final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword)
+ throws Exception {
+
+ this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
+ this.conf = new Configuration();
+ this.conf.set("fs.defaultFS", hdfsNameNode);
+ FileSystem fileSystem = FileSystem.get(this.conf);
+ Path hdfsWritePath = new Path(hdfsPath);
+ FSDataOutputStream fsDataOutputStream = null;
+ if (fileSystem.exists(hdfsWritePath)) {
+ fileSystem.delete(hdfsWritePath, false);
+ }
+ fsDataOutputStream = fileSystem.create(hdfsWritePath);
+
+
+ this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void close() throws IOException {
+ dbClient.close();
+ writer.close();
+ }
+}
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java
new file mode 100644
index 0000000000..1023e2d19d
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java
@@ -0,0 +1,161 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.apache.spark.rdd.SequenceFileRDDFunctions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.action.AtomicAction;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.Programme;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.utils.DHPUtils;
+import scala.Function1;
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
+
+public class SparkAtomicActionJob {
+ private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final HashMap programmeMap = new HashMap<>();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ SparkAtomicActionJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String projectPath = parser.get("projectPath");
+ log.info("projectPath: {}", projectPath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ final String programmePath = parser.get("programmePath");
+ log.info("programmePath {}: ", programmePath);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ getAtomicActions(
+ spark,
+ projectPath,
+ programmePath,
+ outputPath);
+ });
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void getAtomicActions(SparkSession spark, String projectPatH,
+ String programmePath,
+ String outputPath) {
+
+ Dataset project = readPath(spark, projectPatH, CSVProject.class);
+ Dataset programme = readPath(spark, programmePath, CSVProgramme.class);
+
+ project
+ .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
+ .map(c -> {
+ CSVProject csvProject = c._1();
+ Optional csvProgramme = Optional.ofNullable(c._2());
+ if (csvProgramme.isPresent()) {
+ Project p = new Project();
+ p
+ .setId(
+ createOpenaireId(
+ ModelSupport.entityIdPrefix.get("project"),
+ "corda__h2020", csvProject.getId()));
+ Programme pm = new Programme();
+ pm.setCode(csvProject.getProgramme());
+ pm.setDescription(csvProgramme.get().getShortTitle());
+ p.setProgramme(Arrays.asList(pm));
+ return p;
+ }
+
+ return null;
+ }, Encoders.bean(Project.class))
+ .filter(Objects::nonNull)
+ .groupByKey(
+ (MapFunction) p -> p.getId(),
+ Encoders.STRING())
+ .mapGroups((MapGroupsFunction) (s, it) -> {
+ Project first = it.next();
+ it.forEachRemaining(p -> {
+ first.mergeFrom(p);
+ });
+ return first;
+ }, Encoders.bean(Project.class))
+ .toJavaRDD()
+ .map(p -> new AtomicAction(Project.class, p))
+ .mapToPair(
+ aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
+ new Text(OBJECT_MAPPER.writeValueAsString(aa))))
+ .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+
+ public static String createOpenaireId(
+ final String prefix, final String nsPrefix, final String id) {
+
+ return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(id));
+
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java
new file mode 100644
index 0000000000..ef29a6b6a6
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java
@@ -0,0 +1,37 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang.reflect.FieldUtils;
+
+public class CSVParser {
+
+ public List parse(String csvFile, String classForName)
+ throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException {
+ final CSVFormat format = CSVFormat.EXCEL
+ .withHeader()
+ .withDelimiter(';')
+ .withQuote('"')
+ .withTrim();
+ List ret = new ArrayList<>();
+ final org.apache.commons.csv.CSVParser parser = org.apache.commons.csv.CSVParser.parse(csvFile, format);
+ final Set headers = parser.getHeaderMap().keySet();
+ Class> clazz = Class.forName(classForName);
+ for (CSVRecord csvRecord : parser.getRecords()) {
+ final Object cc = clazz.newInstance();
+ for (String header : headers) {
+ FieldUtils.writeField(cc, header, csvRecord.get(header), true);
+
+ }
+ ret.add((R) cc);
+ }
+
+ return ret;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java
new file mode 100644
index 0000000000..a9069e5104
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java
@@ -0,0 +1,52 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.Serializable;
+
+public class CSVProgramme implements Serializable {
+ private String rcn;
+ private String code;
+ private String title;
+ private String shortTitle;
+ private String language;
+
+ public String getRcn() {
+ return rcn;
+ }
+
+ public void setRcn(String rcn) {
+ this.rcn = rcn;
+ }
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getShortTitle() {
+ return shortTitle;
+ }
+
+ public void setShortTitle(String shortTitle) {
+ this.shortTitle = shortTitle;
+ }
+
+ public String getLanguage() {
+ return language;
+ }
+
+ public void setLanguage(String language) {
+ this.language = language;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java
new file mode 100644
index 0000000000..ff18c6260e
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java
@@ -0,0 +1,197 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.Serializable;
+
+public class CSVProject implements Serializable {
+ private String rcn;
+ private String id;
+ private String acronym;
+ private String status;
+ private String programme;
+ private String topics;
+ private String frameworkProgramme;
+ private String title;
+ private String startDate;
+ private String endDate;
+ private String projectUrl;
+ private String objective;
+ private String totalCost;
+ private String ecMaxContribution;
+ private String call;
+ private String fundingScheme;
+ private String coordinator;
+ private String coordinatorCountry;
+ private String participants;
+ private String participantCountries;
+ private String subjects;
+
+ public String getRcn() {
+ return rcn;
+ }
+
+ public void setRcn(String rcn) {
+ this.rcn = rcn;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getAcronym() {
+ return acronym;
+ }
+
+ public void setAcronym(String acronym) {
+ this.acronym = acronym;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getProgramme() {
+ return programme;
+ }
+
+ public void setProgramme(String programme) {
+ this.programme = programme;
+ }
+
+ public String getTopics() {
+ return topics;
+ }
+
+ public void setTopics(String topics) {
+ this.topics = topics;
+ }
+
+ public String getFrameworkProgramme() {
+ return frameworkProgramme;
+ }
+
+ public void setFrameworkProgramme(String frameworkProgramme) {
+ this.frameworkProgramme = frameworkProgramme;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getStartDate() {
+ return startDate;
+ }
+
+ public void setStartDate(String startDate) {
+ this.startDate = startDate;
+ }
+
+ public String getEndDate() {
+ return endDate;
+ }
+
+ public void setEndDate(String endDate) {
+ this.endDate = endDate;
+ }
+
+ public String getProjectUrl() {
+ return projectUrl;
+ }
+
+ public void setProjectUrl(String projectUrl) {
+ this.projectUrl = projectUrl;
+ }
+
+ public String getObjective() {
+ return objective;
+ }
+
+ public void setObjective(String objective) {
+ this.objective = objective;
+ }
+
+ public String getTotalCost() {
+ return totalCost;
+ }
+
+ public void setTotalCost(String totalCost) {
+ this.totalCost = totalCost;
+ }
+
+ public String getEcMaxContribution() {
+ return ecMaxContribution;
+ }
+
+ public void setEcMaxContribution(String ecMaxContribution) {
+ this.ecMaxContribution = ecMaxContribution;
+ }
+
+ public String getCall() {
+ return call;
+ }
+
+ public void setCall(String call) {
+ this.call = call;
+ }
+
+ public String getFundingScheme() {
+ return fundingScheme;
+ }
+
+ public void setFundingScheme(String fundingScheme) {
+ this.fundingScheme = fundingScheme;
+ }
+
+ public String getCoordinator() {
+ return coordinator;
+ }
+
+ public void setCoordinator(String coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ public String getCoordinatorCountry() {
+ return coordinatorCountry;
+ }
+
+ public void setCoordinatorCountry(String coordinatorCountry) {
+ this.coordinatorCountry = coordinatorCountry;
+ }
+
+ public String getParticipants() {
+ return participants;
+ }
+
+ public void setParticipants(String participants) {
+ this.participants = participants;
+ }
+
+ public String getParticipantCountries() {
+ return participantCountries;
+ }
+
+ public void setParticipantCountries(String participantCountries) {
+ this.participantCountries = participantCountries;
+ }
+
+ public String getSubjects() {
+ return subjects;
+ }
+
+ public void setSubjects(String subjects) {
+ this.subjects = subjects;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java
new file mode 100644
index 0000000000..2b72b229ad
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java
@@ -0,0 +1,97 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+public class ReadCSV implements Closeable {
+ private static final Log log = LogFactory.getLog(ReadCSV.class);
+ private final Configuration conf;
+ private final BufferedWriter writer;
+ private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private String csvFile;
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ ReadCSV.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
+
+ parser.parseArgument(args);
+
+ final String fileURL = parser.get("fileURL");
+ final String hdfsPath = parser.get("hdfsPath");
+ final String hdfsNameNode = parser.get("hdfsNameNode");
+ final String classForName = parser.get("classForName");
+
+ try (final ReadCSV readCSV = new ReadCSV(hdfsPath, hdfsNameNode, fileURL)) {
+
+ log.info("Getting CSV file...");
+ readCSV.execute(classForName);
+
+ }
+ }
+
+ public void execute(final String classForName) throws Exception {
+ CSVParser csvParser = new CSVParser();
+ csvParser
+ .parse(csvFile, classForName)
+ .stream()
+ .forEach(p -> write(p));
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ public ReadCSV(
+ final String hdfsPath,
+ final String hdfsNameNode,
+ final String fileURL)
+ throws Exception {
+ this.conf = new Configuration();
+ this.conf.set("fs.defaultFS", hdfsNameNode);
+ HttpConnector httpConnector = new HttpConnector();
+ FileSystem fileSystem = FileSystem.get(this.conf);
+ Path hdfsWritePath = new Path(hdfsPath);
+ FSDataOutputStream fsDataOutputStream = null;
+ if (fileSystem.exists(hdfsWritePath)) {
+ fileSystem.delete(hdfsWritePath, false);
+ }
+ fsDataOutputStream = fileSystem.create(hdfsWritePath);
+
+ this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+ this.csvFile = httpConnector.getInputSource(fileURL);
+ ;
+ }
+
+ protected void write(final Object p) {
+ try {
+ writer.write(OBJECT_MAPPER.writeValueAsString(p));
+ writer.newLine();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java
new file mode 100644
index 0000000000..9d3f882651
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java
@@ -0,0 +1,20 @@
+
+package eu.dnetlib.dhp.actionmanager.project.httpconnector;
+
+import java.util.LinkedList;
+
+public class CollectorPluginErrorLogList extends LinkedList {
+
+ private static final long serialVersionUID = -6925786561303289704L;
+
+ @Override
+ public String toString() {
+ String log = new String();
+ int index = 0;
+ for (String errorMessage : this) {
+ log += String.format("Retry #%s: %s / ", index++, errorMessage);
+ }
+ return log;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java
new file mode 100644
index 0000000000..9167d97b4c
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java
@@ -0,0 +1,20 @@
+
+package eu.dnetlib.dhp.actionmanager.project.httpconnector;
+
+public class CollectorServiceException extends Exception {
+
+ private static final long serialVersionUID = 7523999812098059764L;
+
+ public CollectorServiceException(String string) {
+ super(string);
+ }
+
+ public CollectorServiceException(String string, Throwable exception) {
+ super(string, exception);
+ }
+
+ public CollectorServiceException(Throwable exception) {
+ super(exception);
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java
new file mode 100644
index 0000000000..e20518b559
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java
@@ -0,0 +1,240 @@
+
+package eu.dnetlib.dhp.actionmanager.project.httpconnector;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.*;
+import java.security.GeneralSecurityException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author jochen, michele, andrea
+ */
+public class HttpConnector {
+
+ private static final Log log = LogFactory.getLog(HttpConnector.class);
+
+ private int maxNumberOfRetry = 6;
+ private int defaultDelay = 120; // seconds
+ private int readTimeOut = 120; // seconds
+
+ private String responseType = null;
+
+ private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)";
+
+ public HttpConnector() {
+ CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL));
+ }
+
+ /**
+ * Given the URL returns the content via HTTP GET
+ *
+ * @param requestUrl the URL
+ * @return the content of the downloaded resource
+ * @throws CollectorServiceException when retrying more than maxNumberOfRetry times
+ */
+ public String getInputSource(final String requestUrl) throws CollectorServiceException {
+ return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
+ }
+
+ /**
+ * Given the URL returns the content as a stream via HTTP GET
+ *
+ * @param requestUrl the URL
+ * @return the content of the downloaded resource as InputStream
+ * @throws CollectorServiceException when retrying more than maxNumberOfRetry times
+ */
+ public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorServiceException {
+ return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
+ }
+
+ private String attemptDownlaodAsString(final String requestUrl, final int retryNumber,
+ final CollectorPluginErrorLogList errorList)
+ throws CollectorServiceException {
+ try {
+ InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
+ try {
+ return IOUtils.toString(s);
+ } catch (IOException e) {
+ log.error("error while retrieving from http-connection occured: " + requestUrl, e);
+ Thread.sleep(defaultDelay * 1000);
+ errorList.add(e.getMessage());
+ return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList);
+ } finally {
+ IOUtils.closeQuietly(s);
+ }
+ } catch (InterruptedException e) {
+ throw new CollectorServiceException(e);
+ }
+ }
+
+ private InputStream attemptDownload(final String requestUrl, final int retryNumber,
+ final CollectorPluginErrorLogList errorList)
+ throws CollectorServiceException {
+
+ if (retryNumber > maxNumberOfRetry) {
+ throw new CollectorServiceException("Max number of retries exceeded. Cause: \n " + errorList);
+ }
+
+ log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
+ try {
+ InputStream input = null;
+
+ try {
+ final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection();
+ urlConn.setInstanceFollowRedirects(false);
+ urlConn.setReadTimeout(readTimeOut * 1000);
+ urlConn.addRequestProperty("User-Agent", userAgent);
+
+ if (log.isDebugEnabled()) {
+ logHeaderFields(urlConn);
+ }
+
+ int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
+ if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
+ log.warn("waiting and repeating request after " + retryAfter + " sec.");
+ Thread.sleep(retryAfter * 1000);
+ errorList.add("503 Service Unavailable");
+ urlConn.disconnect();
+ return attemptDownload(requestUrl, retryNumber + 1, errorList);
+ } else if ((urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM)
+ || (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP)) {
+ final String newUrl = obtainNewLocation(urlConn.getHeaderFields());
+ log.debug("The requested url has been moved to " + newUrl);
+ errorList
+ .add(
+ String
+ .format(
+ "%s %s. Moved to: %s", urlConn.getResponseCode(), urlConn.getResponseMessage(),
+ newUrl));
+ urlConn.disconnect();
+ return attemptDownload(newUrl, retryNumber + 1, errorList);
+ } else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ log
+ .error(
+ String
+ .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
+ Thread.sleep(defaultDelay * 1000);
+ errorList.add(String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
+ urlConn.disconnect();
+ return attemptDownload(requestUrl, retryNumber + 1, errorList);
+ } else {
+ input = urlConn.getInputStream();
+ responseType = urlConn.getContentType();
+ return input;
+ }
+ } catch (IOException e) {
+ log.error("error while retrieving from http-connection occured: " + requestUrl, e);
+ Thread.sleep(defaultDelay * 1000);
+ errorList.add(e.getMessage());
+ return attemptDownload(requestUrl, retryNumber + 1, errorList);
+ }
+ } catch (InterruptedException e) {
+ throw new CollectorServiceException(e);
+ }
+ }
+
+ private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
+ log.debug("StatusCode: " + urlConn.getResponseMessage());
+
+ for (Map.Entry> e : urlConn.getHeaderFields().entrySet()) {
+ if (e.getKey() != null) {
+ for (String v : e.getValue()) {
+ log.debug(" key: " + e.getKey() + " - value: " + v);
+ }
+ }
+ }
+ }
+
+ private int obtainRetryAfter(final Map> headerMap) {
+ for (String key : headerMap.keySet()) {
+ if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0)
+ && NumberUtils.isCreatable(headerMap.get(key).get(0))) {
+ return Integer
+ .parseInt(headerMap.get(key).get(0)) + 10;
+ }
+ }
+ return -1;
+ }
+
+ private String obtainNewLocation(final Map> headerMap) throws CollectorServiceException {
+ for (String key : headerMap.keySet()) {
+ if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) {
+ return headerMap.get(key).get(0);
+ }
+ }
+ throw new CollectorServiceException("The requested url has been MOVED, but 'location' param is MISSING");
+ }
+
+ /**
+ * register for https scheme; this is a workaround and not intended for the use in trusted environments
+ */
+ public void initTrustManager() {
+ final X509TrustManager tm = new X509TrustManager() {
+
+ @Override
+ public void checkClientTrusted(final X509Certificate[] xcs, final String string) {
+ }
+
+ @Override
+ public void checkServerTrusted(final X509Certificate[] xcs, final String string) {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ };
+ try {
+ final SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(null, new TrustManager[] {
+ tm
+ }, null);
+ HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory());
+ } catch (GeneralSecurityException e) {
+ log.fatal(e);
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public int getMaxNumberOfRetry() {
+ return maxNumberOfRetry;
+ }
+
+ public void setMaxNumberOfRetry(final int maxNumberOfRetry) {
+ this.maxNumberOfRetry = maxNumberOfRetry;
+ }
+
+ public int getDefaultDelay() {
+ return defaultDelay;
+ }
+
+ public void setDefaultDelay(final int defaultDelay) {
+ this.defaultDelay = defaultDelay;
+ }
+
+ public int getReadTimeOut() {
+ return readTimeOut;
+ }
+
+ public void setReadTimeOut(final int readTimeOut) {
+ this.readTimeOut = readTimeOut;
+ }
+
+ public String getResponseType() {
+ return responseType;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json
new file mode 100644
index 0000000000..a0856e10ef
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+},
+{
+"paramName": "pjp",
+"paramLongName": "projectPath",
+"paramDescription": "the URL from where to get the projects file",
+"paramRequired": true
+},
+{
+"paramName": "pp",
+"paramLongName": "programmePath",
+"paramDescription": "the URL from where to get the programme file",
+"paramRequired": true
+},
+{
+"paramName": "o",
+"paramLongName": "outputPath",
+"paramDescription": "the path of the new ActionSet",
+"paramRequired": true
+}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/config-default.xml
new file mode 100644
index 0000000000..fe82ae1940
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/config-default.xml
@@ -0,0 +1,54 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+
+
+ sparkExecutorNumber
+ 4
+
+
+ spark2EventLogDir
+ /user/spark/spark2ApplicationHistory
+
+
+ sparkDriverMemory
+ 15G
+
+
+ sparkExecutorMemory
+ 6G
+
+
+ sparkExecutorCores
+ 1
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml
new file mode 100644
index 0000000000..ca0a73b97f
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml
@@ -0,0 +1,145 @@
+
+
+
+ projectFileURL
+ the url where to get the projects file
+
+
+
+ programmeFileURL
+ the url where to get the programme file
+
+
+
+ outputPath
+ path where to store the action set
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.actionmanager.project.csvutils.ReadCSV
+ --hdfsNameNode${nameNode}
+ --fileURL${projectFileURL}
+ --hdfsPath${workingDir}/projects
+ --classForNameeu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.actionmanager.project.csvutils.ReadCSV
+ --hdfsNameNode${nameNode}
+ --fileURL${programmeFileURL}
+ --hdfsPath${workingDir}/programme
+ --classForNameeu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.actionmanager.project.ReadProjectsFromDB
+ --hdfsPath${workingDir}/dbProjects
+ --hdfsNameNode${nameNode}
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareProgramme
+ eu.dnetlib.dhp.actionmanager.project.PrepareProgramme
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --programmePath${workingDir}/programme
+ --outputPath${workingDir}/preparedProgramme
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareProgramme
+ eu.dnetlib.dhp.actionmanager.project.PrepareProjects
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --projectPath${workingDir}/projects
+ --outputPath${workingDir}/preparedProjects
+
+
+
+
+
+
+
+ yarn
+ cluster
+ ProjectProgrammeAS
+ eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --projectPath${workingDir}/preparedProjects
+ --programmePath${workingDir}/preparedProgramme
+ --outputPath${outputPath}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json
new file mode 100644
index 0000000000..dd3de70f6e
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json
@@ -0,0 +1,29 @@
+[
+
+ {
+ "paramName": "fu",
+ "paramLongName" : "fileURL",
+ "paramDescription" : "the url of the file to download",
+ "paramRequired" : true
+ },
+ {
+ "paramName": "hp",
+ "paramLongName" : "hdfsPath",
+ "paramDescription" : "where to save the file",
+ "paramRequired" : true
+ },
+ {
+ "paramName": "hnn",
+ "paramLongName" : "hdfsNameNode",
+ "paramDescription" : "the name node",
+ "paramRequired" : true
+ },
+ {
+ "paramName": "cfn",
+ "paramLongName" : "classForName",
+ "paramDescription" : "the name of the class to deserialize the csv to",
+ "paramRequired" : true
+}
+
+
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json
new file mode 100644
index 0000000000..54083e1089
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json
@@ -0,0 +1,20 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+},
+{
+"paramName": "pp",
+"paramLongName": "programmePath",
+"paramDescription": "the URL from where to get the programme file",
+"paramRequired": true
+},
+{
+"paramName": "o",
+"paramLongName": "outputPath",
+"paramDescription": "the path of the new ActionSet",
+"paramRequired": true
+}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json
new file mode 100644
index 0000000000..49f9c73062
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+},
+{
+"paramName": "pjp",
+"paramLongName": "projectPath",
+"paramDescription": "the URL from where to get the programme file",
+"paramRequired": true
+},
+{
+"paramName": "o",
+"paramLongName": "outputPath",
+"paramDescription": "the path of the new ActionSet",
+"paramRequired": true
+},
+ {
+ "paramName": "dbp",
+ "paramLongName": "dbProjectPath",
+ "paramDescription": "the path of the project code read from db",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json
new file mode 100644
index 0000000000..9a2eadaa7d
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "p",
+ "paramLongName": "hdfsPath",
+ "paramDescription": "the path where storing the sequential file",
+ "paramRequired": true
+ },
+ {
+ "paramName": "nn",
+ "paramLongName": "hdfsNameNode",
+ "paramDescription": "the name node on hdfs",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pgurl",
+ "paramLongName": "postgresUrl",
+ "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pguser",
+ "paramLongName": "postgresUser",
+ "paramDescription": "postgres user",
+ "paramRequired": false
+ },
+ {
+ "paramName": "pgpasswd",
+ "paramLongName": "postgresPassword",
+ "paramDescription": "postgres password",
+ "paramRequired": false
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java
new file mode 100644
index 0000000000..17fdd45111
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java
@@ -0,0 +1,41 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVParser;
+
+public class CSVParserTest {
+
+ private static Path workingDir;
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ workingDir = Files.createTempDirectory(CSVParserTest.class.getSimpleName());
+
+ }
+
+ @Test
+ public void readProgrammeTest() throws Exception {
+
+ String programmecsv = IOUtils
+ .toString(
+ getClass()
+ .getClassLoader()
+ .getResourceAsStream("eu/dnetlib/dhp/actionmanager/project/programme.csv"));
+
+ CSVParser csvParser = new CSVParser();
+
+ List