From 8ec0d62d91fbc829222f77bfe3af9e31a00cc170 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Thu, 2 Mar 2023 14:49:19 +0100 Subject: [PATCH] pre-group the records in each table before joning the contents from BETA and PROD together --- .../graph/merge/MergeGraphTableSparkJob.java | 84 +++++++++++++++---- .../dhp/oa/graph/merge/oozie_app/workflow.xml | 2 +- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java index 5f2cd6808..d4f1d9f55 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.java @@ -11,10 +11,9 @@ import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +24,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; /** @@ -107,11 +107,11 @@ public class MergeGraphTableSparkJob { Class b_clazz, String outputPath) { - Dataset> beta = readTableFromPath(spark, betaInputPath, b_clazz); - Dataset> prod = readTableFromPath(spark, prodInputPath, p_clazz); + Dataset> beta = readTableAndGroupById(spark, betaInputPath, b_clazz); + Dataset> prod = readTableAndGroupById(spark, prodInputPath, p_clazz); prod - .joinWith(beta, prod.col("_1").equalTo(beta.col("_1")), "full_outer") + .joinWith(beta, prod.col("value").equalTo(beta.col("value")), "full_outer") .map((MapFunction, Tuple2>, P>) value -> { Optional

p = Optional.ofNullable(value._1()).map(Tuple2::_2); Optional b = Optional.ofNullable(value._2()).map(Tuple2::_2); @@ -126,12 +126,13 @@ public class MergeGraphTableSparkJob { case "PROD": return mergeWithPriorityToPROD(p, b); } - }, Encoders.bean(p_clazz)) + }, Encoders.kryo(p_clazz)) .filter((FilterFunction

) Objects::nonNull) + .map((MapFunction) OBJECT_MAPPER::writeValueAsString, Encoders.STRING()) .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") - .json(outputPath); + .text(outputPath); } /** @@ -212,20 +213,71 @@ public class MergeGraphTableSparkJob { return null; } - private static Dataset> readTableFromPath( + private static Dataset> readTableAndGroupById( SparkSession spark, String inputEntityPath, Class clazz) { + final TypedColumn aggregator = new GroupingAggregator(clazz).toColumn(); + log.info("Reading Graph table from: {}", inputEntityPath); return spark .read() .textFile(inputEntityPath) - .map( - (MapFunction>) value -> { - final T t = OBJECT_MAPPER.readValue(value, clazz); - final String id = ModelSupport.idFn().apply(t); - return new Tuple2<>(id, t); - }, - Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); + .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.kryo(clazz)) + .groupByKey((MapFunction) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) + .agg(aggregator); + } + + public static class GroupingAggregator extends Aggregator { + + private Class clazz; + + public GroupingAggregator(Class clazz) { + this.clazz = clazz; + } + + @Override + public T zero() { + return null; + } + + @Override + public T reduce(T b, T a) { + return mergeAndGet(b, a); + } + + private T mergeAndGet(T b, T a) { + if (Objects.nonNull(a) && Objects.nonNull(b)) { + if (ModelSupport.isSubClass(a, OafEntity.class) && ModelSupport.isSubClass(b, OafEntity.class)) { + return (T) OafMapperUtils.mergeEntities((OafEntity) b, (OafEntity) a); + } + if (a instanceof Relation && b instanceof Relation) { + ((Relation) a).mergeFrom(b); + return a; + } + } + return Objects.isNull(a) ? b : a; + } + + @Override + public T merge(T b, T a) { + return mergeAndGet(b, a); + } + + @Override + public T finish(T j) { + return j; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(clazz); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(clazz); + } + } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml index 86fb51042..a8d0d5068 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/merge/oozie_app/workflow.xml @@ -275,7 +275,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=10000 --betaInputPath${betaInputGraphPath}/relation --prodInputPath${prodInputGraphPath}/relation