diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java index 9ceebf222..cfc69a8f0 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; import java.util.Arrays; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -64,6 +65,18 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { final String workingPath = parser.get("workingDir"); log.info("workingPath: {}", workingPath); + final int iterations = Optional + .ofNullable(parser.get("iterations")) + .map(v -> { + if (Integer.valueOf(v) < MAX_ITERATION) { + return Integer.valueOf(v); + } else + return MAX_ITERATION; + }) + .orElse(MAX_ITERATION); + + log.info("iterations: {}", iterations); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); @@ -77,7 +90,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { resultOrganizationPath, relationPath, workingPath, - outputPath)); + outputPath, + iterations)); } public static void execPropagation(SparkSession spark, @@ -86,26 +100,45 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { String resultOrganizationPath, String graphPath, String workingPath, + String outputPath, + int iterations) { + if (iterations == 1) { + doPropagateOnce( + spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, + workingPath, outputPath); + } else { + + final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE); + final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO); + final LongAccumulator iterationThree = spark.sparkContext().longAccumulator(ITERATION_THREE); + final LongAccumulator iterationFour = spark.sparkContext().longAccumulator(ITERATION_FOUR); + final LongAccumulator iterationFive = spark.sparkContext().longAccumulator(ITERATION_FIVE); + final LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator(ITERATION_NO_PARENT); + + final PropagationCounter propagationCounter = new PropagationCounter(iterationOne, + iterationTwo, + iterationThree, + iterationFour, + iterationFive, + notReachedFirstParent); + + doPropagate( + spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, + workingPath, outputPath, propagationCounter); + } + + } + + private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath, + String resultOrganizationPath, String graphPath, String workingPath, String outputPath) { - final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE); - final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO); - final LongAccumulator iterationThree = spark.sparkContext().longAccumulator(ITERATION_THREE); - final LongAccumulator iterationFour = spark.sparkContext().longAccumulator(ITERATION_FOUR); - final LongAccumulator iterationFive = spark.sparkContext().longAccumulator(ITERATION_FIVE); - final LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator(ITERATION_NO_PARENT); - - final PropagationCounter propagationCounter = new PropagationCounter(iterationOne, - iterationTwo, - iterationThree, - iterationFour, - iterationFive, - notReachedFirstParent); - - doPropagate( - spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, - workingPath, outputPath, propagationCounter); + StepActions + .execStep( + spark, graphPath, workingPath + NEW_RELATION_PATH, + leavesPath, childParentPath, resultOrganizationPath); + addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath); } private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json index f73cc221e..e09cd62fa 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json @@ -46,5 +46,11 @@ "paramLongName": "outputPath", "paramDescription": "the path used to store temporary output files", "paramRequired": true + }, + { + "paramName": "it", + "paramLongName": "iterations", + "paramDescription": "the number of iterations to be computed", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml index 17502abea..3f0530aaf 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - + sourcePath @@ -181,6 +181,7 @@ --resultOrgPath${workingDir}/preparedInfo/resultOrgPath --hive_metastore_uris${hive_metastore_uris} --workingDir${workingDir}/working + --iterations${iterations}