[stats wf] indicators across stats dbs & updates in the org ids #248

Closed
dimitris.pierrakos wants to merge 1742 commits from beta into beta2master_sept_2022
3 changed files with 59 additions and 19 deletions
Showing only changes of commit 904e1c2667 - Show all commits

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable; import java.io.Serializable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -64,6 +65,18 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
final String workingPath = parser.get("workingDir"); final String workingPath = parser.get("workingDir");
log.info("workingPath: {}", workingPath); log.info("workingPath: {}", workingPath);
final int iterations = Optional
.ofNullable(parser.get("iterations"))
.map(v -> {
if (Integer.valueOf(v) < MAX_ITERATION) {
return Integer.valueOf(v);
} else
return MAX_ITERATION;
})
.orElse(MAX_ITERATION);
log.info("iterations: {}", iterations);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
@ -77,7 +90,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
resultOrganizationPath, resultOrganizationPath,
relationPath, relationPath,
workingPath, workingPath,
outputPath)); outputPath,
iterations));
} }
public static void execPropagation(SparkSession spark, public static void execPropagation(SparkSession spark,
@ -86,7 +100,13 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
String resultOrganizationPath, String resultOrganizationPath,
String graphPath, String graphPath,
String workingPath, String workingPath,
String outputPath) { String outputPath,
int iterations) {
if (iterations == 1) {
doPropagateOnce(
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
workingPath, outputPath);
} else {
final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE); final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE);
final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO); final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO);
@ -105,9 +125,22 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
doPropagate( doPropagate(
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
workingPath, outputPath, propagationCounter); workingPath, outputPath, propagationCounter);
}
} }
private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String graphPath, String workingPath,
String outputPath) {
StepActions
.execStep(
spark, graphPath, workingPath + NEW_RELATION_PATH,
leavesPath, childParentPath, resultOrganizationPath);
addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath);
}
private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath,
String resultOrganizationPath, String graphPath, String workingPath, String outputPath, String resultOrganizationPath, String graphPath, String workingPath, String outputPath,
PropagationCounter propagationCounter) { PropagationCounter propagationCounter) {

View File

@ -46,5 +46,11 @@
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the path used to store temporary output files", "paramDescription": "the path used to store temporary output files",
"paramRequired": true "paramRequired": true
},
{
"paramName": "it",
"paramLongName": "iterations",
"paramDescription": "the number of iterations to be computed",
"paramRequired": false
} }
] ]

View File

@ -1,4 +1,4 @@
<workflow-app name="affiliation_from_instrepo_propagation" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="affiliation_from_semrel_propagation" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>sourcePath</name> <name>sourcePath</name>
@ -181,6 +181,7 @@
<arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg> <arg>--resultOrgPath</arg><arg>${workingDir}/preparedInfo/resultOrgPath</arg>
<arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg> <arg>--hive_metastore_uris</arg><arg>${hive_metastore_uris}</arg>
<arg>--workingDir</arg><arg>${workingDir}/working</arg> <arg>--workingDir</arg><arg>${workingDir}/working</arg>
<arg>--iterations</arg><arg>${iterations}</arg>
</spark> </spark>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>