From 822971f54f80cf40421ef0a00898738549492e76 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 12 Nov 2020 09:22:59 +0100 Subject: [PATCH] no need to filter relations in CreateRelatedEntitiesJob_phase1; replaced 'left outer' join with 'left' join in CreateRelatedEntitiesJob_phase2; cleanup; --- .../oa/provision/AdjacencyListBuilderJob.java | 91 ------------------- .../CreateRelatedEntitiesJob_phase1.java | 1 - .../CreateRelatedEntitiesJob_phase2.java | 4 +- .../dhp/oa/provision/PrepareRelationsJob.java | 13 ++- 4 files changed, 9 insertions(+), 100 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java deleted file mode 100644 index 0bc270e8f..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ /dev/null @@ -1,91 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import org.apache.commons.io.IOUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.sql.*; -import org.apache.spark.sql.expressions.Aggregator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.*; -import scala.Tuple2; -import scala.collection.JavaConverters; -import scala.collection.Seq; - -/** - * AdjacencyListBuilderJob: given the tuple (S - R - T) we need to group by S.id -> List [ R - T ], mapping the - * result as JoinedEntity - */ -public class AdjacencyListBuilderJob { - - private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class); - - public static final int MAX_LINKS = 100; - - public static void main(String[] args) throws Exception { - - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - AdjacencyListBuilderJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json"))); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); - - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - createAdjacencyListsKryo(spark, inputPath, outputPath); - }); - } - - private static void createAdjacencyListsKryo( - SparkSession spark, String inputPath, String outputPath) { - - log.info("Reading joined entities from: {}", inputPath); - - final List paths = HdfsSupport - .listFiles(inputPath, spark.sparkContext().hadoopConfiguration()); - - log.info("Found paths: {}", String.join(",", paths)); - - } - - private static Seq toSeq(List list) { - return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); - } - - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 86d380409..dd251ec04 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -92,7 +92,6 @@ public class CreateRelatedEntitiesJob_phase1 { String outputPath) { Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) - .filter("dataInfo.deletedbyinference == false") .map( (MapFunction>) r -> new Tuple2<>(r.getTarget(), r), diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 170835fdb..9cdf1cd2e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -105,7 +105,7 @@ public class CreateRelatedEntitiesJob_phase2 { TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); entities - .joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") + .joinWith(relatedEntities, entities.col("_1").equalTo(relatedEntities.col("_1")), "left") .map((MapFunction, Tuple2>, JoinedEntity>) value -> { JoinedEntity je = new JoinedEntity(value._1()._2()); Optional @@ -114,7 +114,6 @@ public class CreateRelatedEntitiesJob_phase2 { .ifPresent(r -> je.getLinks().add(r)); return je; }, Encoders.kryo(JoinedEntity.class)) - .filter(filterEmptyEntityFn()) .groupByKey( (MapFunction) value -> value.getEntity().getId(), Encoders.STRING()) @@ -122,7 +121,6 @@ public class CreateRelatedEntitiesJob_phase2 { .map( (MapFunction, JoinedEntity>) value -> value._2(), Encoders.kryo(JoinedEntity.class)) - .filter(filterEmptyEntityFn()) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 20d27f0f3..c87f0cd94 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,8 +3,10 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.*; -import java.util.function.Supplier; +import java.util.HashSet; +import java.util.Optional; +import java.util.PriorityQueue; +import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; @@ -15,8 +17,10 @@ import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.*; +import org.apache.spark.sql.Encoder; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +28,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser;