1
0
Fork 0

minor refactoring

This commit is contained in:
Miriam Baglioni 2020-04-15 12:21:31 +02:00
parent 27f1d3ee8f
commit 1859ce8902
1 changed files with 3 additions and 11 deletions

View File

@ -73,16 +73,15 @@ public class SparkResultToProjectThroughSemRelJob3 {
runWithSparkSession(conf, isSparkSessionManaged, runWithSparkSession(conf, isSparkSessionManaged,
spark -> { spark -> {
//createOutputDirs(outputPath, FileSystem.get(spark.sparkContext().hadoopConfiguration()));
if(isTest(parser)) { if(isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} }
execPropagation(spark, inputPath, outputPath, alreadyLinkedPath, potentialUpdatePath, writeUpdates, saveGraph); execPropagation(spark, outputPath, alreadyLinkedPath, potentialUpdatePath, writeUpdates, saveGraph);
}); });
} }
private static void execPropagation(SparkSession spark, String inputPath, String outputPath, String alreadyLinkedPath, String potentialUpdatePath, private static void execPropagation(SparkSession spark, String outputPath, String alreadyLinkedPath, String potentialUpdatePath,
Boolean writeUpdate, Boolean saveGraph){ Boolean writeUpdate, Boolean saveGraph){
Dataset<ProjectResultSet> toaddrelations = readAssocProjectResults(spark, potentialUpdatePath); Dataset<ProjectResultSet> toaddrelations = readAssocProjectResults(spark, potentialUpdatePath);
@ -95,7 +94,6 @@ public class SparkResultToProjectThroughSemRelJob3 {
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression","gzip") .option("compression","gzip")
.text(outputPath +"/potential_updates"); .text(outputPath +"/potential_updates");
//writeUpdates(toaddrelations.toJavaRDD(), outputPath + "/potential_updates");
} }
if (saveGraph){ if (saveGraph){
getNewRelations(alreadyLinked, toaddrelations) getNewRelations(alreadyLinked, toaddrelations)
@ -104,13 +102,7 @@ public class SparkResultToProjectThroughSemRelJob3 {
.mode(SaveMode.Append) .mode(SaveMode.Append)
.option("compression", "gzip") .option("compression", "gzip")
.text(outputPath); .text(outputPath);
// JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
// sc.textFile(inputPath)
// .map(item -> OBJECT_MAPPER.readValue(item, Relation.class))
// .union(getNewRelations(alreadyLinked, toaddrelations)
// .toJavaRDD())
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(outputPath , GzipCodec.class);
} }
} }