1
0
Fork 0

GroupEntitiesSparkJob must read all graph paths but relations

This commit is contained in:
Claudio Atzori 2020-11-26 11:04:01 +01:00
parent 76363a8512
commit 13eae4b31e
1 changed files with 3 additions and 3 deletions

View File

@ -90,7 +90,7 @@ public class GroupEntitiesSparkJob {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark spark
.read() .read()
.textFile(toSeq(listPaths(inputPath, sc))) .textFile(toSeq(listEntityPaths(inputPath, sc)))
.map((MapFunction<String, OafEntity>) s -> parseOaf(s), Encoders.kryo(OafEntity.class)) .map((MapFunction<String, OafEntity>) s -> parseOaf(s), Encoders.kryo(OafEntity.class))
.filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e))) .filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e)))
.groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING()) .groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
@ -191,11 +191,11 @@ public class GroupEntitiesSparkJob {
} }
} }
private static List<String> listPaths(String inputPath, JavaSparkContext sc) { private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration()) .listFiles(inputPath, sc.hadoopConfiguration())
.stream() .stream()
.filter(f -> !f.equals("relation")) .filter(f -> !f.toLowerCase().contains("relation"))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }