diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java index d8641f272..96d1f150a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/graph/GraphJoiner.java @@ -18,6 +18,24 @@ import scala.Tuple2; import java.io.Serializable; import java.util.List; +/** + * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. + * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, + * and all the possible relationships (similarity links produced by the Dedup process are excluded). + * + * The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again + * by E, finally grouped by E.id; + * + * Different manipulations of the E and R sets are introduced to reduce the complexity of the operation + * 1) treat the object payload as string, extracting only the necessary information beforehand using json path, + * it seems that deserializing it with jackson's object mapper has higher memory footprint. + * + * 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false) + * 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S + * and E_target = T. Objects in T are heavily pruned by all the unnecessary information + * + * 4) perform the join as (((T join R) union S) groupby S.id) yield S -> [ ] + */ public class GraphJoiner implements Serializable { public static final int MAX_RELS = 10; @@ -26,6 +44,7 @@ public class GraphJoiner implements Serializable { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + // read each entity JavaPairRDD datasource = readPathEntity(sc, inputPath, "datasource"); JavaPairRDD organization = readPathEntity(sc, inputPath, "organization"); JavaPairRDD project = readPathEntity(sc, inputPath, "project"); @@ -34,6 +53,7 @@ public class GraphJoiner implements Serializable { JavaPairRDD software = readPathEntity(sc, inputPath, "software"); JavaPairRDD publication = readPathEntity(sc, inputPath, "publication"); + // create the union between all the entities final String entitiesPath = outPath + "/entities"; datasource .union(organization) @@ -50,8 +70,9 @@ public class GraphJoiner implements Serializable { .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t)); + // reads the relationships final JavaPairRDD relation = readPathRelation(sc, inputPath) - .filter(r -> !r.getDeleted()) + .filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted .map(p -> new EntityRelEntity().setRelation(p)) .mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p)) .groupByKey() @@ -98,8 +119,16 @@ public class GraphJoiner implements Serializable { }) .map(e -> new ObjectMapper().writeValueAsString(e)) .saveAsTextFile(outPath + "/linked_entities", GzipCodec.class); - } + } + /** + * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file , + * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.TypedRow + * @param sc + * @param inputPath + * @param type + * @return the JavaPairRDD indexed by entity identifier + */ private JavaPairRDD readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class) .mapToPair((PairFunction, String, TypedRow>) item -> { @@ -114,6 +143,13 @@ public class GraphJoiner implements Serializable { }); } + /** + * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file , + * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.TypedRow + * @param sc + * @param inputPath + * @return the JavaRDD containing all the relationships + */ private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) { return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) .map(item -> {