improved extractEntity Relation workflows using dataset

This commit is contained in:
Sandro La Bruzzo 2020-08-06 10:28:24 +02:00
parent 0c3bc9ea4b
commit 9d9e9edbd2
17 changed files with 95 additions and 145 deletions

View File

@ -1,30 +0,0 @@
package eu.dnetlib.dhp.schema.scholexplorer;
import java.util.List;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class DLIRelation extends Relation {
private String dateOfCollection;
private List<KeyValue> collectedFrom;
public List<KeyValue> getCollectedFrom() {
return collectedFrom;
}
public void setCollectedFrom(List<KeyValue> collectedFrom) {
this.collectedFrom = collectedFrom;
}
public String getDateOfCollection() {
return dateOfCollection;
}
public void setDateOfCollection(String dateOfCollection) {
this.dateOfCollection = dateOfCollection;
}
}

View File

@ -100,7 +100,7 @@ public class DedupRecordFactory {
.forEach( .forEach(
pub -> { pub -> {
try { try {
Publication publication = mapper.readValue(pub, Publication.class); DLIPublication publication = mapper.readValue(pub, DLIPublication.class);
p.mergeFrom(publication); p.mergeFrom(publication);
p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor())); p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor()));

View File

@ -55,6 +55,7 @@ public class SparkCreateSimRels {
.as(Encoders.kryo(Oaf.class)) .as(Encoders.kryo(Oaf.class))
.map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING()) .map((MapFunction<Oaf, String>) p -> new ObjectMapper().writeValueAsString(p), Encoders.STRING())
.javaRDD() .javaRDD()
.repartition(1000)
.mapToPair( .mapToPair(
s -> { s -> {
MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s); MapDocument d = MapDocumentUtil.asMapDocumentWithJPath(dedupConf, s);

View File

@ -1,21 +1,16 @@
package eu.dnetlib.dedup.sx; package eu.dnetlib.dedup.sx;
import java.io.IOException; import eu.dnetlib.dhp.schema.scholexplorer.OafUtils;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.spark.sql.SaveMode;
import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
public class SparkPropagateRelationsJob { public class SparkPropagateRelationsJob {
@ -44,37 +39,39 @@ public class SparkPropagateRelationsJob {
.as(Encoders.bean(Relation.class)) .as(Encoders.bean(Relation.class))
.where("relClass == 'merges'"); .where("relClass == 'merges'");
final Dataset<DLIRelation> rels = spark final Dataset<Relation> rels = spark
.read() .read()
.load(relationPath) .load(relationPath)
.as(Encoders.kryo(DLIRelation.class)) .as(Encoders.kryo(Relation.class))
.map( .map(
(MapFunction<DLIRelation, DLIRelation>) r -> r, (MapFunction<Relation, Relation>) r -> r,
Encoders.bean(DLIRelation.class)); Encoders.bean(Relation.class));
final Dataset<DLIRelation> firstJoin = rels final Dataset<Relation> firstJoin = rels
.joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer") .joinWith(merge, merge.col("target").equalTo(rels.col("source")), "left_outer")
.map( .map(
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> { (MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
final Relation mergeRelation = r._2(); final Relation mergeRelation = r._2();
final DLIRelation relation = r._1(); final Relation relation = r._1();
if (mergeRelation != null) if (mergeRelation != null)
relation.setSource(mergeRelation.getSource()); relation.setSource(mergeRelation.getSource());
if (relation.getDataInfo()==null)
relation.setDataInfo(OafUtils.generateDataInfo("0.9",false));
return relation; return relation;
}, },
Encoders.bean(DLIRelation.class)); Encoders.bean(Relation.class));
final Dataset<DLIRelation> secondJoin = firstJoin final Dataset<Relation> secondJoin = firstJoin
.joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer") .joinWith(merge, merge.col("target").equalTo(firstJoin.col("target")), "left_outer")
.map( .map(
(MapFunction<Tuple2<DLIRelation, Relation>, DLIRelation>) r -> { (MapFunction<Tuple2<Relation, Relation>, Relation>) r -> {
final Relation mergeRelation = r._2(); final Relation mergeRelation = r._2();
final DLIRelation relation = r._1(); final Relation relation = r._1();
if (mergeRelation != null) if (mergeRelation != null)
relation.setTarget(mergeRelation.getSource()); relation.setTarget(mergeRelation.getSource());
return relation; return relation;
}, },
Encoders.kryo(DLIRelation.class)); Encoders.kryo(Relation.class));
secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath); secondJoin.write().mode(SaveMode.Overwrite).save(targetRelPath);
} }

View File

@ -2,7 +2,7 @@ package eu.dnetlib.dedup.sx
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation} import eu.dnetlib.dhp.schema.oaf.{Oaf, OafEntity, Relation}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown, OafUtils} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown, OafUtils}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
@ -11,7 +11,7 @@ import org.apache.spark.sql.functions.col
object SparkUpdateEntityWithDedupInfo { object SparkUpdateEntityWithDedupInfo {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json"))) val parser = new ArgumentApplicationParser(IOUtils.toString(SparkUpdateEntityWithDedupInfo.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/dedup/dedup_delete_by_inference_parameters.json")))
val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass) val logger = LoggerFactory.getLogger(SparkUpdateEntityWithDedupInfo.getClass)
parser.parseArgument(args) parser.parseArgument(args)
@ -24,7 +24,7 @@ object SparkUpdateEntityWithDedupInfo {
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
implicit val dlirelEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation]
val spark: SparkSession = SparkSession val spark: SparkSession = SparkSession

View File

@ -53,6 +53,7 @@
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--executor-cores=${sparkExecutorCores}
${sparkExtraOPT} ${sparkExtraOPT}
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn-cluster</arg>
@ -77,6 +78,7 @@
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--executor-cores=${sparkExecutorCores}
${sparkExtraOPT} ${sparkExtraOPT}
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn-cluster</arg>
@ -101,6 +103,7 @@
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--executor-cores=${sparkExecutorCores}
${sparkExtraOPT} ${sparkExtraOPT}
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn-cluster</arg>
@ -125,6 +128,7 @@
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--executor-cores=${sparkExecutorCores}
${sparkExtraOPT} ${sparkExtraOPT}
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn-cluster</arg>
@ -149,6 +153,7 @@
<spark-opts> <spark-opts>
--executor-memory ${sparkExecutorMemory} --executor-memory ${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory} --driver-memory=${sparkDriverMemory}
--executor-cores=${sparkExecutorCores}
${sparkExtraOPT} ${sparkExtraOPT}
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn-cluster</arg>

View File

@ -1,6 +1,6 @@
package eu.dnetlib.dhp.sx.ebi package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import org.apache.spark.sql.{Encoder, Encoders} import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.Aggregator
@ -168,26 +168,7 @@ object EBIAggregator {
} }
def getDLIRelationAggregator(): Aggregator[(String, DLIRelation), DLIRelation, DLIRelation] = new Aggregator[(String, DLIRelation), DLIRelation, DLIRelation]{
override def zero: DLIRelation = new DLIRelation()
override def reduce(b: DLIRelation, a: (String, DLIRelation)): DLIRelation = {
a._2
}
override def merge(a: DLIRelation, b: DLIRelation): DLIRelation = {
if(b!= null) b else a
}
override def finish(reduction: DLIRelation): DLIRelation = reduction
override def bufferEncoder: Encoder[DLIRelation] =
Encoders.kryo(classOf[DLIRelation])
override def outputEncoder: Encoder[DLIRelation] =
Encoders.kryo(classOf[DLIRelation])
}

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.sx.ebi package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Author, Instance, Journal, KeyValue, Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier import eu.dnetlib.dhp.schema.scholexplorer.OafUtils.createQualifier
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, OafUtils, ProvenaceInfo} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, OafUtils, ProvenaceInfo}
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import eu.dnetlib.scholexplorer.relation.RelationMapper import eu.dnetlib.scholexplorer.relation.RelationMapper
@ -115,8 +115,8 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
val dnetPublicationId = s"50|${DHPUtils.md5(s"$pmid::pmid")}" val dnetPublicationId = s"50|${DHPUtils.md5(s"$pmid::pmid")}"
targets.flatMap(l => { targets.flatMap(l => {
val relation = new DLIRelation val relation = new Relation
val inverseRelation = new DLIRelation val inverseRelation = new Relation
val targetDnetId = s"50|${DHPUtils.md5(s"${l.tpid.toLowerCase.trim}::${l.tpidType.toLowerCase.trim}")}" val targetDnetId = s"50|${DHPUtils.md5(s"${l.tpid.toLowerCase.trim}::${l.tpidType.toLowerCase.trim}")}"
val relInfo = relationMapper.get(l.relation.toLowerCase) val relInfo = relationMapper.get(l.relation.toLowerCase)
val relationSemantic = relInfo.getOriginal val relationSemantic = relInfo.getOriginal
@ -177,7 +177,7 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
val workingPath = parser.get("workingPath") val workingPath = parser.get("workingPath")
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication] implicit val oafpubEncoder: Encoder[Publication] = Encoders.kryo[Publication]
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author]) implicit val atEncoder: Encoder[Author] = Encoders.kryo(classOf[Author])
@ -197,7 +197,7 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
val oDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/baseline_links_updates_oaf").as[Oaf] val oDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/baseline_links_updates_oaf").as[Oaf]
oDataset.filter(p =>p.isInstanceOf[DLIRelation]).map(p => p.asInstanceOf[DLIRelation]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_relation") oDataset.filter(p =>p.isInstanceOf[Relation]).map(p => p.asInstanceOf[Relation]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_relation")
oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset") oDataset.filter(p =>p.isInstanceOf[DLIDataset]).map(p => p.asInstanceOf[DLIDataset]).write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_links_updates_dataset")
@ -230,14 +230,14 @@ case class EBILinks(relation:String, pubdate:String, tpid:String, tpidType:Strin
.write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi") .write.mode(SaveMode.Overwrite).save(s"$workingPath/baseline_dataset_ebi")
val rel: Dataset[DLIRelation] = spark.read.load(s"$workingPath/relation").as[DLIRelation] val rel: Dataset[Relation] = spark.read.load(s"$workingPath/relation").as[Relation]
val relupdate : Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[DLIRelation] val relupdate : Dataset[Relation] = spark.read.load(s"$workingPath/ebi_garr/baseline_links_updates_relation").as[Relation]
rel.union(relupdate) rel.union(relupdate)
.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) .map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING) .groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn) .agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2) .map(p => p._2)
.write.mode(SaveMode.Overwrite) .write.mode(SaveMode.Overwrite)
.save(s"$workingPath/baseline_relation_ebi") .save(s"$workingPath/baseline_relation_ebi")

View File

@ -2,7 +2,7 @@ package eu.dnetlib.dhp.sx.ebi
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
@ -38,7 +38,7 @@ object SparkCreateEBIDataFrame {
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset]) implicit val datasetEncoder: Encoder[DLIDataset] = Encoders.kryo(classOf[DLIDataset])
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication]) implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo(classOf[DLIPublication])
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo(classOf[DLIRelation]) implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
// logger.info("Extract Publication and relation from publication_xml") // logger.info("Extract Publication and relation from publication_xml")
// val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s => // val oafPubsRDD:RDD[Oaf] = sc.textFile(s"$workingPath/publication_xml").map(s =>
@ -63,7 +63,7 @@ object SparkCreateEBIDataFrame {
// spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf") // spark.createDataset(oafDatsRDD).write.mode(SaveMode.Append).save(s"$workingPath/oaf")
val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset]) val dataset: Dataset[DLIDataset] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIDataset]).map(d => d.asInstanceOf[DLIDataset])
val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication]) val publication: Dataset[DLIPublication] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIPublication]).map(d => d.asInstanceOf[DLIPublication])
val relations: Dataset[DLIRelation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[DLIRelation]).map(d => d.asInstanceOf[DLIRelation]) val relations: Dataset[Relation] = spark.read.load(s"$workingPath/oaf").as[Oaf].filter(o => o.isInstanceOf[Relation]).map(d => d.asInstanceOf[Relation])
publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder)) publication.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, pubEncoder))
.groupByKey(_._1)(Encoders.STRING) .groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn) .agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
@ -78,7 +78,7 @@ object SparkCreateEBIDataFrame {
relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder)) relations.map(d => (s"${d.getSource}::${d.getRelType}::${d.getTarget}", d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING) .groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn) .agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2) .map(p => p._2)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/relation") .write.mode(SaveMode.Overwrite).save(s"$workingPath/relation")

