diff --git a/dhp-build/dhp-build-assembly-resources/pom.xml b/dhp-build/dhp-build-assembly-resources/pom.xml
index 8bae191d3..012ff89a3 100644
--- a/dhp-build/dhp-build-assembly-resources/pom.xml
+++ b/dhp-build/dhp-build-assembly-resources/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-assembly-resources
diff --git a/dhp-build/dhp-build-properties-maven-plugin/pom.xml b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
index ad8cd57b4..256017e2c 100644
--- a/dhp-build/dhp-build-properties-maven-plugin/pom.xml
+++ b/dhp-build/dhp-build-properties-maven-plugin/pom.xml
@@ -6,7 +6,7 @@
eu.dnetlib.dhp
dhp-build
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build-properties-maven-plugin
diff --git a/dhp-build/dhp-code-style/pom.xml b/dhp-build/dhp-code-style/pom.xml
index 08f5de9ee..e60e8076e 100644
--- a/dhp-build/dhp-code-style/pom.xml
+++ b/dhp-build/dhp-code-style/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp-code-style
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
jar
diff --git a/dhp-build/pom.xml b/dhp-build/pom.xml
index 369e25b24..12b999b9c 100644
--- a/dhp-build/pom.xml
+++ b/dhp-build/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-build
pom
diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index 60e66f45a..0819a8bd2 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
index e793e3f29..c6c9d8044 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/FunctionalInterfaceSupport.java
@@ -2,6 +2,7 @@
package eu.dnetlib.dhp.common;
import java.io.Serializable;
+import java.util.function.Consumer;
import java.util.function.Supplier;
/** Provides serializable and throwing extensions to standard functional interfaces. */
@@ -10,6 +11,16 @@ public class FunctionalInterfaceSupport {
private FunctionalInterfaceSupport() {
}
+ /**
+ * Serializable consumer of any kind of objects. To be used withing spark processing pipelines when supplying
+ * functions externally.
+ *
+ * @param
+ */
+ @FunctionalInterface
+ public interface SerializableConsumer extends Consumer, Serializable {
+ }
+
/**
* Serializable supplier of any kind of objects. To be used withing spark processing pipelines when supplying
* functions externally.
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
index 1909ddcca..6e02ca614 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/PacePerson.java
@@ -16,6 +16,12 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
+/**
+ * PacePerson tries to derive information from the fullname string of an author. Such informations are Names, Surnames
+ * an Fullname split into terms. It provides also an additional field for the original data. The calculation of the
+ * names and the surnames is not always possible. When it is impossible to assert which are the names and the surnames,
+ * the lists are empty.
+ */
public class PacePerson {
private static final String UTF8 = "UTF-8";
@@ -26,10 +32,19 @@ public class PacePerson {
private static Set particles = null;
+ /**
+ * Capitalizes a string
+ *
+ * @param s the string to capitalize
+ * @return the input string with capital letter
+ */
public static final String capitalize(final String s) {
return WordUtils.capitalize(s.toLowerCase(), ' ', '-');
}
+ /**
+ * Adds a dot to a string with length equals to 1
+ */
public static final String dotAbbreviations(final String s) {
return s.length() == 1 ? s + "." : s;
}
@@ -46,6 +61,12 @@ public class PacePerson {
return h;
}
+ /**
+ * The constructor of the class. It fills the fields of the class basing on the input fullname.
+ *
+ * @param s the input string (fullname of the author)
+ * @param aggressive set the string normalization type
+ */
public PacePerson(String s, final boolean aggressive) {
original = s;
s = Normalizer.normalize(s, Normalizer.Form.NFD);
@@ -64,6 +85,7 @@ public class PacePerson {
// s = s.replaceAll("[\\W&&[^,-]]", "");
}
+ // if the string contains a comma, it can derive surname and name by splitting on it
if (s.contains(",")) {
final String[] arr = s.split(",");
if (arr.length == 1) {
@@ -74,21 +96,23 @@ public class PacePerson {
fullname.addAll(surname);
fullname.addAll(name);
}
- } else {
+ } else { // otherwise, it should rely on CAPS terms and short terms
fullname = splitTerms(s);
int lastInitialPosition = fullname.size();
boolean hasSurnameInUpperCase = false;
+ // computes lastInitialPosition and hasSurnameInUpperCase
for (int i = 0; i < fullname.size(); i++) {
final String term = fullname.get(i);
if (term.length() == 1) {
- lastInitialPosition = i;
+ lastInitialPosition = i; // first word in the name longer than 1 (to avoid name with dots)
} else if (term.equals(term.toUpperCase())) {
- hasSurnameInUpperCase = true;
+ hasSurnameInUpperCase = true; // if one of the words is CAPS
}
}
+ // manages particular cases of fullnames
if (lastInitialPosition < fullname.size() - 1) { // Case: Michele G. Artini
name = fullname.subList(0, lastInitialPosition + 1);
surname = fullname.subList(lastInitialPosition + 1, fullname.size());
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java
new file mode 100644
index 000000000..5ebd7213e
--- /dev/null
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/common/PacePersonTest.java
@@ -0,0 +1,27 @@
+
+package eu.dnetlib.dhp.common;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Test;
+
+public class PacePersonTest {
+
+ @Test
+ public void pacePersonTest1() {
+
+ PacePerson p = new PacePerson("Artini, Michele", false);
+ assertEquals("Artini", p.getSurnameString());
+ assertEquals("Michele", p.getNameString());
+ assertEquals("Artini, Michele", p.getNormalisedFullname());
+ }
+
+ @Test
+ public void pacePersonTest2() {
+ PacePerson p = new PacePerson("Michele G. Artini", false);
+ assertEquals("Artini, Michele G.", p.getNormalisedFullname());
+ assertEquals("Michele G", p.getNameString());
+ assertEquals("Artini", p.getSurnameString());
+ }
+
+}
diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 5e864cf94..2e5652b43 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -5,7 +5,7 @@
eu.dnetlib.dhp
dhp
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
../
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
index e32dd10fa..c5905e45b 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelConstants.java
@@ -1,6 +1,10 @@
package eu.dnetlib.dhp.schema.common;
+import java.security.Key;
+
+import eu.dnetlib.dhp.schema.oaf.DataInfo;
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
public class ModelConstants {
@@ -14,6 +18,7 @@ public class ModelConstants {
public static final String DNET_DATA_CITE_RESOURCE = "dnet:dataCite_resource";
public static final String DNET_PROVENANCE_ACTIONS = "dnet:provenanceActions";
public static final String DNET_COUNTRY_TYPE = "dnet:countries";
+ public static final String DNET_REVIEW_LEVELS = "dnet:review_levels";
public static final String SYSIMPORT_CROSSWALK_REPOSITORY = "sysimport:crosswalk:repository";
public static final String SYSIMPORT_CROSSWALK_ENTITYREGISTRY = "sysimport:crosswalk:entityregistry";
@@ -25,6 +30,10 @@ public class ModelConstants {
public static final String ORP_RESULTTYPE_CLASSID = "other";
public static final String RESULT_RESULT = "resultResult";
+ /**
+ * @deprecated Use {@link ModelConstants#RELATIONSHIP} instead.
+ */
+ @Deprecated
public static final String PUBLICATION_DATASET = "publicationDataset";
public static final String IS_RELATED_TO = "isRelatedTo";
public static final String SUPPLEMENT = "supplement";
@@ -34,6 +43,12 @@ public class ModelConstants {
public static final String IS_PART_OF = "IsPartOf";
public static final String HAS_PARTS = "HasParts";
public static final String RELATIONSHIP = "relationship";
+ public static final String CITATION = "citation";
+ public static final String CITES = "cites";
+ public static final String IS_CITED_BY = "IsCitedBy";
+ public static final String REVIEW = "review";
+ public static final String REVIEWS = "reviews";
+ public static final String IS_REVIEWED_BY = "IsReviewedBy";
public static final String RESULT_PROJECT = "resultProject";
public static final String OUTCOME = "outcome";
@@ -84,6 +99,9 @@ public class ModelConstants {
SYSIMPORT_CROSSWALK_ENTITYREGISTRY, SYSIMPORT_CROSSWALK_ENTITYREGISTRY,
DNET_PROVENANCE_ACTIONS, DNET_PROVENANCE_ACTIONS);
+ public static final KeyValue UNKNOWN_REPOSITORY = keyValue(
+ "10|openaire____::55045bd2a65019fd8e6741a755395c8c", "Unknown Repository");
+
private static Qualifier qualifier(
final String classid,
final String classname,
@@ -96,4 +114,12 @@ public class ModelConstants {
q.setSchemename(schemename);
return q;
}
+
+ private static KeyValue keyValue(String key, String value) {
+ KeyValue kv = new KeyValue();
+ kv.setKey(key);
+ kv.setValue(value);
+ kv.setDataInfo(new DataInfo());
+ return kv;
+ }
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
index 9ee7c2deb..7d8be81ac 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java
@@ -58,6 +58,18 @@ public class ModelSupport {
oafTypes.put("relation", Relation.class);
}
+ public static final Map idPrefixMap = Maps.newHashMap();
+
+ static {
+ idPrefixMap.put(Datasource.class, "10");
+ idPrefixMap.put(Organization.class, "20");
+ idPrefixMap.put(Project.class, "40");
+ idPrefixMap.put(Dataset.class, "50");
+ idPrefixMap.put(OtherResearchProduct.class, "50");
+ idPrefixMap.put(Software.class, "50");
+ idPrefixMap.put(Publication.class, "50");
+ }
+
public static final Map entityIdPrefix = Maps.newHashMap();
static {
@@ -289,6 +301,10 @@ public class ModelSupport {
private ModelSupport() {
}
+ public static String getIdPrefix(Class clazz) {
+ return idPrefixMap.get(clazz);
+ }
+
/**
* Checks subclass-superclass relationship.
*
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
index 07ddbb00e..b5587c6b7 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java
@@ -10,6 +10,7 @@ public class Dataset extends Result implements Serializable {
private Field storagedate;
+ // candidate for removal
private Field device;
private Field size;
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
index 2b7d3846c..29d495261 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Instance.java
@@ -31,7 +31,7 @@ public class Instance implements Serializable {
// typed results
private Field processingchargecurrency;
- private Field refereed; // peer-review status
+ private Qualifier refereed; // peer-review status
public Field getLicense() {
return license;
@@ -113,11 +113,11 @@ public class Instance implements Serializable {
this.processingchargecurrency = processingchargecurrency;
}
- public Field getRefereed() {
+ public Qualifier getRefereed() {
return refereed;
}
- public void setRefereed(Field refereed) {
+ public void setRefereed(Qualifier refereed) {
this.refereed = refereed;
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Measure.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Measure.java
new file mode 100644
index 000000000..c0c14d10d
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Measure.java
@@ -0,0 +1,59 @@
+
+package eu.dnetlib.dhp.schema.oaf;
+
+import java.util.List;
+
+import com.google.common.base.Objects;
+
+/**
+ * Represent a measure, must be further described by a system available resource providing name and descriptions.
+ */
+public class Measure {
+
+ /**
+ * Unique measure identifier.
+ */
+ private String id;
+
+ /**
+ * List of units associated with this measure. KeyValue provides a pair to store the laber (key) and the value, plus
+ * common provenance information.
+ */
+ private List unit;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public List getUnit() {
+ return unit;
+ }
+
+ public void setUnit(List unit) {
+ this.unit = unit;
+ }
+
+ public void mergeFrom(Measure m) {
+ // TODO
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ Measure measure = (Measure) o;
+ return Objects.equal(id, measure.id) &&
+ Objects.equal(unit, measure.unit);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id, unit);
+ }
+}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java
new file mode 100644
index 000000000..00dc32fbc
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Programme.java
@@ -0,0 +1,38 @@
+
+package eu.dnetlib.dhp.schema.oaf;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class Programme implements Serializable {
+ private String code;
+ private String description;
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Programme programme = (Programme) o;
+ return Objects.equals(code, programme.code);
+ }
+
+}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
index 924c08cc9..1fcfb305e 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java
@@ -58,6 +58,8 @@ public class Project extends OafEntity implements Serializable {
private Float fundedamount;
+ private List programme;
+
public Field getWebsiteurl() {
return websiteurl;
}
@@ -266,6 +268,14 @@ public class Project extends OafEntity implements Serializable {
this.fundedamount = fundedamount;
}
+ public List getProgramme() {
+ return programme;
+ }
+
+ public void setProgramme(List programme) {
+ this.programme = programme;
+ }
+
@Override
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);
@@ -320,6 +330,9 @@ public class Project extends OafEntity implements Serializable {
fundedamount = p.getFundedamount() != null && compareTrust(this, e) < 0
? p.getFundedamount()
: fundedamount;
+
+ programme = mergeLists(programme, p.getProgramme());
+
mergeOAFDataInfo(e);
}
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
index c928992aa..17a50d7ac 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
@@ -41,6 +41,16 @@ public class Relation extends Oaf {
*/
private String target;
+ /**
+ * Was this relationship authoritatively validated?
+ */
+ private Boolean validated;
+
+ /**
+ * When was this relationship authoritatively validated.
+ */
+ private String validationDate;
+
/**
* List of relation specific properties. Values include 'similarityLevel', indicating the similarity score between a
* pair of publications.
@@ -95,6 +105,22 @@ public class Relation extends Oaf {
this.properties = properties;
}
+ public Boolean getValidated() {
+ return validated;
+ }
+
+ public void setValidated(Boolean validated) {
+ this.validated = validated;
+ }
+
+ public String getValidationDate() {
+ return validationDate;
+ }
+
+ public void setValidationDate(String validationDate) {
+ this.validationDate = validationDate;
+ }
+
public void mergeFrom(final Relation r) {
checkArgument(Objects.equals(getSource(), r.getSource()), "source ids must be equal");
@@ -137,4 +163,5 @@ public class Relation extends Oaf {
public int hashCode() {
return Objects.hash(relType, subRelType, relClass, source, target, collectedfrom);
}
+
}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
index 213a585a8..fdd42ab7d 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java
@@ -2,11 +2,15 @@
package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.stream.Collectors;
public class Result extends OafEntity implements Serializable {
+ private List measures;
+
private List author;
// resulttype allows subclassing results into publications | datasets | software
@@ -51,6 +55,14 @@ public class Result extends OafEntity implements Serializable {
private List instance;
+ public List getMeasures() {
+ return measures;
+ }
+
+ public void setMeasures(List measures) {
+ this.measures = measures;
+ }
+
public List getAuthor() {
return author;
}
@@ -229,6 +241,8 @@ public class Result extends OafEntity implements Serializable {
Result r = (Result) e;
+ // TODO consider merging also Measures
+
instance = mergeLists(instance, r.getInstance());
if (r.getBestaccessright() != null && compareTrust(this, r) < 0)
@@ -244,25 +258,33 @@ public class Result extends OafEntity implements Serializable {
subject = mergeLists(subject, r.getSubject());
- //merge title lists: main title with higher trust and distinct between the others
+ // merge title lists: main title with higher trust and distinct between the others
StructuredProperty baseMainTitle = null;
- if(title != null) {
+ if (title != null) {
baseMainTitle = getMainTitle(title);
- title.remove(baseMainTitle);
+ if (baseMainTitle != null) {
+ final StructuredProperty p = baseMainTitle;
+ title = title.stream().filter(t -> t != p).collect(Collectors.toList());
+ }
}
StructuredProperty newMainTitle = null;
- if(r.getTitle() != null) {
+ if (r.getTitle() != null) {
newMainTitle = getMainTitle(r.getTitle());
- r.getTitle().remove(newMainTitle);
+ if (newMainTitle != null) {
+ final StructuredProperty p = newMainTitle;
+ r.setTitle(r.getTitle().stream().filter(t -> t != p).collect(Collectors.toList()));
+ }
}
- if (newMainTitle != null && compareTrust(this, r) < 0 )
+ if (newMainTitle != null && compareTrust(this, r) < 0) {
baseMainTitle = newMainTitle;
+ }
title = mergeLists(title, r.getTitle());
- if (title != null && baseMainTitle != null)
+ if (title != null && baseMainTitle != null) {
title.add(baseMainTitle);
+ }
relevantdate = mergeLists(relevantdate, r.getRelevantdate());
@@ -314,8 +336,9 @@ public class Result extends OafEntity implements Serializable {
}
private StructuredProperty getMainTitle(List titles) {
- //need to check if the list of titles contains more than 1 main title? (in that case, we should chose which main title select in the list)
- for (StructuredProperty title: titles) {
+ // need to check if the list of titles contains more than 1 main title? (in that case, we should chose which
+ // main title select in the list)
+ for (StructuredProperty title : titles) {
if (title.getQualifier() != null && title.getQualifier().getClassid() != null)
if (title.getQualifier().getClassid().equals("main title"))
return title;
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
index 40332bf53..d25b5c9ce 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java
@@ -10,8 +10,10 @@ public class Software extends Result implements Serializable {
private List> documentationUrl;
+ // candidate for removal
private List license;
+ // candidate for removal
private Field codeRepositoryUrl;
private Qualifier programmingLanguage;
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java
index d2d2089c0..ca85fa14f 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/scholexplorer/DLIRelation.java
@@ -1,11 +1,25 @@
package eu.dnetlib.dhp.schema.scholexplorer;
+import java.util.List;
+
+import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class DLIRelation extends Relation {
+
private String dateOfCollection;
+ private List collectedFrom;
+
+ public List getCollectedFrom() {
+ return collectedFrom;
+ }
+
+ public void setCollectedFrom(List collectedFrom) {
+ this.collectedFrom = collectedFrom;
+ }
+
public String getDateOfCollection() {
return dateOfCollection;
}
diff --git a/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/oaf/MeasureTest.java b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/oaf/MeasureTest.java
new file mode 100644
index 000000000..26b4407c9
--- /dev/null
+++ b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/oaf/MeasureTest.java
@@ -0,0 +1,57 @@
+
+package eu.dnetlib.dhp.schema.oaf;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
+public class MeasureTest {
+
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .setSerializationInclusion(JsonInclude.Include.NON_NULL);
+
+ @Test
+ public void testMeasureSerialization() throws IOException {
+
+ Measure popularity = new Measure();
+ popularity.setId("popularity");
+ popularity
+ .setUnit(
+ Lists
+ .newArrayList(
+ unit("score", "0.5")));
+
+ Measure influence = new Measure();
+ influence.setId("influence");
+ influence
+ .setUnit(
+ Lists
+ .newArrayList(
+ unit("score", "0.3")));
+
+ List m = Lists.newArrayList(popularity, influence);
+
+ String s = OBJECT_MAPPER.writeValueAsString(m);
+ System.out.println(s);
+
+ List mm = OBJECT_MAPPER.readValue(s, new TypeReference>() {
+ });
+
+ Assertions.assertNotNull(mm);
+ }
+
+ private KeyValue unit(String key, String value) {
+ KeyValue unit = new KeyValue();
+ unit.setKey(key);
+ unit.setValue(value);
+ return unit;
+ }
+
+}
diff --git a/dhp-workflows/dhp-actionmanager/pom.xml b/dhp-workflows/dhp-actionmanager/pom.xml
index ec6247102..0b4d25700 100644
--- a/dhp-workflows/dhp-actionmanager/pom.xml
+++ b/dhp-workflows/dhp-actionmanager/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-actionmanager
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
index e55c0eb7b..8ea877aec 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/migration/ProtoConverter.java
@@ -96,12 +96,21 @@ public class ProtoConverter implements Serializable {
.stream()
.distinct()
.collect(Collectors.toCollection(ArrayList::new)) : null);
- i.setRefereed(mapStringField(ri.getRefereed()));
+ i.setRefereed(mapRefereed(ri.getRefereed()));
i.setProcessingchargeamount(mapStringField(ri.getProcessingchargeamount()));
i.setProcessingchargecurrency(mapStringField(ri.getProcessingchargecurrency()));
return i;
}
+ private static Qualifier mapRefereed(FieldTypeProtos.StringField refereed) {
+ Qualifier q = new Qualifier();
+ q.setClassid(refereed.getValue());
+ q.setSchemename(refereed.getValue());
+ q.setSchemeid("dnet:review_levels");
+ q.setSchemename("dnet:review_levels");
+ return q;
+ }
+
private static List convertExternalRefs(OafProtos.Oaf oaf) {
ResultProtos.Result r = oaf.getEntity().getResult();
if (r.getExternalReferenceCount() > 0) {
diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
index 17bfc4af3..5fa9e6723 100644
--- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
+++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.promote;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
+import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
@@ -20,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport.SerializableSupplier;
@@ -134,24 +136,39 @@ public class PromoteActionPayloadForGraphTableJob {
.map(
(MapFunction) value -> OBJECT_MAPPER.readValue(value, rowClazz),
Encoders.bean(rowClazz));
-
- /*
- * return spark .read() .parquet(path) .as(Encoders.bean(rowClazz));
- */
}
private static Dataset readActionPayload(
SparkSession spark, String path, Class actionPayloadClazz) {
logger.info("Reading action payload from path: {}", path);
+
return spark
.read()
.parquet(path)
+ .map((MapFunction) value -> extractPayload(value), Encoders.STRING())
.map(
- (MapFunction) value -> OBJECT_MAPPER
- .readValue(value. getAs("payload"), actionPayloadClazz),
+ (MapFunction) value -> decodePayload(actionPayloadClazz, value),
Encoders.bean(actionPayloadClazz));
}
+ private static String extractPayload(Row value) {
+ try {
+ return value. getAs("payload");
+ } catch (IllegalArgumentException | ClassCastException e) {
+ logger.error("cannot extract payload from action: {}", value.toString());
+ throw e;
+ }
+ }
+
+ private static A decodePayload(Class actionPayloadClazz, String payload) throws IOException {
+ try {
+ return OBJECT_MAPPER.readValue(payload, actionPayloadClazz);
+ } catch (UnrecognizedPropertyException e) {
+ logger.error("error decoding payload: {}", payload);
+ throw e;
+ }
+ }
+
private static Dataset promoteActionPayloadForGraphTable(
Dataset rowDS,
Dataset actionPayloadDS,
diff --git a/dhp-workflows/dhp-aggregation/pom.xml b/dhp-workflows/dhp-aggregation/pom.xml
index 9f082df70..a1bc1c483 100644
--- a/dhp-workflows/dhp-aggregation/pom.xml
+++ b/dhp-workflows/dhp-aggregation/pom.xml
@@ -4,7 +4,7 @@
eu.dnetlib.dhp
dhp-workflows
- 1.2.2-SNAPSHOT
+ 1.2.4-SNAPSHOT
dhp-aggregation
@@ -38,48 +38,6 @@
${project.version}
-
- eu.dnetlib
- dnet-actionmanager-common
-
-
- eu.dnetlib
- dnet-openaireplus-mapping-utils
-
-
- saxonica
- saxon
-
-
- saxonica
- saxon-dom
-
-
- jgrapht
- jgrapht
-
-
- net.sf.ehcache
- ehcache
-
-
- org.springframework
- spring-test
-
-
- org.apache.*
- *
-
-
- apache
- *
-
-
-
-
- eu.dnetlib
- dnet-openaire-data-protos
-
net.sf.saxon
@@ -100,11 +58,15 @@
jaxen
+
- org.apache.hadoop
- hadoop-distcp
+ org.apache.commons
+ commons-csv
+ 1.8
+
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java
new file mode 100644
index 000000000..c6dab13a0
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java
@@ -0,0 +1,123 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.HashMap;
+import java.util.Optional;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import scala.Tuple2;
+
+public class PrepareProgramme {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ PrepareProgramme.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String programmePath = parser.get("programmePath");
+ log.info("programmePath {}: ", programmePath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ exec(spark, programmePath, outputPath);
+ });
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void exec(SparkSession spark, String programmePath, String outputPath) {
+ Dataset programme = readPath(spark, programmePath, CSVProgramme.class);
+
+ programme
+ .toJavaRDD()
+ .filter(p -> !p.getCode().contains("FP7"))
+ .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
+ .reduceByKey((a, b) -> {
+ if (StringUtils.isEmpty(a.getShortTitle())) {
+ if (StringUtils.isEmpty(b.getShortTitle())) {
+ if (StringUtils.isEmpty(a.getTitle())) {
+ if (StringUtils.isNotEmpty(b.getTitle())) {
+ a.setShortTitle(b.getTitle());
+ a.setLanguage(b.getLanguage());
+ }
+ } else {// notIsEmpty a.getTitle
+ if (StringUtils.isEmpty(b.getTitle())) {
+ a.setShortTitle(a.getTitle());
+ } else {
+ if (b.getLanguage().equalsIgnoreCase("en")) {
+ a.setShortTitle(b.getTitle());
+ a.setLanguage(b.getLanguage());
+ } else {
+ a.setShortTitle(a.getTitle());
+ }
+ }
+ }
+ } else {// not isEmpty b.getShortTitle
+ a.setShortTitle(b.getShortTitle());
+ // a.setLanguage(b.getLanguage());
+ }
+ }
+ return a;
+
+ })
+ .map(p -> {
+ CSVProgramme csvProgramme = p._2();
+ if (StringUtils.isEmpty(csvProgramme.getShortTitle())) {
+ csvProgramme.setShortTitle(csvProgramme.getTitle());
+ }
+ return OBJECT_MAPPER.writeValueAsString(csvProgramme);
+ })
+ .saveAsTextFile(outputPath);
+
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java
new file mode 100644
index 000000000..78aed1a69
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java
@@ -0,0 +1,120 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.util.*;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import scala.Tuple2;
+
+public class PrepareProjects {
+
+ private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final HashMap programmeMap = new HashMap<>();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ PrepareProjects.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ final String projectPath = parser.get("projectPath");
+ log.info("projectPath {}: ", projectPath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ final String dbProjectPath = parser.get("dbProjectPath");
+ log.info("dbProjectPath {}: ", dbProjectPath);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ exec(spark, projectPath, dbProjectPath, outputPath);
+ });
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void exec(SparkSession spark, String projectPath, String dbProjectPath, String outputPath) {
+ Dataset project = readPath(spark, projectPath, CSVProject.class);
+ Dataset dbProjects = readPath(spark, dbProjectPath, ProjectSubset.class);
+
+ dbProjects
+ .joinWith(project, dbProjects.col("code").equalTo(project.col("id")), "left")
+ .flatMap(getTuple2CSVProjectFlatMapFunction(), Encoders.bean(CSVProject.class))
+ .filter(Objects::nonNull)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .option("compression", "gzip")
+ .json(outputPath);
+
+ }
+
+ private static FlatMapFunction, CSVProject> getTuple2CSVProjectFlatMapFunction() {
+ return (FlatMapFunction, CSVProject>) value -> {
+ Optional csvProject = Optional.ofNullable(value._2());
+ List csvProjectList = new ArrayList<>();
+ if (csvProject.isPresent()) {
+
+ String[] programme = csvProject.get().getProgramme().split(";");
+ Arrays
+ .stream(programme)
+ .forEach(p -> {
+ CSVProject proj = new CSVProject();
+ proj.setProgramme(p);
+ proj.setId(csvProject.get().getId());
+ csvProjectList.add(proj);
+ });
+ }
+ return csvProjectList.iterator();
+ };
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java
new file mode 100644
index 000000000..2fccbc516
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ProjectSubset.java
@@ -0,0 +1,17 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import java.io.Serializable;
+
+public class ProjectSubset implements Serializable {
+
+ private String code;
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java
new file mode 100644
index 000000000..2d541d2f9
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsFromDB.java
@@ -0,0 +1,115 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.DbClient;
+
+public class ReadProjectsFromDB implements Closeable {
+
+ private final DbClient dbClient;
+ private static final Log log = LogFactory.getLog(ReadProjectsFromDB.class);
+ private final Configuration conf;
+ private final BufferedWriter writer;
+ private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ private final static String query = "SELECT code " +
+ "from projects where id like 'corda__h2020%' ";
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ ReadProjectsFromDB.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json")));
+
+ parser.parseArgument(args);
+
+ final String dbUrl = parser.get("postgresUrl");
+ final String dbUser = parser.get("postgresUser");
+ final String dbPassword = parser.get("postgresPassword");
+ final String hdfsPath = parser.get("hdfsPath");
+ final String hdfsNameNode = parser.get("hdfsNameNode");
+
+ try (final ReadProjectsFromDB rbl = new ReadProjectsFromDB(hdfsPath, hdfsNameNode, dbUrl, dbUser,
+ dbPassword)) {
+
+ log.info("Processing projects...");
+ rbl.execute(query, rbl::processProjectsEntry);
+
+ }
+ }
+
+ public void execute(final String sql, final Function> producer) throws Exception {
+
+ final Consumer consumer = rs -> producer.apply(rs).forEach(r -> writeProject(r));
+
+ dbClient.processResults(sql, consumer);
+ }
+
+ public List processProjectsEntry(ResultSet rs) {
+ try {
+ ProjectSubset p = new ProjectSubset();
+ p.setCode(rs.getString("code"));
+
+ return Arrays.asList(p);
+
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected void writeProject(final ProjectSubset r) {
+ try {
+ writer.write(OBJECT_MAPPER.writeValueAsString(r));
+ writer.newLine();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ReadProjectsFromDB(
+ final String hdfsPath, String hdfsNameNode, final String dbUrl, final String dbUser, final String dbPassword)
+ throws Exception {
+
+ this.dbClient = new DbClient(dbUrl, dbUser, dbPassword);
+ this.conf = new Configuration();
+ this.conf.set("fs.defaultFS", hdfsNameNode);
+ FileSystem fileSystem = FileSystem.get(this.conf);
+ Path hdfsWritePath = new Path(hdfsPath);
+ FSDataOutputStream fsDataOutputStream = null;
+ if (fileSystem.exists(hdfsWritePath)) {
+ fileSystem.delete(hdfsWritePath, false);
+ }
+ fsDataOutputStream = fileSystem.create(hdfsWritePath);
+
+ this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void close() throws IOException {
+ dbClient.close();
+ writer.close();
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java
new file mode 100644
index 000000000..1023e2d19
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java
@@ -0,0 +1,161 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Consumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.apache.spark.rdd.SequenceFileRDDFunctions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme;
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.schema.action.AtomicAction;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.Programme;
+import eu.dnetlib.dhp.schema.oaf.Project;
+import eu.dnetlib.dhp.utils.DHPUtils;
+import scala.Function1;
+import scala.Tuple2;
+import scala.runtime.BoxedUnit;
+
+public class SparkAtomicActionJob {
+ private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final HashMap programmeMap = new HashMap<>();
+
+ public static void main(String[] args) throws Exception {
+
+ String jsonConfiguration = IOUtils
+ .toString(
+ SparkAtomicActionJob.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json"));
+
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
+
+ parser.parseArgument(args);
+
+ Boolean isSparkSessionManaged = Optional
+ .ofNullable(parser.get("isSparkSessionManaged"))
+ .map(Boolean::valueOf)
+ .orElse(Boolean.TRUE);
+
+ log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
+
+ String projectPath = parser.get("projectPath");
+ log.info("projectPath: {}", projectPath);
+
+ final String outputPath = parser.get("outputPath");
+ log.info("outputPath {}: ", outputPath);
+
+ final String programmePath = parser.get("programmePath");
+ log.info("programmePath {}: ", programmePath);
+
+ SparkConf conf = new SparkConf();
+
+ runWithSparkSession(
+ conf,
+ isSparkSessionManaged,
+ spark -> {
+ removeOutputDir(spark, outputPath);
+ getAtomicActions(
+ spark,
+ projectPath,
+ programmePath,
+ outputPath);
+ });
+ }
+
+ private static void removeOutputDir(SparkSession spark, String path) {
+ HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
+ }
+
+ private static void getAtomicActions(SparkSession spark, String projectPatH,
+ String programmePath,
+ String outputPath) {
+
+ Dataset project = readPath(spark, projectPatH, CSVProject.class);
+ Dataset programme = readPath(spark, programmePath, CSVProgramme.class);
+
+ project
+ .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
+ .map(c -> {
+ CSVProject csvProject = c._1();
+ Optional csvProgramme = Optional.ofNullable(c._2());
+ if (csvProgramme.isPresent()) {
+ Project p = new Project();
+ p
+ .setId(
+ createOpenaireId(
+ ModelSupport.entityIdPrefix.get("project"),
+ "corda__h2020", csvProject.getId()));
+ Programme pm = new Programme();
+ pm.setCode(csvProject.getProgramme());
+ pm.setDescription(csvProgramme.get().getShortTitle());
+ p.setProgramme(Arrays.asList(pm));
+ return p;
+ }
+
+ return null;
+ }, Encoders.bean(Project.class))
+ .filter(Objects::nonNull)
+ .groupByKey(
+ (MapFunction) p -> p.getId(),
+ Encoders.STRING())
+ .mapGroups((MapGroupsFunction) (s, it) -> {
+ Project first = it.next();
+ it.forEachRemaining(p -> {
+ first.mergeFrom(p);
+ });
+ return first;
+ }, Encoders.bean(Project.class))
+ .toJavaRDD()
+ .map(p -> new AtomicAction(Project.class, p))
+ .mapToPair(
+ aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
+ new Text(OBJECT_MAPPER.writeValueAsString(aa))))
+ .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+
+ }
+
+ public static Dataset readPath(
+ SparkSession spark, String inputPath, Class clazz) {
+ return spark
+ .read()
+ .textFile(inputPath)
+ .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
+ }
+
+ public static String createOpenaireId(
+ final String prefix, final String nsPrefix, final String id) {
+
+ return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5(id));
+
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java
new file mode 100644
index 000000000..ef29a6b6a
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVParser.java
@@ -0,0 +1,37 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang.reflect.FieldUtils;
+
+public class CSVParser {
+
+ public List parse(String csvFile, String classForName)
+ throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException {
+ final CSVFormat format = CSVFormat.EXCEL
+ .withHeader()
+ .withDelimiter(';')
+ .withQuote('"')
+ .withTrim();
+ List ret = new ArrayList<>();
+ final org.apache.commons.csv.CSVParser parser = org.apache.commons.csv.CSVParser.parse(csvFile, format);
+ final Set headers = parser.getHeaderMap().keySet();
+ Class> clazz = Class.forName(classForName);
+ for (CSVRecord csvRecord : parser.getRecords()) {
+ final Object cc = clazz.newInstance();
+ for (String header : headers) {
+ FieldUtils.writeField(cc, header, csvRecord.get(header), true);
+
+ }
+ ret.add((R) cc);
+ }
+
+ return ret;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java
new file mode 100644
index 000000000..a9069e510
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java
@@ -0,0 +1,52 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.Serializable;
+
+public class CSVProgramme implements Serializable {
+ private String rcn;
+ private String code;
+ private String title;
+ private String shortTitle;
+ private String language;
+
+ public String getRcn() {
+ return rcn;
+ }
+
+ public void setRcn(String rcn) {
+ this.rcn = rcn;
+ }
+
+ public String getCode() {
+ return code;
+ }
+
+ public void setCode(String code) {
+ this.code = code;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getShortTitle() {
+ return shortTitle;
+ }
+
+ public void setShortTitle(String shortTitle) {
+ this.shortTitle = shortTitle;
+ }
+
+ public String getLanguage() {
+ return language;
+ }
+
+ public void setLanguage(String language) {
+ this.language = language;
+ }
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java
new file mode 100644
index 000000000..ff18c6260
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProject.java
@@ -0,0 +1,197 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.Serializable;
+
+public class CSVProject implements Serializable {
+ private String rcn;
+ private String id;
+ private String acronym;
+ private String status;
+ private String programme;
+ private String topics;
+ private String frameworkProgramme;
+ private String title;
+ private String startDate;
+ private String endDate;
+ private String projectUrl;
+ private String objective;
+ private String totalCost;
+ private String ecMaxContribution;
+ private String call;
+ private String fundingScheme;
+ private String coordinator;
+ private String coordinatorCountry;
+ private String participants;
+ private String participantCountries;
+ private String subjects;
+
+ public String getRcn() {
+ return rcn;
+ }
+
+ public void setRcn(String rcn) {
+ this.rcn = rcn;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getAcronym() {
+ return acronym;
+ }
+
+ public void setAcronym(String acronym) {
+ this.acronym = acronym;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getProgramme() {
+ return programme;
+ }
+
+ public void setProgramme(String programme) {
+ this.programme = programme;
+ }
+
+ public String getTopics() {
+ return topics;
+ }
+
+ public void setTopics(String topics) {
+ this.topics = topics;
+ }
+
+ public String getFrameworkProgramme() {
+ return frameworkProgramme;
+ }
+
+ public void setFrameworkProgramme(String frameworkProgramme) {
+ this.frameworkProgramme = frameworkProgramme;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getStartDate() {
+ return startDate;
+ }
+
+ public void setStartDate(String startDate) {
+ this.startDate = startDate;
+ }
+
+ public String getEndDate() {
+ return endDate;
+ }
+
+ public void setEndDate(String endDate) {
+ this.endDate = endDate;
+ }
+
+ public String getProjectUrl() {
+ return projectUrl;
+ }
+
+ public void setProjectUrl(String projectUrl) {
+ this.projectUrl = projectUrl;
+ }
+
+ public String getObjective() {
+ return objective;
+ }
+
+ public void setObjective(String objective) {
+ this.objective = objective;
+ }
+
+ public String getTotalCost() {
+ return totalCost;
+ }
+
+ public void setTotalCost(String totalCost) {
+ this.totalCost = totalCost;
+ }
+
+ public String getEcMaxContribution() {
+ return ecMaxContribution;
+ }
+
+ public void setEcMaxContribution(String ecMaxContribution) {
+ this.ecMaxContribution = ecMaxContribution;
+ }
+
+ public String getCall() {
+ return call;
+ }
+
+ public void setCall(String call) {
+ this.call = call;
+ }
+
+ public String getFundingScheme() {
+ return fundingScheme;
+ }
+
+ public void setFundingScheme(String fundingScheme) {
+ this.fundingScheme = fundingScheme;
+ }
+
+ public String getCoordinator() {
+ return coordinator;
+ }
+
+ public void setCoordinator(String coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ public String getCoordinatorCountry() {
+ return coordinatorCountry;
+ }
+
+ public void setCoordinatorCountry(String coordinatorCountry) {
+ this.coordinatorCountry = coordinatorCountry;
+ }
+
+ public String getParticipants() {
+ return participants;
+ }
+
+ public void setParticipants(String participants) {
+ this.participants = participants;
+ }
+
+ public String getParticipantCountries() {
+ return participantCountries;
+ }
+
+ public void setParticipantCountries(String participantCountries) {
+ this.participantCountries = participantCountries;
+ }
+
+ public String getSubjects() {
+ return subjects;
+ }
+
+ public void setSubjects(String subjects) {
+ this.subjects = subjects;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java
new file mode 100644
index 000000000..2b72b229a
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/ReadCSV.java
@@ -0,0 +1,97 @@
+
+package eu.dnetlib.dhp.actionmanager.project.csvutils;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+
+public class ReadCSV implements Closeable {
+ private static final Log log = LogFactory.getLog(ReadCSV.class);
+ private final Configuration conf;
+ private final BufferedWriter writer;
+ private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private String csvFile;
+
+ public static void main(final String[] args) throws Exception {
+ final ArgumentApplicationParser parser = new ArgumentApplicationParser(
+ IOUtils
+ .toString(
+ ReadCSV.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/actionmanager/project/parameters.json")));
+
+ parser.parseArgument(args);
+
+ final String fileURL = parser.get("fileURL");
+ final String hdfsPath = parser.get("hdfsPath");
+ final String hdfsNameNode = parser.get("hdfsNameNode");
+ final String classForName = parser.get("classForName");
+
+ try (final ReadCSV readCSV = new ReadCSV(hdfsPath, hdfsNameNode, fileURL)) {
+
+ log.info("Getting CSV file...");
+ readCSV.execute(classForName);
+
+ }
+ }
+
+ public void execute(final String classForName) throws Exception {
+ CSVParser csvParser = new CSVParser();
+ csvParser
+ .parse(csvFile, classForName)
+ .stream()
+ .forEach(p -> write(p));
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ public ReadCSV(
+ final String hdfsPath,
+ final String hdfsNameNode,
+ final String fileURL)
+ throws Exception {
+ this.conf = new Configuration();
+ this.conf.set("fs.defaultFS", hdfsNameNode);
+ HttpConnector httpConnector = new HttpConnector();
+ FileSystem fileSystem = FileSystem.get(this.conf);
+ Path hdfsWritePath = new Path(hdfsPath);
+ FSDataOutputStream fsDataOutputStream = null;
+ if (fileSystem.exists(hdfsWritePath)) {
+ fileSystem.delete(hdfsWritePath, false);
+ }
+ fsDataOutputStream = fileSystem.create(hdfsWritePath);
+
+ this.writer = new BufferedWriter(new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
+ this.csvFile = httpConnector.getInputSource(fileURL);
+ ;
+ }
+
+ protected void write(final Object p) {
+ try {
+ writer.write(OBJECT_MAPPER.writeValueAsString(p));
+ writer.newLine();
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java
new file mode 100644
index 000000000..9d3f88265
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorPluginErrorLogList.java
@@ -0,0 +1,20 @@
+
+package eu.dnetlib.dhp.actionmanager.project.httpconnector;
+
+import java.util.LinkedList;
+
+public class CollectorPluginErrorLogList extends LinkedList {
+
+ private static final long serialVersionUID = -6925786561303289704L;
+
+ @Override
+ public String toString() {
+ String log = new String();
+ int index = 0;
+ for (String errorMessage : this) {
+ log += String.format("Retry #%s: %s / ", index++, errorMessage);
+ }
+ return log;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java
new file mode 100644
index 000000000..9167d97b4
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/CollectorServiceException.java
@@ -0,0 +1,20 @@
+
+package eu.dnetlib.dhp.actionmanager.project.httpconnector;
+
+public class CollectorServiceException extends Exception {
+
+ private static final long serialVersionUID = 7523999812098059764L;
+
+ public CollectorServiceException(String string) {
+ super(string);
+ }
+
+ public CollectorServiceException(String string, Throwable exception) {
+ super(string, exception);
+ }
+
+ public CollectorServiceException(Throwable exception) {
+ super(exception);
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java
new file mode 100644
index 000000000..e20518b55
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/httpconnector/HttpConnector.java
@@ -0,0 +1,240 @@
+
+package eu.dnetlib.dhp.actionmanager.project.httpconnector;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.*;
+import java.security.GeneralSecurityException;
+import java.security.cert.X509Certificate;
+import java.util.List;
+import java.util.Map;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @author jochen, michele, andrea
+ */
+public class HttpConnector {
+
+ private static final Log log = LogFactory.getLog(HttpConnector.class);
+
+ private int maxNumberOfRetry = 6;
+ private int defaultDelay = 120; // seconds
+ private int readTimeOut = 120; // seconds
+
+ private String responseType = null;
+
+ private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)";
+
+ public HttpConnector() {
+ CookieHandler.setDefault(new CookieManager(null, CookiePolicy.ACCEPT_ALL));
+ }
+
+ /**
+ * Given the URL returns the content via HTTP GET
+ *
+ * @param requestUrl the URL
+ * @return the content of the downloaded resource
+ * @throws CollectorServiceException when retrying more than maxNumberOfRetry times
+ */
+ public String getInputSource(final String requestUrl) throws CollectorServiceException {
+ return attemptDownlaodAsString(requestUrl, 1, new CollectorPluginErrorLogList());
+ }
+
+ /**
+ * Given the URL returns the content as a stream via HTTP GET
+ *
+ * @param requestUrl the URL
+ * @return the content of the downloaded resource as InputStream
+ * @throws CollectorServiceException when retrying more than maxNumberOfRetry times
+ */
+ public InputStream getInputSourceAsStream(final String requestUrl) throws CollectorServiceException {
+ return attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
+ }
+
+ private String attemptDownlaodAsString(final String requestUrl, final int retryNumber,
+ final CollectorPluginErrorLogList errorList)
+ throws CollectorServiceException {
+ try {
+ InputStream s = attemptDownload(requestUrl, 1, new CollectorPluginErrorLogList());
+ try {
+ return IOUtils.toString(s);
+ } catch (IOException e) {
+ log.error("error while retrieving from http-connection occured: " + requestUrl, e);
+ Thread.sleep(defaultDelay * 1000);
+ errorList.add(e.getMessage());
+ return attemptDownlaodAsString(requestUrl, retryNumber + 1, errorList);
+ } finally {
+ IOUtils.closeQuietly(s);
+ }
+ } catch (InterruptedException e) {
+ throw new CollectorServiceException(e);
+ }
+ }
+
+ private InputStream attemptDownload(final String requestUrl, final int retryNumber,
+ final CollectorPluginErrorLogList errorList)
+ throws CollectorServiceException {
+
+ if (retryNumber > maxNumberOfRetry) {
+ throw new CollectorServiceException("Max number of retries exceeded. Cause: \n " + errorList);
+ }
+
+ log.debug("Downloading " + requestUrl + " - try: " + retryNumber);
+ try {
+ InputStream input = null;
+
+ try {
+ final HttpURLConnection urlConn = (HttpURLConnection) new URL(requestUrl).openConnection();
+ urlConn.setInstanceFollowRedirects(false);
+ urlConn.setReadTimeout(readTimeOut * 1000);
+ urlConn.addRequestProperty("User-Agent", userAgent);
+
+ if (log.isDebugEnabled()) {
+ logHeaderFields(urlConn);
+ }
+
+ int retryAfter = obtainRetryAfter(urlConn.getHeaderFields());
+ if (retryAfter > 0 && urlConn.getResponseCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
+ log.warn("waiting and repeating request after " + retryAfter + " sec.");
+ Thread.sleep(retryAfter * 1000);
+ errorList.add("503 Service Unavailable");
+ urlConn.disconnect();
+ return attemptDownload(requestUrl, retryNumber + 1, errorList);
+ } else if ((urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_PERM)
+ || (urlConn.getResponseCode() == HttpURLConnection.HTTP_MOVED_TEMP)) {
+ final String newUrl = obtainNewLocation(urlConn.getHeaderFields());
+ log.debug("The requested url has been moved to " + newUrl);
+ errorList
+ .add(
+ String
+ .format(
+ "%s %s. Moved to: %s", urlConn.getResponseCode(), urlConn.getResponseMessage(),
+ newUrl));
+ urlConn.disconnect();
+ return attemptDownload(newUrl, retryNumber + 1, errorList);
+ } else if (urlConn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ log
+ .error(
+ String
+ .format("HTTP error: %s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
+ Thread.sleep(defaultDelay * 1000);
+ errorList.add(String.format("%s %s", urlConn.getResponseCode(), urlConn.getResponseMessage()));
+ urlConn.disconnect();
+ return attemptDownload(requestUrl, retryNumber + 1, errorList);
+ } else {
+ input = urlConn.getInputStream();
+ responseType = urlConn.getContentType();
+ return input;
+ }
+ } catch (IOException e) {
+ log.error("error while retrieving from http-connection occured: " + requestUrl, e);
+ Thread.sleep(defaultDelay * 1000);
+ errorList.add(e.getMessage());
+ return attemptDownload(requestUrl, retryNumber + 1, errorList);
+ }
+ } catch (InterruptedException e) {
+ throw new CollectorServiceException(e);
+ }
+ }
+
+ private void logHeaderFields(final HttpURLConnection urlConn) throws IOException {
+ log.debug("StatusCode: " + urlConn.getResponseMessage());
+
+ for (Map.Entry> e : urlConn.getHeaderFields().entrySet()) {
+ if (e.getKey() != null) {
+ for (String v : e.getValue()) {
+ log.debug(" key: " + e.getKey() + " - value: " + v);
+ }
+ }
+ }
+ }
+
+ private int obtainRetryAfter(final Map> headerMap) {
+ for (String key : headerMap.keySet()) {
+ if ((key != null) && key.toLowerCase().equals("retry-after") && (headerMap.get(key).size() > 0)
+ && NumberUtils.isCreatable(headerMap.get(key).get(0))) {
+ return Integer
+ .parseInt(headerMap.get(key).get(0)) + 10;
+ }
+ }
+ return -1;
+ }
+
+ private String obtainNewLocation(final Map> headerMap) throws CollectorServiceException {
+ for (String key : headerMap.keySet()) {
+ if ((key != null) && key.toLowerCase().equals("location") && (headerMap.get(key).size() > 0)) {
+ return headerMap.get(key).get(0);
+ }
+ }
+ throw new CollectorServiceException("The requested url has been MOVED, but 'location' param is MISSING");
+ }
+
+ /**
+ * register for https scheme; this is a workaround and not intended for the use in trusted environments
+ */
+ public void initTrustManager() {
+ final X509TrustManager tm = new X509TrustManager() {
+
+ @Override
+ public void checkClientTrusted(final X509Certificate[] xcs, final String string) {
+ }
+
+ @Override
+ public void checkServerTrusted(final X509Certificate[] xcs, final String string) {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ };
+ try {
+ final SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(null, new TrustManager[] {
+ tm
+ }, null);
+ HttpsURLConnection.setDefaultSSLSocketFactory(ctx.getSocketFactory());
+ } catch (GeneralSecurityException e) {
+ log.fatal(e);
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public int getMaxNumberOfRetry() {
+ return maxNumberOfRetry;
+ }
+
+ public void setMaxNumberOfRetry(final int maxNumberOfRetry) {
+ this.maxNumberOfRetry = maxNumberOfRetry;
+ }
+
+ public int getDefaultDelay() {
+ return defaultDelay;
+ }
+
+ public void setDefaultDelay(final int defaultDelay) {
+ this.defaultDelay = defaultDelay;
+ }
+
+ public int getReadTimeOut() {
+ return readTimeOut;
+ }
+
+ public void setReadTimeOut(final int readTimeOut) {
+ this.readTimeOut = readTimeOut;
+ }
+
+ public String getResponseType() {
+ return responseType;
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json
new file mode 100644
index 000000000..a0856e10e
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+},
+{
+"paramName": "pjp",
+"paramLongName": "projectPath",
+"paramDescription": "the URL from where to get the projects file",
+"paramRequired": true
+},
+{
+"paramName": "pp",
+"paramLongName": "programmePath",
+"paramDescription": "the URL from where to get the programme file",
+"paramRequired": true
+},
+{
+"paramName": "o",
+"paramLongName": "outputPath",
+"paramDescription": "the path of the new ActionSet",
+"paramRequired": true
+}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/config-default.xml
new file mode 100644
index 000000000..fe82ae194
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/config-default.xml
@@ -0,0 +1,54 @@
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+
+
+ spark2ExtraListeners
+ com.cloudera.spark.lineage.NavigatorAppListener
+
+
+ spark2SqlQueryExecutionListeners
+ com.cloudera.spark.lineage.NavigatorQueryListener
+
+
+ sparkExecutorNumber
+ 4
+
+
+ spark2EventLogDir
+ /user/spark/spark2ApplicationHistory
+
+
+ sparkDriverMemory
+ 15G
+
+
+ sparkExecutorMemory
+ 6G
+
+
+ sparkExecutorCores
+ 1
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml
new file mode 100644
index 000000000..1e3445675
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml
@@ -0,0 +1,146 @@
+
+
+
+ projectFileURL
+ the url where to get the projects file
+
+
+
+ programmeFileURL
+ the url where to get the programme file
+
+
+
+ outputPath
+ path where to store the action set
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.actionmanager.project.csvutils.ReadCSV
+ --hdfsNameNode${nameNode}
+ --fileURL${projectFileURL}
+ --hdfsPath${workingDir}/projects
+ --classForNameeu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.actionmanager.project.csvutils.ReadCSV
+ --hdfsNameNode${nameNode}
+ --fileURL${programmeFileURL}
+ --hdfsPath${workingDir}/programme
+ --classForNameeu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme
+
+
+
+
+
+
+
+ eu.dnetlib.dhp.actionmanager.project.ReadProjectsFromDB
+ --hdfsPath${workingDir}/dbProjects
+ --hdfsNameNode${nameNode}
+ --postgresUrl${postgresURL}
+ --postgresUser${postgresUser}
+ --postgresPassword${postgresPassword}
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareProgramme
+ eu.dnetlib.dhp.actionmanager.project.PrepareProgramme
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --programmePath${workingDir}/programme
+ --outputPath${workingDir}/preparedProgramme
+
+
+
+
+
+
+
+ yarn
+ cluster
+ PrepareProjects
+ eu.dnetlib.dhp.actionmanager.project.PrepareProjects
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --projectPath${workingDir}/projects
+ --outputPath${workingDir}/preparedProjects
+ --dbProjectPath${workingDir}/dbProjects
+
+
+
+
+
+
+
+ yarn
+ cluster
+ ProjectProgrammeAS
+ eu.dnetlib.dhp.actionmanager.project.SparkAtomicActionJob
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-cores=${sparkExecutorCores}
+ --executor-memory=${sparkExecutorMemory}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.shuffle.partitions=3840
+
+ --projectPath${workingDir}/preparedProjects
+ --programmePath${workingDir}/preparedProgramme
+ --outputPath${outputPath}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json
new file mode 100644
index 000000000..dd3de70f6
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/parameters.json
@@ -0,0 +1,29 @@
+[
+
+ {
+ "paramName": "fu",
+ "paramLongName" : "fileURL",
+ "paramDescription" : "the url of the file to download",
+ "paramRequired" : true
+ },
+ {
+ "paramName": "hp",
+ "paramLongName" : "hdfsPath",
+ "paramDescription" : "where to save the file",
+ "paramRequired" : true
+ },
+ {
+ "paramName": "hnn",
+ "paramLongName" : "hdfsNameNode",
+ "paramDescription" : "the name node",
+ "paramRequired" : true
+ },
+ {
+ "paramName": "cfn",
+ "paramLongName" : "classForName",
+ "paramDescription" : "the name of the class to deserialize the csv to",
+ "paramRequired" : true
+}
+
+
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json
new file mode 100644
index 000000000..54083e108
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_programme_parameters.json
@@ -0,0 +1,20 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+},
+{
+"paramName": "pp",
+"paramLongName": "programmePath",
+"paramDescription": "the URL from where to get the programme file",
+"paramRequired": true
+},
+{
+"paramName": "o",
+"paramLongName": "outputPath",
+"paramDescription": "the path of the new ActionSet",
+"paramRequired": true
+}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json
new file mode 100644
index 000000000..49f9c7306
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/prepare_project_parameters.json
@@ -0,0 +1,26 @@
+[
+ {
+ "paramName": "issm",
+ "paramLongName": "isSparkSessionManaged",
+ "paramDescription": "when true will stop SparkSession after job execution",
+ "paramRequired": false
+},
+{
+"paramName": "pjp",
+"paramLongName": "projectPath",
+"paramDescription": "the URL from where to get the programme file",
+"paramRequired": true
+},
+{
+"paramName": "o",
+"paramLongName": "outputPath",
+"paramDescription": "the path of the new ActionSet",
+"paramRequired": true
+},
+ {
+ "paramName": "dbp",
+ "paramLongName": "dbProjectPath",
+ "paramDescription": "the path of the project code read from db",
+ "paramRequired": true
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json
new file mode 100644
index 000000000..9a2eadaa7
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/read_projects_db.json
@@ -0,0 +1,32 @@
+[
+ {
+ "paramName": "p",
+ "paramLongName": "hdfsPath",
+ "paramDescription": "the path where storing the sequential file",
+ "paramRequired": true
+ },
+ {
+ "paramName": "nn",
+ "paramLongName": "hdfsNameNode",
+ "paramDescription": "the name node on hdfs",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pgurl",
+ "paramLongName": "postgresUrl",
+ "paramDescription": "postgres url, example: jdbc:postgresql://localhost:5432/testdb",
+ "paramRequired": true
+ },
+ {
+ "paramName": "pguser",
+ "paramLongName": "postgresUser",
+ "paramDescription": "postgres user",
+ "paramRequired": false
+ },
+ {
+ "paramName": "pgpasswd",
+ "paramLongName": "postgresPassword",
+ "paramDescription": "postgres password",
+ "paramRequired": false
+ }
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java
new file mode 100644
index 000000000..17fdd4511
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/CSVParserTest.java
@@ -0,0 +1,41 @@
+
+package eu.dnetlib.dhp.actionmanager.project;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVParser;
+
+public class CSVParserTest {
+
+ private static Path workingDir;
+
+ @BeforeAll
+ public static void beforeAll() throws IOException {
+ workingDir = Files.createTempDirectory(CSVParserTest.class.getSimpleName());
+
+ }
+
+ @Test
+ public void readProgrammeTest() throws Exception {
+
+ String programmecsv = IOUtils
+ .toString(
+ getClass()
+ .getClassLoader()
+ .getResourceAsStream("eu/dnetlib/dhp/actionmanager/project/programme.csv"));
+
+ CSVParser csvParser = new CSVParser();
+
+ List