fixed export Scholexplorer to OpenAire

This commit is contained in:
Sandro La Bruzzo 2020-10-13 08:47:58 +02:00
parent 734934e2eb
commit 34bf64c94f
4 changed files with 56 additions and 23 deletions

View File

@ -1,17 +1,13 @@
package eu.dnetlib.dhp.doiboost
import eu.dnetlib.dhp.schema.oaf.Project
import eu.dnetlib.dhp.schema.oaf.Publication
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.{col, sum}
import org.apache.hadoop.io.Text
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.codehaus.jackson.map.ObjectMapper
import org.json4s.DefaultFormats
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import scala.::
import scala.collection.JavaConverters._
class QueryTest {
@ -27,19 +23,32 @@ class QueryTest {
}
def hasInstanceWithUrl(p:Publication):Boolean = {
val c = p.getInstance.asScala.map(i => i.getUrl!= null && !i.getUrl.isEmpty).size
!(!p.getInstance.isEmpty && c == p.getInstance().size)
}
def hasNullAccessRights(p:Publication):Boolean = {
val c = p.getInstance.asScala.map(i => i.getAccessright!= null && i.getAccessright.getClassname.nonEmpty).size
!p.getInstance.isEmpty && c == p.getInstance().size()
}
def myQuery(spark:SparkSession, sc:SparkContext): Unit = {
implicit val mapEncoderPub: Encoder[Project] = Encoders.kryo[Project]
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
val mapper = new ObjectMapper()
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
// val ds:Dataset[Project] = spark.createDataset(sc.sequenceFile("", classOf[Text], classOf[Text])
// .map(_._2.toString)
// .map(s => new ObjectMapper().readValue(s, classOf[Project])))
//
// ds.write.saveAsTable()
val ds:Dataset[Publication] = spark.read.load("/tmp/p").as[Publication]
ds.filter(p =>p.getBestaccessright!= null && p.getBestaccessright.getClassname.nonEmpty).count()
}
}

View File

@ -47,6 +47,7 @@ object DLIToOAF {
"References" -> ("isRelatedTo", "relationship"),
"IsRelatedTo" -> ("isRelatedTo", "relationship"),
"IsSupplementedBy" -> ("isSupplementedBy", "supplement"),
"Documents"-> ("isRelatedTo", "relationship"),
"Cites" -> ("cites", "citation"),
"Unknown" -> ("isRelatedTo", "relationship"),
"IsSourceOf" -> ("isRelatedTo", "relationship"),
@ -83,7 +84,7 @@ object DLIToOAF {
val rel_inverse: Map[String, String] = Map(
"isRelatedTo" -> "isRelatedTo",
"IsSupplementedBy" -> "isSupplementTo",
"isSupplementedBy" -> "isSupplementTo",
"cites" -> "IsCitedBy",
"IsCitedBy" -> "cites",
"reviews" -> "IsReviewedBy"
@ -272,9 +273,17 @@ object DLIToOAF {
result
}
def convertDLIRelation(r: Relation): Relation = {
r.setSource(r.getSource.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::"))
r.setTarget(r.getTarget.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::"))
val rt = r.getRelType
if (!relationTypeMapping.contains(rt))
return null
r.setRelType("resultResult")
r.setRelClass(relationTypeMapping(rt)._1)
r.setSubRelType(relationTypeMapping(rt)._2)
r.setSource(generateId(r.getSource))
r.setTarget(generateId(r.getTarget))
r
}

View File

@ -15,11 +15,13 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jackson.map.ObjectMapper
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
object SparkExportContentForOpenAire {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExportContentForOpenAire.getClass.getResourceAsStream("input_export_content_parameters.json")))
@ -42,9 +44,11 @@ object SparkExportContentForOpenAire {
import spark.implicits._
val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation]
dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).map(DLIToOAF.convertDLIRelation).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false)
.map(DLIToOAF.convertDLIRelation)
.filter(r => r!= null)
.write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication]

View File

@ -5,9 +5,7 @@ import java.time.format.DateTimeFormatter
import eu.dnetlib.dhp.schema.oaf.Relation
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
import org.junit.jupiter.api.Test
@ -23,6 +21,19 @@ class ExportDLITOOAFTest {
}
@Test
def testMappingRele():Unit = {
val r:Relation = new Relation
r.setSource("60|fbff1d424e045eecf24151a5fe3aa738")
r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877")
val r1 =DLIToOAF.convertDLIRelation(r)
println(r1.getSource, r1.getTarget)
}
@Test
def testPublicationMapping():Unit = {