View File

@ -10,7 +10,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import scala.Tuple2; import scala.Tuple2;
@ -55,18 +55,18 @@ public class SparkSXGeneratePidSimlarity {
.equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::"))) .equalsIgnoreCase(StringUtils.substringAfter(t._2(), "::")))
.distinct(); .distinct();
JavaRDD<DLIRelation> simRel = datasetSimRel JavaRDD<Relation> simRel = datasetSimRel
.union(publicationSimRel) .union(publicationSimRel)
.map( .map(
s -> { s -> {
final DLIRelation r = new DLIRelation(); final Relation r = new Relation();
r.setSource(s._1()); r.setSource(s._1());
r.setTarget(s._2()); r.setTarget(s._2());
r.setRelType("similar"); r.setRelType("similar");
return r; return r;
}); });
spark spark
.createDataset(simRel.rdd(), Encoders.bean(DLIRelation.class)) .createDataset(simRel.rdd(), Encoders.bean(Relation.class))
.distinct() .distinct()
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)

View File

@ -31,7 +31,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset; import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication; import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
import net.minidev.json.JSONArray; import net.minidev.json.JSONArray;
@ -156,9 +155,9 @@ public class SparkScholexplorerCreateRawGraphJob {
SparkSXGeneratePidSimlarity SparkSXGeneratePidSimlarity
.generateDataFrame( .generateDataFrame(
spark, sc, inputPath.replace("/relation", ""), targetPath.replace("/relation", "")); spark, sc, inputPath.replace("/relation", ""), targetPath.replace("/relation", ""));
RDD<DLIRelation> rdd = union RDD<Relation> rdd = union
.mapToPair( .mapToPair(
(PairFunction<String, String, DLIRelation>) f -> { (PairFunction<String, String, Relation>) f -> {
final String source = getJPathString(SOURCEJSONPATH, f); final String source = getJPathString(SOURCEJSONPATH, f);
final String target = getJPathString(TARGETJSONPATH, f); final String target = getJPathString(TARGETJSONPATH, f);
final String reltype = getJPathString(RELJSONPATH, f); final String reltype = getJPathString(RELJSONPATH, f);
@ -175,7 +174,7 @@ public class SparkScholexplorerCreateRawGraphJob {
source.toLowerCase(), source.toLowerCase(),
reltype.toLowerCase(), reltype.toLowerCase(),
target.toLowerCase())), target.toLowerCase())),
mapper.readValue(f, DLIRelation.class)); mapper.readValue(f, Relation.class));
}) })
.reduceByKey( .reduceByKey(
(a, b) -> { (a, b) -> {
@ -186,7 +185,7 @@ public class SparkScholexplorerCreateRawGraphJob {
.rdd(); .rdd();
spark spark
.createDataset(rdd, Encoders.bean(DLIRelation.class)) .createDataset(rdd, Encoders.bean(Relation.class))
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(targetPath); .save(targetPath);

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.sx.graph package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation, DLIUnknown} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
import eu.dnetlib.dhp.sx.ebi.EBIAggregator import eu.dnetlib.dhp.sx.ebi.EBIAggregator
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal} import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
@ -12,7 +12,7 @@ import org.slf4j.LoggerFactory
object SparkSplitOafTODLIEntities { object SparkSplitOafTODLIEntities {
def getKeyRelation(rel:DLIRelation):String = { def getKeyRelation(rel:Relation):String = {
s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}" s"${rel.getSource}::${rel.getRelType}::${rel.getTarget}"
@ -30,13 +30,14 @@ object SparkSplitOafTODLIEntities {
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication] implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset] implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown] implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
implicit val relEncoder: Encoder[DLIRelation] = Encoders.kryo[DLIRelation] implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
val spark:SparkSession = SparkSession val spark:SparkSession = SparkSession
.builder() .builder()
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName) .appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.master(parser.get("master")) .master(parser.get("master"))
.getOrCreate() .getOrCreate()
@ -47,7 +48,7 @@ object SparkSplitOafTODLIEntities {
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset] val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication] val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
val ebi_relation:Dataset[DLIRelation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[DLIRelation] val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation]
@ -86,12 +87,12 @@ object SparkSplitOafTODLIEntities {
OAFDataset OAFDataset
.filter(s => s != null && s.isInstanceOf[DLIRelation]) .filter(s => s != null && s.isInstanceOf[Relation])
.map(s =>s.asInstanceOf[DLIRelation]) .map(s =>s.asInstanceOf[Relation])
.union(ebi_relation) .union(ebi_relation)
.map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder)) .map(d => (getKeyRelation(d), d))(Encoders.tuple(Encoders.STRING, relEncoder))
.groupByKey(_._1)(Encoders.STRING) .groupByKey(_._1)(Encoders.STRING)
.agg(EBIAggregator.getDLIRelationAggregator().toColumn) .agg(EBIAggregator.getRelationAggregator().toColumn)
.map(p => p._2) .map(p => p._2)
.repartition(1000) .repartition(1000)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation") .write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")

