1
0
Fork 0

[graph dedup] consistency wf should not remove the relations while dispatching the entities

This commit is contained in:
Claudio Atzori 2023-09-12 14:34:28 +02:00
parent 4786aa0e09
commit 8a6892cc63
1 changed files with 5 additions and 6 deletions

View File

@ -47,17 +47,14 @@ public class DispatchEntitiesSparkJob {
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible); log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible));
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
dispatchEntities(spark, inputPath, outputPath, filterInvisible);
});
} }
private static void dispatchEntities( private static void dispatchEntities(
@ -72,7 +69,9 @@ public class DispatchEntitiesSparkJob {
String entityType = entry.getKey(); String entityType = entry.getKey();
Class<?> clazz = entry.getValue(); Class<?> clazz = entry.getValue();
final String entityPath = outputPath + "/" + entityType;
if (!entityType.equalsIgnoreCase("relation")) { if (!entityType.equalsIgnoreCase("relation")) {
HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> entityDF = spark Dataset<Row> entityDF = spark
.read() .read()
.schema(Encoders.bean(clazz).schema()) .schema(Encoders.bean(clazz).schema())
@ -91,7 +90,7 @@ public class DispatchEntitiesSparkJob {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "/" + entityType); .json(entityPath);
} }
}); });
} }