minor changes

This commit is contained in:
Miriam Baglioni 2020-04-14 16:45:12 +02:00
parent e0038bde5b
commit ceb1f299bf
1 changed files with 18 additions and 8 deletions

View File

@ -55,8 +55,8 @@ public class PrepareProjectResultsAssociation {
runWithSparkHiveSession(conf, isSparkSessionManaged,
spark -> {
removeOutputDir(spark, potentialUpdatePath);
removeOutputDir(spark, alreadyLinkedPath);
// removeOutputDir(spark, potentialUpdatePath);
// removeOutputDir(spark, alreadyLinkedPath);
prepareResultProjProjectResults(spark, inputPath, potentialUpdatePath, alreadyLinkedPath, allowedsemrel);
});
@ -91,9 +91,14 @@ public class PrepareProjectResultsAssociation {
"GROUP BY projectId ";
spark.sql(query).as(Encoders.bean(ProjectResultSet.class))
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(potentialUpdatePath, GzipCodec.class);
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.text(potentialUpdatePath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(potentialUpdatePath, GzipCodec.class);
query = "SELECT target projectId, collect_set(source) resultSet " +
@ -102,9 +107,14 @@ public class PrepareProjectResultsAssociation {
spark.sql(query)
.as(Encoders.bean(ProjectResultSet.class))
.toJavaRDD()
.map(r -> OBJECT_MAPPER.writeValueAsString(r))
.saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
.toJSON()
.write()
.mode(SaveMode.Overwrite)
.option("compression","gzip")
.text(alreadyLinkedPath);
// .toJavaRDD()
// .map(r -> OBJECT_MAPPER.writeValueAsString(r))
// .saveAsTextFile(alreadyLinkedPath, GzipCodec.class);
}