View File

@ -1,8 +1,8 @@
package eu.dnetlib.dhp.sx.graph package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Oaf import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser} import eu.dnetlib.dhp.sx.graph.parser.{DatasetScholexplorerParser, PublicationScholexplorerParser}
import eu.dnetlib.scholexplorer.relation.RelationMapper import eu.dnetlib.scholexplorer.relation.RelationMapper
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
@ -40,7 +40,7 @@ object SparkXMLToOAFDataset {
implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf] implicit val oafEncoder:Encoder[Oaf] = Encoders.kryo[Oaf]
implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset] implicit val datasetEncoder:Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication] implicit val publicationEncoder:Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
implicit val relationEncoder:Encoder[DLIRelation] = Encoders.kryo[DLIRelation] implicit val relationEncoder:Encoder[Relation] = Encoders.kryo[Relation]
val relationMapper = RelationMapper.load val relationMapper = RelationMapper.load

View File

@ -14,7 +14,6 @@ import org.apache.commons.logging.LogFactory;
import eu.dnetlib.dhp.parser.utility.VtdUtilityParser; import eu.dnetlib.dhp.parser.utility.VtdUtilityParser;
import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown; import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo; import eu.dnetlib.dhp.schema.scholexplorer.ProvenaceInfo;
import eu.dnetlib.dhp.utils.DHPUtils; import eu.dnetlib.dhp.utils.DHPUtils;
@ -175,8 +174,8 @@ public abstract class AbstractScholexplorerParser {
.stream() .stream()
.flatMap( .flatMap(
n -> { n -> {
final List<DLIRelation> rels = new ArrayList<>(); final List<Relation> rels = new ArrayList<>();
DLIRelation r = new DLIRelation(); Relation r = new Relation();
r.setSource(parsedObject.getId()); r.setSource(parsedObject.getId());
final String relatedPid = n.getTextValue(); final String relatedPid = n.getTextValue();
final String relatedPidType = n.getAttributes().get("relatedIdentifierType"); final String relatedPidType = n.getAttributes().get("relatedIdentifierType");
@ -184,7 +183,6 @@ public abstract class AbstractScholexplorerParser {
String relationSemantic = n.getAttributes().get("relationType"); String relationSemantic = n.getAttributes().get("relationType");
String inverseRelation; String inverseRelation;
final String targetId = generateId(relatedPid, relatedPidType, relatedType); final String targetId = generateId(relatedPid, relatedPidType, relatedType);
r.setDateOfCollection(dateOfCollection);
if (relationMapper.containsKey(relationSemantic.toLowerCase())) { if (relationMapper.containsKey(relationSemantic.toLowerCase())) {
RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase()); RelInfo relInfo = relationMapper.get(relationSemantic.toLowerCase());
relationSemantic = relInfo.getOriginal(); relationSemantic = relInfo.getOriginal();
@ -199,14 +197,13 @@ public abstract class AbstractScholexplorerParser {
r.setCollectedfrom(parsedObject.getCollectedfrom()); r.setCollectedfrom(parsedObject.getCollectedfrom());
r.setDataInfo(di); r.setDataInfo(di);
rels.add(r); rels.add(r);
r = new DLIRelation(); r = new Relation();
r.setDataInfo(di); r.setDataInfo(di);
r.setSource(targetId); r.setSource(targetId);
r.setTarget(parsedObject.getId()); r.setTarget(parsedObject.getId());
r.setRelType(inverseRelation); r.setRelType(inverseRelation);
r.setRelClass("datacite"); r.setRelClass("datacite");
r.setCollectedfrom(parsedObject.getCollectedfrom()); r.setCollectedfrom(parsedObject.getCollectedfrom());
r.setDateOfCollection(dateOfCollection);
rels.add(r); rels.add(r);
if ("unknown".equalsIgnoreCase(relatedType)) if ("unknown".equalsIgnoreCase(relatedType))
result result

View File

@ -6,7 +6,7 @@ import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.common.PacePerson import eu.dnetlib.dhp.common.PacePerson
import eu.dnetlib.dhp.schema.action.AtomicAction import eu.dnetlib.dhp.schema.action.AtomicAction
import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty} import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils
import org.codehaus.jackson.map.ObjectMapper import org.codehaus.jackson.map.ObjectMapper
@ -273,29 +273,29 @@ object DLIToOAF {
} }
def convertDLIRelation(r: DLIRelation): Relation = { // def convertDLIRelation(r: DLIRelation): Relation = {
//
val result = new Relation // val result = new Relation
if (!relationTypeMapping.contains(r.getRelType)) // if (!relationTypeMapping.contains(r.getRelType))
return null // return null
//
if (r.getCollectedFrom == null || r.getCollectedFrom.size() == 0 || (r.getCollectedFrom.size() == 1 && r.getCollectedFrom.get(0) == null)) // if (r.getProperties == null || r.getProperties.size() == 0 || (r.getProperties.size() == 1 && r.getProperties.get(0) == null))
return null // return null
val t = relationTypeMapping.get(r.getRelType) // val t = relationTypeMapping.get(r.getRelType)
//
result.setRelType("resultResult") // result.setRelType("resultResult")
result.setRelClass(t.get._1) // result.setRelClass(t.get._1)
result.setSubRelType(t.get._2) // result.setSubRelType(t.get._2)
result.setCollectedfrom(r.getCollectedFrom.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava) // result.setCollectedfrom(r.getProperties.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
result.setSource(generateId(r.getSource)) // result.setSource(generateId(r.getSource))
result.setTarget(generateId(r.getTarget)) // result.setTarget(generateId(r.getTarget))
//
if (result.getSource.equals(result.getTarget)) // if (result.getSource.equals(result.getTarget))
return null // return null
result.setDataInfo(generateDataInfo()) // result.setDataInfo(generateDataInfo())
//
result // result
} // }
def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = { def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = {

View File

@ -2,7 +2,7 @@ package eu.dnetlib.dhp.`export`
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset}
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.GzipCodec import org.apache.hadoop.io.compress.GzipCodec
@ -39,14 +39,13 @@ object SparkExportContentForOpenAire {
implicit val pubEncoder: Encoder[Publication] = Encoders.bean(classOf[Publication]) implicit val pubEncoder: Encoder[Publication] = Encoders.bean(classOf[Publication])
implicit val datEncoder: Encoder[OafDataset] = Encoders.bean(classOf[OafDataset]) implicit val datEncoder: Encoder[OafDataset] = Encoders.bean(classOf[OafDataset])
implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation]) implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
implicit val dliRelEncoder: Encoder[DLIRelation] = Encoders.bean(classOf[DLIRelation])
import spark.implicits._ import spark.implicits._
val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j") val relRDD:RDD[Relation] = sc.textFile(s"$workingPath/relation_j")
.map(s => new ObjectMapper().readValue(s, classOf[DLIRelation])) .map(s => new ObjectMapper().readValue(s, classOf[Relation]))
.filter(p => p.getDataInfo.getDeletedbyinference == false) .filter(p => p.getDataInfo.getDeletedbyinference == false)
.map(DLIToOAF.convertDLIRelation).filter(p=>p!= null)
spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS") spark.createDataset(relRDD).write.mode(SaveMode.Overwrite).save(s"$workingPath/relationDS")
val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset") val datRDD:RDD[OafDataset] = sc.textFile(s"$workingPath/dataset")

View File

@ -4,7 +4,7 @@ import java.time.LocalDateTime
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.schema.oaf.Relation import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIRelation} import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SparkSession
@ -65,7 +65,7 @@ class ExportDLITOOAFTest {
val json = Source.fromInputStream(getClass.getResourceAsStream("relation.json")).mkString val json = Source.fromInputStream(getClass.getResourceAsStream("relation.json")).mkString
val oaf =DLIToOAF.convertDLIRelation(mapper.readValue(json, classOf[DLIRelation])) val oaf =mapper.readValue(json, classOf[Relation])
println(mapper.writeValueAsString(oaf)) println(mapper.writeValueAsString(oaf))