diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
index 0b602b774..7a8e55a6e 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/AuthorMerger.java
@@ -67,6 +67,7 @@ public class AuthorMerger {
a -> a
.getPid()
.stream()
+ .filter(Objects::nonNull)
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
@@ -78,6 +79,7 @@ public class AuthorMerger {
a -> a
.getPid()
.stream()
+ .filter(Objects::nonNull)
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
@@ -150,7 +152,7 @@ public class AuthorMerger {
}
private static boolean hasPid(Author a) {
- if (a == null || a.getPid() == null || a.getPid().size() == 0)
+ if (a == null || a.getPid() == null || a.getPid().isEmpty())
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
@@ -159,7 +161,10 @@ public class AuthorMerger {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
- return new Person(author.getFullname(), false);
+ if (StringUtils.isNotBlank(author.getFullname()))
+ return new Person(author.getFullname(), false);
+ else
+ return new Person("", false);
}
}
diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java
index a75cc52e6..e5181b111 100644
--- a/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java
+++ b/dhp-common/src/main/java/eu/dnetlib/dhp/schema/oaf/utils/GraphCleaningFunctions.java
@@ -98,7 +98,7 @@ public class GraphCleaningFunctions extends CleaningFunctions {
Result r = (Result) value;
- if (Objects.nonNull(r.getTitle()) && r.getTitle().isEmpty()) {
+ if (Objects.isNull(r.getTitle()) || r.getTitle().isEmpty()) {
return false;
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
index 0cdf0accb..cfdd98d30 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTransformation.scala
@@ -367,7 +367,7 @@ object DataciteToOAFTransformation {
result.setDateofcollection(ISO8601FORMAT.format(d))
- result.setDateoftransformation(ISO8601FORMAT.format(ts))
+ result.setDateoftransformation(ISO8601FORMAT.format(d))
result.setDataInfo(dataInfo)
val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List())
@@ -532,11 +532,11 @@ object DataciteToOAFTransformation {
JField("awardUri", JString(awardUri)) <- fundingReferences
} yield awardUri
+ result.setId(IdentifierFactory.createIdentifier(result))
var relations: List[Relation] = awardUris.flatMap(a => get_projectRelation(a, result.getId)).filter(r => r != null)
-
fix_figshare(result)
- result.setId(IdentifierFactory.createIdentifier(result))
+
if (result.getId == null)
return List()
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml
index 036178b37..021704f54 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/datacite/oozie_app/workflow.xml
@@ -16,7 +16,7 @@
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala
index 0d10c41dc..a795a910d 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/datacite/DataciteToOAFTest.scala
@@ -3,13 +3,14 @@ package eu.dnetlib.dhp.actionmanager.datacite
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
-
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
+import java.text.SimpleDateFormat
+import java.util.Locale
import scala.io.Source
@ExtendWith(Array(classOf[MockitoExtension]))
@@ -22,6 +23,18 @@ class DataciteToOAFTest extends AbstractVocabularyTest{
super.setUpVocabulary()
}
+
+ @Test
+ def testDateMapping:Unit = {
+ val inputDate = "2021-07-14T11:52:54+0000"
+ val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US)
+ val dt = ISO8601FORMAT.parse(inputDate)
+ println(dt.getTime)
+
+
+ }
+
+
@Test
def testMapping() :Unit = {
val record =Source.fromInputStream(getClass.getResourceAsStream("record.json")).mkString
diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json
index 3ae10be73..f5aa65940 100644
--- a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json
+++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/datacite/record.json
@@ -1 +1 @@
-{"id":"10.7282/t3-sjyd-1r46","type":"dois","attributes":{"doi":"10.7282/t3-sjyd-1r46","identifiers":[],"creators":[{"name":"Huang, Xu","nameType":"Personal","givenName":"Xu","familyName":"Huang","affiliation":[],"nameIdentifiers":[]},{"name":"Guo, Zhixiong","nameType":"Personal","givenName":"Zhixiong","familyName":"Guo","nameIdentifiers":[{"schemeUri":"https://orcid.org","nameIdentifier":"https://orcid.org/0000-0003-0481-2738","nameIdentifierScheme":"ORCID"}],"affiliation":[]}],"titles":[{"title":"High thermal conductance across c-BN/diamond interface"}],"publisher":"Rutgers University","container":{},"publicationYear":2099,"subjects":[{"subject":"Diamond"},{"subject":"Cubic boron nitride"},{"subject":"Thermal conductivity"},{"subject":"Interface"},{"subject":"Phonon"},{"subject":"Thermal conductance"}],"contributors":[],"dates":[{"date":"2099-12-31","dateType":"Accepted"},{"date":"2099","dateType":"Issued"}],"language":"en","types":{"ris":"RPRT","bibtex":"article","citeproc":"article-journal","schemaOrg":"ScholarlyArticle","resourceType":"Accepted manuscript","resourceTypeGeneral":"Text"},"relatedIdentifiers":[],"sizes":[],"formats":["application/pdf"],"version":null,"rightsList":[{"rights":"Embargo"}],"descriptions":[{"description":"High thermal conductivity electronic components with low interfacial thermal resistance are of technological importance and fundamental interest of research. Diamond, a superhard material with ultrahigh thermal conductivity at room temperature, is desirable for microelectronics thermal management. Cubic polymorph of boron nitride (c-BN) is a promising material due to wide bandgap and diamond like structure and properties. To understand the nature in thermal transport of diamond, c-BN and the most commonly used silicon (Si) semiconductor, ab initio phonon Boltzmann transport equations are employed to investigate lattice vibrational properties of these three materials. At 300 K, the predicted thermal conductivity of Si, diamond and c-BN reached 142, 2112, and 736 W/(m��K), respectively. What's more, heat transport phenomena across the interfaces of Si/diamond, c-BN/diamond and Si/c-BN are unfolded. In comparison, the interfacial thermal conductance of c-BN/diamond is ten-fold of Si/diamond; besides, the thermal conductance across Si/c-BN interface is 20.2% larger than that of Si/diamond at 300 K and 18.9% larger at 340 K. These findings provide us new vision and potential solution to heat dissipation of high-local-power density devices, shedding light on future thermal management of c-BN and diamond related electronics.","descriptionType":"Abstract"}],"geoLocations":[],"fundingReferences":[],"url":"https://scholarship.libraries.rutgers.edu/discovery/fulldisplay/alma991031549917804646/01RUT_INST:ResearchRepository","contentUrl":null,"metadataVersion":1,"schemaVersion":"http://datacite.org/schema/kernel-4","source":"mds","isActive":true,"state":"findable","reason":null,"viewCount":0,"downloadCount":0,"referenceCount":0,"citationCount":0,"partCount":0,"partOfCount":0,"versionCount":0,"versionOfCount":0,"created":"2020-06-30T21:12:19Z","registered":"2020-07-02T16:45:07Z","published":null,"updated":"2021-01-14T18:24:19Z"},"relationships":{"client":{"data":{"id":"rutgers.lib","type":"clients"}}}}
\ No newline at end of file
+{"id":"10.5517/ccdc.csd.cc25rpzm","type":"dois","attributes":{"doi":"10.5517/ccdc.csd.cc25rpzm","prefix":"10.5517","suffix":"ccdc.csd.cc25rpzm","identifiers":[{"identifier":"2018781","identifierType":"CCDC"}],"alternateIdentifiers":[{"alternateIdentifierType":"CCDC","alternateIdentifier":"2018781"}],"creators":[{"name":"Ling, Irene","affiliation":[],"nameIdentifiers":[]},{"name":"Sobolev, Alexandre N.","affiliation":[],"nameIdentifiers":[]},{"name":"Raston, Colin L.","affiliation":[],"nameIdentifiers":[]}],"titles":[{"title":"CCDC 2018781: Experimental Crystal Structure Determination"}],"publisher":"Cambridge Crystallographic Data Centre","container":{},"publicationYear":2021,"subjects":[{"subject":"Crystal Structure"},{"subject":"Experimental 3D Coordinates"},{"subject":"Crystal System"},{"subject":"Space Group"},{"subject":"Cell Parameters"},{"subject":"Crystallography"},{"subject":"bis[penta-aqua-copper(ii)] bis(mu-5,11,17,23-tetra-sulfonato-25,26,27,28-tetrahydroxycalix(4)arene)-dodeca-aqua-tri-copper(ii) bis(nitrate) heptahydrate"}],"contributors":[],"dates":[],"language":"en","types":{"ris":"DATA","bibtex":"misc","citeproc":"dataset","schemaOrg":"Dataset","resourceTypeGeneral":"Dataset"},"relatedIdentifiers":[{"relationType":"IsSupplementTo","relatedIdentifier":"10.1080/00958972.2020.1849642","relatedIdentifierType":"DOI"}],"sizes":[],"formats":["CIF"],"version":null,"rightsList":[],"descriptions":[{"description":"Related Article: Irene Ling, Alexandre N. Sobolev, Colin L. Raston|2021|J.Coord.Chem.|74|40|doi:10.1080/00958972.2020.1849642","descriptionType":"Other"}],"geoLocations":[],"fundingReferences":[],"xml":"PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiPz4KPHJlc291cmNlIHhtbG5zOnhzaT0iaHR0cDovL3d3dy53My5vcmcvMjAwMS9YTUxTY2hlbWEtaW5zdGFuY2UiIHhtbG5zPSJodHRwOi8vZGF0YWNpdGUub3JnL3NjaGVtYS9rZXJuZWwtNCIgeHNpOnNjaGVtYUxvY2F0aW9uPSJodHRwOi8vZGF0YWNpdGUub3JnL3NjaGVtYS9rZXJuZWwtNCBodHRwOi8vc2NoZW1hLmRhdGFjaXRlLm9yZy9tZXRhL2tlcm5lbC00L21ldGFkYXRhLnhzZCI+CiAgPGlkZW50aWZpZXIgaWRlbnRpZmllclR5cGU9IkRPSSI+MTAuNTUxNy9DQ0RDLkNTRC5DQzI1UlBaTTwvaWRlbnRpZmllcj4KICA8Y3JlYXRvcnM+CiAgICA8Y3JlYXRvcj4KICAgICAgPGNyZWF0b3JOYW1lPkxpbmcsIElyZW5lPC9jcmVhdG9yTmFtZT4KICAgIDwvY3JlYXRvcj4KICAgIDxjcmVhdG9yPgogICAgICA8Y3JlYXRvck5hbWU+U29ib2xldiwgQWxleGFuZHJlIE4uPC9jcmVhdG9yTmFtZT4KICAgIDwvY3JlYXRvcj4KICAgIDxjcmVhdG9yPgogICAgICA8Y3JlYXRvck5hbWU+UmFzdG9uLCBDb2xpbiBMLjwvY3JlYXRvck5hbWU+CiAgICA8L2NyZWF0b3I+CiAgPC9jcmVhdG9ycz4KICA8dGl0bGVzPgogICAgPHRpdGxlPkNDREMgMjAxODc4MTogRXhwZXJpbWVudGFsIENyeXN0YWwgU3RydWN0dXJlIERldGVybWluYXRpb248L3RpdGxlPgogIDwvdGl0bGVzPgogIDxwdWJsaXNoZXI+Q2FtYnJpZGdlIENyeXN0YWxsb2dyYXBoaWMgRGF0YSBDZW50cmU8L3B1Ymxpc2hlcj4KICA8cHVibGljYXRpb25ZZWFyPjIwMjE8L3B1YmxpY2F0aW9uWWVhcj4KICA8cmVzb3VyY2VUeXBlIHJlc291cmNlVHlwZUdlbmVyYWw9IkRhdGFzZXQiLz4KICA8c3ViamVjdHM+CiAgICA8c3ViamVjdD5DcnlzdGFsIFN0cnVjdHVyZTwvc3ViamVjdD4KICAgIDxzdWJqZWN0PkV4cGVyaW1lbnRhbCAzRCBDb29yZGluYXRlczwvc3ViamVjdD4KICAgIDxzdWJqZWN0PkNyeXN0YWwgU3lzdGVtPC9zdWJqZWN0PgogICAgPHN1YmplY3Q+U3BhY2UgR3JvdXA8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5DZWxsIFBhcmFtZXRlcnM8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5DcnlzdGFsbG9ncmFwaHk8L3N1YmplY3Q+CiAgICA8c3ViamVjdD5iaXNbcGVudGEtYXF1YS1jb3BwZXIoaWkpXSBiaXMobXUtNSwxMSwxNywyMy10ZXRyYS1zdWxmb25hdG8tMjUsMjYsMjcsMjgtdGV0cmFoeWRyb3h5Y2FsaXgoNClhcmVuZSktZG9kZWNhLWFxdWEtdHJpLWNvcHBlcihpaSkgYmlzKG5pdHJhdGUpIGhlcHRhaHlkcmF0ZTwvc3ViamVjdD4KICA8L3N1YmplY3RzPgogIDxsYW5ndWFnZT5lbmc8L2xhbmd1YWdlPgogIDxhbHRlcm5hdGVJZGVudGlmaWVycz4KICAgIDxhbHRlcm5hdGVJZGVudGlmaWVyIGFsdGVybmF0ZUlkZW50aWZpZXJUeXBlPSJDQ0RDIj4yMDE4NzgxPC9hbHRlcm5hdGVJZGVudGlmaWVyPgogIDwvYWx0ZXJuYXRlSWRlbnRpZmllcnM+CiAgPHJlbGF0ZWRJZGVudGlmaWVycz4KICAgIDxyZWxhdGVkSWRlbnRpZmllciByZWxhdGVkSWRlbnRpZmllclR5cGU9IkRPSSIgcmVsYXRpb25UeXBlPSJJc1N1cHBsZW1lbnRUbyI+MTAuMTA4MC8wMDk1ODk3Mi4yMDIwLjE4NDk2NDI8L3JlbGF0ZWRJZGVudGlmaWVyPgogIDwvcmVsYXRlZElkZW50aWZpZXJzPgogIDxzaXplcy8+CiAgPGZvcm1hdHM+CiAgICA8Zm9ybWF0PkNJRjwvZm9ybWF0PgogIDwvZm9ybWF0cz4KICA8dmVyc2lvbi8+CiAgPGRlc2NyaXB0aW9ucz4KICAgIDxkZXNjcmlwdGlvbiBkZXNjcmlwdGlvblR5cGU9Ik90aGVyIj5SZWxhdGVkIEFydGljbGU6IElyZW5lIExpbmcsICBBbGV4YW5kcmUgTi4gU29ib2xldiwgIENvbGluIEwuIFJhc3RvbnwyMDIxfEouQ29vcmQuQ2hlbS58NzR8NDB8ZG9pOjEwLjEwODAvMDA5NTg5NzIuMjAyMC4xODQ5NjQyPC9kZXNjcmlwdGlvbj4KICA8L2Rlc2NyaXB0aW9ucz4KPC9yZXNvdXJjZT4K","url":"http://www.ccdc.cam.ac.uk/services/structure_request?id=doi:10.5517/ccdc.csd.cc25rpzm&sid=DataCite","contentUrl":null,"metadataVersion":3,"schemaVersion":"http://datacite.org/schema/kernel-4","source":"api","isActive":true,"state":"findable","reason":null,"viewCount":0,"viewsOverTime":[],"downloadCount":0,"downloadsOverTime":[],"referenceCount":0,"citationCount":0,"citationsOverTime":[],"partCount":0,"partOfCount":0,"versionCount":0,"versionOfCount":0,"created":"2021-03-09T13:25:35.000Z","registered":"2021-03-09T13:25:36.000Z","published":"2021","updated":"2021-03-31T21:49:56.000Z"},"relationships":{"client":{"data":{"id":"ccdc.csd","type":"clients"}},"provider":{"data":{"id":"ccdc","type":"providers"}},"media":{"data":{"id":"10.5517/ccdc.csd.cc25rpzm","type":"media"}},"references":{"data":[]},"citations":{"data":[]},"parts":{"data":[]},"partOf":{"data":[]},"versions":{"data":[]},"versionOf":{"data":[]}}}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java
index 58009bfcf..3f27b9442 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java
@@ -38,7 +38,8 @@ import scala.Tuple2;
/**
* Groups the graph content by entity identifier to ensure ID uniqueness
*/
-public class GroupEntitiesSparkJob {
+public class
+GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java
index 03709c8fe..fdef7f77d 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkUpdateEntity.java
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@@ -77,48 +78,54 @@ public class SparkUpdateEntity extends AbstractSparkAction {
(type, clazz) -> {
final String outputPath = dedupGraphPath + "/" + type;
removeOutputDir(spark, outputPath);
+ final String ip = DedupUtility.createEntityPath(graphBasePath, type.toString());
+ if (HdfsSupport.exists(ip, sc.hadoopConfiguration())) {
+ JavaRDD sourceEntity = sc
+ .textFile(DedupUtility.createEntityPath(graphBasePath, type.toString()));
- JavaRDD sourceEntity = sc
- .textFile(DedupUtility.createEntityPath(graphBasePath, type.toString()));
+ if (mergeRelExists(workingPath, type.toString())) {
- if (mergeRelExists(workingPath, type.toString())) {
+ final String mergeRelPath = DedupUtility
+ .createMergeRelPath(workingPath, "*", type.toString());
+ final String dedupRecordPath = DedupUtility
+ .createDedupRecordPath(workingPath, "*", type.toString());
- final String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", type.toString());
- final String dedupRecordPath = DedupUtility
- .createDedupRecordPath(workingPath, "*", type.toString());
+ final Dataset rel = spark
+ .read()
+ .load(mergeRelPath)
+ .as(Encoders.bean(Relation.class));
- final Dataset rel = spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
+ final JavaPairRDD mergedIds = rel
+ .where("relClass == 'merges'")
+ .where("source != target")
+ .select(rel.col("target"))
+ .distinct()
+ .toJavaRDD()
+ .mapToPair(
+ (PairFunction) r -> new Tuple2<>(r.getString(0), "d"));
- final JavaPairRDD mergedIds = rel
- .where("relClass == 'merges'")
- .where("source != target")
- .select(rel.col("target"))
- .distinct()
- .toJavaRDD()
- .mapToPair(
- (PairFunction) r -> new Tuple2<>(r.getString(0), "d"));
+ JavaPairRDD entitiesWithId = sourceEntity
+ .mapToPair(
+ (PairFunction) s -> new Tuple2<>(
+ MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
+ if (type == EntityType.organization) // exclude root records from organizations
+ entitiesWithId = excludeRootOrgs(entitiesWithId, rel);
- JavaPairRDD entitiesWithId = sourceEntity
- .mapToPair(
- (PairFunction) s -> new Tuple2<>(
- MapDocumentUtil.getJPathString(IDJSONPATH, s), s));
- if (type == EntityType.organization) // exclude root records from organizations
- entitiesWithId = excludeRootOrgs(entitiesWithId, rel);
+ JavaRDD map = entitiesWithId
+ .leftOuterJoin(mergedIds)
+ .map(k -> {
+ if (k._2()._2().isPresent()) {
+ return updateDeletedByInference(k._2()._1(), clazz);
+ }
+ return k._2()._1();
+ });
- JavaRDD map = entitiesWithId
- .leftOuterJoin(mergedIds)
- .map(k -> {
- if (k._2()._2().isPresent()) {
- return updateDeletedByInference(k._2()._1(), clazz);
- }
- return k._2()._1();
- });
+ sourceEntity = map.union(sc.textFile(dedupRecordPath));
- sourceEntity = map.union(sc.textFile(dedupRecordPath));
+ }
+ sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
}
-
- sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
});
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala
new file mode 100644
index 000000000..3ee0c7dd6
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertDatasetToJsonRDD.scala
@@ -0,0 +1,42 @@
+package eu.dnetlib.dhp.sx.graph
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{Oaf, OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+object SparkConvertDatasetToJsonRDD {
+
+
+ def main(args: Array[String]): Unit = {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+ val sourcePath = parser.get("sourcePath")
+ log.info(s"sourcePath -> $sourcePath")
+ val targetPath = parser.get("targetPath")
+ log.info(s"targetPath -> $targetPath")
+
+ val resultObject = List("publication","dataset","software", "otherResearchProduct")
+ val mapper = new ObjectMapper()
+ implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
+
+
+ resultObject.foreach{item =>
+ spark.read.load(s"$sourcePath/$item").as[Result].map(r=> mapper.writeValueAsString(r))(Encoders.STRING).rdd.saveAsTextFile(s"$targetPath/${item.toLowerCase}", classOf[GzipCodec])
+ }
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
new file mode 100644
index 000000000..cb41d6134
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkConvertRDDtoDataset.scala
@@ -0,0 +1,67 @@
+package eu.dnetlib.dhp.sx.graph
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Relation, Result, Software, Dataset => OafDataset}
+import org.apache.commons.io.IOUtils
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+object SparkConvertRDDtoDataset {
+
+ def main(args: Array[String]): Unit = {
+
+
+ val log: Logger = LoggerFactory.getLogger(getClass)
+ val conf: SparkConf = new SparkConf()
+ val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json")))
+ parser.parseArgument(args)
+ val spark: SparkSession =
+ SparkSession
+ .builder()
+ .config(conf)
+ .appName(getClass.getSimpleName)
+ .master(parser.get("master")).getOrCreate()
+
+ val sourcePath = parser.get("sourcePath")
+ log.info(s"sourcePath -> $sourcePath")
+ val t = parser.get("targetPath")
+ log.info(s"targetPath -> $t")
+
+ val entityPath = s"$t/entities"
+ val relPath = s"$t/relation"
+ val mapper = new ObjectMapper()
+ implicit val datasetEncoder: Encoder[OafDataset] = Encoders.kryo(classOf[OafDataset])
+ implicit val publicationEncoder: Encoder[Publication] = Encoders.kryo(classOf[Publication])
+ implicit val relationEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
+ implicit val orpEncoder: Encoder[OtherResearchProduct] = Encoders.kryo(classOf[OtherResearchProduct])
+ implicit val softwareEncoder: Encoder[Software] = Encoders.kryo(classOf[Software])
+
+
+ log.info("Converting dataset")
+ val rddDataset =spark.sparkContext.textFile(s"$sourcePath/dataset").map(s => mapper.readValue(s, classOf[OafDataset]))
+ spark.createDataset(rddDataset).as[OafDataset].write.mode(SaveMode.Overwrite).save(s"$entityPath/dataset")
+
+
+ log.info("Converting publication")
+ val rddPublication =spark.sparkContext.textFile(s"$sourcePath/publication").map(s => mapper.readValue(s, classOf[Publication]))
+ spark.createDataset(rddPublication).as[Publication].write.mode(SaveMode.Overwrite).save(s"$entityPath/publication")
+
+ log.info("Converting software")
+ val rddSoftware =spark.sparkContext.textFile(s"$sourcePath/software").map(s => mapper.readValue(s, classOf[Software]))
+ spark.createDataset(rddSoftware).as[Software].write.mode(SaveMode.Overwrite).save(s"$entityPath/software")
+
+ log.info("Converting otherresearchproduct")
+ val rddOtherResearchProduct =spark.sparkContext.textFile(s"$sourcePath/otherresearchproduct").map(s => mapper.readValue(s, classOf[OtherResearchProduct]))
+ spark.createDataset(rddOtherResearchProduct).as[OtherResearchProduct].write.mode(SaveMode.Overwrite).save(s"$entityPath/otherresearchproduct")
+
+
+ log.info("Converting Relation")
+
+
+ val rddRelation =spark.sparkContext.textFile(s"$sourcePath/relation").map(s => mapper.readValue(s, classOf[Relation]))
+ spark.createDataset(rddRelation).as[Relation].write.mode(SaveMode.Overwrite).save(s"$relPath")
+
+
+ }
+}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
index a37dd2132..350b00c5e 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph.scala
@@ -70,7 +70,7 @@ object SparkCreateInputGraph {
resultObject.foreach { r =>
log.info(s"Make ${r._1} unique")
- makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/dedup/${r._1}",spark, r._2)
+ makeDatasetUnique(s"$targetPath/extracted/${r._1}",s"$targetPath/preprocess/${r._1}",spark, r._2)
}
}
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala
index ba483bfb2..0a7fc18fb 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala
@@ -42,6 +42,7 @@ object SparkCreateScholix {
val relationDS: Dataset[(String, Relation)] = spark.read.load(relationPath).as[Relation]
+ .filter(r => (r.getDataInfo== null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase.contains("merge"))
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
val summaryDS: Dataset[(String, ScholixSummary)] = spark.read.load(summaryPath).as[ScholixSummary]
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala
index a66da3e6d..0970375f5 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateSummaryObject.scala
@@ -1,7 +1,7 @@
package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser
-import eu.dnetlib.dhp.schema.oaf.Result
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
import org.apache.commons.io.IOUtils
@@ -29,11 +29,12 @@ object SparkCreateSummaryObject {
log.info(s"targetPath -> $targetPath")
implicit val resultEncoder:Encoder[Result] = Encoders.kryo[Result]
+ implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val summaryEncoder:Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
- val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result]
+ val ds:Dataset[Result] = spark.read.load(s"$sourcePath/*").as[Result].filter(r=>r.getDataInfo== null || r.getDataInfo.getDeletedbyinference== false)
ds.repartition(6000).map(r => ScholixUtils.resultToSummary(r)).filter(s => s!= null).write.mode(SaveMode.Overwrite).save(targetPath)
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala
index 6ee575e2a..b2fddec20 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala
@@ -1,10 +1,17 @@
package eu.dnetlib.dhp.sx.graph
+import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import org.apache.commons.io.IOUtils
+import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkConf
+import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
+import org.json4s
+import org.json4s.DefaultFormats
+import org.json4s.JsonAST.{JField, JObject, JString}
+import org.json4s.jackson.JsonMethods.parse
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.JavaConverters._
@@ -25,60 +32,109 @@ object SparkResolveRelation {
val relationPath = parser.get("relationPath")
log.info(s"sourcePath -> $relationPath")
val entityPath = parser.get("entityPath")
- log.info(s"targetPath -> $entityPath")
+ log.info(s"entityPath -> $entityPath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
-
- implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
import spark.implicits._
- val entities:Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
- entities.flatMap(e => e.getPid.asScala
- .map(p =>
- convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
- .filter(s => s!= null)
- .map(s => (s,e.getId))
- ).groupByKey(_._1)
- .reduceGroups((x,y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
- .map(s =>s._2)
- .write
- .mode(SaveMode.Overwrite)
- .save(s"$workingPath/resolvedPid")
- val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)]
+ extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath)
+
+ val mappper = new ObjectMapper()
+
+ val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)]
val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
- relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_1")), "left").map{
+ relationDs.joinWith(rPid, relationDs("_1").equalTo(rPid("_2")), "left").map{
m =>
val sourceResolved = m._2
val currentRelation = m._1._2
- if (sourceResolved!=null && sourceResolved._2.nonEmpty)
- currentRelation.setSource(sourceResolved._2)
+ if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty)
+ currentRelation.setSource(sourceResolved._1)
currentRelation
}.write
.mode(SaveMode.Overwrite)
- .save(s"$workingPath/resolvedSource")
+ .save(s"$workingPath/relationResolvedSource")
- val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/resolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
- relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_1")), "left").map{
+ val relationSourceResolved:Dataset[(String,Relation)] = spark.read.load(s"$workingPath/relationResolvedSource").as[Relation].map(r => (r.getTarget.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder))
+ relationSourceResolved.joinWith(rPid, relationSourceResolved("_1").equalTo(rPid("_2")), "left").map{
m =>
val targetResolved = m._2
val currentRelation = m._1._2
- if (targetResolved!=null && targetResolved._2.nonEmpty)
- currentRelation.setTarget(targetResolved._2)
+ if (targetResolved!=null && targetResolved._1.nonEmpty)
+ currentRelation.setTarget(targetResolved._1)
currentRelation
}.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50"))
.write
.mode(SaveMode.Overwrite)
- .save(s"$workingPath/resolvedRelation")
+ .save(s"$workingPath/relation_resolved")
+
+ spark.read.load(s"$workingPath/relation_resolved").as[Relation]
+ .map(r => mappper.writeValueAsString(r))
+ .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec])
+
}
+ private def extractPidsFromRecord(input:String):(String,List[(String,String)]) = {
+ implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
+ lazy val json: json4s.JValue = parse(input)
+ val id:String = (json \ "id").extract[String]
+ val result: List[(String,String)] = for {
+ JObject(pids) <- json \ "pid"
+ JField("value", JString(pidValue)) <- pids
+ JField("qualifier", JObject(qualifier)) <- pids
+ JField("classname", JString(pidType)) <- qualifier
+ } yield (pidValue, pidType)
+ (id,result)
+ }
+ private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, entityPath: String, workingPath: String) = {
+ import spark.implicits._
+
+ val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*")
+ .map(i => extractPidsFromRecord(i))
+ .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty)
+ .flatMap{ p =>
+ p._2.map(pid =>
+ (p._1, convertPidToDNETIdentifier(pid._1, pid._2))
+ )
+ }.filter(r =>r._1 != null || r._2 != null)
+
+ spark.createDataset(d)
+ .groupByKey(_._2)
+ .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y)
+ .map(s => s._2)
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/relationResolvedPid")
+ }
+
+
+ /*
+ This method should be used once we finally convert everythings in Kryo dataset
+ instead of using rdd of json
+ */
+ private def extractPidResolvedTableFromKryo(spark: SparkSession, entityPath: String, workingPath: String) = {
+ import spark.implicits._
+ implicit val oafEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
+ val entities: Dataset[Result] = spark.read.load(s"$entityPath/*").as[Result]
+ entities.flatMap(e => e.getPid.asScala
+ .map(p =>
+ convertPidToDNETIdentifier(p.getValue, p.getQualifier.getClassid))
+ .filter(s => s != null)
+ .map(s => (s, e.getId))
+ ).groupByKey(_._1)
+ .reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y)
+ .map(s => s._2)
+ .write
+ .mode(SaveMode.Overwrite)
+ .save(s"$workingPath/relationResolvedPid")
+ }
def convertPidToDNETIdentifier(pid:String, pidType: String):String = {
if (pid==null || pid.isEmpty || pidType== null || pidType.isEmpty)
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json
new file mode 100644
index 000000000..8bfdde5b0
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json
@@ -0,0 +1,5 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the source Path", "paramRequired": true},
+ {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the raw graph", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml
new file mode 100644
index 000000000..685976ce6
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/extractEntities/oozie_app/workflow.xml
@@ -0,0 +1,85 @@
+
+
+
+ sourcePath
+ the working dir base path
+
+
+ targetPath
+ the graph Raw base path
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+ yarn
+ cluster
+ Extract entities in raw graph
+ eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=2000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${sourcePath}
+ --targetPath${targetPath}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Generate Input Graph for deduplication
+ eu.dnetlib.dhp.sx.graph.SparkConvertDatasetToJsonRDD
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=3000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --sourcePath${targetPath}/preprocess
+ --targetPath${targetPath}/dedup
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
similarity index 78%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
index 3ea4e9d30..d8eb1fc80 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step1/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/finalGraph/oozie_app/workflow.xml
@@ -1,4 +1,4 @@
-
+
sourcePath
@@ -6,48 +6,22 @@
targetPath
- the graph Raw base path
+ the final graph path
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
+
yarn
cluster
- Extract entities in raw graph
- eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=2000
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --masteryarn
- --sourcePath${sourcePath}
- --targetPath${targetPath}
-
-
-
-
-
-
-
-
- yarn
- cluster
- Resolve Relations in raw graph
- eu.dnetlib.dhp.sx.graph.SparkResolveRelation
+ Import JSONRDD to Dataset kryo
+ eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset
dhp-graph-mapper-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
@@ -60,9 +34,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--masteryarn
- --relationPath${targetPath}/extracted/relation
- --workingPath${targetPath}/resolved/
- --entityPath${targetPath}/dedup
+ --sourcePath${sourcePath}
+ --targetPath${targetPath}
@@ -87,7 +60,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--masteryarn
- --sourcePath${targetPath}/dedup
+ --sourcePath${targetPath}/entities
--targetPath${targetPath}/provision/summaries
@@ -114,7 +87,7 @@
--masteryarn
--summaryPath${targetPath}/provision/summaries
--targetPath${targetPath}/provision/scholix
- --relationPath${targetPath}/resolved/resolvedRelation
+ --relationPath${targetPath}/relation
@@ -182,9 +155,5 @@
-
-
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml
similarity index 100%
rename from dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/config-default.xml
rename to dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/config-default.xml
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml
new file mode 100644
index 000000000..7683ff94c
--- /dev/null
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/resolverelation/oozie_app/workflow.xml
@@ -0,0 +1,62 @@
+
+
+
+ entityPath
+ the path of deduplicate Entities
+
+
+ relationPath
+ the path of relation unresolved
+
+
+ targetPath
+ the path of relation unresolved
+
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Resolve Relations in raw graph
+ eu.dnetlib.dhp.sx.graph.SparkResolveRelation
+ dhp-graph-mapper-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=3000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --relationPath${relationPath}
+ --workingPath${targetPath}
+ --entityPath${entityPath}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml
deleted file mode 100644
index 9d06c42d6..000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step2/oozie_app/workflow.xml
+++ /dev/null
@@ -1,120 +0,0 @@
-
-
-
- workingPath
- the working path
-
-
- sparkDriverMemory
- memory for driver process
-
-
- sparkExecutorMemory
- memory for individual executor
-
-
-
-
-
-
- Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Extract DLI Entities (Publication)
- eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=5000
- ${sparkExtraOPT}
-
- -mt yarn-cluster
- --workingPath${workingPath}
- -epublication
-
-
-
-
-
-
-
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Extract DLI Entities (Dataset)
- eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=5000
- ${sparkExtraOPT}
-
- -mt yarn-cluster
- --workingPath${workingPath}
- -edataset
-
-
-
-
-
-
-
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Extract DLI Entities (Unknown)
- eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=5000
- ${sparkExtraOPT}
-
- -mt yarn-cluster
- --workingPath${workingPath}
- -eunknown
-
-
-
-
-
-
-
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Extract DLI Entities (Relation)
- eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities
- dhp-graph-mapper-${projectVersion}.jar
-
- --executor-memory ${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.sql.shuffle.partitions=5000
- ${sparkExtraOPT}
-
- -mt yarn-cluster
- --workingPath${workingPath}
- -erelation
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml
deleted file mode 100644
index 4d54b2afb..000000000
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/sx/graph/step3/oozie_app/workflow.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-
-
-
- sourcePath
- the source path
-
-
- targetPath
- the source path
-
-
- sparkDriverMemory
- memory for driver process
-
-
- sparkExecutorMemory
- memory for individual executor
-
-
- entity
- the entity to be merged
-
-
-
-
-
-
- Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- ${jobTracker}
- ${nameNode}
- yarn-cluster
- cluster
- Merge ${entity}
- eu.dnetlib.dhp.sx.graph.SparkScholexplorerCreateRawGraphJob
- dhp-graph-mapper-${projectVersion}.jar
- --executor-memory ${sparkExecutorMemory} --driver-memory=${sparkDriverMemory} ${sparkExtraOPT}
- -mt yarn-cluster
- --sourcePath${sourcePath}/${entity}
- --targetPath${targetPath}/${entity}
- --entity${entity}
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
index 5b229a625..c41a6c68c 100644
--- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
+++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/raw/MappersTest.java
@@ -1,17 +1,13 @@
package eu.dnetlib.dhp.oa.graph.raw;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.lenient;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Optional;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
+import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.utils.PidType;
+import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -20,22 +16,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
-import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
-import eu.dnetlib.dhp.oa.graph.clean.GraphCleaningFunctionsTest;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
-import eu.dnetlib.dhp.schema.oaf.Author;
-import eu.dnetlib.dhp.schema.oaf.Dataset;
-import eu.dnetlib.dhp.schema.oaf.Field;
-import eu.dnetlib.dhp.schema.oaf.Instance;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Publication;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Software;
-import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
-import eu.dnetlib.dhp.schema.oaf.utils.PidType;
-import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
public class MappersTest {
@@ -74,7 +60,7 @@ public class MappersTest {
assertValidId(p.getId());
- assertEquals(1, p.getOriginalId().size());
+ assertEquals(2, p.getOriginalId().size());
assertTrue(p.getOriginalId().contains("10.3897/oneeco.2.e13718"));
assertValidId(p.getCollectedfrom().get(0).getKey());
@@ -261,8 +247,8 @@ public class MappersTest {
final Relation r2 = (Relation) list.get(2);
assertValidId(d.getId());
- assertEquals(1, d.getOriginalId().size());
- assertTrue(d.getOriginalId().contains("oai:zenodo.org:3234526"));
+ assertEquals(2, d.getOriginalId().size());
+ assertTrue(d.getOriginalId().stream().anyMatch(oid -> oid.equals("oai:zenodo.org:3234526")));
assertValidId(d.getCollectedfrom().get(0).getKey());
assertTrue(StringUtils.isNotBlank(d.getTitle().get(0).getValue()));
assertTrue(d.getAuthor().size() > 0);
@@ -351,8 +337,11 @@ public class MappersTest {
final Publication p = (Publication) list.get(0);
assertValidId(p.getId());
- assertTrue(p.getOriginalId().size() == 1);
- assertEquals("oai:pub.uni-bielefeld.de:2949739", p.getOriginalId().get(0));
+ assertEquals(2, p.getOriginalId().size());
+
+ assertTrue(p.getOriginalId().stream().anyMatch(oid -> oid.equals("oai:pub.uni-bielefeld.de:2949739")));
+ //assertEquals("oai:pub.uni-bielefeld.de:2949739", p.getOriginalId().get(0));
+
assertValidId(p.getCollectedfrom().get(0).getKey());
assertTrue(p.getAuthor().size() > 0);
@@ -413,7 +402,8 @@ public class MappersTest {
assertEquals(ModelConstants.DNET_PROVENANCE_ACTIONS, d.getDataInfo().getProvenanceaction().getSchemename());
assertValidId(d.getId());
- assertTrue(d.getOriginalId().size() == 1);
+ assertEquals(2, d.getOriginalId().size());
+
assertEquals("feabb67c-1fd1-423b-aec6-606d04ce53c6", d.getOriginalId().get(0));
assertValidId(d.getCollectedfrom().get(0).getKey());
@@ -663,8 +653,8 @@ public class MappersTest {
final Dataset p = (Dataset) list.get(0);
assertValidId(p.getId());
- assertTrue(p.getOriginalId().size() == 1);
- assertEquals("df76e73f-0483-49a4-a9bb-63f2f985574a", p.getOriginalId().get(0));
+ assertEquals(2, p.getOriginalId().size());
+ assertTrue(p.getOriginalId().stream().anyMatch(oid -> oid.equals("df76e73f-0483-49a4-a9bb-63f2f985574a")));
assertValidId(p.getCollectedfrom().get(0).getKey());
assertTrue(p.getAuthor().size() > 0);