1
0
Fork 0

experimenting with pruning of relations

This commit is contained in:
Claudio Atzori 2020-07-10 10:06:41 +02:00
parent 67e1d222b6
commit ff4d6214f1
1 changed files with 174 additions and 170 deletions

View File

@ -135,27 +135,31 @@ public class PrepareRelationsJob {
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
Set<String> relationFilter, int maxRelations, int relPartitions) { Set<String> relationFilter, int maxRelations, int relPartitions) {
// group by SOURCE and apply limit JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath);
RDD<Relation> bySource = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getSource()), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator)
.map(Tuple2::_2)
.rdd();
JavaRDD<Relation> pruned = pruneRels(
pruneRels(rels, relationFilter, maxRelations, relPartitions, (Function<Relation, String>) r -> r.getSource()),
relationFilter, maxRelations, relPartitions, (Function<Relation, String>) r -> r.getTarget());
spark spark
.createDataset(bySource, Encoders.bean(Relation.class)) .createDataset(pruned.rdd(), Encoders.bean(Relation.class))
.repartition(relPartitions) .repartition(relPartitions)
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.parquet(outputPath); .parquet(outputPath);
} }
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, Set<String> relationFilter, int maxRelations, int relPartitions, Function<Relation, String> idFn) {
return rels
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator).map(Tuple2::_2);
}
// experimental // experimental
private static void prepareRelationsDataset( private static void prepareRelationsDataset(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations, SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,