diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 5a70e258f..337a2ebbb 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -1,21 +1,26 @@ package eu.dnetlib.dhp.oa.provision; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.FlatMapGroupsFunction; -import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.*; +import org.apache.spark.rdd.RDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.Tuple2; import java.util.Optional; @@ -87,7 +92,7 @@ public class PrepareRelationsJob { runWithSparkSession(conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions); + prepareRelationsRDDFromPaths(spark, inputRelationsPath, outputPath, numPartitions); }); } @@ -115,6 +120,30 @@ public class PrepareRelationsJob { .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class)); } + private static void prepareRelationsRDDFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) { + JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath) + .repartition(numPartitions); + + RDD d = rels + .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) //only consider those that are not virtually deleted + .mapToPair((PairFunction) rel -> new Tuple2<>(rel, rel)) + .groupByKey(new RelationPartitioner(rels.getNumPartitions())) + .map(p -> Iterables.limit(p._2(), MAX_RELS)) + .flatMap(p -> p.iterator()) + .rdd(); + + spark.createDataset(d, Encoders.bean(SortableRelation.class)) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static JavaRDD readPathRelationRDD(SparkSession spark, final String inputPath) { + JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + return sc.textFile(inputPath) + .map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class)); + } + private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java index 910cd8543..059cb31f2 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/XmlConverterJob.java @@ -115,14 +115,6 @@ public class XmlConverterJob { spark.read() .load(inputPath) .as(Encoders.bean(JoinedEntity.class)) - /* .map((MapFunction) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()) - .write() - .option("codec", "org.apache.hadoop.io.compress.GzipCodec") - .text("/tmp/json"); - - spark.read() - .textFile("/tmp/json") - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, JoinedEntity.class), Encoders.bean(JoinedEntity.class)) .map((MapFunction) j -> { if (j.getLinks() != null) { j.setLinks(j.getLinks() @@ -132,8 +124,6 @@ public class XmlConverterJob { } return j; }, Encoders.bean(JoinedEntity.class)) - - */ .map((MapFunction>) je -> new Tuple2<>( je.getEntity().getId(), recordFactory.build(je) diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java index 35dfa41d3..e1ca8e316 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/EntityRelEntity.java @@ -1,5 +1,7 @@ package eu.dnetlib.dhp.oa.provision.model; +import com.google.common.base.Objects; + import java.io.Serializable; public class EntityRelEntity implements Serializable { @@ -44,4 +46,19 @@ public class EntityRelEntity implements Serializable { public void setTarget(RelatedEntity target) { this.target = target; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + EntityRelEntity that = (EntityRelEntity) o; + return Objects.equal(entity, that.entity) && + Objects.equal(relation, that.relation) && + Objects.equal(target, that.target); + } + + @Override + public int hashCode() { + return Objects.hashCode(entity, relation, target); + } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java index 2e5b4186c..011d9276d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntity.java @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.oa.provision.model; +import com.google.common.base.Objects; import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.Qualifier; @@ -228,4 +229,39 @@ public class RelatedEntity implements Serializable { public void setFundingtree(List fundingtree) { this.fundingtree = fundingtree; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RelatedEntity that = (RelatedEntity) o; + return Objects.equal(id, that.id) && + Objects.equal(type, that.type) && + Objects.equal(title, that.title) && + Objects.equal(websiteurl, that.websiteurl) && + Objects.equal(dateofacceptance, that.dateofacceptance) && + Objects.equal(publisher, that.publisher) && + Objects.equal(pid, that.pid) && + Objects.equal(codeRepositoryUrl, that.codeRepositoryUrl) && + Objects.equal(resulttype, that.resulttype) && + Objects.equal(collectedfrom, that.collectedfrom) && + Objects.equal(instances, that.instances) && + Objects.equal(officialname, that.officialname) && + Objects.equal(datasourcetype, that.datasourcetype) && + Objects.equal(datasourcetypeui, that.datasourcetypeui) && + Objects.equal(openairecompatibility, that.openairecompatibility) && + Objects.equal(legalname, that.legalname) && + Objects.equal(legalshortname, that.legalshortname) && + Objects.equal(country, that.country) && + Objects.equal(projectTitle, that.projectTitle) && + Objects.equal(code, that.code) && + Objects.equal(acronym, that.acronym) && + Objects.equal(contracttype, that.contracttype) && + Objects.equal(fundingtree, that.fundingtree); + } + + @Override + public int hashCode() { + return Objects.hashCode(id, type, title, websiteurl, dateofacceptance, publisher, pid, codeRepositoryUrl, resulttype, collectedfrom, instances, officialname, datasourcetype, datasourcetypeui, openairecompatibility, legalname, legalshortname, country, projectTitle, code, acronym, contracttype, fundingtree); + } } \ No newline at end of file diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java deleted file mode 100644 index fef9915e8..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java +++ /dev/null @@ -1,96 +0,0 @@ -package eu.dnetlib.dhp.oa.provision.model; - -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Maps; -import eu.dnetlib.dhp.schema.oaf.Relation; - -import java.io.Serializable; -import java.util.Map; - -/** - * Allows to sort relationships according to the priority defined in weights map. - */ -public class SortableRelationKey implements Comparable, Serializable { - - private String sourceId; - private String targetId; - - private String relType; - private String subRelType; - private String relClass; - - private final static Map weights = Maps.newHashMap(); - - static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("publicationDataset", 2); - weights.put("relationship", 3); - weights.put("similarity", 4); - weights.put("affiliation", 5); - - weights.put("provision", 6); - weights.put("participation", 7); - weights.put("dedup", 8); - } - - public static SortableRelationKey from(final Relation r) { - final SortableRelationKey s = new SortableRelationKey(); - s.setSourceId(r.getSource()); - s.setTargetId(r.getTarget()); - s.setRelType(r.getRelType()); - s.setSubRelType(r.getSubRelType()); - s.setRelClass(r.getRelClass()); - return s; - } - - public String getSourceId() { - return sourceId; - } - - public void setSourceId(String sourceId) { - this.sourceId = sourceId; - } - - public String getTargetId() { - return targetId; - } - - public void setTargetId(String targetId) { - this.targetId = targetId; - } - - public String getRelType() { - return relType; - } - - public void setRelType(String relType) { - this.relType = relType; - } - - public String getSubRelType() { - return subRelType; - } - - public void setSubRelType(String subRelType) { - this.subRelType = subRelType; - } - - public String getRelClass() { - return relClass; - } - - public void setRelClass(String relClass) { - this.relClass = relClass; - } - - @Override - public int compareTo(SortableRelationKey o) { - return ComparisonChain.start() - .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) - .compare(getSourceId(), o.getSourceId()) - .compare(getTargetId(), o.getTargetId()) - .result(); - } - -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index 9714830d3..c8e7a2429 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -1,6 +1,6 @@ package eu.dnetlib.dhp.oa.provision.utils; -import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; +import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; @@ -23,7 +23,7 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - return Utils.nonNegativeMod(((SortableRelationKey) key).getSourceId().hashCode(), numPartitions()); + return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions()); } }