|
|
|
@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
|
|
|
|
|
|
|
|
|
|
import java.io.Serializable;
|
|
|
|
|
import java.util.Arrays;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
|
|
import org.apache.spark.SparkConf;
|
|
|
|
@ -64,6 +65,18 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
|
|
|
|
|
final String workingPath = parser.get("workingDir");
|
|
|
|
|
log.info("workingPath: {}", workingPath);
|
|
|
|
|
|
|
|
|
|
final int iterations = Optional
|
|
|
|
|
.ofNullable(parser.get("iterations"))
|
|
|
|
|
.map(v -> {
|
|
|
|
|
if (Integer.valueOf(v) < MAX_ITERATION) {
|
|
|
|
|
return Integer.valueOf(v);
|
|
|
|
|
} else
|
|
|
|
|
return MAX_ITERATION;
|
|
|
|
|
})
|
|
|
|
|
.orElse(MAX_ITERATION);
|
|
|
|
|
|
|
|
|
|
log.info("iterations: {}", iterations);
|
|
|
|
|
|
|
|
|
|
SparkConf conf = new SparkConf();
|
|
|
|
|
conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
|
|
|
|
|
|
|
|
|
@ -77,7 +90,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
|
|
|
|
|
resultOrganizationPath,
|
|
|
|
|
relationPath,
|
|
|
|
|
workingPath,
|
|
|
|
|
outputPath));
|
|
|
|
|
outputPath,
|
|
|
|
|
iterations));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static void execPropagation(SparkSession spark,
|
|
|
|
@ -86,26 +100,45 @@ public class SparkResultToOrganizationFromSemRel implements Serializable {
|
|
|
|
|
String resultOrganizationPath,
|
|
|
|
|
String graphPath,
|
|
|
|
|
String workingPath,
|
|
|
|
|
String outputPath,
|
|
|
|
|
int iterations) {
|
|
|
|
|
if (iterations == 1) {
|
|
|
|
|
doPropagateOnce(
|
|
|
|
|
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
|
|
|
|
|
workingPath, outputPath);
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
|
|
final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE);
|
|
|
|
|
final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO);
|
|
|
|
|
final LongAccumulator iterationThree = spark.sparkContext().longAccumulator(ITERATION_THREE);
|
|
|
|
|
final LongAccumulator iterationFour = spark.sparkContext().longAccumulator(ITERATION_FOUR);
|
|
|
|
|
final LongAccumulator iterationFive = spark.sparkContext().longAccumulator(ITERATION_FIVE);
|
|
|
|
|
final LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator(ITERATION_NO_PARENT);
|
|
|
|
|
|
|
|
|
|
final PropagationCounter propagationCounter = new PropagationCounter(iterationOne,
|
|
|
|
|
iterationTwo,
|
|
|
|
|
iterationThree,
|
|
|
|
|
iterationFour,
|
|
|
|
|
iterationFive,
|
|
|
|
|
notReachedFirstParent);
|
|
|
|
|
|
|
|
|
|
doPropagate(
|
|
|
|
|
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
|
|
|
|
|
workingPath, outputPath, propagationCounter);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath,
|
|
|
|
|
String resultOrganizationPath, String graphPath, String workingPath,
|
|
|
|
|
String outputPath) {
|
|
|
|
|
|
|
|
|
|
final LongAccumulator iterationOne = spark.sparkContext().longAccumulator(ITERATION_ONE);
|
|
|
|
|
final LongAccumulator iterationTwo = spark.sparkContext().longAccumulator(ITERATION_TWO);
|
|
|
|
|
final LongAccumulator iterationThree = spark.sparkContext().longAccumulator(ITERATION_THREE);
|
|
|
|
|
final LongAccumulator iterationFour = spark.sparkContext().longAccumulator(ITERATION_FOUR);
|
|
|
|
|
final LongAccumulator iterationFive = spark.sparkContext().longAccumulator(ITERATION_FIVE);
|
|
|
|
|
final LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator(ITERATION_NO_PARENT);
|
|
|
|
|
|
|
|
|
|
final PropagationCounter propagationCounter = new PropagationCounter(iterationOne,
|
|
|
|
|
iterationTwo,
|
|
|
|
|
iterationThree,
|
|
|
|
|
iterationFour,
|
|
|
|
|
iterationFive,
|
|
|
|
|
notReachedFirstParent);
|
|
|
|
|
|
|
|
|
|
doPropagate(
|
|
|
|
|
spark, leavesPath, childParentPath, resultOrganizationPath, graphPath,
|
|
|
|
|
workingPath, outputPath, propagationCounter);
|
|
|
|
|
StepActions
|
|
|
|
|
.execStep(
|
|
|
|
|
spark, graphPath, workingPath + NEW_RELATION_PATH,
|
|
|
|
|
leavesPath, childParentPath, resultOrganizationPath);
|
|
|
|
|
|
|
|
|
|
addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath,
|
|
|
|
|