diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
index 575f9229ed..eea8d0a5ab 100644
--- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_db/oozie_app/workflow.xml
@@ -16,6 +16,11 @@
postgresPassword
the password postgres
+
+ dbSchema
+ beta
+ the database schema according to the D-Net infrastructure (beta or production)
+
isLookupUrl
the address of the lookUp service
@@ -93,6 +98,7 @@
--postgresUser${postgresUser}
--postgresPassword${postgresPassword}
--isLookupUrl${isLookupUrl}
+ --dbschema${dbSchema}
@@ -109,6 +115,7 @@
--postgresUser${postgresUser}
--postgresPassword${postgresPassword}
--isLookupUrl${isLookupUrl}
+ --dbschema${dbSchema}
--actionclaims
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
index 80b8000173..57dca7bb15 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java
@@ -9,6 +9,7 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
@@ -115,11 +116,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache();
- Dataset> entities = readPathEntity(spark, inputEntityPath, clazz)
+ final String relatedEntityPath = outputPath + "_relatedEntity";
+ readPathEntity(spark, inputEntityPath, clazz)
.filter("dataInfo.invisible == false")
.map(
(MapFunction) value -> asRelatedEntity(value, clazz),
Encoders.kryo(RelatedEntity.class))
+ .repartition(5000)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(relatedEntityPath);
+
+ Dataset> entities = spark
+ .read()
+ .load(relatedEntityPath)
+ .as(Encoders.kryo(RelatedEntity.class))
.map(
(MapFunction>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntity.class)))
@@ -165,13 +176,21 @@ public class CreateRelatedEntitiesJob_phase1 {
Result result = (Result) entity;
if (result.getTitle() != null && !result.getTitle().isEmpty()) {
- re.setTitle(result.getTitle().stream().findFirst().get());
+ final StructuredProperty title = result.getTitle().stream().findFirst().get();
+ title.setValue(StringUtils.left(title.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
+ re.setTitle(title);
}
re.setDateofacceptance(getValue(result.getDateofacceptance()));
re.setPublisher(getValue(result.getPublisher()));
re.setResulttype(result.getResulttype());
- re.setInstances(result.getInstance());
+ re
+ .setInstances(
+ result
+ .getInstance()
+ .stream()
+ .limit(ProvisionConstants.MAX_INSTANCES)
+ .collect(Collectors.toList()));
// TODO still to be mapped
// re.setCodeRepositoryUrl(j.read("$.coderepositoryurl"));
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
index bfcc648a35..7e175121e5 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java
@@ -61,12 +61,6 @@ public class CreateRelatedEntitiesJob_phase2 {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final int MAX_EXTERNAL_ENTITIES = 50;
- private static final int MAX_AUTHORS = 200;
- private static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
- private static final int MAX_TITLE_LENGTH = 5000;
- private static final int MAX_ABSTRACT_LENGTH = 100000;
-
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
@@ -246,15 +240,15 @@ public class CreateRelatedEntitiesJob_phase2 {
List refs = r
.getExternalReference()
.stream()
- .limit(MAX_EXTERNAL_ENTITIES)
+ .limit(ProvisionConstants.MAX_EXTERNAL_ENTITIES)
.collect(Collectors.toList());
r.setExternalReference(refs);
}
if (r.getAuthor() != null) {
List authors = Lists.newArrayList();
for (Author a : r.getAuthor()) {
- a.setFullname(StringUtils.left(a.getFullname(), MAX_AUTHOR_FULLNAME_LENGTH));
- if (authors.size() < MAX_AUTHORS || hasORCID(a)) {
+ a.setFullname(StringUtils.left(a.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
+ if (authors.size() < ProvisionConstants.MAX_AUTHORS || hasORCID(a)) {
authors.add(a);
}
}
@@ -266,7 +260,7 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(d -> {
- d.setValue(StringUtils.left(d.getValue(), MAX_ABSTRACT_LENGTH));
+ d.setValue(StringUtils.left(d.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
return d;
})
.collect(Collectors.toList());
@@ -278,9 +272,10 @@ public class CreateRelatedEntitiesJob_phase2 {
.stream()
.filter(Objects::nonNull)
.map(t -> {
- t.setValue(StringUtils.left(t.getValue(), MAX_TITLE_LENGTH));
+ t.setValue(StringUtils.left(t.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
return t;
})
+ .limit(ProvisionConstants.MAX_TITLES)
.collect(Collectors.toList());
r.setTitle(titles);
}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
index eb63d4423d..da0a810217 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java
@@ -100,11 +100,17 @@ public class PrepareRelationsJob {
.orElse(new HashSet<>());
log.info("relationFilter: {}", relationFilter);
- int maxRelations = Optional
- .ofNullable(parser.get("maxRelations"))
+ int sourceMaxRelations = Optional
+ .ofNullable(parser.get("sourceMaxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
- log.info("maxRelations: {}", maxRelations);
+ log.info("sourceMaxRelations: {}", sourceMaxRelations);
+
+ int targetMaxRelations = Optional
+ .ofNullable(parser.get("targetMaxRelations"))
+ .map(Integer::valueOf)
+ .orElse(MAX_RELS);
+ log.info("targetMaxRelations: {}", targetMaxRelations);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@@ -116,7 +122,8 @@ public class PrepareRelationsJob {
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsRDD(
- spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions);
+ spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations,
+ relPartitions);
});
}
@@ -129,31 +136,40 @@ public class PrepareRelationsJob {
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
- * @param maxRelations maximum number of allowed outgoing edges
+ * @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source
+ * @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target
* @param relPartitions number of partitions for the output RDD
*/
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
- Set relationFilter, int maxRelations, int relPartitions) {
+ Set relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
- // group by SOURCE and apply limit
- RDD bySource = readPathRelationRDD(spark, inputRelationsPath)
+ JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
- .filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
- .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
+ .filter(rel -> relationFilter.contains(rel.getRelClass()) == false);
+
+ JavaRDD pruned = pruneRels(
+ pruneRels(
+ rels,
+ sourceMaxRelations, relPartitions, (Function) r -> r.getSource()),
+ targetMaxRelations, relPartitions, (Function) r -> r.getTarget());
+ spark
+ .createDataset(pruned.rdd(), Encoders.bean(Relation.class))
+ .repartition(relPartitions)
+ .write()
+ .mode(SaveMode.Overwrite)
+ .parquet(outputPath);
+ }
+
+ private static JavaRDD pruneRels(JavaRDD rels, int maxRelations,
+ int relPartitions, Function idFn) {
+ return rels
+ .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator)
- .map(Tuple2::_2)
- .rdd();
-
- spark
- .createDataset(bySource, Encoders.bean(Relation.class))
- .repartition(relPartitions)
- .write()
- .mode(SaveMode.Overwrite)
- .parquet(outputPath);
+ .map(Tuple2::_2);
}
// experimental
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
new file mode 100644
index 0000000000..9bc3706cdd
--- /dev/null
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/ProvisionConstants.java
@@ -0,0 +1,14 @@
+
+package eu.dnetlib.dhp.oa.provision;
+
+public class ProvisionConstants {
+
+ public static final int MAX_EXTERNAL_ENTITIES = 50;
+ public static final int MAX_AUTHORS = 200;
+ public static final int MAX_AUTHOR_FULLNAME_LENGTH = 1000;
+ public static final int MAX_TITLE_LENGTH = 5000;
+ public static final int MAX_TITLES = 10;
+ public static final int MAX_ABSTRACT_LENGTH = 100000;
+ public static final int MAX_INSTANCES = 10;
+
+}
diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
index bf7f9330d1..bd7b4d78ee 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
+++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java
@@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable, Ser
private static final Map weights = Maps.newHashMap();
static {
- weights.put("outcome", 0);
- weights.put("supplement", 1);
- weights.put("review", 2);
- weights.put("citation", 3);
- weights.put("affiliation", 4);
- weights.put("relationship", 5);
- weights.put("publicationDataset", 6);
- weights.put("similarity", 7);
+ weights.put("participation", 0);
- weights.put("provision", 8);
- weights.put("participation", 9);
- weights.put("dedup", 10);
+ weights.put("outcome", 1);
+ weights.put("affiliation", 2);
+ weights.put("dedup", 3);
+ weights.put("publicationDataset", 4);
+ weights.put("citation", 5);
+ weights.put("supplement", 6);
+ weights.put("review", 7);
+ weights.put("relationship", 8);
+ weights.put("provision", 9);
+ weights.put("similarity", 10);
}
private static final long serialVersionUID = 3232323;
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
index 71b2becc4d..33fa1dc8df 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json
@@ -30,9 +30,16 @@
"paramRequired": false
},
{
- "paramName": "mr",
- "paramLongName": "maxRelations",
- "paramDescription": "maximum number of relations allowed for a each entity",
+ "paramName": "smr",
+ "paramLongName": "sourceMaxRelations",
+ "paramDescription": "maximum number of relations allowed for a each entity grouping by source",
+ "paramRequired": false
+ },
+ {
+ "paramName": "tmr",
+ "paramLongName": "targetMaxRelations",
+ "paramDescription": "maximum number of relations allowed for a each entity grouping by target",
"paramRequired": false
}
+
]
diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
index faa81ad644..32bf7ce83d 100644
--- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml
@@ -18,8 +18,12 @@
filter applied reading relations (by relClass)
- maxRelations
- maximum number of relations allowed for a each entity
+ sourceMaxRelations
+ maximum number of relations allowed for a each entity grouping by source
+
+
+ targetMaxRelations
+ maximum number of relations allowed for a each entity grouping by target
otherDsTypeId
@@ -133,7 +137,8 @@
--inputRelationsPath${inputGraphRootPath}/relation
--outputPath${workingDir}/relation
- --maxRelations${maxRelations}
+ --sourceMaxRelations${sourceMaxRelations}
+ --targetMaxRelations${targetMaxRelations}
--relationFilter${relationFilter}
--relPartitions5000
@@ -166,7 +171,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -193,7 +198,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -220,7 +225,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -247,7 +252,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -274,7 +279,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -301,7 +306,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -328,7 +333,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputRelationsPath${workingDir}/relation
@@ -367,7 +372,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=15360
+ --conf spark.sql.shuffle.partitions=15000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/publication
@@ -395,7 +400,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/dataset
@@ -423,7 +428,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/otherresearchproduct
@@ -451,7 +456,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/software
@@ -479,7 +484,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=8000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/datasource
@@ -507,7 +512,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=7680
+ --conf spark.sql.shuffle.partitions=10000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/organization
@@ -535,7 +540,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --conf spark.sql.shuffle.partitions=3840
+ --conf spark.sql.shuffle.partitions=5000
--conf spark.network.timeout=${sparkNetworkTimeout}
--inputEntityPath${inputGraphRootPath}/project
@@ -607,5 +612,4 @@
-
\ No newline at end of file