From 8047d16dd9f56787ad469746839f86d70facdadc Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 27 May 2020 12:38:12 +0200 Subject: [PATCH] added RDD based adjacency list creation procedure --- .../oa/provision/AdjacencyListBuilderJob.java | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 99247b756..9f221ae45 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -9,14 +9,20 @@ import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; @@ -83,7 +89,7 @@ public class AdjacencyListBuilderJob { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - createAdjacencyLists(spark, inputPath, outputPath); + createAdjacencyListsRDD(spark, inputPath, outputPath); }); } @@ -118,6 +124,38 @@ public class AdjacencyListBuilderJob { .parquet(outputPath); } + private static void createAdjacencyListsRDD( + SparkSession spark, String inputPath, String outputPath) { + + log.info("Reading joined entities from: {}", inputPath); + RDD joinedEntities = spark + .read() + .load(inputPath) + .as(Encoders.bean(EntityRelEntity.class)) + .javaRDD() + .mapToPair(re -> { + JoinedEntity je = new JoinedEntity(); + je.setEntity(re.getEntity()); + je.setLinks(Lists.newArrayList()); + if (re.getRelation() != null && re.getTarget() != null) { + je.getLinks().add(new Tuple2(re.getRelation(), re.getTarget())); + } + return new scala.Tuple2<>(re.getEntity().getId(), je); + }) + .reduceByKey((je1, je2) -> { + je1.getLinks().addAll(je2.getLinks()); + return je1; + }) + .map(t -> t._2()) + .rdd(); + + spark + .createDataset(joinedEntities, Encoders.bean(JoinedEntity.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); }