diff --git a/dhp-common/pom.xml b/dhp-common/pom.xml
index acac3594fa..b1494f649a 100644
--- a/dhp-common/pom.xml
+++ b/dhp-common/pom.xml
@@ -21,6 +21,10 @@
org.apache.hadoop
hadoop-common
+
+ commons-validator
+ commons-validator
+
org.apache.spark
spark-core_2.11
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java
index 15fff07c02..da253c681a 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java
@@ -7,11 +7,13 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.validator.GenericValidator;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
public class GraphCleaningFunctions extends CleaningFunctions {
@@ -115,7 +117,13 @@ public class GraphCleaningFunctions extends CleaningFunctions {
o.setCountry(ModelConstants.UNKNOWN_COUNTRY);
}
} else if (value instanceof Relation) {
- // nothing to clean here
+ Relation r = (Relation) value;
+
+ if (!isValidDate(r.getValidationDate())) {
+ r.setValidationDate(null);
+ r.setValidated(false);
+ }
+
} else if (value instanceof Result) {
Result r = (Result) value;
@@ -292,6 +300,12 @@ public class GraphCleaningFunctions extends CleaningFunctions {
return value;
}
+ protected static boolean isValidDate(String date) {
+ return Stream
+ .of(ModelSupport.DATE_TIME_FORMATS)
+ .anyMatch(format -> GenericValidator.isDate(date, format, false));
+ }
+
// HELPERS
private static boolean isValidAuthorName(Author a) {
diff --git a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java
index 7256d6489f..e8135f2019 100644
--- a/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java
+++ b/dhp-common/src/test/java/eu/dnetlib/dhp/schema/oaf/utils/OafMapperUtilsTest.java
@@ -4,6 +4,7 @@ package eu.dnetlib.dhp.schema.oaf.utils;
import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException;
+import java.time.format.DateTimeParseException;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
@@ -15,16 +16,23 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.common.ModelConstants;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.KeyValue;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.*;
public class OafMapperUtilsTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ @Test
+ public void testDateValidation() {
+
+ assertTrue(GraphCleaningFunctions.isValidDate("2016-05-07T12:41:19.202Z"));
+ assertTrue(GraphCleaningFunctions.isValidDate("2020-09-10 11:08:52"));
+ assertTrue(GraphCleaningFunctions.isValidDate("2016-04-05"));
+ assertFalse(GraphCleaningFunctions.isValidDate("2016 April 05"));
+
+ }
+
@Test
public void testMergePubs() throws IOException {
Publication p1 = read("publication_1.json", Publication.class);
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
index d6101ba7a9..931ac06f64 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/ImportDatacite.scala
@@ -56,6 +56,7 @@ object ImportDatacite {
val hdfsTargetPath = new Path(targetPath)
log.info(s"hdfsTargetPath is $hdfsTargetPath")
+ val bs = if (parser.get("blocksize") == null) 100 else parser.get("blocksize").toInt
val spkipImport = parser.get("skipImport")
log.info(s"skipImport is $spkipImport")
@@ -110,7 +111,7 @@ object ImportDatacite {
println(s"last Timestamp is $ts")
- val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf)
+ val cnt = if ("true".equalsIgnoreCase(spkipImport)) 1 else writeSequenceFile(hdfsTargetPath, ts, conf, bs)
println(s"Imported from Datacite API $cnt documents")
@@ -137,7 +138,7 @@ object ImportDatacite {
}
}
- private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration): Long = {
+ private def writeSequenceFile(hdfsTargetPath: Path, timestamp: Long, conf: Configuration, bs:Int): Long = {
var from:Long = timestamp * 1000
val delta:Long = 50000000L
var client: DataciteAPIImporter = null
@@ -148,7 +149,7 @@ object ImportDatacite {
try {
var start: Long = System.currentTimeMillis
while (from < now) {
- client = new DataciteAPIImporter(from, 100, from + delta)
+ client = new DataciteAPIImporter(from, bs, from + delta)
var end: Long = 0
val key: IntWritable = new IntWritable(i)
val value: Text = new Text
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java
index e5a79300e5..760e5131db 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java
@@ -143,7 +143,6 @@ public class PrepareProgramme {
JavaRDD h2020Programmes = programme
.toJavaRDD()
- .filter(p -> p.getFrameworkProgramme().trim().equalsIgnoreCase("H2020"))
.mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme))
.reduceByKey((a, b) -> {
if (!a.getLanguage().equals("en")) {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java
index 3ef98e0215..cecd537ba0 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProjects.java
@@ -18,7 +18,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
@@ -32,7 +31,6 @@ public class PrepareProjects {
private static final Logger log = LoggerFactory.getLogger(PrepareProgramme.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final HashMap programmeMap = new HashMap<>();
public static void main(String[] args) throws Exception {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java
index a583b7bfa2..fdc12c6629 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java
@@ -120,7 +120,6 @@ public class SparkAtomicActionJob {
.map((MapFunction, Project>) c -> {
CSVProject csvProject = c._1();
- Optional ocsvProgramme = Optional.ofNullable(c._2());
return Optional
.ofNullable(c._2())
@@ -135,9 +134,9 @@ public class SparkAtomicActionJob {
H2020Programme pm = new H2020Programme();
H2020Classification h2020classification = new H2020Classification();
pm.setCode(csvProject.getProgramme());
- h2020classification.setClassification(ocsvProgramme.get().getClassification());
+ h2020classification.setClassification(csvProgramme.getClassification());
h2020classification.setH2020Programme(pm);
- setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short());
+ setLevelsandProgramme(h2020classification, csvProgramme.getClassification_short());
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
pp.setH2020classification(Arrays.asList(h2020classification));
@@ -145,10 +144,11 @@ public class SparkAtomicActionJob {
})
.orElse(null);
- }, Encoders.bean(Project.class));
+ }, Encoders.bean(Project.class))
+ .filter(Objects::nonNull);
aaproject
- .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")))
+ .joinWith(topic, aaproject.col("h2020topiccode").equalTo(topic.col("code")), "left")
.map((MapFunction, Project>) p -> {
Optional op = Optional.ofNullable(p._2());
Project rp = p._1();
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java
index f991a4297b..d486f01049 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/CSVProgramme.java
@@ -7,14 +7,7 @@ import java.io.Serializable;
* The model for the programme csv file
*/
public class CSVProgramme implements Serializable {
- private String parentProgramme;
- private String frameworkProgramme;
- private String startDate;
- private String endDate;
- private String objective;
- private String subjects;
- private String legalBasis;
- private String call;
+
private String rcn;
private String code;
@@ -80,67 +73,5 @@ public class CSVProgramme implements Serializable {
this.language = language;
}
- public String getParentProgramme() {
- return parentProgramme;
- }
-
- public void setParentProgramme(String parentProgramme) {
- this.parentProgramme = parentProgramme;
- }
-
- public String getFrameworkProgramme() {
- return frameworkProgramme;
- }
-
- public void setFrameworkProgramme(String frameworkProgramme) {
- this.frameworkProgramme = frameworkProgramme;
- }
-
- public String getStartDate() {
- return startDate;
- }
-
- public void setStartDate(String startDate) {
- this.startDate = startDate;
- }
-
- public String getEndDate() {
- return endDate;
- }
-
- public void setEndDate(String endDate) {
- this.endDate = endDate;
- }
-
- public String getObjective() {
- return objective;
- }
-
- public void setObjective(String objective) {
- this.objective = objective;
- }
-
- public String getSubjects() {
- return subjects;
- }
-
- public void setSubjects(String subjects) {
- this.subjects = subjects;
- }
-
- public String getLegalBasis() {
- return legalBasis;
- }
-
- public void setLegalBasis(String legalBasis) {
- this.legalBasis = legalBasis;
- }
-
- public String getCall() {
- return call;
- }
-
- public void setCall(String call) {
- this.call = call;
- }
+//
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/EXCELParser.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/EXCELParser.java
index 1a6ebb9e86..5f5b61d8b2 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/EXCELParser.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/utils/EXCELParser.java
@@ -26,7 +26,6 @@ public class EXCELParser {
throws ClassNotFoundException, IOException, IllegalAccessException, InstantiationException,
InvalidFormatException {
- // OPCPackage pkg = OPCPackage.open(httpConnector.getInputSourceAsStream(URL));
OPCPackage pkg = OPCPackage.open(file);
XSSFWorkbook wb = new XSSFWorkbook(pkg);
@@ -58,7 +57,6 @@ public class EXCELParser {
for (int i = 0; i < headers.size(); i++) {
Cell cell = row.getCell(i);
- String value = dataFormatter.formatCellValue(cell);
FieldUtils.writeField(cc, headers.get(i), dataFormatter.formatCellValue(cell), true);
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json
index 69fb039ba8..a37ae4bba0 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/import_from_api.json
@@ -18,6 +18,12 @@
"paramDescription": "avoid to downlaod new items but apply the previous update",
"paramRequired": false
},
+ {
+ "paramName": "bs",
+ "paramLongName": "blocksize",
+ "paramDescription": "define the requests block size",
+ "paramRequired": false
+ },
{
"paramName": "n",
"paramLongName": "namenode",
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml
index 8ce5818851..e4f2715fb3 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/project/oozie_app/workflow.xml
@@ -1,4 +1,4 @@
-
+
projectFileURL
@@ -18,6 +18,10 @@
outputPath
path where to store the action set
+
+ sheetName
+ the name of the sheet to read
+
@@ -31,10 +35,23 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
eu.dnetlib.dhp.actionmanager.project.utils.ReadCSV
@@ -43,7 +60,7 @@
--hdfsPath${workingDir}/projects
--classForNameeu.dnetlib.dhp.actionmanager.project.utils.CSVProject
-
+
@@ -55,7 +72,7 @@
--hdfsPath${workingDir}/programme
--classForNameeu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme
-
+
@@ -68,7 +85,7 @@
--sheetName${sheetName}
--classForNameeu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic
-
+
@@ -81,7 +98,7 @@
--postgresUser${postgresUser}
--postgresPassword${postgresPassword}
-
+
@@ -105,10 +122,15 @@
--programmePath${workingDir}/programme
--outputPath${workingDir}/preparedProgramme
-
+
+
+
+
+
+
yarn
@@ -130,7 +152,7 @@
--outputPath${workingDir}/preparedProjects
--dbProjectPath${workingDir}/dbProjects
-
+
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java
index 1601d9b3ee..b7155bc3a4 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/EXCELParserTest.java
@@ -20,8 +20,8 @@ import eu.dnetlib.dhp.collection.HttpConnector2;
public class EXCELParserTest {
private static Path workingDir;
- private final HttpConnector2 httpConnector = new HttpConnector2();
- private static final String URL = "http://cordis.europa.eu/data/reference/cordisref-H2020topics.xlsx";
+ private HttpConnector2 httpConnector = new HttpConnector2();
+ private static final String URL = "https://cordis.europa.eu/data/reference/cordisref-h2020topics.xlsx";
@BeforeAll
public static void beforeAll() throws IOException {
@@ -35,11 +35,12 @@ public class EXCELParserTest {
EXCELParser excelParser = new EXCELParser();
- final String classForName = "eu.dnetlib.dhp.actionmanager.project.utils.ExcelTopic";
- final String sheetName = "Topics";
- List
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index b4acd6e68e..5b96816d96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,6 +200,12 @@
${dhp.commons.lang.version}
+
+ commons-validator
+ commons-validator
+ 1.7
+
+
com.google.guava
guava
@@ -730,7 +736,7 @@
3.3.3
3.4.2
[2.12,3.0)
- [2.4.7]
+ [2.5.11]
[4.0.3]
[6.0.5]
[3.1.6]