diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala index 274bdb150d..bb1c9438e5 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkCreateScholix.scala @@ -5,8 +5,10 @@ import eu.dnetlib.dhp.schema.oaf.Relation import eu.dnetlib.dhp.schema.sx.scholix.Scholix import eu.dnetlib.dhp.schema.sx.summary.ScholixSummary import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils +import eu.dnetlib.dhp.sx.graph.scholix.ScholixUtils.RelatedEntities import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.count import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} import org.slf4j.{Logger, LoggerFactory} @@ -58,19 +60,57 @@ object SparkCreateScholix { scholixSource.joinWith(summaryDS, scholixSource("_1").equalTo(summaryDS("_1")), "left") .map { input: ((String, Scholix), (String, ScholixSummary)) => - val s: Scholix = input._1._2 - val target: ScholixSummary = input._2._2 - ScholixUtils.generateCompleteScholix(s, target) - }.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_one_verse") + if (input._2== null) { + null + } else { + val s: Scholix = input._1._2 + val target: ScholixSummary = input._2._2 + ScholixUtils.generateCompleteScholix(s, target) + } + }.filter(s => s!= null).write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix_one_verse") val scholix_o_v: Dataset[Scholix] = spark.read.load(s"$targetPath/scholix_one_verse").as[Scholix] - scholix_o_v.flatMap(s => List(s, ScholixUtils.createInverseScholixRelation(s))).groupByKey(_.getIdentifier).reduceGroups { (x, y) => - if (x != null) - x - else - y - }.write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix") + scholix_o_v.flatMap(s => List(s, ScholixUtils.createInverseScholixRelation(s))).as[Scholix] + .map(s=> (s.getIdentifier,s))(Encoders.tuple(Encoders.STRING, scholixEncoder)) + .groupByKey(_._1) + .agg(ScholixUtils.scholixAggregator.toColumn) + .map(s => s._2) + .write.mode(SaveMode.Overwrite).save(s"$targetPath/scholix") + + val scholix_final:Dataset[Scholix] = spark.read.load(s"$targetPath/scholix").as[Scholix] + + + + + val stats:Dataset[(String,String,Long)]= scholix_final.map(s => (s.getSource.getDnetIdentifier, s.getTarget.getObjectType)).groupBy("_1", "_2").agg(count("_1")).as[(String,String,Long)] + + + stats + .map(s => RelatedEntities(s._1, if ("dataset".equalsIgnoreCase(s._2)) s._3 else 0, if ("publication".equalsIgnoreCase(s._2)) s._3 else 0 )) + .groupByKey(_.id) + .reduceGroups((a, b) => RelatedEntities(a.id, a.relatedDataset+b.relatedDataset, a.relatedPublication+b.relatedPublication)) + .map(_._2) + .write.mode(SaveMode.Overwrite).save(s"$targetPath/related_entities") + + + + val relatedEntitiesDS:Dataset[RelatedEntities] = spark.read.load(s"$targetPath/related_entities").as[RelatedEntities].filter(r => r.relatedPublication>0 || r.relatedDataset > 0) + + + relatedEntitiesDS.joinWith(summaryDS, relatedEntitiesDS("id").equalTo(summaryDS("_1")), "inner").map{i => + val re = i._1 + val sum = i._2._2 + + sum.setRelatedDatasets(re.relatedDataset) + sum.setRelatedPublications(re.relatedPublication) + sum + }.write.mode(SaveMode.Overwrite).save(s"${summaryPath}_filtered") + } + + + + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala index d38ea388a4..eacab631b1 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/scholix/ScholixUtils.scala @@ -5,6 +5,9 @@ import eu.dnetlib.dhp.schema.oaf.{Dataset, Relation, Result, StructuredProperty} import eu.dnetlib.dhp.schema.sx.scholix.{Scholix, ScholixCollectedFrom, ScholixEntityId, ScholixIdentifier, ScholixRelationship, ScholixResource} import eu.dnetlib.dhp.schema.sx.summary.{CollectedFromType, SchemeValue, ScholixSummary, Typology} import eu.dnetlib.dhp.utils.DHPUtils +import org.apache.spark.sql.Encoders.bean +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.sql.{Encoder, Encoders} import org.json4s import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse @@ -21,6 +24,8 @@ object ScholixUtils { val DATE_RELATION_KEY:String = "RelationDate" case class RelationVocabulary(original:String, inverse:String){} + case class RelatedEntities(id:String, relatedDataset:Long, relatedPublication:Long){} + val relations:Map[String, RelationVocabulary] = { val input =Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/relations.json")).mkString implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -62,6 +67,72 @@ object ScholixUtils { } + + val statsAggregator:Aggregator[(String,String, Long), RelatedEntities, RelatedEntities] = new Aggregator[(String,String, Long), RelatedEntities, RelatedEntities] with Serializable { + override def zero: RelatedEntities = null + + override def reduce(b: RelatedEntities, a: (String, String, Long)): RelatedEntities = { + val id = a._1 + val relatedDataset = if ("dataset".equalsIgnoreCase(a._2)) a._3 else 0 + val relatedPublication = if ("publication".equalsIgnoreCase(a._2)) a._3 else 0 + + if (b == null) + RelatedEntities(a._1, relatedDataset, relatedPublication) + else + RelatedEntities(a._1,b.relatedDataset+ relatedDataset, b.relatedPublication+ relatedPublication ) + } + + override def merge(b1: RelatedEntities, b2: RelatedEntities): RelatedEntities = { + if (b1!= null && b2!= null) + RelatedEntities(b1.id, b1.relatedDataset+ b2.relatedDataset, b1.relatedPublication+ b2.relatedPublication) + + else + if (b1!= null) + b1 + else + b2 + } + + override def finish(reduction: RelatedEntities): RelatedEntities = reduction + + override def bufferEncoder: Encoder[RelatedEntities] = Encoders.bean(classOf[RelatedEntities]) + + override def outputEncoder: Encoder[RelatedEntities] = Encoders.bean(classOf[RelatedEntities]) + } + + + val scholixAggregator: Aggregator[(String, Scholix), Scholix, Scholix] = new Aggregator[(String, Scholix), Scholix, Scholix] with Serializable { + override def zero: Scholix = null + + + def scholix_complete(s:Scholix):Boolean ={ + if (s== null || s.getIdentifier==null) { + false + } else if (s.getSource == null || s.getTarget == null) { + false + } + else if (s.getLinkprovider == null || s.getLinkprovider.isEmpty) + false + else + true + } + + override def reduce(b: Scholix, a: (String, Scholix)): Scholix = { + if (scholix_complete(b)) b else a._2 + } + + override def merge(b1: Scholix, b2: Scholix): Scholix = { + if (scholix_complete(b1)) b1 else b2 + } + + override def finish(reduction: Scholix): Scholix = reduction + + override def bufferEncoder: Encoder[Scholix] = Encoders.kryo[Scholix] + + override def outputEncoder: Encoder[Scholix] = Encoders.kryo[Scholix] + } + + def createInverseScholixRelation(scholix: Scholix):Scholix = { val s = new Scholix s.setPublicationDate(scholix.getPublicationDate)