improved scholix generation using bean

This commit is contained in:
Sandro La Bruzzo 2023-07-19 16:53:28 +02:00
parent f1ae28fe42
commit e4feedd67e
3 changed files with 92 additions and 82 deletions

View File

@ -50,13 +50,10 @@ object ScholixUtils extends Serializable {
} }
} }
def extractRelationDate(summary: ScholixSummary): String = { def extractRelationDate(summary: ScholixResource): String = {
summary.getPublicationDate
if (summary.getDate == null || summary.getDate.isEmpty)
null
else {
summary.getDate.get(0)
}
} }
def inverseRelationShip(rel: ScholixRelationship): ScholixRelationship = { def inverseRelationShip(rel: ScholixRelationship): ScholixRelationship = {
@ -232,7 +229,7 @@ object ScholixUtils extends Serializable {
if (summaryObject.getAuthor != null && !summaryObject.getAuthor.isEmpty) { if (summaryObject.getAuthor != null && !summaryObject.getAuthor.isEmpty) {
val l: List[ScholixEntityId] = val l: List[ScholixEntityId] =
summaryObject.getAuthor.asScala.map(a => new ScholixEntityId(a, null)).toList summaryObject.getAuthor.asScala.map(a => new ScholixEntityId(a, null)).take(100).toList
if (l.nonEmpty) if (l.nonEmpty)
r.setCreator(l.asJava) r.setCreator(l.asJava)
} }
@ -241,7 +238,7 @@ object ScholixUtils extends Serializable {
r.setPublicationDate(summaryObject.getDate.get(0)) r.setPublicationDate(summaryObject.getDate.get(0))
if (summaryObject.getPublisher != null && !summaryObject.getPublisher.isEmpty) { if (summaryObject.getPublisher != null && !summaryObject.getPublisher.isEmpty) {
val plist: List[ScholixEntityId] = val plist: List[ScholixEntityId] =
summaryObject.getPublisher.asScala.map(p => new ScholixEntityId(p, null)).toList summaryObject.getPublisher.asScala.map(p => new ScholixEntityId(p, null)).take(100).toList
if (plist.nonEmpty) if (plist.nonEmpty)
r.setPublisher(plist.asJava) r.setPublisher(plist.asJava)
@ -260,6 +257,7 @@ object ScholixUtils extends Serializable {
"complete" "complete"
) )
) )
.take(100)
.toList .toList
if (l.nonEmpty) if (l.nonEmpty)
@ -269,38 +267,38 @@ object ScholixUtils extends Serializable {
r r
} }
// def scholixFromSource(relation: Relation, source: ScholixResource): Scholix = {
// if (relation == null || source == null)
// return null
// val s = new Scholix
// var l: List[ScholixEntityId] = extractCollectedFrom(relation)
// if (l.isEmpty)
// l = extractCollectedFrom(source)
// if (l.isEmpty)
// return null
// s.setLinkprovider(l.asJava)
// var d = extractRelationDate(relation)
// if (d == null)
// d = source.getPublicationDate
//
// s.setPublicationDate(d)
//
// if (source.getPublisher != null && !source.getPublisher.isEmpty) {
// s.setPublisher(source.getPublisher)
// }
//
// val semanticRelation = relations.getOrElse(relation.getRelClass.toLowerCase, null)
// if (semanticRelation == null)
// return null
// s.setRelationship(
// new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse)
// )
// s.setSource(source)
//
// s
// }
def scholixFromSource(relation: Relation, source: ScholixResource): Scholix = { def scholixFromSource(relation: Relation, source: ScholixResource): Scholix = {
if (relation == null || source == null)
return null
val s = new Scholix
var l: List[ScholixEntityId] = extractCollectedFrom(relation)
if (l.isEmpty)
l = extractCollectedFrom(source)
if (l.isEmpty)
return null
s.setLinkprovider(l.asJava)
var d = extractRelationDate(relation)
if (d == null)
d = source.getPublicationDate
s.setPublicationDate(d)
if (source.getPublisher != null && !source.getPublisher.isEmpty) {
s.setPublisher(source.getPublisher)
}
val semanticRelation = relations.getOrElse(relation.getRelClass.toLowerCase, null)
if (semanticRelation == null)
return null
s.setRelationship(
new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse)
)
s.setSource(source)
s
}
def scholixFromSource(relation: Relation, source: ScholixSummary): Scholix = {
if (relation == null || source == null) if (relation == null || source == null)
return null return null
@ -322,11 +320,8 @@ object ScholixUtils extends Serializable {
s.setPublicationDate(d) s.setPublicationDate(d)
if (source.getPublisher != null && !source.getPublisher.isEmpty) { if (source.getPublisher != null && !source.getPublisher.isEmpty) {
val l: List[ScholixEntityId] = source.getPublisher.asScala source.getPublisher
.map { p => val l: List[ScholixEntityId] = source.getPublisher.asScala.toList
new ScholixEntityId(p, null)
}(collection.breakOut)
if (l.nonEmpty) if (l.nonEmpty)
s.setPublisher(l.asJava) s.setPublisher(l.asJava)
} }
@ -337,7 +332,7 @@ object ScholixUtils extends Serializable {
s.setRelationship( s.setRelationship(
new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse) new ScholixRelationship(semanticRelation.original, "datacite", semanticRelation.inverse)
) )
s.setSource(generateScholixResourceFromSummary(source)) s.setSource(source)
s s
} }

View File

@ -2,13 +2,13 @@ package eu.dnetlib.dhp.sx.graph
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Relation import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils.RelatedEntities import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils.RelatedEntities
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.count import org.apache.spark.sql.functions.{col, count}
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -42,57 +42,63 @@ object SparkCreateScholix {
val dumpCitations = Try(parser.get("dumpCitations").toBoolean).getOrElse(false) val dumpCitations = Try(parser.get("dumpCitations").toBoolean).getOrElse(false)
log.info(s"dumpCitations -> $dumpCitations") log.info(s"dumpCitations -> $dumpCitations")
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation] implicit val relEncoder: Encoder[Relation] = Encoders.bean(classOf[Relation])
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary] implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.bean(classOf[ScholixSummary])
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix] implicit val resourceEncoder: Encoder[ScholixResource] = Encoders.bean(classOf[ScholixResource])
implicit val scholixEncoder: Encoder[Scholix] = Encoders.bean(classOf[Scholix])
import spark.implicits._ import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val relationDS: Dataset[(String, Relation)] = spark.read val relationDS: Dataset[Relation] = spark.read
.load(relationPath) .schema(relEncoder.schema)
.json(relationPath)
.as[Relation] .as[Relation]
.filter(r => .filter(r =>
(r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase (r.getDataInfo == null || r.getDataInfo.getDeletedbyinference == false) && !r.getRelClass.toLowerCase
.contains("merge") .contains("merge")
) )
.map(r => (r.getSource, r))(Encoders.tuple(Encoders.STRING, relEncoder))
val summaryDS: Dataset[(String, ScholixSummary)] = spark.read val summaryDS: Dataset[ScholixResource] = spark.read
.load(summaryPath) .schema(summaryEncoder.schema)
.json(summaryPath)
.as[ScholixSummary] .as[ScholixSummary]
.map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, summaryEncoder)) .map(s => ScholixUtils.generateScholixResourceFromSummary(s))
relationDS relationDS
.joinWith(summaryDS, relationDS("_1").equalTo(summaryDS("_1")), "left") .joinWith(summaryDS, relationDS("source").equalTo(summaryDS("dnetIdentifier")), "left")
.map { input: ((String, Relation), (String, ScholixSummary)) => .map { input: (Relation, ScholixResource) =>
if (input._1 != null && input._2 != null) { if (input._1 != null && input._2 != null) {
val rel: Relation = input._1._2 val rel: Relation = input._1
val source: ScholixSummary = input._2._2 val source: ScholixResource = input._2
(rel.getTarget, ScholixUtils.scholixFromSource(rel, source)) val s = ScholixUtils.scholixFromSource(rel, source)
s.setIdentifier(rel.getTarget)
s
} else null } else null
}(Encoders.tuple(Encoders.STRING, scholixEncoder)) }(scholixEncoder)
.filter(r => r != null) .filter(r => r != null)
.write .write
.option("compression", "lz4")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$targetPath/scholix_from_source") .save(s"$targetPath/scholix_from_source")
val scholixSource: Dataset[(String, Scholix)] = spark.read val scholixSource: Dataset[Scholix] = spark.read
.load(s"$targetPath/scholix_from_source") .load(s"$targetPath/scholix_from_source")
.as[(String, Scholix)](Encoders.tuple(Encoders.STRING, scholixEncoder)) .as[Scholix]
scholixSource scholixSource
.joinWith(summaryDS, scholixSource("_1").equalTo(summaryDS("_1")), "left") .joinWith(summaryDS, scholixSource("identifier").equalTo(summaryDS("dnetIdentifier")), "left")
.map { input: ((String, Scholix), (String, ScholixSummary)) => .map { input: (Scholix, ScholixResource) =>
if (input._2 == null) { if (input._2 == null) {
null null
} else { } else {
val s: Scholix = input._1._2 val s: Scholix = input._1
val target: ScholixSummary = input._2._2 val target: ScholixResource = input._2
ScholixUtils.generateCompleteScholix(s, target) ScholixUtils.generateCompleteScholix(s, target)
} }
} }
.filter(s => s != null) .filter(s => s != null)
.write .write
.option("compression", "lz4")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$targetPath/scholix_one_verse") .save(s"$targetPath/scholix_one_verse")
@ -102,11 +108,10 @@ object SparkCreateScholix {
scholix_o_v scholix_o_v
.flatMap(s => List(s, ScholixUtils.createInverseScholixRelation(s))) .flatMap(s => List(s, ScholixUtils.createInverseScholixRelation(s)))
.as[Scholix] .as[Scholix]
.map(s => (s.getIdentifier, s))(Encoders.tuple(Encoders.STRING, scholixEncoder)) .map(s => (s.getIdentifier, s))
.groupByKey(_._1) .dropDuplicates("identifier")
.agg(ScholixUtils.scholixAggregator.toColumn)
.map(s => s._2)
.write .write
.option("compression", "lz4")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$targetPath/scholix") .save(s"$targetPath/scholix")
@ -136,6 +141,7 @@ object SparkCreateScholix {
) )
.map(_._2) .map(_._2)
.write .write
.option("compression", "lz4")
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$targetPath/related_entities") .save(s"$targetPath/related_entities")
@ -144,19 +150,23 @@ object SparkCreateScholix {
.as[RelatedEntities] .as[RelatedEntities]
.filter(r => dumpCitations || r.relatedPublication > 0 || r.relatedDataset > 0) .filter(r => dumpCitations || r.relatedPublication > 0 || r.relatedDataset > 0)
val summaryDS2: Dataset[ScholixSummary] = spark.read
.schema(summaryEncoder.schema)
.json(summaryPath)
.as[ScholixSummary]
relatedEntitiesDS relatedEntitiesDS
.joinWith(summaryDS, relatedEntitiesDS("id").equalTo(summaryDS("_1")), "inner") .joinWith(summaryDS2, relatedEntitiesDS("id").equalTo(summaryDS("id")), "inner")
.map { i => .map { i =>
val re = i._1 val re = i._1
val sum = i._2._2 val sum = i._2
sum.setRelatedDatasets(re.relatedDataset) sum.setRelatedDatasets(re.relatedDataset)
sum.setRelatedPublications(re.relatedPublication) sum.setRelatedPublications(re.relatedPublication)
sum sum
} }
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "lz4")
.save(s"${summaryPath}_filtered") .save(s"${summaryPath}_filtered")
} }
} }

