From 0e723d378b3e1cf63cb40bdb27e5ab4cb272bfa2 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 24 Jun 2020 18:34:42 +0200 Subject: [PATCH 1/6] added default from vocab for missing instance.refereed; remove spurious prefixes from orcid values; WIP: prepare relation job --- .../oa/graph/clean/CleanGraphSparkJob.java | 3 + .../CreateRelatedEntitiesJob_phase1.java | 15 +- .../dhp/oa/provision/PrepareRelationsJob.java | 31 +- .../model/ProvisionModelSupport.java | 2 +- .../provision/model/RelatedEntityWrapper.java | 12 +- .../oa/provision/model/SortableRelation.java | 38 -- .../provision/model/SortableRelationKey.java | 90 ++++ .../provision/utils/RelationPartitioner.java | 6 +- .../oa/provision/utils/XmlRecordFactory.java | 2 +- .../dhp/oa/provision/oozie_app/workflow.xml | 480 +----------------- 10 files changed, 128 insertions(+), 551 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 8f43ab1cf..bdbd64160 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -147,6 +147,9 @@ public class CleanGraphSparkJob { if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); } + if (Objects.isNull(i.getRefereed())) { + i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); + } } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 4d2633bc5..80b800017 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -25,9 +25,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -109,11 +107,12 @@ public class CreateRelatedEntitiesJob_phase1 { Class clazz, String outputPath) { - Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) + Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) .filter("dataInfo.deletedbyinference == false") .map( - (MapFunction>) r -> new Tuple2<>(r.getTarget(), r), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))) + (MapFunction>) r -> new Tuple2<>(r.getTarget(), + r), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) @@ -129,7 +128,7 @@ public class CreateRelatedEntitiesJob_phase1 { relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .map( - (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( + (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( t._1()._2(), t._2()._2()), Encoders.kryo(RelatedEntityWrapper.class)) .write() @@ -232,11 +231,11 @@ public class CreateRelatedEntitiesJob_phase1 { * @param relationPath * @return the Dataset containing all the relationships */ - private static Dataset readPathRelation( + private static Dataset readPathRelation( SparkSession spark, final String relationPath) { log.info("Reading relations from: {}", relationPath); - return spark.read().load(relationPath).as(Encoders.bean(SortableRelation.class)); + return spark.read().load(relationPath).as(Encoders.bean(Relation.class)); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 6b184071a..d69b75b65 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -4,35 +4,28 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; -import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.*; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.clearspring.analytics.util.Lists; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.FunctionalInterfaceSupport; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; -import scala.Function1; -import scala.Tuple2; +import eu.dnetlib.dhp.schema.oaf.Relation; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -133,22 +126,22 @@ public class PrepareRelationsJob { SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { - RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) + RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) .repartition(relPartitions) .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) .filter(rel -> !relationFilter.contains(rel.getRelClass())) // group by SOURCE and apply limit - .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel)) - .groupByKey(new RelationPartitioner(relPartitions)) - .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + .groupBy(r -> SortableRelationKey.create(r, r.getSource())) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) // group by TARGET and apply limit - .mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel)) - .groupByKey(new RelationPartitioner(relPartitions)) - .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + .groupBy(r -> SortableRelationKey.create(r, r.getTarget())) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) .rdd(); spark - .createDataset(cappedRels, Encoders.bean(SortableRelation.class)) + .createDataset(cappedRels, Encoders.bean(Relation.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); @@ -162,10 +155,10 @@ public class PrepareRelationsJob { * @param inputPath * @return the JavaRDD containing all the relationships */ - private static JavaRDD readPathRelationRDD( + private static JavaRDD readPathRelationRDD( SparkSession spark, final String inputPath) { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class)); + return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index f9fde14e5..051fe923d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -19,7 +19,7 @@ public class ProvisionModelSupport { RelatedEntityWrapper.class, JoinedEntity.class, RelatedEntity.class, - SortableRelation.class)); + SortableRelationKey.class)); return modelClasses.toArray(new Class[] {}); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index d708b6ed0..cbb143ee2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -5,28 +5,30 @@ import java.io.Serializable; import com.google.common.base.Objects; +import eu.dnetlib.dhp.schema.oaf.Relation; + public class RelatedEntityWrapper implements Serializable { - private SortableRelation relation; + private Relation relation; private RelatedEntity target; public RelatedEntityWrapper() { } - public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(Relation relation, RelatedEntity target) { this(null, relation, target); } - public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) { this.relation = relation; this.target = target; } - public SortableRelation getRelation() { + public Relation getRelation() { return relation; } - public void setRelation(SortableRelation relation) { + public void setRelation(Relation relation) { this.relation = relation; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java deleted file mode 100644 index b6571b9bf..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java +++ /dev/null @@ -1,38 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; -import java.util.Map; - -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Maps; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class SortableRelation extends Relation implements Comparable, Serializable { - - private static final Map weights = Maps.newHashMap(); - - static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("affiliation", 2); - weights.put("relationship", 3); - weights.put("publicationDataset", 4); - weights.put("similarity", 5); - - weights.put("provision", 6); - weights.put("participation", 7); - weights.put("dedup", 8); - } - - @Override - public int compareTo(Relation o) { - return ComparisonChain - .start() - .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) - .compare(getSource(), o.getSource()) - .compare(getTarget(), o.getTarget()) - .result(); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java new file mode 100644 index 000000000..ad61fa044 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.io.Serializable; +import java.util.Map; +import java.util.Optional; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class SortableRelationKey implements Comparable, Serializable { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("affiliation", 2); + weights.put("relationship", 3); + weights.put("publicationDataset", 4); + weights.put("similarity", 5); + + weights.put("provision", 6); + weights.put("participation", 7); + weights.put("dedup", 8); + } + + private String groupingKey; + + private String source; + + private String target; + + private String subRelType; + + public String getSource() { + return source; + } + + public static SortableRelationKey create(Relation r, String groupingKey) { + SortableRelationKey sr = new SortableRelationKey(); + sr.setGroupingKey(groupingKey); + sr.setSource(r.getSource()); + sr.setTarget(r.getTarget()); + sr.setSubRelType(r.getSubRelType()); + return sr; + } + + @Override + public int compareTo(SortableRelationKey o) { + final Integer wt = Optional.ofNullable(weights.get(getSubRelType())).orElse(Integer.MAX_VALUE); + final Integer wo = Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); + return ComparisonChain + .start() + .compare(wt, wo) + .compare(getSource(), o.getSource()) + .compare(getTarget(), o.getTarget()) + .result(); + } + + public void setSource(String source) { + this.source = source; + } + + public String getTarget() { + return target; + } + + public void setTarget(String target) { + this.target = target; + } + + public String getSubRelType() { + return subRelType; + } + + public void setSubRelType(String subRelType) { + this.subRelType = subRelType; + } + + public String getGroupingKey() { + return groupingKey; + } + + public void setGroupingKey(String groupingKey) { + this.groupingKey = groupingKey; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index c7862b48a..bdece36ab 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -4,6 +4,8 @@ package eu.dnetlib.dhp.oa.provision.utils; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; + /** * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to * sort relations sharing the same source id by the ordering defined in SortableRelationKey. @@ -23,8 +25,8 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - String partitionKey = (String) key; - return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions()); + SortableRelationKey partitionKey = (SortableRelationKey) key; + return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions()); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index b2aa01dc7..5d8d9fa20 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -276,7 +276,7 @@ public class XmlRecordFactory implements Serializable { pidType, pidValue .toLowerCase() - .replaceAll("orcid", ""))); + .replaceAll("^.*orcid\\.org\\/", ""))); } } }); diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 0d5121cf1..e98cbbc73 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -97,18 +97,7 @@ - - - - - ${wf:conf('resumeFrom') eq 'prepare_relations'} - ${wf:conf('resumeFrom') eq 'fork_join_related_entities'} - ${wf:conf('resumeFrom') eq 'fork_join_all_entities'} - ${wf:conf('resumeFrom') eq 'convert_to_xml'} - ${wf:conf('resumeFrom') eq 'to_solr_index'} - - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -135,475 +124,12 @@ --outputPath${workingDir}/relation --relPartitions5000 - - - - - - - - - - - - - - - - - yarn - cluster - Join[relation.target = publication.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/join_partial/publication - - - - - - - - yarn - cluster - Join[relation.target = dataset.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/join_partial/dataset - - - - - - - - yarn - cluster - Join[relation.target = otherresearchproduct.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/join_partial/otherresearchproduct - - - - - - - - yarn - cluster - Join[relation.target = software.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/join_partial/software - - - - - - - - yarn - cluster - Join[relation.target = datasource.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --outputPath${workingDir}/join_partial/datasource - - - - - - - - yarn - cluster - Join[relation.target = organization.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --outputPath${workingDir}/join_partial/organization - - - - - - - - yarn - cluster - Join[relation.target = project.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --outputPath${workingDir}/join_partial/project - - - - - - - - - - - - - - - - - - - - yarn - cluster - Join[publication.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/publication - --numPartitions30000 - - - - - - - - yarn - cluster - Join[dataset.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/dataset - --numPartitions20000 - - - - - - - - yarn - cluster - Join[otherresearchproduct.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/otherresearchproduct - --numPartitions10000 - - - - - - - - yarn - cluster - Join[software.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/software - --numPartitions10000 - - - - - - - - yarn - cluster - Join[datasource.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/datasource - --numPartitions1000 - - - - - - - - yarn - cluster - Join[organization.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/organization - --numPartitions20000 - - - - - - - - yarn - cluster - Join[project.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/project - --numPartitions10000 - - - - - - - - - - yarn - cluster - convert_to_xml - eu.dnetlib.dhp.oa.provision.XmlConverterJob - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputPath${workingDir}/join_entities - --outputPath${workingDir}/xml - --isLookupUrl${isLookupUrl} - --otherDsTypeId${otherDsTypeId} - - - - - - - - yarn - cluster - to_solr_index - eu.dnetlib.dhp.oa.provision.XmlIndexingJob - dhp-graph-provision-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemoryForIndexing} - --driver-memory=${sparkDriverMemoryForIndexing} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.speculation=false - --conf spark.hadoop.mapreduce.map.speculative=false - --conf spark.hadoop.mapreduce.reduce.speculative=false - - --inputPath${workingDir}/xml - --isLookupUrl${isLookupUrl} - --format${format} - --batchSize${batchSize} - + + \ No newline at end of file From 46e76affeb8658e7aae037c9f20b84f803e9de6b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 24 Jun 2020 19:01:15 +0200 Subject: [PATCH 2/6] WIP: prepare relation job --- .../dhp/oa/provision/PrepareRelationsJob.java | 45 ++++++++++++++++++- .../provision/model/SortableRelationKey.java | 39 ++++++---------- 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index d69b75b65..6b34899c8 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,7 +3,9 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; +import java.io.Serializable; import java.util.*; +import java.util.function.Supplier; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -18,7 +20,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; +import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -59,6 +63,21 @@ public class PrepareRelationsJob { public static final int DEFAULT_NUM_PARTITIONS = 3000; + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("affiliation", 2); + weights.put("relationship", 3); + weights.put("publicationDataset", 4); + weights.put("similarity", 5); + + weights.put("provision", 6); + weights.put("participation", 7); + weights.put("dedup", 8); + } + public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -132,11 +151,15 @@ public class PrepareRelationsJob { .filter(rel -> !relationFilter.contains(rel.getRelClass())) // group by SOURCE and apply limit .groupBy(r -> SortableRelationKey.create(r, r.getSource())) - .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .repartitionAndSortWithinPartitions( + new RelationPartitioner(relPartitions), + (SerializableComparator) (o1, o2) -> compare(o1, o2)) .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) // group by TARGET and apply limit .groupBy(r -> SortableRelationKey.create(r, r.getTarget())) - .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .repartitionAndSortWithinPartitions( + new RelationPartitioner(relPartitions), + (SerializableComparator) (o1, o2) -> compare(o1, o2)) .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) .rdd(); @@ -147,6 +170,24 @@ public class PrepareRelationsJob { .parquet(outputPath); } + private static int compare(SortableRelationKey o1, SortableRelationKey o2) { + final Integer w1 = Optional.ofNullable(weights.get(o1.getSubRelType())).orElse(Integer.MAX_VALUE); + final Integer w2 = Optional.ofNullable(weights.get(o2.getSubRelType())).orElse(Integer.MAX_VALUE); + return ComparisonChain + .start() + .compare(w1, w2) + .compare(o1.getSource(), o2.getSource()) + .compare(o1.getTarget(), o2.getTarget()) + .result(); + } + + @FunctionalInterface + public interface SerializableComparator extends Comparator, Serializable { + + @Override + int compare(T o1, T o2); + } + /** * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * file, diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index ad61fa044..ab6518809 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -5,27 +5,13 @@ import java.io.Serializable; import java.util.Map; import java.util.Optional; +import com.google.common.base.Objects; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Maps; import eu.dnetlib.dhp.schema.oaf.Relation; -public class SortableRelationKey implements Comparable, Serializable { - - private static final Map weights = Maps.newHashMap(); - - static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("affiliation", 2); - weights.put("relationship", 3); - weights.put("publicationDataset", 4); - weights.put("similarity", 5); - - weights.put("provision", 6); - weights.put("participation", 7); - weights.put("dedup", 8); - } +public class SortableRelationKey implements Serializable { private String groupingKey; @@ -49,15 +35,18 @@ public class SortableRelationKey implements Comparable, Ser } @Override - public int compareTo(SortableRelationKey o) { - final Integer wt = Optional.ofNullable(weights.get(getSubRelType())).orElse(Integer.MAX_VALUE); - final Integer wo = Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); - return ComparisonChain - .start() - .compare(wt, wo) - .compare(getSource(), o.getSource()) - .compare(getTarget(), o.getTarget()) - .result(); + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + SortableRelationKey that = (SortableRelationKey) o; + return Objects.equal(getGroupingKey(), that.getGroupingKey()); + } + + @Override + public int hashCode() { + return Objects.hashCode(getGroupingKey()); } public void setSource(String source) { From 69b0391708edcc5b3d0ef3dcd73b68ae3a0ca51a Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 25 Jun 2020 10:19:56 +0200 Subject: [PATCH 3/6] WIP: prepare relation job --- .../dhp/oa/provision/PrepareRelationsJob.java | 67 ++++++------------- .../provision/model/SortableRelationKey.java | 48 +++++++------ .../provision/utils/RelationPartitioner.java | 12 ++++ 3 files changed, 62 insertions(+), 65 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 6b34899c8..4ae822df7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -11,6 +11,8 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -30,6 +32,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; import eu.dnetlib.dhp.schema.oaf.Relation; +import scala.Tuple2; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -63,21 +66,6 @@ public class PrepareRelationsJob { public static final int DEFAULT_NUM_PARTITIONS = 3000; - private static final Map weights = Maps.newHashMap(); - - static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("affiliation", 2); - weights.put("relationship", 3); - weights.put("publicationDataset", 4); - weights.put("similarity", 5); - - weights.put("provision", 6); - weights.put("participation", 7); - weights.put("dedup", 8); - } - public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils .toString( @@ -146,21 +134,26 @@ public class PrepareRelationsJob { int relPartitions) { RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) - .repartition(relPartitions) - .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) - .filter(rel -> !relationFilter.contains(rel.getRelClass())) + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) + // group by SOURCE and apply limit - .groupBy(r -> SortableRelationKey.create(r, r.getSource())) - .repartitionAndSortWithinPartitions( - new RelationPartitioner(relPartitions), - (SerializableComparator) (o1, o2) -> compare(o1, o2)) - .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) + .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r)) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .groupBy(Tuple2::_1) + .map(Tuple2::_2) + .map(t -> Iterables.limit(t, maxRelations)) + .flatMap(Iterable::iterator) + .map(Tuple2::_2) + // group by TARGET and apply limit - .groupBy(r -> SortableRelationKey.create(r, r.getTarget())) - .repartitionAndSortWithinPartitions( - new RelationPartitioner(relPartitions), - (SerializableComparator) (o1, o2) -> compare(o1, o2)) - .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) + .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r)) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .groupBy(Tuple2::_1) + .map(Tuple2::_2) + .map(t -> Iterables.limit(t, maxRelations)) + .flatMap(Iterable::iterator) + .map(Tuple2::_2) .rdd(); spark @@ -170,24 +163,6 @@ public class PrepareRelationsJob { .parquet(outputPath); } - private static int compare(SortableRelationKey o1, SortableRelationKey o2) { - final Integer w1 = Optional.ofNullable(weights.get(o1.getSubRelType())).orElse(Integer.MAX_VALUE); - final Integer w2 = Optional.ofNullable(weights.get(o2.getSubRelType())).orElse(Integer.MAX_VALUE); - return ComparisonChain - .start() - .compare(w1, w2) - .compare(o1.getSource(), o2.getSource()) - .compare(o1.getTarget(), o2.getTarget()) - .result(); - } - - @FunctionalInterface - public interface SerializableComparator extends Comparator, Serializable { - - @Override - int compare(T o1, T o2); - } - /** * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * file, diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index ab6518809..e96c4ca5c 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -11,25 +11,34 @@ import com.google.common.collect.Maps; import eu.dnetlib.dhp.schema.oaf.Relation; -public class SortableRelationKey implements Serializable { +public class SortableRelationKey implements Comparable, Serializable { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("review", 2); + weights.put("citation", 3); + weights.put("affiliation", 4); + weights.put("relationship", 5); + weights.put("publicationDataset", 6); + weights.put("similarity", 7); + + weights.put("provision", 8); + weights.put("participation", 9); + weights.put("dedup", 10); + } + + private static final long serialVersionUID = 3232323; private String groupingKey; - private String source; - - private String target; - private String subRelType; - public String getSource() { - return source; - } - public static SortableRelationKey create(Relation r, String groupingKey) { SortableRelationKey sr = new SortableRelationKey(); sr.setGroupingKey(groupingKey); - sr.setSource(r.getSource()); - sr.setTarget(r.getTarget()); sr.setSubRelType(r.getSubRelType()); return sr; } @@ -49,16 +58,16 @@ public class SortableRelationKey implements Serializable { return Objects.hashCode(getGroupingKey()); } - public void setSource(String source) { - this.source = source; + @Override + public int compareTo(SortableRelationKey o) { + return ComparisonChain + .start() + .compare(getWeight(this), getWeight(o)) + .result() * -1; } - public String getTarget() { - return target; - } - - public void setTarget(String target) { - this.target = target; + private Integer getWeight(SortableRelationKey o) { + return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); } public String getSubRelType() { @@ -76,4 +85,5 @@ public class SortableRelationKey implements Serializable { public void setGroupingKey(String groupingKey) { this.groupingKey = groupingKey; } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index bdece36ab..7bd8b9217 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -12,6 +12,8 @@ import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; */ public class RelationPartitioner extends Partitioner { + private static final long serialVersionUID = 343434456L; + private final int numPartitions; public RelationPartitioner(int numPartitions) { @@ -29,4 +31,14 @@ public class RelationPartitioner extends Partitioner { return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions()); } + @Override + public boolean equals(Object obj) { + if (obj instanceof RelationPartitioner) { + RelationPartitioner p = (RelationPartitioner) obj; + if (p.numPartitions() == numPartitions()) + return true; + } + return false; + } + } From a6c0faac7064e64b7d122926005de014aea3919e Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 25 Jun 2020 10:48:15 +0200 Subject: [PATCH 4/6] added test to verify secondary sorting --- .../doiboost/DoiBoostMappingUtil.scala | 22 ++ .../dnetlib/doiboost/mag/MagDataModel.scala | 6 +- .../doiboost/mag/SparkPreProcessMAG.scala | 2 +- .../intersection/oozie_app/workflow.xml | 2 +- .../crossref/CrossrefMappingTest.scala | 24 ++ .../dnetlib/doiboost/crossref/orcid_data.json | 271 ++++++++++++++++++ .../oa/provision/SortableRelationKeyTest.java | 41 +++ .../dnetlib/dhp/oa/provision/relations.json | 90 ++++++ 8 files changed, 453 insertions(+), 5 deletions(-) create mode 100644 dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json create mode 100644 dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala index 7b21ecda2..1a45defb0 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/DoiBoostMappingUtil.scala @@ -271,6 +271,26 @@ object DoiBoostMappingUtil { } + + def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId,className, schemeId, schemeName)) + sp.setValue(value) + sp + + } + + + + def createSP(value: String, classId: String,className:String, schemeId: String, schemeName:String, dataInfo: DataInfo): StructuredProperty = { + val sp = new StructuredProperty + sp.setQualifier(createQualifier(classId,className, schemeId, schemeName)) + sp.setValue(value) + sp.setDataInfo(dataInfo) + sp + + } + def createSP(value: String, classId: String, schemeId: String): StructuredProperty = { val sp = new StructuredProperty sp.setQualifier(createQualifier(classId, schemeId)) @@ -279,6 +299,8 @@ object DoiBoostMappingUtil { } + + def createSP(value: String, classId: String, schemeId: String, dataInfo: DataInfo): StructuredProperty = { val sp = new StructuredProperty sp.setQualifier(createQualifier(classId, schemeId)) diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala index 2419f86a3..7bb4686cf 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/MagDataModel.scala @@ -129,16 +129,16 @@ case object ConversionUtil { val fieldOfStudy = item._2 if (fieldOfStudy != null && fieldOfStudy.subjects != null && fieldOfStudy.subjects.nonEmpty) { val p: List[StructuredProperty] = fieldOfStudy.subjects.flatMap(s => { - val s1 = createSP(s.DisplayName, "keyword", "dnet:subject_classification_typologies") + val s1 = createSP(s.DisplayName, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies") val di = DoiBoostMappingUtil.generateDataInfo(s.Score.toString) var resList: List[StructuredProperty] = List(s1) if (s.MainType.isDefined) { val maintp = s.MainType.get - val s2 = createSP(s.MainType.get, "keyword", "dnet:subject_classification_typologies") + val s2 = createSP(s.MainType.get, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies") s2.setDataInfo(di) resList = resList ::: List(s2) if (maintp.contains(".")) { - val s3 = createSP(maintp.split("\\.").head, "keyword", "dnet:subject_classification_typologies") + val s3 = createSP(maintp.split("\\.").head, "MAG","Microsoft Academic Graph classification", "dnet:subject_classification_typologies", "dnet:subject_classification_typologies") s3.setDataInfo(di) resList = resList ::: List(s3) } diff --git a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala index f3d051bd6..a24f0e6bb 100644 --- a/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala +++ b/dhp-workflows/dhp-doiboost/src/main/java/eu/dnetlib/doiboost/mag/SparkPreProcessMAG.scala @@ -43,7 +43,7 @@ object SparkPreProcessMAG { val distinctPaper: Dataset[MagPapers] = spark.createDataset(result) distinctPaper.write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/Papers_distinct") - logger.info("Phase 6) Enrich Publication with description") + logger.info("Phase 0) Enrich Publication with description") val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract] pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract") diff --git a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml index 34ba5d89d..bf91958cf 100644 --- a/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-doiboost/src/main/resources/eu/dnetlib/dhp/doiboost/intersection/oozie_app/workflow.xml @@ -32,7 +32,7 @@ - + diff --git a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala index d31f80248..f62ac2b67 100644 --- a/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala +++ b/dhp-workflows/dhp-doiboost/src/test/java/eu/dnetlib/doiboost/crossref/CrossrefMappingTest.scala @@ -18,6 +18,9 @@ class CrossrefMappingTest { val mapper = new ObjectMapper() + + + @Test def testFunderRelationshipsMapping(): Unit = { val template = Source.fromInputStream(getClass.getResourceAsStream("article_funder_template.json")).mkString @@ -58,6 +61,27 @@ class CrossrefMappingTest { } + @Test + def testOrcidID() :Unit = { + val json = Source.fromInputStream(getClass.getResourceAsStream("orcid_data.json")).mkString + + + assertNotNull(json) + assertFalse(json.isEmpty); + + val resultList: List[Oaf] = Crossref2Oaf.convert(json) + + assertTrue(resultList.nonEmpty) + + val items = resultList.filter(p => p.isInstanceOf[Result]) + + + mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT) + items.foreach(p => println(mapper.writeValueAsString(p))) + + + } + @Test def testEmptyTitle() :Unit = { val json = Source.fromInputStream(getClass.getResourceAsStream("empty_title.json")).mkString diff --git a/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json new file mode 100644 index 000000000..def546ddb --- /dev/null +++ b/dhp-workflows/dhp-doiboost/src/test/resources/eu/dnetlib/doiboost/crossref/orcid_data.json @@ -0,0 +1,271 @@ +{ + "DOI":"10.1016/j.carbpol.2020.115930", + "issued":{ + "date-parts":[ + [ + 2020, + 4 + ] + ] + }, + "published-print":{ + "date-parts":[ + [ + 2020, + 4 + ] + ] + }, + "prefix":"10.1016", + "subject":[ + "Organic Chemistry", + "Materials Chemistry", + "Polymers and Plastics" + ], + "author":[ + { + "affiliation":[ + + ], + "given":"Lei", + "family":"Fang", + "sequence":"first" + }, + { + "affiliation":[ + + ], + "given":"Hua", + "family":"Lin", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Zhenfeng", + "family":"Wu", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Zhen", + "family":"Wang", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Xinxin", + "family":"Fan", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Ziting", + "family":"Cheng", + "sequence":"additional" + }, + { + "affiliation":[ + + ], + "given":"Xiaoya", + "family":"Hou", + "sequence":"additional" + }, + { + "authenticated-orcid":false, + "given":"Daquan", + "family":"Chen", + "sequence":"additional", + "affiliation":[ + + ], + "ORCID":"http://orcid.org/0000-0002-6796-0204" + } + ], + "reference-count":41, + "ISSN":[ + "0144-8617" + ], + "assertion":[ + { + "name":"publisher", + "value":"Elsevier", + "label":"This article is maintained by" + }, + { + "name":"articletitle", + "value":"In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle", + "label":"Article Title" + }, + { + "name":"journaltitle", + "value":"Carbohydrate Polymers", + "label":"Journal Title" + }, + { + "name":"articlelink", + "value":"https://doi.org/10.1016/j.carbpol.2020.115930", + "label":"CrossRef DOI link to publisher maintained version" + }, + { + "name":"content_type", + "value":"article", + "label":"Content Type" + }, + { + "name":"copyright", + "value":"\\u00a9 2020 Elsevier Ltd. All rights reserved.", + "label":"Copyright" + } + ], + "member":"78", + "source":"Crossref", + "score":1.0, + "deposited":{ + "timestamp":1584590965000, + "date-time":"2020-03-19T04:09:25Z", + "date-parts":[ + [ + 2020, + 3, + 19 + ] + ] + }, + "indexed":{ + "timestamp":1584592912467, + "date-time":"2020-03-19T04:41:52Z", + "date-parts":[ + [ + 2020, + 3, + 19 + ] + ] + }, + "type":"journal-article", + "URL":"http://dx.doi.org/10.1016/j.carbpol.2020.115930", + "is-referenced-by-count":0, + "volume":"234", + "issn-type":[ + { + "type":"print", + "value":"0144-8617" + } + ], + "link":[ + { + "URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/xml", + "intended-application":"text-mining", + "content-version":"vor", + "content-type":"text/xml" + }, + { + "URL":"https://api.elsevier.com/content/article/PII:S0144861720301041?httpAccept=text/plain", + "intended-application":"text-mining", + "content-version":"vor", + "content-type":"text/plain" + } + ], + "update-policy":"http://dx.doi.org/10.1016/elsevier_cm_policy", + "references-count":41, + "short-container-title":[ + "Carbohydrate Polymers" + ], + "publisher":"Elsevier BV", + "content-domain":{ + "domain":[ + "elsevier.com", + "sciencedirect.com" + ], + "crossmark-restriction":true + }, + "language":"en", + "license":[ + { + "URL":"https://www.elsevier.com/tdm/userlicense/1.0/", + "start":{ + "timestamp":1585699200000, + "date-time":"2020-04-01T00:00:00Z", + "date-parts":[ + [ + 2020, + 4, + 1 + ] + ] + }, + "content-version":"tdm", + "delay-in-days":0 + } + ], + "created":{ + "timestamp":1581759678000, + "date-time":"2020-02-15T09:41:18Z", + "date-parts":[ + [ + 2020, + 2, + 15 + ] + ] + }, + "title":[ + "In vitro/vivo evaluation of novel mitochondrial targeting charge-reversal polysaccharide-based antitumor nanoparticle" + ], + "alternative-id":[ + "S0144861720301041" + ], + "container-title":[ + "Carbohydrate Polymers" + ], + "funder":[ + { + "doi-asserted-by":"publisher", + "DOI":"10.13039/501100007129", + "name":"Natural Science Foundation of Shandong Province", + "award":[ + "ZR2019ZD24", + "ZR2019YQ30" + ] + }, + { + "doi-asserted-by":"publisher", + "DOI":"10.13039/100010449", + "name":"Ministry of Education, Libya", + "award":[ + + ] + }, + { + "doi-asserted-by":"publisher", + "DOI":"10.13039/501100012249", + "name":"Jiangxi University of Traditional Chinese Medicine", + "award":[ + "TCM-0906" + ] + }, + { + "name":"Taishan Scholar Program", + "award":[ + "qnts20161035" + ] + }, + { + "name":"Open fund project of Key Laboratory of Modern Preparation of TCM", + "award":[ + + ] + } + ], + "page":"115930", + "article-number":"115930" +} \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java new file mode 100644 index 000000000..200800bd8 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java @@ -0,0 +1,41 @@ +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.util.List; + +public class SortableRelationKeyTest { + + @Test + public void doTesSorting() throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + final String json = IOUtils.toString(this.getClass().getResourceAsStream("relations.json")); + final List relations = mapper.readValue(json, new TypeReference>() { }); + + + relations.stream().map(r -> SortableRelationKey.create(r, r.getSource())).sorted() + .forEach( + + it -> { + try { + System.out.println(mapper.writeValueAsString(it)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + }); + + + + + + + } + + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json new file mode 100644 index 000000000..3280d0d61 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.json @@ -0,0 +1,90 @@ +[ + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "hasAuthorInstitution", + "relType": "resultOrganization", + "source": "1", + "subRelType": "affiliation", + "target": "2" + }, + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "isAuthorInstitutionOf", + "relType": "resultOrganization", + "source": "2", + "subRelType": "affiliation", + "target": "1" + }, + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "isProducedBy", + "relType": "resultProject", + "source": "1", + "subRelType": "outcome", + "target": "2" + }, + { + "collectedfrom": [], + "dataInfo": { + "deletedbyinference": false, + "inferred": false, + "invisible": false, + "provenanceaction": { + "classid": "sysimport:crosswalk:entityregistry", + "classname": "Harvested", + "schemeid": "dnet:provenanceActions", + "schemename": "dnet:provenanceActions" + }, + "trust": "0.9" + }, + "lastupdatetimestamp": 1592688952862, + "properties": [], + "relClass": "produces", + "relType": "resultProject", + "source": "2", + "subRelType": "outcome", + "target": "1" + } +] \ No newline at end of file From 6933ec11fbf5fecdacbad4f3782c2316a322ea69 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 25 Jun 2020 11:04:12 +0200 Subject: [PATCH 5/6] WIP: prepare relation job --- .../dhp/oa/provision/PrepareRelationsJob.java | 8 ++- .../provision/model/SortableRelationKey.java | 3 +- .../oa/provision/SortableRelationKeyTest.java | 53 ++++++++++--------- 3 files changed, 35 insertions(+), 29 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 4ae822df7..cf311c690 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -7,6 +7,8 @@ import java.io.Serializable; import java.util.*; import java.util.function.Supplier; +import javax.annotation.Nullable; + import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -21,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; import com.google.common.base.Splitter; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; @@ -142,7 +145,7 @@ public class PrepareRelationsJob { .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .groupBy(Tuple2::_1) .map(Tuple2::_2) - .map(t -> Iterables.limit(t, maxRelations)) + .map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) .flatMap(Iterable::iterator) .map(Tuple2::_2) @@ -151,7 +154,8 @@ public class PrepareRelationsJob { .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .groupBy(Tuple2::_1) .map(Tuple2::_2) - .map(t -> Iterables.limit(t, maxRelations)) + .map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) + // .map(t -> Iterables.limit(t, maxRelations)) .flatMap(Iterable::iterator) .map(Tuple2::_2) .rdd(); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index e96c4ca5c..09a1a9d33 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -62,8 +62,9 @@ public class SortableRelationKey implements Comparable, Ser public int compareTo(SortableRelationKey o) { return ComparisonChain .start() + .compare(getGroupingKey(), o.getGroupingKey()) .compare(getWeight(this), getWeight(o)) - .result() * -1; + .result(); } private Integer getWeight(SortableRelationKey o) { diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java index 200800bd8..72f28fdf2 100644 --- a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/SortableRelationKeyTest.java @@ -1,41 +1,42 @@ + package eu.dnetlib.dhp.oa.provision; +import java.io.IOException; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.schema.oaf.Relation; -import org.apache.commons.io.IOUtils; -import org.junit.jupiter.api.Test; -import java.io.IOException; -import java.util.List; public class SortableRelationKeyTest { - @Test - public void doTesSorting() throws IOException { - final ObjectMapper mapper = new ObjectMapper(); - final String json = IOUtils.toString(this.getClass().getResourceAsStream("relations.json")); - final List relations = mapper.readValue(json, new TypeReference>() { }); + @Test + public void doTesSorting() throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + final String json = IOUtils.toString(this.getClass().getResourceAsStream("relations.json")); + final List relations = mapper.readValue(json, new TypeReference>() { + }); + relations + .stream() + .map(r -> SortableRelationKey.create(r, r.getSource())) + .sorted() + .forEach( - relations.stream().map(r -> SortableRelationKey.create(r, r.getSource())).sorted() - .forEach( - - it -> { - try { - System.out.println(mapper.writeValueAsString(it)); - } catch (JsonProcessingException e) { - e.printStackTrace(); - } - }); - - - - - - - } + it -> { + try { + System.out.println(mapper.writeValueAsString(it)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + }); + } } From e62333192c80e9a2307239244fe31a01bea6d77b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 25 Jun 2020 12:22:18 +0200 Subject: [PATCH 6/6] WIP: prepare relation job --- .../dhp/oa/provision/PrepareRelationsJob.java | 58 +++++++++---------- .../provision/model/SortableRelationKey.java | 2 +- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index cf311c690..cb1a3b327 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -1,42 +1,33 @@ package eu.dnetlib.dhp.oa.provision; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.*; -import java.util.function.Supplier; - -import javax.annotation.Nullable; - +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; +import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; +import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; -import com.google.common.base.Splitter; -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; -import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; -import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The * operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and @@ -136,32 +127,35 @@ public class PrepareRelationsJob { SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { - RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) + // group by SOURCE and apply limit + RDD bySource = readPathRelationRDD(spark, inputRelationsPath) .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) - - // group by SOURCE and apply limit .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .groupBy(Tuple2::_1) .map(Tuple2::_2) - .map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) + .map(t -> Iterables.limit(t, maxRelations)) .flatMap(Iterable::iterator) .map(Tuple2::_2) + .rdd(); - // group by TARGET and apply limit + // group by TARGET and apply limit + RDD byTarget = readPathRelationRDD(spark, inputRelationsPath) + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .groupBy(Tuple2::_1) .map(Tuple2::_2) - .map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) - // .map(t -> Iterables.limit(t, maxRelations)) + .map(t -> Iterables.limit(t, maxRelations)) .flatMap(Iterable::iterator) .map(Tuple2::_2) .rdd(); spark - .createDataset(cappedRels, Encoders.bean(Relation.class)) + .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class)) + .repartition(relPartitions) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index 09a1a9d33..bf7f9330d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -50,7 +50,7 @@ public class SortableRelationKey implements Comparable, Ser if (o == null || getClass() != o.getClass()) return false; SortableRelationKey that = (SortableRelationKey) o; - return Objects.equal(getGroupingKey(), that.getGroupingKey()); + return getGroupingKey().equals(that.getGroupingKey()); } @Override