dataset based provision WIP

This commit is contained in:
Claudio Atzori 2020-04-06 17:34:25 +02:00
parent ca345aaad3
commit e355961997
6 changed files with 88 additions and 112 deletions

View File

@ -1,21 +1,26 @@
package eu.dnetlib.dhp.oa.provision; package eu.dnetlib.dhp.oa.provision;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Optional; import java.util.Optional;
@ -87,7 +92,7 @@ public class PrepareRelationsJob {
runWithSparkSession(conf, isSparkSessionManaged, runWithSparkSession(conf, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
prepareRelationsFromPaths(spark, inputRelationsPath, outputPath, numPartitions); prepareRelationsRDDFromPaths(spark, inputRelationsPath, outputPath, numPartitions);
}); });
} }
@ -115,6 +120,30 @@ public class PrepareRelationsJob {
.map((MapFunction<String, SortableRelation>) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class)); .map((MapFunction<String, SortableRelation>) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class));
} }
private static void prepareRelationsRDDFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath)
.repartition(numPartitions);
RDD<SortableRelation> d = rels
.filter(rel -> !rel.getDataInfo().getDeletedbyinference()) //only consider those that are not virtually deleted
.mapToPair((PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
.groupByKey(new RelationPartitioner(rels.getNumPartitions()))
.map(p -> Iterables.limit(p._2(), MAX_RELS))
.flatMap(p -> p.iterator())
.rdd();
spark.createDataset(d, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
private static JavaRDD<SortableRelation> readPathRelationRDD(SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return sc.textFile(inputPath)
.map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class));
}
private static void removeOutputDir(SparkSession spark, String path) { private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }

View File

@ -115,14 +115,6 @@ public class XmlConverterJob {
spark.read() spark.read()
.load(inputPath) .load(inputPath)
.as(Encoders.bean(JoinedEntity.class)) .as(Encoders.bean(JoinedEntity.class))
/* .map((MapFunction<JoinedEntity, String>) value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING())
.write()
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.text("/tmp/json");
spark.read()
.textFile("/tmp/json")
.map((MapFunction<String, JoinedEntity>) value -> OBJECT_MAPPER.readValue(value, JoinedEntity.class), Encoders.bean(JoinedEntity.class))
.map((MapFunction<JoinedEntity, JoinedEntity>) j -> { .map((MapFunction<JoinedEntity, JoinedEntity>) j -> {
if (j.getLinks() != null) { if (j.getLinks() != null) {
j.setLinks(j.getLinks() j.setLinks(j.getLinks()
@ -132,8 +124,6 @@ public class XmlConverterJob {
} }
return j; return j;
}, Encoders.bean(JoinedEntity.class)) }, Encoders.bean(JoinedEntity.class))
*/
.map((MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>( .map((MapFunction<JoinedEntity, Tuple2<String, String>>) je -> new Tuple2<>(
je.getEntity().getId(), je.getEntity().getId(),
recordFactory.build(je) recordFactory.build(je)

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.oa.provision.model; package eu.dnetlib.dhp.oa.provision.model;
import com.google.common.base.Objects;
import java.io.Serializable; import java.io.Serializable;
public class EntityRelEntity implements Serializable { public class EntityRelEntity implements Serializable {
@ -44,4 +46,19 @@ public class EntityRelEntity implements Serializable {
public void setTarget(RelatedEntity target) { public void setTarget(RelatedEntity target) {
this.target = target; this.target = target;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EntityRelEntity that = (EntityRelEntity) o;
return Objects.equal(entity, that.entity) &&
Objects.equal(relation, that.relation) &&
Objects.equal(target, that.target);
}
@Override
public int hashCode() {
return Objects.hashCode(entity, relation, target);
}
} }

View File

@ -1,5 +1,6 @@
package eu.dnetlib.dhp.oa.provision.model; package eu.dnetlib.dhp.oa.provision.model;
import com.google.common.base.Objects;
import eu.dnetlib.dhp.schema.oaf.Instance; import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue; import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier; import eu.dnetlib.dhp.schema.oaf.Qualifier;
@ -228,4 +229,39 @@ public class RelatedEntity implements Serializable {
public void setFundingtree(List<String> fundingtree) { public void setFundingtree(List<String> fundingtree) {
this.fundingtree = fundingtree; this.fundingtree = fundingtree;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RelatedEntity that = (RelatedEntity) o;
return Objects.equal(id, that.id) &&
Objects.equal(type, that.type) &&
Objects.equal(title, that.title) &&
Objects.equal(websiteurl, that.websiteurl) &&
Objects.equal(dateofacceptance, that.dateofacceptance) &&
Objects.equal(publisher, that.publisher) &&
Objects.equal(pid, that.pid) &&
Objects.equal(codeRepositoryUrl, that.codeRepositoryUrl) &&
Objects.equal(resulttype, that.resulttype) &&
Objects.equal(collectedfrom, that.collectedfrom) &&
Objects.equal(instances, that.instances) &&
Objects.equal(officialname, that.officialname) &&
Objects.equal(datasourcetype, that.datasourcetype) &&
Objects.equal(datasourcetypeui, that.datasourcetypeui) &&
Objects.equal(openairecompatibility, that.openairecompatibility) &&
Objects.equal(legalname, that.legalname) &&
Objects.equal(legalshortname, that.legalshortname) &&
Objects.equal(country, that.country) &&
Objects.equal(projectTitle, that.projectTitle) &&
Objects.equal(code, that.code) &&
Objects.equal(acronym, that.acronym) &&
Objects.equal(contracttype, that.contracttype) &&
Objects.equal(fundingtree, that.fundingtree);
}
@Override
public int hashCode() {
return Objects.hashCode(id, type, title, websiteurl, dateofacceptance, publisher, pid, codeRepositoryUrl, resulttype, collectedfrom, instances, officialname, datasourcetype, datasourcetypeui, openairecompatibility, legalname, legalshortname, country, projectTitle, code, acronym, contracttype, fundingtree);
}
} }

View File

@ -1,96 +0,0 @@
package eu.dnetlib.dhp.oa.provision.model;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.Serializable;
import java.util.Map;
/**
* Allows to sort relationships according to the priority defined in weights map.
*/
public class SortableRelationKey implements Comparable<SortableRelationKey>, Serializable {
private String sourceId;
private String targetId;
private String relType;
private String subRelType;
private String relClass;
private final static Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("publicationDataset", 2);
weights.put("relationship", 3);
weights.put("similarity", 4);
weights.put("affiliation", 5);
weights.put("provision", 6);
weights.put("participation", 7);
weights.put("dedup", 8);
}
public static SortableRelationKey from(final Relation r) {
final SortableRelationKey s = new SortableRelationKey();
s.setSourceId(r.getSource());
s.setTargetId(r.getTarget());
s.setRelType(r.getRelType());
s.setSubRelType(r.getSubRelType());
s.setRelClass(r.getRelClass());
return s;
}
public String getSourceId() {
return sourceId;
}
public void setSourceId(String sourceId) {
this.sourceId = sourceId;
}
public String getTargetId() {
return targetId;
}
public void setTargetId(String targetId) {
this.targetId = targetId;
}
public String getRelType() {
return relType;
}
public void setRelType(String relType) {
this.relType = relType;
}
public String getSubRelType() {
return subRelType;
}
public void setSubRelType(String subRelType) {
this.subRelType = subRelType;
}
public String getRelClass() {
return relClass;
}
public void setRelClass(String relClass) {
this.relClass = relClass;
}
@Override
public int compareTo(SortableRelationKey o) {
return ComparisonChain.start()
.compare(weights.get(getSubRelType()), weights.get(o.getSubRelType()))
.compare(getSourceId(), o.getSourceId())
.compare(getTargetId(), o.getTargetId())
.result();
}
}

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.oa.provision.utils; package eu.dnetlib.dhp.oa.provision.utils;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import org.apache.spark.Partitioner; import org.apache.spark.Partitioner;
import org.apache.spark.util.Utils; import org.apache.spark.util.Utils;
@ -23,7 +23,7 @@ public class RelationPartitioner extends Partitioner {
@Override @Override
public int getPartition(Object key) { public int getPartition(Object key) {
return Utils.nonNegativeMod(((SortableRelationKey) key).getSourceId().hashCode(), numPartitions()); return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions());
} }
} }