From 528231a287be05c251709773c62ed8386382e8a9 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 13 Nov 2020 15:37:48 +0100 Subject: [PATCH] grouping graph entities by id turned out to be an easy extension for the already existing cleaning workflow --- .../oa/graph/clean/CleanGraphSparkJob.java | 14 +- .../GroupEntitiesAndRelationsSparkJob.java | 2 +- .../groupbyid/DispatchEntitiesSparkJob.java | 97 ------ .../dhp/oa/graph/clean/oozie_app/workflow.xml | 26 +- .../groupbyid/oozie_app/config-default.xml | 18 -- .../oa/graph/groupbyid/oozie_app/workflow.xml | 293 ------------------ 6 files changed, 35 insertions(+), 415 deletions(-) rename dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/{groupbyid => clean}/GroupEntitiesAndRelationsSparkJob.java (99%) delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml delete mode 100644 dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 714b35dac..8231dd77e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -6,7 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.Optional; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -69,12 +71,12 @@ public class CleanGraphSparkJob { conf, isSparkSessionManaged, spark -> { - removeOutputDir(spark, outputPath); - fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath); + HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); + cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath); }); } - private static void fixGraphTable( + private static void cleanGraphTable( SparkSession spark, VocabularyGroup vocs, String inputPath, @@ -100,13 +102,15 @@ public class CleanGraphSparkJob { return spark .read() .textFile(inputEntityPath) + .filter((FilterFunction) s -> isEntityType(s, clazz)) + .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + private static boolean isEntityType(final String s, final Class clazz) { + return StringUtils.substringBefore(s, "|").equals(clazz.getName()); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java similarity index 99% rename from dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java rename to dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java index 1d887cecb..9c80528e3 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/GroupEntitiesAndRelationsSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/GroupEntitiesAndRelationsSparkJob.java @@ -1,5 +1,5 @@ -package eu.dnetlib.dhp.oa.graph.groupbyid; +package eu.dnetlib.dhp.oa.graph.clean; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.utils.DHPUtils.toSeq; diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java deleted file mode 100644 index 1b4226411..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/groupbyid/DispatchEntitiesSparkJob.java +++ /dev/null @@ -1,97 +0,0 @@ - -package eu.dnetlib.dhp.oa.graph.groupbyid; - -import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; - -import java.util.Optional; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Oaf; -import eu.dnetlib.dhp.schema.oaf.OafEntity; -import scala.Tuple2; - -public class DispatchEntitiesSparkJob { - - private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class); - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - public static void main(final String[] args) throws Exception { - final ArgumentApplicationParser parser = new ArgumentApplicationParser( - IOUtils - .toString( - MigrateMongoMdstoresApplication.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json"))); - parser.parseArgument(args); - - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - - final String entitiesPath = parser.get("entitiesPath"); - log.info("entitiesPath: {}", entitiesPath); - - final String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); - - String graphTableClassName = parser.get("graphTableClassName"); - log.info("graphTableClassName: {}", graphTableClassName); - - Class entityClazz = (Class) Class.forName(graphTableClassName); - - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); - - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration()); - dispatchOaf(spark, entityClazz, entitiesPath, outputPath); - }); - } - - private static void dispatchOaf( - final SparkSession spark, - final Class clazz, - final String sourcePath, - final String targetPath) { - - log.info("Processing entities ({}) in file: {}", clazz.getName(), sourcePath); - - spark - .read() - .textFile(sourcePath) - .filter((FilterFunction) s -> isEntityType(s, clazz)) - .map((MapFunction) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING()) - .write() - .option("compression", "gzip") - .mode(SaveMode.Overwrite) - .text(targetPath); - } - - private static boolean isEntityType(final String s, final Class clazz) { - return StringUtils.substringBefore(s, "|").equals(clazz.getName()); - } - -} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml index 7329df29a..8b6ca9de6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/clean/oozie_app/workflow.xml @@ -50,12 +50,36 @@ - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + yarn + cluster + group graph entities and relations + eu.dnetlib.dhp.oa.graph.clean.GroupEntitiesAndRelationsSparkJob + dhp-graph-mapper-${projectVersion}.jar + + --executor-cores=${sparkExecutorCores} + --executor-memory=${sparkExecutorMemory} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=7680 + + --graphInputPath${graphInputPath} + --outputPath${workingDir}/grouped_entities + + + + + diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml deleted file mode 100644 index 2e0ed9aee..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/config-default.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - jobTracker - yarnRM - - - nameNode - hdfs://nameservice1 - - - oozie.use.system.libpath - true - - - oozie.action.sharelib.for.spark - spark2 - - \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml deleted file mode 100644 index 70b2ea68e..000000000 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/groupbyid/oozie_app/workflow.xml +++ /dev/null @@ -1,293 +0,0 @@ - - - - - graphInputPath - the graph root input path - - - outputPath - the graph root output path - - - - sparkDriverMemory - memory for driver process - - - sparkExecutorMemory - memory for individual executor - - - sparkExecutorCores - number of cores used by single executor - - - oozieActionShareLibForSpark2 - oozie action sharelib for spark 2.* - - - spark2ExtraListeners - com.cloudera.spark.lineage.NavigatorAppListener - spark 2.* extra listeners classname - - - spark2SqlQueryExecutionListeners - com.cloudera.spark.lineage.NavigatorQueryListener - spark 2.* sql query execution listeners classname - - - spark2YarnHistoryServerAddress - spark 2.* yarn history server address - - - spark2EventLogDir - spark 2.* event log dir location - - - - - - - Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - - - - yarn - cluster - group graph entities and relations - eu.dnetlib.dhp.oa.graph.groupbyid.GroupEntitiesAndRelationsSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --graphInputPath${graphInputPath} - --outputPath${workingDir}/grouped_entities - - - - - - - - - - - - - - - - - - - yarn - cluster - Dispatch publications - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - - - - - - - - yarn - cluster - Dispatch datasets - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - - - - - - - - yarn - cluster - Dispatch softwares - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - - - - - - - - yarn - cluster - Dispatch otherresearchproducts - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - - - - - - - - yarn - cluster - Dispatch datasources - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - - - - - - - - yarn - cluster - Dispatch organizations - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - - - - - - - - yarn - cluster - Dispatch project - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - - - - - - - - yarn - cluster - Dispatch relations - eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob - dhp-graph-mapper-${projectVersion}.jar - - --executor-cores=${sparkExecutorCores} - --executor-memory=${sparkExecutorMemory} - --driver-memory=${sparkDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - - --entitiesPath${workingDir}/grouped_entities - --outputPath${outputPath}/relation - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Relation - - - - - - - - - \ No newline at end of file