From b21866a2da7be44106713c3f3982e3ee646d22d4 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 10 Jul 2020 13:59:48 +0200 Subject: [PATCH] allow to set different to relations cut points by source and by target; adjusted weight assigned to relationship types --- .../dhp/oa/provision/PrepareRelationsJob.java | 358 +++++++++--------- .../provision/model/SortableRelationKey.java | 22 +- .../input_params_prepare_relations.json | 13 +- 3 files changed, 206 insertions(+), 187 deletions(-) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index e1f7386e9..da0a81021 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -59,204 +59,216 @@ import scala.Tuple2; */ public class PrepareRelationsJob { - private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); + private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static final int MAX_RELS = 100; + public static final int MAX_RELS = 100; - public static final int DEFAULT_NUM_PARTITIONS = 3000; + public static final int DEFAULT_NUM_PARTITIONS = 3000; - public static void main(String[] args) throws Exception { - String jsonConfiguration = IOUtils - .toString( - PrepareRelationsJob.class - .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); - final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); - parser.parseArgument(args); + public static void main(String[] args) throws Exception { + String jsonConfiguration = IOUtils + .toString( + PrepareRelationsJob.class + .getResourceAsStream( + "/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json")); + final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); + parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - log.info("isSparkSessionManaged: {}", isSparkSessionManaged); + Boolean isSparkSessionManaged = Optional + .ofNullable(parser.get("isSparkSessionManaged")) + .map(Boolean::valueOf) + .orElse(Boolean.TRUE); + log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String inputRelationsPath = parser.get("inputRelationsPath"); - log.info("inputRelationsPath: {}", inputRelationsPath); + String inputRelationsPath = parser.get("inputRelationsPath"); + log.info("inputRelationsPath: {}", inputRelationsPath); - String outputPath = parser.get("outputPath"); - log.info("outputPath: {}", outputPath); + String outputPath = parser.get("outputPath"); + log.info("outputPath: {}", outputPath); - int relPartitions = Optional - .ofNullable(parser.get("relPartitions")) - .map(Integer::valueOf) - .orElse(DEFAULT_NUM_PARTITIONS); - log.info("relPartitions: {}", relPartitions); + int relPartitions = Optional + .ofNullable(parser.get("relPartitions")) + .map(Integer::valueOf) + .orElse(DEFAULT_NUM_PARTITIONS); + log.info("relPartitions: {}", relPartitions); - Set relationFilter = Optional - .ofNullable(parser.get("relationFilter")) - .map(s -> Sets.newHashSet(Splitter.on(",").split(s))) - .orElse(new HashSet<>()); - log.info("relationFilter: {}", relationFilter); + Set relationFilter = Optional + .ofNullable(parser.get("relationFilter")) + .map(s -> Sets.newHashSet(Splitter.on(",").split(s))) + .orElse(new HashSet<>()); + log.info("relationFilter: {}", relationFilter); - int maxRelations = Optional - .ofNullable(parser.get("maxRelations")) - .map(Integer::valueOf) - .orElse(MAX_RELS); - log.info("maxRelations: {}", maxRelations); + int sourceMaxRelations = Optional + .ofNullable(parser.get("sourceMaxRelations")) + .map(Integer::valueOf) + .orElse(MAX_RELS); + log.info("sourceMaxRelations: {}", sourceMaxRelations); - SparkConf conf = new SparkConf(); - conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); + int targetMaxRelations = Optional + .ofNullable(parser.get("targetMaxRelations")) + .map(Integer::valueOf) + .orElse(MAX_RELS); + log.info("targetMaxRelations: {}", targetMaxRelations); - runWithSparkSession( - conf, - isSparkSessionManaged, - spark -> { - removeOutputDir(spark, outputPath); - prepareRelationsRDD( - spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); - }); - } + SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); - /** - * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering - * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are - * prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation. - * - * @param spark the spark session - * @param inputRelationsPath source path for the graph relations - * @param outputPath output path for the processed relations - * @param relationFilter set of relation filters applied to the `relClass` field - * @param maxRelations maximum number of allowed outgoing edges - * @param relPartitions number of partitions for the output RDD - */ - private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, - Set relationFilter, int maxRelations, int relPartitions) { - - JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath); - - JavaRDD pruned = pruneRels( - pruneRels(rels, relationFilter, maxRelations, relPartitions, (Function) r -> r.getSource()), - relationFilter, maxRelations, relPartitions, (Function) r -> r.getTarget()); - spark - .createDataset(pruned.rdd(), Encoders.bean(Relation.class)) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - private static JavaRDD pruneRels(JavaRDD rels, Set relationFilter, int maxRelations, int relPartitions, Function idFn) { - return rels - .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) - .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r)) - .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) - .groupBy(Tuple2::_1) - .map(Tuple2::_2) - .map(t -> Iterables.limit(t, maxRelations)) - .flatMap(Iterable::iterator).map(Tuple2::_2); + runWithSparkSession( + conf, + isSparkSessionManaged, + spark -> { + removeOutputDir(spark, outputPath); + prepareRelationsRDD( + spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations, + relPartitions); + }); } - // experimental - private static void prepareRelationsDataset( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, - int relPartitions) { - spark - .read() - .textFile(inputRelationsPath) - .repartition(relPartitions) - .map( - (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), - Encoders.kryo(Relation.class)) - .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) - .groupByKey( - (MapFunction) Relation::getSource, - Encoders.STRING()) - .agg(new RelationAggregator(maxRelations).toColumn()) - .flatMap( - (FlatMapFunction, Relation>) t -> Iterables - .limit(t._2().getRelations(), maxRelations) - .iterator(), - Encoders.bean(Relation.class)) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } + /** + * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering + * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are + * prioritized according to the weights indicated in eu.dnetlib.dhp.oa.provision.model.SortableRelation. + * + * @param spark the spark session + * @param inputRelationsPath source path for the graph relations + * @param outputPath output path for the processed relations + * @param relationFilter set of relation filters applied to the `relClass` field + * @param sourceMaxRelations maximum number of allowed outgoing edges grouping by relation.source + * @param targetMaxRelations maximum number of allowed outgoing edges grouping by relation.target + * @param relPartitions number of partitions for the output RDD + */ + private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, + Set relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) { - public static class RelationAggregator - extends Aggregator { + JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath) + .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter(rel -> relationFilter.contains(rel.getRelClass()) == false); - private int maxRelations; + JavaRDD pruned = pruneRels( + pruneRels( + rels, + sourceMaxRelations, relPartitions, (Function) r -> r.getSource()), + targetMaxRelations, relPartitions, (Function) r -> r.getTarget()); + spark + .createDataset(pruned.rdd(), Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } - public RelationAggregator(int maxRelations) { - this.maxRelations = maxRelations; - } + private static JavaRDD pruneRels(JavaRDD rels, int maxRelations, + int relPartitions, Function idFn) { + return rels + .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, idFn.call(r)), r)) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .groupBy(Tuple2::_1) + .map(Tuple2::_2) + .map(t -> Iterables.limit(t, maxRelations)) + .flatMap(Iterable::iterator) + .map(Tuple2::_2); + } - @Override - public RelationList zero() { - return new RelationList(); - } + // experimental + private static void prepareRelationsDataset( + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { + spark + .read() + .textFile(inputRelationsPath) + .repartition(relPartitions) + .map( + (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), + Encoders.kryo(Relation.class)) + .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) + .groupByKey( + (MapFunction) Relation::getSource, + Encoders.STRING()) + .agg(new RelationAggregator(maxRelations).toColumn()) + .flatMap( + (FlatMapFunction, Relation>) t -> Iterables + .limit(t._2().getRelations(), maxRelations) + .iterator(), + Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } - @Override - public RelationList reduce(RelationList b, Relation a) { - b.getRelations().add(a); - return getSortableRelationList(b); - } + public static class RelationAggregator + extends Aggregator { - @Override - public RelationList merge(RelationList b1, RelationList b2) { - b1.getRelations().addAll(b2.getRelations()); - return getSortableRelationList(b1); - } + private int maxRelations; - @Override - public RelationList finish(RelationList r) { - return getSortableRelationList(r); - } + public RelationAggregator(int maxRelations) { + this.maxRelations = maxRelations; + } - private RelationList getSortableRelationList(RelationList b1) { - RelationList sr = new RelationList(); - sr - .setRelations( - b1 - .getRelations() - .stream() - .limit(maxRelations) - .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); - return sr; - } + @Override + public RelationList zero() { + return new RelationList(); + } - @Override - public Encoder bufferEncoder() { - return Encoders.kryo(RelationList.class); - } + @Override + public RelationList reduce(RelationList b, Relation a) { + b.getRelations().add(a); + return getSortableRelationList(b); + } - @Override - public Encoder outputEncoder() { - return Encoders.kryo(RelationList.class); - } - } + @Override + public RelationList merge(RelationList b1, RelationList b2) { + b1.getRelations().addAll(b2.getRelations()); + return getSortableRelationList(b1); + } - /** - * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text - * file, - * - * @param spark - * @param inputPath - * @return the JavaRDD containing all the relationships - */ - private static JavaRDD readPathRelationRDD( - SparkSession spark, final String inputPath) { - JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); - } + @Override + public RelationList finish(RelationList r) { + return getSortableRelationList(r); + } - private static void removeOutputDir(SparkSession spark, String path) { - HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); - } + private RelationList getSortableRelationList(RelationList b1) { + RelationList sr = new RelationList(); + sr + .setRelations( + b1 + .getRelations() + .stream() + .limit(maxRelations) + .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); + return sr; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(RelationList.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(RelationList.class); + } + } + + /** + * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * file, + * + * @param spark + * @param inputPath + * @return the JavaRDD containing all the relationships + */ + private static JavaRDD readPathRelationRDD( + SparkSession spark, final String inputPath) { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); + } + + private static void removeOutputDir(SparkSession spark, String path) { + HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java index bf7f9330d..bd7b4d78e 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -16,18 +16,18 @@ public class SortableRelationKey implements Comparable, Ser private static final Map weights = Maps.newHashMap(); static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("review", 2); - weights.put("citation", 3); - weights.put("affiliation", 4); - weights.put("relationship", 5); - weights.put("publicationDataset", 6); - weights.put("similarity", 7); + weights.put("participation", 0); - weights.put("provision", 8); - weights.put("participation", 9); - weights.put("dedup", 10); + weights.put("outcome", 1); + weights.put("affiliation", 2); + weights.put("dedup", 3); + weights.put("publicationDataset", 4); + weights.put("citation", 5); + weights.put("supplement", 6); + weights.put("review", 7); + weights.put("relationship", 8); + weights.put("provision", 9); + weights.put("similarity", 10); } private static final long serialVersionUID = 3232323; diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json index 71b2becc4..33fa1dc8d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json @@ -30,9 +30,16 @@ "paramRequired": false }, { - "paramName": "mr", - "paramLongName": "maxRelations", - "paramDescription": "maximum number of relations allowed for a each entity", + "paramName": "smr", + "paramLongName": "sourceMaxRelations", + "paramDescription": "maximum number of relations allowed for a each entity grouping by source", + "paramRequired": false + }, + { + "paramName": "tmr", + "paramLongName": "targetMaxRelations", + "paramDescription": "maximum number of relations allowed for a each entity grouping by target", "paramRequired": false } + ]