From 6c31fddd0354c83a5b7522ab058a221c5fc3750f Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Tue, 7 Jan 2025 10:48:37 +0100 Subject: [PATCH] path existance works with globs, graph creation workflow to specify driver and executor memory overhead --- .../eu/dnetlib/dhp/common/HdfsSupport.java | 8 +++++++ .../graph/raw/VerifyRecordsApplication.java | 2 ++ .../common/AbstractMigrationApplication.java | 2 +- .../oa/graph/raw_all/oozie_app/workflow.xml | 24 ++++++++++++++++++- 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java index 654fdd5ac..371878196 100644 --- a/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java +++ b/dhp-common/src/main/java/eu/dnetlib/dhp/common/HdfsSupport.java @@ -33,6 +33,14 @@ public class HdfsSupport { () -> { Path f = new Path(path); FileSystem fileSystem = FileSystem.get(configuration); + if (path.contains("*")) { + FileStatus[] fileStatus = fileSystem.globStatus(f); + if (fileStatus != null) { + Arrays.stream(fileStatus).forEach(fs -> logger.info("Glob path: {}", fs.getPath())); + } + logger.info("Glob path exists: {}", fileStatus != null && fileStatus.length > 0); + return fileStatus != null && fileStatus.length > 0; + } return fileSystem.exists(f); }); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java index de0003bd1..523a41b85 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/VerifyRecordsApplication.java @@ -13,6 +13,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; @@ -78,6 +79,7 @@ public class VerifyRecordsApplication extends AbstractMigrationApplication { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); for (final String sp : existingSourcePaths) { + RDD invalidRecords = sc .sequenceFile(sp, Text.class, Text.class) .map(k -> tryApplyMapping(k._1().toString(), k._2().toString(), true, vocs)) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java index 950abdcc6..3238886fe 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/common/AbstractMigrationApplication.java @@ -106,7 +106,7 @@ public class AbstractMigrationApplication implements Closeable { return Arrays .stream(paths.split(",")) .filter(StringUtils::isNotBlank) - .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration()) || p.contains("/*")) + .filter(p -> HdfsSupport.exists(p, sc.hadoopConfiguration())) .collect(Collectors.toList()); } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml index 1b3cb1111..3c674fccd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-mapper/src/main/resources/eu/dnetlib/dhp/oa/graph/raw_all/oozie_app/workflow.xml @@ -357,6 +357,8 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -383,6 +385,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -417,6 +420,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -477,6 +481,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -501,6 +506,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -527,6 +533,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -550,6 +557,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -574,6 +582,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -599,6 +608,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -623,16 +633,18 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} + --conf spark.driver.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --masteryarn --hdfsPath${workingDir}/graph_raw --mdstoreManagerUrl${mdstoreManagerUrl} --mdFormatOAF --mdLayoutstore - --masteryarn --mdInterpretationgraph @@ -664,6 +676,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -690,6 +703,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -716,6 +730,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -742,6 +757,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -768,6 +784,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -794,6 +811,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -820,6 +838,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -846,6 +865,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -872,6 +892,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} @@ -910,6 +931,7 @@ --executor-memory ${sparkExecutorMemory} --executor-cores ${sparkExecutorCores} --driver-memory=${sparkDriverMemory} + --conf spark.executor.memoryOverhead=${sparkExecutorMemory} --conf spark.extraListeners=${spark2ExtraListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}