From fab99202719bb8636e51e5bb3b7cc87fd7166e9a Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Tue, 8 Aug 2023 15:52:20 +0200 Subject: [PATCH 1/2] DispatchEntitiesSparkJob: manage all entity types together, support filtering by dataInfo.invisible flag --- .../oa/merge/DispatchEntitiesSparkJob.java | 73 ++++---- .../merge/dispatch_entities_parameters.json | 6 +- .../dedup/consistency/oozie_app/workflow.xml | 174 +----------------- .../dnetlib/dhp/oa/dedup/SparkDedupTest.java | 106 ++++++----- .../dhp/oa/graph/group/oozie_app/workflow.xml | 174 +----------------- .../group/GroupEntitiesSparkJobTest.java | 58 +++--- 6 files changed, 137 insertions(+), 454 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java index 3f65d754f..88c32fdbf 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java @@ -11,25 +11,18 @@ import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; - import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; +import eu.dnetlib.dhp.schema.common.ModelSupport; public class DispatchEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void main(String[] args) throws Exception { String jsonConfiguration = IOUtils @@ -54,11 +47,12 @@ public class DispatchEntitiesSparkJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); + boolean filterInvisible = Optional + .ofNullable(parser.get("filterInvisible")) + .map(Boolean::valueOf) + .orElse(Boolean.FALSE); - @SuppressWarnings("unchecked") - Class entityClazz = (Class) Class.forName(graphTableClassName); + log.info("filterInvisible: {}", filterInvisible); SparkConf conf = new SparkConf(); runWithSparkSession( @@ -66,32 +60,43 @@ public class DispatchEntitiesSparkJob { isSparkSessionManaged, spark -> { HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchEntities(spark, inputPath, entityClazz, outputPath); + dispatchEntities(spark, inputPath, outputPath, filterInvisible); }); } - private static void dispatchEntities( + private static void dispatchEntities( SparkSession spark, String inputPath, - Class clazz, - String outputPath) { + String outputPath, + boolean filterInvisible) { - spark - .read() - .textFile(inputPath) - .filter((FilterFunction) s -> isEntityType(s, clazz)) - .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), - Encoders.bean(clazz)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(outputPath); + Dataset df = spark.read().textFile(inputPath); + + ModelSupport.oafTypes.entrySet().parallelStream().forEach(entry -> { + String entityType = entry.getKey(); + Class clazz = entry.getValue(); + + if (!entityType.equalsIgnoreCase("relation")) { + Dataset entityDF = spark + .read() + .schema(Encoders.bean(clazz).schema()) + .json( + df + .filter((FilterFunction) s -> s.startsWith(clazz.getName())) + .map( + (MapFunction) s -> StringUtils.substringAfter(s, "|"), + Encoders.STRING())); + + if (filterInvisible) { + entityDF = entityDF.filter("dataInfo.invisible != true"); + } + + entityDF + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(outputPath + "/" + entityType); + } + }); } - - private static boolean isEntityType(final String s, final Class clazz) { - return StringUtils.substringBefore(s, "|").equals(clazz.getName()); - } - } diff --git a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json index aa8d2a7c2..60f11ac84 100644 --- a/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json +++ b/dhp-common/src/main/resources/eu/dnetlib/dhp/oa/merge/dispatch_entities_parameters.json @@ -18,9 +18,9 @@ "paramRequired": true }, { - "paramName": "c", - "paramLongName": "graphTableClassName", - "paramDescription": "the graph entity class name", + "paramName": "fi", + "paramLongName": "filterInvisible", + "paramDescription": "if true filters out invisible entities", "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index fedc68d9d..b1ceca05a 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -144,25 +144,15 @@ --graphInputPath${graphBasePath} --outputPath${workingPath}/grouped_entities - + - - - - - - - - - - - + yarn cluster - Dispatch publications + Dispatch grouped entitities eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-dedup-openaire-${projectVersion}.jar @@ -176,164 +166,12 @@ --conf spark.sql.shuffle.partitions=7680 --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --outputPath${graphOutputPath} + --filterInvisibletrue - + - - - yarn - cluster - Dispatch project - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - - - - - - - - yarn - cluster - Dispatch organization - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - - - - - - - - yarn - cluster - Dispatch publication - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - - - - - - - - yarn - cluster - Dispatch dataset - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - - - - - - - - yarn - cluster - Dispatch software - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - - - - - - - - yarn - cluster - Dispatch otherresearchproduct - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-dedup-openaire-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - - - - - - - \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java index c382e922e..38bd72a5e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkDedupTest.java @@ -1,15 +1,25 @@ package eu.dnetlib.dhp.oa.dedup; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.util.MapDocumentUtil; +import static java.nio.file.Files.createTempDirectory; + +import static org.apache.spark.sql.functions.col; +import static org.apache.spark.sql.functions.count; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.lenient; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.Serializable; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -26,26 +36,19 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Sets; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.util.MapDocumentUtil; import scala.Tuple2; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.Serializable; -import java.net.URISyntaxException; -import java.nio.file.Paths; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import static java.nio.file.Files.createTempDirectory; -import static org.apache.spark.sql.functions.col; -import static org.apache.spark.sql.functions.count; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.lenient; - @ExtendWith(MockitoExtension.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SparkDedupTest implements Serializable { @@ -723,26 +726,32 @@ public class SparkDedupTest implements Serializable { @Order(8) void testCleanBaseRelations() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); // append dangling relations to be cleaned up - Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testGraphBasePath + "/relation"); - Dataset df_input =df_before - .unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a"))) - .unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a"))); + Dataset df_before = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(testGraphBasePath + "/relation"); + Dataset df_input = df_before + .unionByName(df_before.drop("source").withColumn("source", functions.lit("n/a"))) + .unionByName(df_before.drop("target").withColumn("target", functions.lit("n/a"))); df_input.write().mode(SaveMode.Overwrite).json(testOutputBasePath + "_tmp"); parser - .parseArgument( - new String[]{ - "--graphBasePath", testGraphBasePath, - "--inputPath", testGraphBasePath + "/relation", - "--outputPath", testDedupGraphBasePath + "/relation" - }); + .parseArgument( + new String[] { + "--graphBasePath", testGraphBasePath, + "--inputPath", testGraphBasePath + "/relation", + "--outputPath", testDedupGraphBasePath + "/relation" + }); new SparkCleanRelation(parser, spark).run(isLookUpService); - Dataset df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); + Dataset df_after = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(testDedupGraphBasePath + "/relation"); assertNotEquals(df_before.count(), df_input.count()); assertNotEquals(df_input.count(), df_after.count()); @@ -753,7 +762,7 @@ public class SparkDedupTest implements Serializable { @Order(9) void testCleanDedupedRelations() throws Exception { ArgumentApplicationParser parser = new ArgumentApplicationParser( - classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); + classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/cleanRelation_parameters.json")); String inputRelPath = testDedupGraphBasePath + "/propagaterelation/relation"; @@ -763,16 +772,19 @@ public class SparkDedupTest implements Serializable { df_before.filter(col("dataInfo.deletedbyinference").notEqual(true)).show(50, false); parser - .parseArgument( - new String[]{ - "--graphBasePath", testGraphBasePath, - "--inputPath", inputRelPath, - "--outputPath", testDedupGraphBasePath + "/relation" - }); + .parseArgument( + new String[] { + "--graphBasePath", testGraphBasePath, + "--inputPath", inputRelPath, + "--outputPath", testDedupGraphBasePath + "/relation" + }); new SparkCleanRelation(parser, spark).run(isLookUpService); - Dataset df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation"); + Dataset df_after = spark + .read() + .schema(Encoders.bean(Relation.class).schema()) + .json(testDedupGraphBasePath + "/relation"); assertNotEquals(df_before.count(), df_after.count()); assertEquals(0, df_after.count()); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml index 888a873c5..27a18207b 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml @@ -94,25 +94,15 @@ --graphInputPath${graphBasePath} --outputPath${workingPath}/grouped_entities - + - - - - - - - - - - - + yarn cluster - Dispatch publications + Dispatch grouped entities eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob dhp-graph-mapper-${projectVersion}.jar @@ -126,165 +116,13 @@ --conf spark.sql.shuffle.partitions=7680 --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --outputPath${graphOutputPath} + --filterInvisiblefalse - + - - - yarn - cluster - Dispatch project - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - - - - - - - - yarn - cluster - Dispatch organization - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - - - - - - - - yarn - cluster - Dispatch publication - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - - - - - - - - yarn - cluster - Dispatch dataset - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - - - - - - - - yarn - cluster - Dispatch software - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - - - - - - - - yarn - cluster - Dispatch otherresearchproduct - eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --inputPath${workingPath}/grouped_entities - --outputPath${graphOutputPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - - - - - - - diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java index 3bd1c13de..4c86da47e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/group/GroupEntitiesSparkJobTest.java @@ -1,14 +1,14 @@ package eu.dnetlib.dhp.oa.graph.group; -import static org.junit.jupiter.api.Assertions.*; - -import java.io.IOException; -import java.net.URISyntaxException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; - +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; +import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.Result; +import eu.dnetlib.dhp.utils.DHPUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -19,17 +19,13 @@ import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.merge.DispatchEntitiesSparkJob; -import eu.dnetlib.dhp.oa.merge.GroupEntitiesSparkJob; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Publication; -import eu.dnetlib.dhp.schema.oaf.Result; -import eu.dnetlib.dhp.utils.DHPUtils; +import static org.junit.jupiter.api.Assertions.assertEquals; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class GroupEntitiesSparkJobTest { @@ -101,21 +97,16 @@ public class GroupEntitiesSparkJobTest { @Test @Order(2) void testDispatchEntities() throws Exception { - for (String type : Lists - .newArrayList( - Publication.class.getCanonicalName(), eu.dnetlib.dhp.schema.oaf.Dataset.class.getCanonicalName())) { - String directory = StringUtils.substringAfterLast(type, ".").toLowerCase(); - DispatchEntitiesSparkJob.main(new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - groupEntityPath.toString(), - "-outputPath", - dispatchEntityPath.resolve(directory).toString(), - "-graphTableClassName", - type - }); - } + DispatchEntitiesSparkJob.main(new String[] { + "-isSparkSessionManaged", + Boolean.FALSE.toString(), + "-inputPath", + groupEntityPath.toString(), + "-outputPath", + dispatchEntityPath.resolve(".").toString(), + "-filterInvisible", + Boolean.TRUE.toString() + }); Dataset output = spark .read() @@ -140,5 +131,4 @@ public class GroupEntitiesSparkJobTest { .filter((FilterFunction) s -> s.equals("dataset")) .count()); } - } From 95cd2b9b1e7f130d53b616773e9e016ccf970b83 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Thu, 10 Aug 2023 11:53:48 +0200 Subject: [PATCH 2/2] Make filterInvisible a mandatory parameter of DispathEntitiesSparkJob Make filterInvisible a mandatory parameter of both dedup/consistency and graph/group oozie workflows --- .../eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java | 6 +----- .../dhp/oa/dedup/consistency/oozie_app/workflow.xml | 7 +++++-- .../eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml | 6 +++++- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java index 88c32fdbf..4d2ccb178 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/oa/merge/DispatchEntitiesSparkJob.java @@ -47,11 +47,7 @@ public class DispatchEntitiesSparkJob { String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - boolean filterInvisible = Optional - .ofNullable(parser.get("filterInvisible")) - .map(Boolean::valueOf) - .orElse(Boolean.FALSE); - + boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); log.info("filterInvisible: {}", filterInvisible); SparkConf conf = new SparkConf(); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml index b1ceca05a..eb7e6d98d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/consistency/oozie_app/workflow.xml @@ -12,6 +12,10 @@ graphOutputPath path of the output graph + + filterInvisible + whether filter out invisible entities after merge + sparkDriverMemory memory for driver process @@ -24,7 +28,6 @@ sparkExecutorCores number of cores used by single executor - oozieActionShareLibForSpark2 oozie action sharelib for spark 2.* @@ -167,7 +170,7 @@ --inputPath${workingPath}/grouped_entities --outputPath${graphOutputPath} - --filterInvisibletrue + --filterInvisible${filterInvisible} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml index 27a18207b..219dc7331 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/group/oozie_app/workflow.xml @@ -12,6 +12,10 @@ graphOutputPath path of the output graph + + filterInvisible + whether filter out invisible entities after merge + sparkDriverMemory memory for driver process @@ -117,7 +121,7 @@ --inputPath${workingPath}/grouped_entities --outputPath${graphOutputPath} - --filterInvisiblefalse + --filterInvisible${filterInvisible}