allow to set different to relations cut points by source and by target; adjusted weight assigned to relationship types

This commit is contained in:
Claudio Atzori 2020-07-10 13:59:48 +02:00
parent ff4d6214f1
commit b21866a2da
3 changed files with 206 additions and 187 deletions

View File

@ -100,11 +100,17 @@ public class PrepareRelationsJob {
.orElse(new HashSet<>());
log.info("relationFilter: {}", relationFilter);
int maxRelations = Optional
.ofNullable(parser.get("maxRelations"))
int sourceMaxRelations = Optional
.ofNullable(parser.get("sourceMaxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
log.info("maxRelations: {}", maxRelations);
log.info("sourceMaxRelations: {}", sourceMaxRelations);
int targetMaxRelations = Optional
.ofNullable(parser.get("targetMaxRelations"))
.map(Integer::valueOf)
.orElse(MAX_RELS);
log.info("targetMaxRelations: {}", targetMaxRelations);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
@ -116,7 +122,8 @@ public class PrepareRelationsJob {
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsRDD(
spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions);
spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations,
relPartitions);
});
}
@ -129,17 +136,22 @@ public class PrepareRelationsJob {
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
* @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source
* @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target
* @param relPartitions number of partitions for the output RDD
*/
private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath,
Set<String> relationFilter, int maxRelations, int relPartitions) {
Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath);
JavaRDD<Relation> rels = readPathRelationRDD(spark, inputRelationsPath)
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false);
JavaRDD<Relation> pruned = pruneRels(
pruneRels(rels, relationFilter, maxRelations, relPartitions, (Function<Relation, String>) r -> r.getSource()),
relationFilter, maxRelations, relPartitions, (Function<Relation, String>) r -> r.getTarget());
pruneRels(
rels,
sourceMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getSource()),
targetMaxRelations, relPartitions, (Function<Relation, String>) r -> r.getTarget());
spark
.createDataset(pruned.rdd(), Encoders.bean(Relation.class))
.repartition(relPartitions)
@ -148,16 +160,16 @@ public class PrepareRelationsJob {
.parquet(outputPath);
}
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, Set<String> relationFilter, int maxRelations, int relPartitions, Function<Relation, String> idFn) {
private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, int maxRelations,
int relPartitions, Function<Relation, String> idFn) {
return rels
.filter(rel -> rel.getDataInfo().getDeletedbyinference() == false)
.filter(rel -> relationFilter.contains(rel.getRelClass()) == false)
.mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.groupBy(Tuple2::_1)
.map(Tuple2::_2)
.map(t -> Iterables.limit(t, maxRelations))
.flatMap(Iterable::iterator).map(Tuple2::_2);
.flatMap(Iterable::iterator)
.map(Tuple2::_2);
}
// experimental

View File

@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable<SortableRelationKey>, Ser
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("review", 2);
weights.put("citation", 3);
weights.put("affiliation", 4);
weights.put("relationship", 5);
weights.put("publicationDataset", 6);
weights.put("similarity", 7);
weights.put("participation", 0);
weights.put("provision", 8);
weights.put("participation", 9);
weights.put("dedup", 10);
weights.put("outcome", 1);
weights.put("affiliation", 2);
weights.put("dedup", 3);
weights.put("publicationDataset", 4);
weights.put("citation", 5);
weights.put("supplement", 6);
weights.put("review", 7);
weights.put("relationship", 8);
weights.put("provision", 9);
weights.put("similarity", 10);
}
private static final long serialVersionUID = 3232323;

View File

@ -30,9 +30,16 @@
"paramRequired": false
},
{
"paramName": "mr",
"paramLongName": "maxRelations",
"paramDescription": "maximum number of relations allowed for a each entity",
"paramName": "smr",
"paramLongName": "sourceMaxRelations",
"paramDescription": "maximum number of relations allowed for a each entity grouping by source",
"paramRequired": false
},
{
"paramName": "tmr",
"paramLongName": "targetMaxRelations",
"paramDescription": "maximum number of relations allowed for a each entity grouping by target",
"paramRequired": false
}
]