package eu.dnetlib.dhp.graph; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import; import; import com.jayway.jsonpath.DocumentContext; import com.jayway.jsonpath.JsonPath; import eu.dnetlib.dhp.schema.oaf.*; import net.minidev.json.JSONArray; import; import; import; import; import; import; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import; import; import java.util.List; import; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. * The operation considers all the entity types (publication, dataset, software, ORP, project, datasource, organization, * and all the possible relationships (similarity links produced by the Dedup process are excluded). * * The operation is implemented creating the union between the entity types (E), joined by the relationships (R), and again * by E, finally grouped by; * * Different manipulations of the E and R sets are introduced to reduce the complexity of the operation * 1) treat the object payload as string, extracting only the necessary information beforehand using json path, * it seems that deserializing it with jackson's object mapper has higher memory footprint. * * 2) only consider rels that are not virtually deleted ($.dataInfo.deletedbyinference == false) * 3) we only need a subset of fields from the related entities, so we introduce a distinction between E_source = S * and E_target = T. Objects in T are heavily pruned by all the unnecessary information * * 4) perform the join as (((T join R) union S) groupby yield S -> [ ] */ public class GraphJoiner implements Serializable { public static final int MAX_RELS = 100; public void join(final SparkSession spark, final String inputPath, final String hiveDbName, final String outPath) { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); // read each entity JavaPairRDD datasource = readPathEntity(sc, inputPath, "datasource"); JavaPairRDD organization = readPathEntity(sc, inputPath, "organization"); JavaPairRDD project = readPathEntity(sc, inputPath, "project"); JavaPairRDD dataset = readPathEntity(sc, inputPath, "dataset"); JavaPairRDD otherresearchproduct = readPathEntity(sc, inputPath, "otherresearchproduct"); JavaPairRDD software = readPathEntity(sc, inputPath, "software"); JavaPairRDD publication = readPathEntity(sc, inputPath, "publication"); // create the union between all the entities final String entitiesPath = outPath + "/entities"; datasource .union(organization) .union(project) .union(dataset) .union(otherresearchproduct) .union(software) .union(publication) .map(e -> new EntityRelEntity().setSource(e._2())) .map(MappingUtils::serialize) .saveAsTextFile(entitiesPath, GzipCodec.class); JavaPairRDD entities = sc.textFile(entitiesPath) .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getSource().getSourceId(), t)); // reads the relationships final JavaPairRDD relation = readPathRelation(sc, inputPath) .filter(r -> !r.getDeleted()) //only consider those that are not virtually deleted .map(p -> new EntityRelEntity().setRelation(p)) .mapToPair(p -> new Tuple2<>(p.getRelation().getSourceId(), p)) .groupByKey() .map(p -> Iterables.limit(p._2(), MAX_RELS)) .flatMap(p -> p.iterator()) .mapToPair(p -> new Tuple2<>(p.getRelation().getTargetId(), p)); final String joinByTargetPath = outPath + "/join_by_target"; relation .join(entities .filter(e -> !e._2().getSource().getDeleted()) .mapToPair(e -> new Tuple2<>(e._1(), MappingUtils.pruneModel(e._2())))) .map(s -> new EntityRelEntity() .setRelation(s._2()._1().getRelation()) .setTarget(s._2()._2().getSource())) .map(MappingUtils::serialize) .saveAsTextFile(joinByTargetPath, GzipCodec.class); JavaPairRDD bySource = sc.textFile(joinByTargetPath) .map(t -> new ObjectMapper().readValue(t, EntityRelEntity.class)) .mapToPair(t -> new Tuple2<>(t.getRelation().getSourceId(), t)); final String linkedEntitiesPath = outPath + "/linked_entities"; entities .union(bySource) .groupByKey() // by source id .map(GraphJoiner::asLinkedEntityWrapper) .map(MappingUtils::serialize) .saveAsTextFile(linkedEntitiesPath, GzipCodec.class); } /** * Reads a set of eu.dnetlib.dhp.schema.oaf.OafEntity objects from a sequence file , * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.TypedRow * @param sc * @param inputPath * @param type * @return the JavaPairRDD indexed by entity identifier */ private JavaPairRDD readPathEntity(final JavaSparkContext sc, final String inputPath, final String type) { return sc.sequenceFile(inputPath + "/" + type, Text.class, Text.class) .mapToPair((PairFunction, String, TypedRow>) item -> { final String json = item._2().toString(); final String id =, "$.id"); return new Tuple2<>(id, new TypedRow() .setSourceId(id) .setDeleted(, "$.dataInfo.deletedbyinference")) .setType(type) .setOaf(json)); }); } /** * Reads a set of eu.dnetlib.dhp.schema.oaf.Relation objects from a sequence file , * extracts necessary information using json path, wraps the oaf object in a eu.dnetlib.dhp.graph.TypedRow * @param sc * @param inputPath * @return the JavaRDD containing all the relationships */ private JavaRDD readPathRelation(final JavaSparkContext sc, final String inputPath) { return sc.sequenceFile(inputPath + "/relation", Text.class, Text.class) .map(item -> { final String s = item._2().toString(); final DocumentContext json = JsonPath.parse(s); return new TypedRow() .setSourceId("$.source")) .setTargetId("$.target")) .setDeleted("$.dataInfo.deletedbyinference")) .setType("relation") .setOaf(s); }); } private static LinkedEntityWrapper asLinkedEntityWrapper(Tuple2> p) { final LinkedEntityWrapper e = new LinkedEntityWrapper(); final List links = Lists.newArrayList(); for (EntityRelEntity rel : p._2()) { if (rel.hasMainEntity() & e.getEntity() == null) { e.setEntity(rel.getSource()); } if (rel.hasRelatedEntity()) { links.add(new TupleWrapper() .setRelation(rel.getRelation()) .setTarget(rel.getTarget())); } } e.setLinks(links); if (e.getEntity() == null) { throw new IllegalStateException("missing main entity on '" + p._1() + "'"); } return e; } }