From e62333192c80e9a2307239244fe31a01bea6d77b Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 25 Jun 2020 12:22:18 +0200 Subject: [PATCH] WIP: prepare relation job --- .../dhp/oa/provision/PrepareRelationsJob.java | 58 +++++++++---------- .../provision/model/SortableRelationKey.java | 2 +- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index cf311c690..cb1a3b327 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -1,42 +1,33 @@ package eu.dnetlib.dhp.oa.provision; -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.io.Serializable; -import java.util.*; -import java.util.function.Supplier; - -import javax.annotation.Nullable; - +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; +import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; +import eu.dnetlib.dhp.schema.oaf.Relation; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; -import com.google.common.base.Splitter; -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; -import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; -import eu.dnetlib.dhp.schema.oaf.Relation; import scala.Tuple2; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; + /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The * operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, and @@ -136,32 +127,35 @@ public class PrepareRelationsJob { SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { - RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) + // group by SOURCE and apply limit + RDD bySource = readPathRelationRDD(spark, inputRelationsPath) .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) - - // group by SOURCE and apply limit .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .groupBy(Tuple2::_1) .map(Tuple2::_2) - .map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) + .map(t -> Iterables.limit(t, maxRelations)) .flatMap(Iterable::iterator) .map(Tuple2::_2) + .rdd(); - // group by TARGET and apply limit + // group by TARGET and apply limit + RDD byTarget = readPathRelationRDD(spark, inputRelationsPath) + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r)) .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) .groupBy(Tuple2::_1) .map(Tuple2::_2) - .map(t -> Iterables.filter(t, input -> input._1().getSubRelType().equals("outcome"))) - // .map(t -> Iterables.limit(t, maxRelations)) + .map(t -> Iterables.limit(t, maxRelations)) .flatMap(Iterable::iterator) .map(Tuple2::_2) .rdd(); spark - .createDataset(cappedRels, Encoders.bean(Relation.class)) + .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class)) + .repartition(relPartitions) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index 09a1a9d33..bf7f9330d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -50,7 +50,7 @@ public class SortableRelationKey implements Comparable, Ser if (o == null || getClass() != o.getClass()) return false; SortableRelationKey that = (SortableRelationKey) o; - return Objects.equal(getGroupingKey(), that.getGroupingKey()); + return getGroupingKey().equals(that.getGroupingKey()); } @Override