View File

@ -3,7 +3,7 @@ package eu.dnetlib.dhp.sx.graph.scholix
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature} import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation import eu.dnetlib.dhp.oa.graph.resolution.SparkResolveRelation
import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import eu.dnetlib.dhp.schema.oaf.{Relation, Result}
import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixResource}
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest import eu.dnetlib.dhp.sx.graph.bio.pubmed.AbstractVocabularyTest
import org.json4s import org.json4s
@ -69,15 +69,20 @@ class ScholixGraphTest extends AbstractVocabularyTest {
getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/merge_result_scholix") getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/merge_result_scholix")
) )
.mkString .mkString
val result: List[(Relation, ScholixSummary)] = inputRelations.linesWithSeparators val result: List[(Relation, ScholixResource)] = inputRelations.linesWithSeparators
.map(l => l.stripLineEnd) .map(l => l.stripLineEnd)
.sliding(2) .sliding(2, 2)
.map(s => (s.head, s(1))) .map(s => (s.head, s(1)))
.map(p => (mapper.readValue(p._1, classOf[Relation]), mapper.readValue(p._2, classOf[ScholixSummary]))) .map(p =>
(
mapper.readValue(p._1, classOf[Relation]),
ScholixUtils.generateScholixResourceFromSummary(mapper.readValue(p._2, classOf[ScholixSummary]))
)
)
.toList .toList
assertNotNull(result) assertNotNull(result)
assertTrue(result.nonEmpty) assertTrue(result.nonEmpty)
result.foreach(r => assertEquals(r._1.getSource, r._2.getId)) result.foreach(r => assertEquals(r._1.getSource, r._2.getDnetIdentifier))
val scholix: List[Scholix] = result.map(r => ScholixUtils.scholixFromSource(r._1, r._2)) val scholix: List[Scholix] = result.map(r => ScholixUtils.scholixFromSource(r._1, r._2))
println(mapper.writeValueAsString(scholix.head)) println(mapper.writeValueAsString(scholix.head))
} }