This commit is contained in:
Miriam Baglioni 2020-04-30 15:23:49 +02:00
parent 65a5d67b8b
commit 13f30664ea
2 changed files with 18 additions and 21 deletions

View File

@ -70,7 +70,7 @@ public class SparkResultToCommunityFromOrganizationJob2 {
if (isTest(parser)) { if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} }
if(saveGraph) if (saveGraph)
execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath); execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
}); });
} }

View File

@ -58,8 +58,6 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
final String resultClassName = parser.get("resultTableName"); final String resultClassName = parser.get("resultTableName");
log.info("resultTableName: {}", resultClassName); log.info("resultTableName: {}", resultClassName);
final Boolean saveGraph = Optional final Boolean saveGraph = Optional
.ofNullable(parser.get("saveGraph")) .ofNullable(parser.get("saveGraph"))
.map(Boolean::valueOf) .map(Boolean::valueOf)
@ -78,7 +76,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
if (isTest(parser)) { if (isTest(parser)) {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
} }
if(saveGraph) if (saveGraph)
execPropagation( execPropagation(
spark, spark,
datasourceorganization, datasourceorganization,
@ -86,7 +84,7 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
inputPath, inputPath,
outputPath, outputPath,
resultClazz); resultClazz);
}); });
} }
private static void execPropagation( private static void execPropagation(
@ -112,22 +110,21 @@ public class SparkResultToOrganizationFromIstRepoJob2 {
broadcast_datasourceorganizationassoc) broadcast_datasourceorganizationassoc)
.as(Encoders.bean(ResultOrganizationSet.class)); .as(Encoders.bean(ResultOrganizationSet.class));
getNewRelations(
getNewRelations( spark
spark .read()
.read() .textFile(alreadylinked)
.textFile(alreadylinked) .map(
.map( value -> OBJECT_MAPPER
value -> OBJECT_MAPPER .readValue(
.readValue( value, ResultOrganizationSet.class),
value, ResultOrganizationSet.class), Encoders.bean(ResultOrganizationSet.class)),
Encoders.bean(ResultOrganizationSet.class)), potentialUpdates)
potentialUpdates) .toJSON()
.toJSON() .write()
.write() .mode(SaveMode.Append)
.mode(SaveMode.Append) .option("compression", "gzip")
.option("compression", "gzip") .text(outputPath);
.text(outputPath);
} }