2021-11-11 10:17:40 +01:00
|
|
|
package eu.dnetlib.dhp.oa.graph.resolution
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper
|
2023-02-06 13:45:21 +01:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.common.EntityType
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.utils.{MergeUtils, OafMapperUtils}
|
2021-11-22 11:48:55 +01:00
|
|
|
import eu.dnetlib.dhp.schema.oaf.{Publication, Result, StructuredProperty}
|
2021-11-11 10:17:40 +01:00
|
|
|
import org.apache.commons.io.FileUtils
|
|
|
|
import org.apache.spark.SparkConf
|
|
|
|
import org.apache.spark.sql._
|
|
|
|
import org.junit.jupiter.api.Assertions._
|
|
|
|
import org.junit.jupiter.api.TestInstance.Lifecycle
|
|
|
|
import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, TestInstance}
|
|
|
|
|
|
|
|
import java.nio.file.{Files, Path}
|
|
|
|
import scala.collection.JavaConverters._
|
|
|
|
import scala.io.Source
|
|
|
|
|
|
|
|
@TestInstance(Lifecycle.PER_CLASS)
|
|
|
|
class ResolveEntitiesTest extends Serializable {
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
var workingDir: Path = null
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
val FAKE_TITLE = "FAKETITLE"
|
|
|
|
val FAKE_SUBJECT = "FAKESUBJECT"
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
var sparkSession: Option[SparkSession] = None
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
@BeforeAll
|
2022-01-11 16:57:48 +01:00
|
|
|
def setUp(): Unit = {
|
2021-11-11 10:17:40 +01:00
|
|
|
workingDir = Files.createTempDirectory(getClass.getSimpleName)
|
|
|
|
|
|
|
|
val conf = new SparkConf()
|
2022-01-11 16:57:48 +01:00
|
|
|
sparkSession = Some(
|
|
|
|
SparkSession
|
|
|
|
.builder()
|
|
|
|
.config(conf)
|
|
|
|
.appName(getClass.getSimpleName)
|
|
|
|
.master("local[*]")
|
|
|
|
.getOrCreate()
|
|
|
|
)
|
2021-11-11 10:17:40 +01:00
|
|
|
populateDatasets(sparkSession.get)
|
|
|
|
generateUpdates(sparkSession.get)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
@AfterAll
|
2022-01-11 16:57:48 +01:00
|
|
|
def tearDown(): Unit = {
|
2021-11-11 10:17:40 +01:00
|
|
|
FileUtils.deleteDirectory(workingDir.toFile)
|
|
|
|
sparkSession.get.stop()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
def generateUpdates(spark: SparkSession): Unit = {
|
2021-11-11 10:17:40 +01:00
|
|
|
val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString
|
|
|
|
|
2022-12-21 11:21:17 +01:00
|
|
|
val pids: List[String] = template.linesWithSeparators.map(l =>l.stripLineEnd)
|
2022-01-11 16:57:48 +01:00
|
|
|
.map { id =>
|
|
|
|
val r = new Result
|
|
|
|
r.setId(id.toLowerCase.trim)
|
|
|
|
r.setSubject(
|
|
|
|
List(
|
2022-08-04 11:39:39 +02:00
|
|
|
OafMapperUtils.subject(
|
2022-01-11 16:57:48 +01:00
|
|
|
FAKE_SUBJECT,
|
2023-02-06 13:45:21 +01:00
|
|
|
OafMapperUtils.qualifier("fos", "fosCS", "fossSchema"),
|
2022-01-11 16:57:48 +01:00
|
|
|
null
|
|
|
|
)
|
|
|
|
).asJava
|
|
|
|
)
|
|
|
|
r.setTitle(
|
|
|
|
List(
|
|
|
|
OafMapperUtils.structuredProperty(
|
|
|
|
FAKE_TITLE,
|
2023-02-06 13:45:21 +01:00
|
|
|
OafMapperUtils.qualifier("fos", "fosCS", "fossSchema")
|
2022-01-11 16:57:48 +01:00
|
|
|
)
|
|
|
|
).asJava
|
|
|
|
)
|
|
|
|
r
|
|
|
|
}
|
|
|
|
.map { r =>
|
|
|
|
val mapper = new ObjectMapper()
|
|
|
|
|
|
|
|
mapper.writeValueAsString(r)
|
|
|
|
}
|
|
|
|
.toList
|
|
|
|
|
|
|
|
val sc = spark.sparkContext
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
println(sc.parallelize(pids).count())
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
spark
|
|
|
|
.createDataset(sc.parallelize(pids))(Encoders.STRING)
|
|
|
|
.write
|
|
|
|
.mode(SaveMode.Overwrite)
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.text(s"$workingDir/updates")
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
import spark.implicits._
|
|
|
|
implicit val resEncoder: Encoder[Result] = Encoders.bean(classOf[Result])
|
2022-01-11 16:57:48 +01:00
|
|
|
val ds = spark.read
|
|
|
|
.text(s"$workingDir/updates")
|
|
|
|
.as[String]
|
|
|
|
.map { s =>
|
|
|
|
val mapper = new ObjectMapper()
|
|
|
|
mapper.readValue(s, classOf[Result])
|
|
|
|
}
|
|
|
|
.collect()
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
assertEquals(4, ds.length)
|
2022-01-11 16:57:48 +01:00
|
|
|
ds.foreach { r => assertNotNull(r.getSubject) }
|
|
|
|
ds.foreach { r => assertEquals(1, r.getSubject.size()) }
|
|
|
|
ds.foreach { r => assertNotNull(r.getTitle) }
|
|
|
|
ds.foreach { r => assertEquals(1, r.getTitle.size()) }
|
2021-11-11 10:17:40 +01:00
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
ds.flatMap(r => r.getTitle.asScala.map(t => t.getValue))
|
|
|
|
.foreach(t => assertEquals(FAKE_TITLE, t))
|
|
|
|
ds.flatMap(r => r.getSubject.asScala.map(t => t.getValue))
|
|
|
|
.foreach(t => assertEquals(FAKE_SUBJECT, t))
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
println("generated Updates")
|
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
def populateDatasets(spark: SparkSession): Unit = {
|
2021-11-11 10:17:40 +01:00
|
|
|
import spark.implicits._
|
2022-01-11 16:57:48 +01:00
|
|
|
val entities = SparkResolveEntities.entities
|
|
|
|
|
|
|
|
entities.foreach { e =>
|
|
|
|
val template = Source.fromInputStream(this.getClass.getResourceAsStream(s"$e")).mkString
|
|
|
|
spark
|
2022-12-21 11:21:17 +01:00
|
|
|
.createDataset(spark.sparkContext.parallelize(template.linesWithSeparators.map(l =>l.stripLineEnd).toList))
|
2022-01-11 16:57:48 +01:00
|
|
|
.as[String]
|
|
|
|
.write
|
|
|
|
.option("compression", "gzip")
|
|
|
|
.text(s"$workingDir/graph/$e")
|
|
|
|
println(s"Created Dataset $e")
|
2021-11-11 10:17:40 +01:00
|
|
|
}
|
2022-01-11 16:57:48 +01:00
|
|
|
SparkResolveRelation.extractPidResolvedTableFromJsonRDD(
|
|
|
|
spark,
|
|
|
|
s"$workingDir/graph",
|
|
|
|
s"$workingDir/work"
|
|
|
|
)
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
2022-01-11 16:57:48 +01:00
|
|
|
def testResolution(): Unit = {
|
|
|
|
val spark: SparkSession = sparkSession.get
|
2021-11-11 10:17:40 +01:00
|
|
|
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
2022-01-11 16:57:48 +01:00
|
|
|
SparkResolveEntities.resolveEntities(spark, s"$workingDir/work", s"$workingDir/updates")
|
2021-11-11 10:17:40 +01:00
|
|
|
|
|
|
|
val ds = spark.read.load(s"$workingDir/work/resolvedEntities").as[Result]
|
|
|
|
|
|
|
|
assertEquals(3, ds.count())
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
ds.collect().foreach { r =>
|
2021-11-11 10:17:40 +01:00
|
|
|
assertTrue(r.getId.startsWith("50"))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
private def structuredPContainsValue(
|
|
|
|
l: java.util.List[StructuredProperty],
|
|
|
|
exptectedValue: String
|
|
|
|
): Boolean = {
|
|
|
|
l.asScala.exists(p => p.getValue != null && p.getValue.equalsIgnoreCase(exptectedValue))
|
2021-11-11 10:17:40 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Test
|
2022-01-11 16:57:48 +01:00
|
|
|
def testUpdate(): Unit = {
|
|
|
|
val spark: SparkSession = sparkSession.get
|
2021-11-11 10:17:40 +01:00
|
|
|
import spark.implicits._
|
|
|
|
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
|
|
|
val m = new ObjectMapper()
|
2022-01-11 16:57:48 +01:00
|
|
|
SparkResolveEntities.resolveEntities(spark, s"$workingDir/work", s"$workingDir/updates")
|
|
|
|
SparkResolveEntities.generateResolvedEntities(
|
|
|
|
spark,
|
|
|
|
s"$workingDir/work",
|
|
|
|
s"$workingDir/graph",
|
|
|
|
s"$workingDir/target"
|
|
|
|
)
|
|
|
|
|
|
|
|
val pubDS: Dataset[Result] = spark.read
|
|
|
|
.text(s"$workingDir/target/publication")
|
|
|
|
.as[String]
|
|
|
|
.map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication))
|
|
|
|
val t = pubDS
|
|
|
|
.filter(p => p.getTitle != null && p.getSubject != null)
|
|
|
|
.filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE")))
|
|
|
|
.count()
|
2021-11-11 10:17:40 +01:00
|
|
|
|
2021-11-22 11:48:55 +01:00
|
|
|
var ct = pubDS.count()
|
2022-01-11 16:57:48 +01:00
|
|
|
var et = pubDS
|
2022-01-12 09:40:28 +01:00
|
|
|
.filter(p => p.getTitle != null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty))
|
2022-01-11 16:57:48 +01:00
|
|
|
.count()
|
2021-11-22 11:48:55 +01:00
|
|
|
|
|
|
|
assertEquals(ct, et)
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val datDS: Dataset[Result] = spark.read
|
|
|
|
.text(s"$workingDir/target/dataset")
|
|
|
|
.as[String]
|
|
|
|
.map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset))
|
|
|
|
val td = datDS
|
|
|
|
.filter(p => p.getTitle != null && p.getSubject != null)
|
|
|
|
.filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE")))
|
|
|
|
.count()
|
2021-11-22 11:48:55 +01:00
|
|
|
ct = datDS.count()
|
2022-01-11 16:57:48 +01:00
|
|
|
et = datDS
|
2022-01-12 09:40:28 +01:00
|
|
|
.filter(p => p.getTitle != null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty))
|
2022-01-11 16:57:48 +01:00
|
|
|
.count()
|
2021-11-22 11:48:55 +01:00
|
|
|
assertEquals(ct, et)
|
2021-11-11 10:17:40 +01:00
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val softDS: Dataset[Result] = spark.read
|
|
|
|
.text(s"$workingDir/target/software")
|
|
|
|
.as[String]
|
|
|
|
.map(s => SparkResolveEntities.deserializeObject(s, EntityType.software))
|
|
|
|
val ts = softDS
|
|
|
|
.filter(p => p.getTitle != null && p.getSubject != null)
|
|
|
|
.filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE")))
|
|
|
|
.count()
|
2021-11-22 11:48:55 +01:00
|
|
|
ct = softDS.count()
|
2022-01-11 16:57:48 +01:00
|
|
|
et = softDS
|
2022-01-12 09:40:28 +01:00
|
|
|
.filter(p => p.getTitle != null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty))
|
2022-01-11 16:57:48 +01:00
|
|
|
.count()
|
2021-11-22 11:48:55 +01:00
|
|
|
assertEquals(ct, et)
|
2021-11-11 10:17:40 +01:00
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val orpDS: Dataset[Result] = spark.read
|
|
|
|
.text(s"$workingDir/target/otherresearchproduct")
|
|
|
|
.as[String]
|
|
|
|
.map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct))
|
|
|
|
val to = orpDS
|
|
|
|
.filter(p => p.getTitle != null && p.getSubject != null)
|
|
|
|
.filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE")))
|
|
|
|
.count()
|
2021-11-11 10:17:40 +01:00
|
|
|
|
2021-11-22 11:48:55 +01:00
|
|
|
ct = orpDS.count()
|
2022-01-11 16:57:48 +01:00
|
|
|
et = orpDS
|
2022-01-12 09:40:28 +01:00
|
|
|
.filter(p => p.getTitle != null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty))
|
2022-01-11 16:57:48 +01:00
|
|
|
.count()
|
2021-11-22 11:48:55 +01:00
|
|
|
assertEquals(ct, et)
|
|
|
|
|
2021-11-11 10:17:40 +01:00
|
|
|
assertEquals(0, t)
|
|
|
|
assertEquals(2, td)
|
|
|
|
assertEquals(1, ts)
|
|
|
|
assertEquals(0, to)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2021-11-22 11:48:55 +01:00
|
|
|
@Test
|
2022-01-11 16:57:48 +01:00
|
|
|
def testMerge(): Unit = {
|
2021-11-22 11:48:55 +01:00
|
|
|
|
2023-02-06 13:45:21 +01:00
|
|
|
var r = new Result
|
2022-01-11 16:57:48 +01:00
|
|
|
r.setSubject(
|
|
|
|
List(
|
2022-08-04 11:39:39 +02:00
|
|
|
OafMapperUtils.subject(
|
2022-01-11 16:57:48 +01:00
|
|
|
FAKE_SUBJECT,
|
2023-02-06 13:45:21 +01:00
|
|
|
OafMapperUtils.qualifier("fos", "fosCS", "fossSchema"),
|
2022-01-11 16:57:48 +01:00
|
|
|
null
|
|
|
|
)
|
|
|
|
).asJava
|
|
|
|
)
|
2021-11-22 11:48:55 +01:00
|
|
|
|
|
|
|
val mapper = new ObjectMapper()
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val p = mapper.readValue(
|
|
|
|
Source
|
|
|
|
.fromInputStream(this.getClass.getResourceAsStream(s"publication"))
|
|
|
|
.mkString
|
2022-12-21 11:21:17 +01:00
|
|
|
.linesWithSeparators.map(l =>l.stripLineEnd)
|
2022-01-11 16:57:48 +01:00
|
|
|
.next(),
|
|
|
|
classOf[Publication]
|
|
|
|
)
|
2021-11-22 11:48:55 +01:00
|
|
|
|
2023-02-06 13:45:21 +01:00
|
|
|
r = MergeUtils.mergeResult(r, p);
|
2021-11-22 11:48:55 +01:00
|
|
|
|
|
|
|
println(mapper.writeValueAsString(r))
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2021-11-11 10:17:40 +01:00
|
|
|
}
|