completed workflow of generation of scholix and summaries

This commit is contained in:
Sandro La Bruzzo 2021-07-07 23:10:34 +02:00
parent ed684874f2
commit a01dbe0ab0
2 changed files with 121 additions and 10 deletions

View File

@ -5,8 +5,10 @@ import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.scholix.Scholix
import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils
import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils.RelatedEntities
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -58,19 +60,57 @@ object SparkCreateScholix {
scholixSource.joinWith(summaryDS, scholixSource("_1").equalTo(summaryDS("_1")), "left") scholixSource.joinWith(summaryDS, scholixSource("_1").equalTo(summaryDS("_1")), "left")
.map { input: ((String, Scholix), (String, ScholixSummary)) => .map { input: ((String, Scholix), (String, ScholixSummary)) =>
val s: Scholix = input._1._2 if (input._2== null) {
val target: ScholixSummary = input._2._2 null
ScholixUtils.generateCompleteScholix(s, target) } else {
}.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_one_verse") val s: Scholix = input._1._2
val target: ScholixSummary = input._2._2
ScholixUtils.generateCompleteScholix(s, target)
}
}.filter(s => s!= null).write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_one_verse")
val scholix_o_v: Dataset[Scholix] = spark.read.load(s"$targetPath/scholix_one_verse").as[Scholix] val scholix_o_v: Dataset[Scholix] = spark.read.load(s"$targetPath/scholix_one_verse").as[Scholix]
scholix_o_v.flatMap(s => List(s, ScholixUtils.createInverseScholixRelation(s))).groupByKey(_.getIdentifier).reduceGroups { (x, y) => scholix_o_v.flatMap(s => List(s, ScholixUtils.createInverseScholixRelation(s))).as[Scholix]
if (x != null) .map(s=> (s.getIdentifier,s))(Encoders.tuple(Encoders.STRING, scholixEncoder))
x .groupByKey(_._1)
else .agg(ScholixUtils.scholixAggregator.toColumn)
y .map(s => s._2)
}.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix") .write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix")
val scholix_final:Dataset[Scholix] = spark.read.load(s"$targetPath/scholix").as[Scholix]
val stats:Dataset[(String,String,Long)]= scholix_final.map(s => (s.getSource.getDnetIdentifier, s.getTarget.getObjectType)).groupBy("_1", "_2").agg(count("_1")).as[(String,String,Long)]
stats
.map(s => RelatedEntities(s._1, if ("dataset".equalsIgnoreCase(s._2)) s._3 else 0, if ("publication".equalsIgnoreCase(s._2)) s._3 else 0 ))
.groupByKey(_.id)
.reduceGroups((a, b) => RelatedEntities(a.id, a.relatedDataset+b.relatedDataset, a.relatedPublication+b.relatedPublication))
.map(_._2)
.write.mode(SaveMode.Overwrite).save(s"$targetPath/related_entities")
val relatedEntitiesDS:Dataset[RelatedEntities] = spark.read.load(s"$targetPath/related_entities").as[RelatedEntities].filter(r => r.relatedPublication>0 || r.relatedDataset > 0)
relatedEntitiesDS.joinWith(summaryDS, relatedEntitiesDS("id").equalTo(summaryDS("_1")), "inner").map{i =>
val re = i._1
val sum = i._2._2
sum.setRelatedDatasets(re.relatedDataset)
sum.setRelatedPublications(re.relatedPublication)
sum
}.write.mode(SaveMode.Overwrite).save(s"${summaryPath}_filtered")
} }
} }

View File

@ -5,6 +5,9 @@ import eu.dnetlib.dhp.schema.oaf.{Dataset, Relation, Result, StructuredProperty}
import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource} import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource}
import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology} import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology}
import eu.dnetlib.dhp.utils.DHPUtils import eu.dnetlib.dhp.utils.DHPUtils
import org.apache.spark.sql.Encoders.bean
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
import org.json4s import org.json4s
import org.json4s.DefaultFormats import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.JsonMethods.parse
@ -21,6 +24,8 @@ object ScholixUtils {
val DATE_RELATION_KEY:String = "RelationDate" val DATE_RELATION_KEY:String = "RelationDate"
case class RelationVocabulary(original:String, inverse:String){} case class RelationVocabulary(original:String, inverse:String){}
case class RelatedEntities(id:String, relatedDataset:Long, relatedPublication:Long){}
val relations:Map[String, RelationVocabulary] = { val relations:Map[String, RelationVocabulary] = {
val input =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString val input =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@ -62,6 +67,72 @@ object ScholixUtils {
} }
val statsAggregator:Aggregator[(String,String, Long), RelatedEntities, RelatedEntities] = new Aggregator[(String,String, Long), RelatedEntities, RelatedEntities] with Serializable {
override def zero: RelatedEntities = null
override def reduce(b: RelatedEntities, a: (String, String, Long)): RelatedEntities = {
val id = a._1
val relatedDataset = if ("dataset".equalsIgnoreCase(a._2)) a._3 else 0
val relatedPublication = if ("publication".equalsIgnoreCase(a._2)) a._3 else 0
if (b == null)
RelatedEntities(a._1, relatedDataset, relatedPublication)
else
RelatedEntities(a._1,b.relatedDataset+ relatedDataset, b.relatedPublication+ relatedPublication )
}
override def merge(b1: RelatedEntities, b2: RelatedEntities): RelatedEntities = {
if (b1!= null && b2!= null)
RelatedEntities(b1.id, b1.relatedDataset+ b2.relatedDataset, b1.relatedPublication+ b2.relatedPublication)
else
if (b1!= null)
b1
else
b2
}
override def finish(reduction: RelatedEntities): RelatedEntities = reduction
override def bufferEncoder: Encoder[RelatedEntities] = Encoders.bean(classOf[RelatedEntities])
override def outputEncoder: Encoder[RelatedEntities] = Encoders.bean(classOf[RelatedEntities])
}
val scholixAggregator: Aggregator[(String, Scholix), Scholix, Scholix] = new Aggregator[(String, Scholix), Scholix, Scholix] with Serializable {
override def zero: Scholix = null
def scholix_complete(s:Scholix):Boolean ={
if (s== null || s.getIdentifier==null) {
false
} else if (s.getSource == null || s.getTarget == null) {
false
}
else if (s.getLinkprovider == null || s.getLinkprovider.isEmpty)
false
else
true
}
override def reduce(b: Scholix, a: (String, Scholix)): Scholix = {
if (scholix_complete(b)) b else a._2
}
override def merge(b1: Scholix, b2: Scholix): Scholix = {
if (scholix_complete(b1)) b1 else b2
}
override def finish(reduction: Scholix): Scholix = reduction
override def bufferEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
override def outputEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
}
def createInverseScholixRelation(scholix: Scholix):Scholix = { def createInverseScholixRelation(scholix: Scholix):Scholix = {
val s = new Scholix val s = new Scholix
s.setPublicationDate(scholix.getPublicationDate) s.setPublicationDate(scholix.getPublicationDate)