implemented and tested resolution of entities

This commit is contained in:
Sandro La Bruzzo 2021-11-11 10:17:40 +01:00
parent 6477a40670
commit 9cb195314f
9 changed files with 306 additions and 1 deletions

View File

@ -0,0 +1,96 @@
package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory}
object SparkResolveEntities {
val mapper = new ObjectMapper()
val entities = List(EntityType.dataset,EntityType.publication, EntityType.software, EntityType.otherresearchproduct)
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val graphBasePath = parser.get("graphBasePath")
log.info(s"graphBasePath -> $graphBasePath")
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
val unresolvedPath = parser.get("unresolvedPath")
log.info(s"unresolvedPath -> $unresolvedPath")
resolveEntities(spark, workingPath, unresolvedPath)
generateResolvedEntities(spark, workingPath, graphBasePath)
}
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)]
val up: Dataset[(String, Result)] = spark.read.text(unresolvedPath).as[String].map(s => mapper.readValue(s, classOf[Result])).map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, resEncoder))
rPid.joinWith(up, rPid("_2").equalTo(up("_1")), "inner").map {
r =>
val result = r._2._2
val dnetId = r._1._1
result.setId(dnetId)
result
}.write.mode(SaveMode.Overwrite).save(s"$workingPath/resolvedEntities")
}
def deserializeObject(input:String, entity:EntityType ) :Result = {
entity match {
case EntityType.publication => mapper.readValue(input, classOf[Publication])
case EntityType.dataset => mapper.readValue(input, classOf[OafDataset])
case EntityType.software=> mapper.readValue(input, classOf[Software])
case EntityType.otherresearchproduct=> mapper.readValue(input, classOf[OtherResearchProduct])
}
}
def generateResolvedEntities(spark:SparkSession, workingPath: String, graphBasePath:String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
val re:Dataset[Result] = spark.read.load(s"$workingPath/resolvedEntities").as[Result]
entities.foreach {
e =>
spark.read.text(s"$graphBasePath/$e").as[String]
.map(s => deserializeObject(s, e))
.union(re)
.groupByKey(_.getId)
.reduceGroups {
(x, y) =>
x.mergeFrom(y)
x
}.map(_._2)
.filter(r => r.getClass.getSimpleName.toLowerCase != "result")
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingPath/resolvedGraph/$e")
}
}
}

View File

@ -128,7 +128,7 @@ object SparkResolveRelation {
source != null source != null
} }
private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = { def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = {
import spark.implicits._ import spark.implicits._
val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*") val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*")

View File

@ -0,0 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"u", "paramLongName":"unresolvedPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
]

View File

@ -0,0 +1,190 @@
package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{Result, StructuredProperty}
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.TestInstance.Lifecycle
import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, TestInstance}
import java.nio.file.{Files, Path}
import scala.collection.JavaConverters._
import scala.io.Source
@TestInstance(Lifecycle.PER_CLASS)
class ResolveEntitiesTest extends Serializable {
var workingDir:Path = null
val FAKE_TITLE = "FAKETITLE"
val FAKE_SUBJECT = "FAKESUBJECT"
var sparkSession:Option[SparkSession] = None
@BeforeAll
def setUp() :Unit = {
workingDir = Files.createTempDirectory(getClass.getSimpleName)
val conf = new SparkConf()
sparkSession = Some(SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master("local[*]").getOrCreate())
populateDatasets(sparkSession.get)
generateUpdates(sparkSession.get)
}
@AfterAll
def tearDown():Unit = {
FileUtils.deleteDirectory(workingDir.toFile)
sparkSession.get.stop()
}
def generateUpdates(spark:SparkSession):Unit = {
val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString
val pids:List[String] = template.lines.map{id =>
val r = new Result
r.setId(id.toLowerCase.trim)
r.setSubject(List(OafMapperUtils.structuredProperty(FAKE_SUBJECT, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
r.setTitle(List(OafMapperUtils.structuredProperty(FAKE_TITLE, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
r
}.map{r =>
val mapper = new ObjectMapper()
mapper.writeValueAsString(r)}.toList
val sc =spark.sparkContext
println(sc.parallelize(pids).count())
spark.createDataset(sc.parallelize(pids))(Encoders.STRING).write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingDir/updates")
import spark.implicits._
implicit val resEncoder: Encoder[Result] = Encoders.bean(classOf[Result])
val ds = spark.read.text(s"$workingDir/updates").as[String].map{s => val mapper = new ObjectMapper()
mapper.readValue(s, classOf[Result])}.collect()
assertEquals(4, ds.length)
ds.foreach{r => assertNotNull(r.getSubject)}
ds.foreach{r => assertEquals(1,r.getSubject.size())}
ds.foreach{r => assertNotNull(r.getTitle)}
ds.foreach{r => assertEquals(1,r.getTitle.size())}
ds.flatMap(r => r.getTitle.asScala.map(t => t.getValue)).foreach(t => assertEquals(FAKE_TITLE,t))
ds.flatMap(r => r.getSubject.asScala.map(t => t.getValue)).foreach(t => assertEquals(FAKE_SUBJECT,t))
println("generated Updates")
}
def populateDatasets(spark:SparkSession):Unit = {
import spark.implicits._
val entities =SparkResolveEntities.entities
entities.foreach{
e =>
val template = Source.fromInputStream(this.getClass.getResourceAsStream(s"$e")).mkString
spark.createDataset(spark.sparkContext.parallelize(template.lines.toList)).as[String].write.option("compression", "gzip").text(s"$workingDir/graph/$e")
println(s"Created Dataset $e")
}
SparkResolveRelation.extractPidResolvedTableFromJsonRDD(spark, s"$workingDir/graph", s"$workingDir/work")
}
@Test
def testResolution():Unit = {
val spark:SparkSession = sparkSession.get
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" )
val ds = spark.read.load(s"$workingDir/work/resolvedEntities").as[Result]
assertEquals(3, ds.count())
ds.collect().foreach{
r =>
assertTrue(r.getId.startsWith("50"))
}
}
private def structuredPContainsValue(l:java.util.List[StructuredProperty], exptectedValue:String):Boolean = {
l.asScala.exists(p =>p.getValue!= null && p.getValue.equalsIgnoreCase(exptectedValue))
}
@Test
def testUpdate():Unit = {
val spark:SparkSession = sparkSession.get
import spark.implicits._
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
val m = new ObjectMapper()
SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" )
SparkResolveEntities.generateResolvedEntities(spark,s"$workingDir/work",s"$workingDir/graph" )
val pubDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/publication").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication))
val t = pubDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
val datDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset))
val td = datDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
val softDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software))
val ts = softDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct))
val to = orpDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
assertEquals(0, t)
assertEquals(2, td)
assertEquals(1, ts)
assertEquals(0, to)
}
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,4 @@
unresolved::10.17026/dans-x3z-fsq5::doi
unresolved::10.17026/dans-xsw-qtnx::doi
unresolved::10.5281/zenodo.1473694::doi
unresolved::10.17632/fake::doi