diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
index 7b21ecda2..1a45defb0 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala
@@ -271,6 +271,26 @@ object DoiBoostMappingUtil {
}
+
+ def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
+ sp.setValue(value)
+ sp
+
+ }
+
+
+
+ def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = {
+ val sp = new StructuredProperty
+ sp.setQualifier(createQualifier(classId,className, schemeId, schemeName))
+ sp.setValue(value)
+ sp.setDataInfo(dataInfo)
+ sp
+
+ }
+
def createSP(value: String, classId: String, schemeId: String): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
@@ -279,6 +299,8 @@ object DoiBoostMappingUtil {
}
+
+
def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = {
val sp = new StructuredProperty
sp.setQualifier(createQualifier(classId, schemeId))
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
index 2419f86a3..7bb4686cf 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala
@@ -129,16 +129,16 @@ case object ConversionUtil {
val fieldOfStudy = item._2
if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) {
val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => {
- val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies")
+ val s1 = createSP(s.DisplayName, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString)
var resList: List[StructuredProperty] = List(s1)
if (s.MainType.isDefined) {
val maintp = s.MainType.get
- val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies")
+ val s2 = createSP(s.MainType.get, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
s2.setDataInfo(di)
resList = resList ::: List(s2)
if (maintp.contains(".")) {
- val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies")
+ val s3 = createSP(maintp.split("\\.").head, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies")
s3.setDataInfo(di)
resList = resList ::: List(s3)
}
diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
index f3d051bd6..a24f0e6bb 100644
--- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
+++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala
@@ -43,7 +43,7 @@ object SparkPreProcessMAG {
val distinctPaper: Dataset[MagPapers] = spark.createDataset(result)
distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct")
- logger.info("Phase 6) Enrich Publication with description")
+ logger.info("Phase 0) Enrich Publication with description")
val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml
index 34ba5d89d..bf91958cf 100644
--- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml
@@ -32,7 +32,7 @@
-
+
diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
index d31f80248..f62ac2b67 100644
--- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
+++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala
@@ -18,6 +18,9 @@ class CrossrefMappingTest {
val mapper = new ObjectMapper()
+
+
+
@Test
def testFunderRelationshipsMapping(): Unit = {
val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString
@@ -58,6 +61,27 @@ class CrossrefMappingTest {
}
+ @Test
+ def testOrcidID() :Unit = {
+ val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString
+
+
+ assertNotNull(json)
+ assertFalse(json.isEmpty);
+
+ val resultList: List[Oaf] = Crossref2Oaf.convert(json)
+
+ assertTrue(resultList.nonEmpty)
+
+ val items = resultList.filter(p => p.isInstanceOf[Result])
+
+
+ mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
+ items.foreach(p => println(mapper.writeValueAsString(p)))
+
+
+ }
+
@Test
def testEmptyTitle() :Unit = {
val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString
diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json
new file mode 100644
index 000000000..def546ddb
--- /dev/null
+++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json
@@ -0,0 +1,271 @@
+{
+ "DOI":"10.1016/j.carbpol.2020.115930",
+ "issued":{
+ "date-parts":[
+ [
+ 2020,
+ 4
+ ]
+ ]
+ },
+ "published-print":{
+ "date-parts":[
+ [
+ 2020,
+ 4
+ ]
+ ]
+ },
+ "prefix":"10.1016",
+ "subject":[
+ "Organic Chemistry",
+ "Materials Chemistry",
+ "Polymers and Plastics"
+ ],
+ "author":[
+ {
+ "affiliation":[
+
+ ],
+ "given":"Lei",
+ "family":"Fang",
+ "sequence":"first"
+ },
+ {
+ "affiliation":[
+
+ ],
+ "given":"Hua",
+ "family":"Lin",
+ "sequence":"additional"
+ },
+ {
+ "affiliation":[
+
+ ],
+ "given":"Zhenfeng",
+ "family":"Wu",
+ "sequence":"additional"
+ },
+ {
+ "affiliation":[
+
+ ],
+ "given":"Zhen",
+ "family":"Wang",
+ "sequence":"additional"
+ },
+ {
+ "affiliation":[
+
+ ],
+ "given":"Xinxin",
+ "family":"Fan",
+ "sequence":"additional"
+ },
+ {
+ "affiliation":[
+
+ ],
+ "given":"Ziting",
+ "family":"Cheng",
+ "sequence":"additional"
+ },
+ {
+ "affiliation":[
+
+ ],
+ "given":"Xiaoya",
+ "family":"Hou",
+ "sequence":"additional"
+ },
+ {
+ "authenticated-orcid":false,
+ "given":"Daquan",
+ "family":"Chen",
+ "sequence":"additional",
+ "affiliation":[
+
+ ],
+ "ORCID":"http://orcid.org/0000-0002-6796-0204"
+ }
+ ],
+ "reference-count":41,
+ "ISSN":[
+ "0144-8617"
+ ],
+ "assertion":[
+ {
+ "name":"publisher",
+ "value":"Elsevier",
+ "label":"This article is maintained by"
+ },
+ {
+ "name":"articletitle",
+ "value":"In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle",
+ "label":"Article Title"
+ },
+ {
+ "name":"journaltitle",
+ "value":"Carbohydrate Polymers",
+ "label":"Journal Title"
+ },
+ {
+ "name":"articlelink",
+ "value":"https://doi.org/10.1016/j.carbpol.2020.115930",
+ "label":"CrossRef DOI link to publisher maintained version"
+ },
+ {
+ "name":"content_type",
+ "value":"article",
+ "label":"Content Type"
+ },
+ {
+ "name":"copyright",
+ "value":"\\u00a9 2020 Elsevier Ltd. All rights reserved.",
+ "label":"Copyright"
+ }
+ ],
+ "member":"78",
+ "source":"Crossref",
+ "score":1.0,
+ "deposited":{
+ "timestamp":1584590965000,
+ "date-time":"2020-03-19T04:09:25Z",
+ "date-parts":[
+ [
+ 2020,
+ 3,
+ 19
+ ]
+ ]
+ },
+ "indexed":{
+ "timestamp":1584592912467,
+ "date-time":"2020-03-19T04:41:52Z",
+ "date-parts":[
+ [
+ 2020,
+ 3,
+ 19
+ ]
+ ]
+ },
+ "type":"journal-article",
+ "URL":"http://dx.doi.org/10.1016/j.carbpol.2020.115930",
+ "is-referenced-by-count":0,
+ "volume":"234",
+ "issn-type":[
+ {
+ "type":"print",
+ "value":"0144-8617"
+ }
+ ],
+ "link":[
+ {
+ "URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/xml",
+ "intended-application":"text-mining",
+ "content-version":"vor",
+ "content-type":"text/xml"
+ },
+ {
+ "URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/plain",
+ "intended-application":"text-mining",
+ "content-version":"vor",
+ "content-type":"text/plain"
+ }
+ ],
+ "update-policy":"http://dx.doi.org/10.1016/elsevier_cm_policy",
+ "references-count":41,
+ "short-container-title":[
+ "Carbohydrate Polymers"
+ ],
+ "publisher":"Elsevier BV",
+ "content-domain":{
+ "domain":[
+ "elsevier.com",
+ "sciencedirect.com"
+ ],
+ "crossmark-restriction":true
+ },
+ "language":"en",
+ "license":[
+ {
+ "URL":"https://www.elsevier.com/tdm/userlicense/1.0/",
+ "start":{
+ "timestamp":1585699200000,
+ "date-time":"2020-04-01T00:00:00Z",
+ "date-parts":[
+ [
+ 2020,
+ 4,
+ 1
+ ]
+ ]
+ },
+ "content-version":"tdm",
+ "delay-in-days":0
+ }
+ ],
+ "created":{
+ "timestamp":1581759678000,
+ "date-time":"2020-02-15T09:41:18Z",
+ "date-parts":[
+ [
+ 2020,
+ 2,
+ 15
+ ]
+ ]
+ },
+ "title":[
+ "In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle"
+ ],
+ "alternative-id":[
+ "S0144861720301041"
+ ],
+ "container-title":[
+ "Carbohydrate Polymers"
+ ],
+ "funder":[
+ {
+ "doi-asserted-by":"publisher",
+ "DOI":"10.13039/501100007129",
+ "name":"Natural Science Foundation of Shandong Province",
+ "award":[
+ "ZR2019ZD24",
+ "ZR2019YQ30"
+ ]
+ },
+ {
+ "doi-asserted-by":"publisher",
+ "DOI":"10.13039/100010449",
+ "name":"Ministry of Education, Libya",
+ "award":[
+
+ ]
+ },
+ {
+ "doi-asserted-by":"publisher",
+ "DOI":"10.13039/501100012249",
+ "name":"Jiangxi University of Traditional Chinese Medicine",
+ "award":[
+ "TCM-0906"
+ ]
+ },
+ {
+ "name":"Taishan Scholar Program",
+ "award":[
+ "qnts20161035"
+ ]
+ },
+ {
+ "name":"Open fund project of Key Laboratory of Modern Preparation of TCM",
+ "award":[
+
+ ]
+ }
+ ],
+ "page":"115930",
+ "article-number":"115930"
+}
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
index 8f43ab1cf..bdbd64160 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
+++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java
@@ -147,6 +147,9 @@ public class CleanGraphSparkJob {
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
}
+ if (Objects.isNull(i.getRefereed())) {
+ i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
+ }
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
index 4d2633bc5..80b800017 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
@@ -25,9 +25,7 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
-import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.common.EntityType;
-import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@@ -109,11 +107,12 @@ public class CreateRelatedEntitiesJob_phase1 {
Class clazz,
String outputPath) {
- Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath)
+ Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false")
.map(
- (MapFunction>) r -> new Tuple2<>(r.getTarget(), r),
- Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class)))
+ (MapFunction>) r -> new Tuple2<>(r.getTarget(),
+ r),
+ Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache();
Dataset> entities = readPathEntity(spark, inputEntityPath, clazz)
@@ -129,7 +128,7 @@ public class CreateRelatedEntitiesJob_phase1 {
relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
.map(
- (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
+ (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
t._1()._2(), t._2()._2()),
Encoders.kryo(RelatedEntityWrapper.class))
.write()
@@ -232,11 +231,11 @@ public class CreateRelatedEntitiesJob_phase1 {
* @param relationPath
* @return the Dataset containing all the relationships
*/
- private static Dataset readPathRelation(
+ private static Dataset readPathRelation(
SparkSession spark, final String relationPath) {
log.info("Reading relations from: {}", relationPath);
- return spark.read().load(relationPath).as(Encoders.bean(SortableRelation.class));
+ return spark.read().load(relationPath).as(Encoders.bean(Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
index 6b184071a..cb1a3b327 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
@@ -1,39 +1,33 @@
package eu.dnetlib.dhp.oa.provision;
-import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
-
-import java.util.*;
-import java.util.function.Function;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import eu.dnetlib.dhp.application.ArgumentApplicationParser;
+import eu.dnetlib.dhp.common.HdfsSupport;
+import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
+import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
+import eu.dnetlib.dhp.schema.oaf.Relation;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.clearspring.analytics.util.Lists;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
-import eu.dnetlib.dhp.common.HdfsSupport;
-import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
-import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
-import scala.Function1;
import scala.Tuple2;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
+
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
* operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and
@@ -133,22 +127,35 @@ public class PrepareRelationsJob {
SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations,
int relPartitions) {
- RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath)
- .repartition(relPartitions)
- .filter(rel -> !rel.getDataInfo().getDeletedbyinference())
- .filter(rel -> !relationFilter.contains(rel.getRelClass()))
- // group by SOURCE and apply limit
- .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel))
- .groupByKey(new RelationPartitioner(relPartitions))
- .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
- // group by TARGET and apply limit
- .mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel))
- .groupByKey(new RelationPartitioner(relPartitions))
- .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
+ // group by SOURCE and apply limit
+ RDD bySource = readPathRelationRDD(spark, inputRelationsPath)
+ .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
+ .filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
+ .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
+ .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
+ .groupBy(Tuple2::_1)
+ .map(Tuple2::_2)
+ .map(t -> Iterables.limit(t, maxRelations))
+ .flatMap(Iterable::iterator)
+ .map(Tuple2::_2)
+ .rdd();
+
+ // group by TARGET and apply limit
+ RDD byTarget = readPathRelationRDD(spark, inputRelationsPath)
+ .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
+ .filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
+ .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r))
+ .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
+ .groupBy(Tuple2::_1)
+ .map(Tuple2::_2)
+ .map(t -> Iterables.limit(t, maxRelations))
+ .flatMap(Iterable::iterator)
+ .map(Tuple2::_2)
.rdd();
spark
- .createDataset(cappedRels, Encoders.bean(SortableRelation.class))
+ .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class))
+ .repartition(relPartitions)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
@@ -162,10 +169,10 @@ public class PrepareRelationsJob {
* @param inputPath
* @return the JavaRDD containing all the relationships
*/
- private static JavaRDD readPathRelationRDD(
+ private static JavaRDD readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
- return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class));
+ return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java
index f9fde14e5..051fe923d 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java
@@ -19,7 +19,7 @@ public class ProvisionModelSupport {
RelatedEntityWrapper.class,
JoinedEntity.class,
RelatedEntity.class,
- SortableRelation.class));
+ SortableRelationKey.class));
return modelClasses.toArray(new Class[] {});
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java
index d708b6ed0..cbb143ee2 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java
@@ -5,28 +5,30 @@ import java.io.Serializable;
import com.google.common.base.Objects;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
public class RelatedEntityWrapper implements Serializable {
- private SortableRelation relation;
+ private Relation relation;
private RelatedEntity target;
public RelatedEntityWrapper() {
}
- public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) {
+ public RelatedEntityWrapper(Relation relation, RelatedEntity target) {
this(null, relation, target);
}
- public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) {
+ public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) {
this.relation = relation;
this.target = target;
}
- public SortableRelation getRelation() {
+ public Relation getRelation() {
return relation;
}
- public void setRelation(SortableRelation relation) {
+ public void setRelation(Relation relation) {
this.relation = relation;
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java
deleted file mode 100644
index b6571b9bf..000000000
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java
+++ /dev/null
@@ -1,38 +0,0 @@
-
-package eu.dnetlib.dhp.oa.provision.model;
-
-import java.io.Serializable;
-import java.util.Map;
-
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.Maps;
-
-import eu.dnetlib.dhp.schema.oaf.Relation;
-
-public class SortableRelation extends Relation implements Comparable, Serializable {
-
- private static final Map weights = Maps.newHashMap();
-
- static {
- weights.put("outcome", 0);
- weights.put("supplement", 1);
- weights.put("affiliation", 2);
- weights.put("relationship", 3);
- weights.put("publicationDataset", 4);
- weights.put("similarity", 5);
-
- weights.put("provision", 6);
- weights.put("participation", 7);
- weights.put("dedup", 8);
- }
-
- @Override
- public int compareTo(Relation o) {
- return ComparisonChain
- .start()
- .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType()))
- .compare(getSource(), o.getSource())
- .compare(getTarget(), o.getTarget())
- .result();
- }
-}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
new file mode 100644
index 000000000..bf7f9330d
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
@@ -0,0 +1,90 @@
+
+package eu.dnetlib.dhp.oa.provision.model;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Optional;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Maps;
+
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class SortableRelationKey implements Comparable, Serializable {
+
+ private static final Map weights = Maps.newHashMap();
+
+ static {
+ weights.put("outcome", 0);
+ weights.put("supplement", 1);
+ weights.put("review", 2);
+ weights.put("citation", 3);
+ weights.put("affiliation", 4);
+ weights.put("relationship", 5);
+ weights.put("publicationDataset", 6);
+ weights.put("similarity", 7);
+
+ weights.put("provision", 8);
+ weights.put("participation", 9);
+ weights.put("dedup", 10);
+ }
+
+ private static final long serialVersionUID = 3232323;
+
+ private String groupingKey;
+
+ private String subRelType;
+
+ public static SortableRelationKey create(Relation r, String groupingKey) {
+ SortableRelationKey sr = new SortableRelationKey();
+ sr.setGroupingKey(groupingKey);
+ sr.setSubRelType(r.getSubRelType());
+ return sr;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ SortableRelationKey that = (SortableRelationKey) o;
+ return getGroupingKey().equals(that.getGroupingKey());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(getGroupingKey());
+ }
+
+ @Override
+ public int compareTo(SortableRelationKey o) {
+ return ComparisonChain
+ .start()
+ .compare(getGroupingKey(), o.getGroupingKey())
+ .compare(getWeight(this), getWeight(o))
+ .result();
+ }
+
+ private Integer getWeight(SortableRelationKey o) {
+ return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
+ }
+
+ public String getSubRelType() {
+ return subRelType;
+ }
+
+ public void setSubRelType(String subRelType) {
+ this.subRelType = subRelType;
+ }
+
+ public String getGroupingKey() {
+ return groupingKey;
+ }
+
+ public void setGroupingKey(String groupingKey) {
+ this.groupingKey = groupingKey;
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java
index c7862b48a..7bd8b9217 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java
@@ -4,12 +4,16 @@ package eu.dnetlib.dhp.oa.provision.utils;
import org.apache.spark.Partitioner;
import org.apache.spark.util.Utils;
+import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
+
/**
* Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to
* sort relations sharing the same source id by the ordering defined in SortableRelationKey.
*/
public class RelationPartitioner extends Partitioner {
+ private static final long serialVersionUID = 343434456L;
+
private final int numPartitions;
public RelationPartitioner(int numPartitions) {
@@ -23,8 +27,18 @@ public class RelationPartitioner extends Partitioner {
@Override
public int getPartition(Object key) {
- String partitionKey = (String) key;
- return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions());
+ SortableRelationKey partitionKey = (SortableRelationKey) key;
+ return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof RelationPartitioner) {
+ RelationPartitioner p = (RelationPartitioner) obj;
+ if (p.numPartitions() == numPartitions())
+ return true;
+ }
+ return false;
}
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java
index b2aa01dc7..5d8d9fa20 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java
@@ -276,7 +276,7 @@ public class XmlRecordFactory implements Serializable {
pidType,
pidValue
.toLowerCase()
- .replaceAll("orcid", "")));
+ .replaceAll("^.*orcid\\.org\\/", "")));
}
}
});
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index 0d5121cf1..e98cbbc73 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -97,18 +97,7 @@
-
-
-
-
- ${wf:conf('resumeFrom') eq 'prepare_relations'}
- ${wf:conf('resumeFrom') eq 'fork_join_related_entities'}
- ${wf:conf('resumeFrom') eq 'fork_join_all_entities'}
- ${wf:conf('resumeFrom') eq 'convert_to_xml'}
- ${wf:conf('resumeFrom') eq 'to_solr_index'}
-
-
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
@@ -135,475 +124,12 @@
--outputPath${workingDir}/relation
--relPartitions5000
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- yarn
- cluster
- Join[relation.target = publication.id]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputRelationsPath${workingDir}/relation
- --inputEntityPath${inputGraphRootPath}/publication
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
- --outputPath${workingDir}/join_partial/publication
-
-
-
-
-
-
-
- yarn
- cluster
- Join[relation.target = dataset.id]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputRelationsPath${workingDir}/relation
- --inputEntityPath${inputGraphRootPath}/dataset
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
- --outputPath${workingDir}/join_partial/dataset
-
-
-
-
-
-
-
- yarn
- cluster
- Join[relation.target = otherresearchproduct.id]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputRelationsPath${workingDir}/relation
- --inputEntityPath${inputGraphRootPath}/otherresearchproduct
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
- --outputPath${workingDir}/join_partial/otherresearchproduct
-
-
-
-
-
-
-
- yarn
- cluster
- Join[relation.target = software.id]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputRelationsPath${workingDir}/relation
- --inputEntityPath${inputGraphRootPath}/software
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
- --outputPath${workingDir}/join_partial/software
-
-
-
-
-
-
-
- yarn
- cluster
- Join[relation.target = datasource.id]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputRelationsPath${workingDir}/relation
- --inputEntityPath${inputGraphRootPath}/datasource
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
- --outputPath${workingDir}/join_partial/datasource
-
-
-
-
-
-
-
- yarn
- cluster
- Join[relation.target = organization.id]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputRelationsPath${workingDir}/relation
- --inputEntityPath${inputGraphRootPath}/organization
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
- --outputPath${workingDir}/join_partial/organization
-
-
-
-
-
-
-
- yarn
- cluster
- Join[relation.target = project.id]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputRelationsPath${workingDir}/relation
- --inputEntityPath${inputGraphRootPath}/project
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
- --outputPath${workingDir}/join_partial/project
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- yarn
- cluster
- Join[publication.id = relatedEntity.source]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=15360
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputEntityPath${inputGraphRootPath}/publication
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication
- --inputRelatedEntitiesPath${workingDir}/join_partial
- --outputPath${workingDir}/join_entities/publication
- --numPartitions30000
-
-
-
-
-
-
-
- yarn
- cluster
- Join[dataset.id = relatedEntity.source]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputEntityPath${inputGraphRootPath}/dataset
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset
- --inputRelatedEntitiesPath${workingDir}/join_partial
- --outputPath${workingDir}/join_entities/dataset
- --numPartitions20000
-
-
-
-
-
-
-
- yarn
- cluster
- Join[otherresearchproduct.id = relatedEntity.source]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputEntityPath${inputGraphRootPath}/otherresearchproduct
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct
- --inputRelatedEntitiesPath${workingDir}/join_partial
- --outputPath${workingDir}/join_entities/otherresearchproduct
- --numPartitions10000
-
-
-
-
-
-
-
- yarn
- cluster
- Join[software.id = relatedEntity.source]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputEntityPath${inputGraphRootPath}/software
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software
- --inputRelatedEntitiesPath${workingDir}/join_partial
- --outputPath${workingDir}/join_entities/software
- --numPartitions10000
-
-
-
-
-
-
-
- yarn
- cluster
- Join[datasource.id = relatedEntity.source]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputEntityPath${inputGraphRootPath}/datasource
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource
- --inputRelatedEntitiesPath${workingDir}/join_partial
- --outputPath${workingDir}/join_entities/datasource
- --numPartitions1000
-
-
-
-
-
-
-
- yarn
- cluster
- Join[organization.id = relatedEntity.source]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputEntityPath${inputGraphRootPath}/organization
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization
- --inputRelatedEntitiesPath${workingDir}/join_partial
- --outputPath${workingDir}/join_entities/organization
- --numPartitions20000
-
-
-
-
-
-
-
- yarn
- cluster
- Join[project.id = relatedEntity.source]
- eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputEntityPath${inputGraphRootPath}/project
- --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project
- --inputRelatedEntitiesPath${workingDir}/join_partial
- --outputPath${workingDir}/join_entities/project
- --numPartitions10000
-
-
-
-
-
-
-
-
-
- yarn
- cluster
- convert_to_xml
- eu.dnetlib.dhp.oa.provision.XmlConverterJob
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-cores=${sparkExecutorCoresForJoining}
- --executor-memory=${sparkExecutorMemoryForJoining}
- --driver-memory=${sparkDriverMemoryForJoining}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
- --conf spark.network.timeout=${sparkNetworkTimeout}
-
- --inputPath${workingDir}/join_entities
- --outputPath${workingDir}/xml
- --isLookupUrl${isLookupUrl}
- --otherDsTypeId${otherDsTypeId}
-
-
-
-
-
-
-
- yarn
- cluster
- to_solr_index
- eu.dnetlib.dhp.oa.provision.XmlIndexingJob
- dhp-graph-provision-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemoryForIndexing}
- --driver-memory=${sparkDriverMemoryForIndexing}
- --conf spark.dynamicAllocation.enabled=true
- --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.speculation=false
- --conf spark.hadoop.mapreduce.map.speculative=false
- --conf spark.hadoop.mapreduce.reduce.speculative=false
-
- --inputPath${workingDir}/xml
- --isLookupUrl${isLookupUrl}
- --format${format}
- --batchSize${batchSize}
-
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java
new file mode 100644
index 000000000..72f28fdf2
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java
@@ -0,0 +1,42 @@
+
+package eu.dnetlib.dhp.oa.provision;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+
+public class SortableRelationKeyTest {
+
+ @Test
+ public void doTesSorting() throws IOException {
+ final ObjectMapper mapper = new ObjectMapper();
+ final String json = IOUtils.toString(this.getClass().getResourceAsStream("relations.json"));
+ final List relations = mapper.readValue(json, new TypeReference>() {
+ });
+
+ relations
+ .stream()
+ .map(r -> SortableRelationKey.create(r, r.getSource()))
+ .sorted()
+ .forEach(
+
+ it -> {
+ try {
+ System.out.println(mapper.writeValueAsString(it));
+ } catch (JsonProcessingException e) {
+ e.printStackTrace();
+ }
+ });
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json
new file mode 100644
index 000000000..3280d0d61
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json
@@ -0,0 +1,90 @@
+[
+ {
+ "collectedfrom": [],
+ "dataInfo": {
+ "deletedbyinference": false,
+ "inferred": false,
+ "invisible": false,
+ "provenanceaction": {
+ "classid": "sysimport:crosswalk:entityregistry",
+ "classname": "Harvested",
+ "schemeid": "dnet:provenanceActions",
+ "schemename": "dnet:provenanceActions"
+ },
+ "trust": "0.9"
+ },
+ "lastupdatetimestamp": 1592688952862,
+ "properties": [],
+ "relClass": "hasAuthorInstitution",
+ "relType": "resultOrganization",
+ "source": "1",
+ "subRelType": "affiliation",
+ "target": "2"
+ },
+ {
+ "collectedfrom": [],
+ "dataInfo": {
+ "deletedbyinference": false,
+ "inferred": false,
+ "invisible": false,
+ "provenanceaction": {
+ "classid": "sysimport:crosswalk:entityregistry",
+ "classname": "Harvested",
+ "schemeid": "dnet:provenanceActions",
+ "schemename": "dnet:provenanceActions"
+ },
+ "trust": "0.9"
+ },
+ "lastupdatetimestamp": 1592688952862,
+ "properties": [],
+ "relClass": "isAuthorInstitutionOf",
+ "relType": "resultOrganization",
+ "source": "2",
+ "subRelType": "affiliation",
+ "target": "1"
+ },
+ {
+ "collectedfrom": [],
+ "dataInfo": {
+ "deletedbyinference": false,
+ "inferred": false,
+ "invisible": false,
+ "provenanceaction": {
+ "classid": "sysimport:crosswalk:entityregistry",
+ "classname": "Harvested",
+ "schemeid": "dnet:provenanceActions",
+ "schemename": "dnet:provenanceActions"
+ },
+ "trust": "0.9"
+ },
+ "lastupdatetimestamp": 1592688952862,
+ "properties": [],
+ "relClass": "isProducedBy",
+ "relType": "resultProject",
+ "source": "1",
+ "subRelType": "outcome",
+ "target": "2"
+ },
+ {
+ "collectedfrom": [],
+ "dataInfo": {
+ "deletedbyinference": false,
+ "inferred": false,
+ "invisible": false,
+ "provenanceaction": {
+ "classid": "sysimport:crosswalk:entityregistry",
+ "classname": "Harvested",
+ "schemeid": "dnet:provenanceActions",
+ "schemename": "dnet:provenanceActions"
+ },
+ "trust": "0.9"
+ },
+ "lastupdatetimestamp": 1592688952862,
+ "properties": [],
+ "relClass": "produces",
+ "relType": "resultProject",
+ "source": "2",
+ "subRelType": "outcome",
+ "target": "1"
+ }
+]
\ No newline at end of file