updated resolution wf:

- generate a new version of the graph
 - changed merge from union to join
This commit is contained in:
Sandro La Bruzzo 2021-11-22 11:48:55 +01:00
parent fdb75b180e
commit 35e20b0647
6 changed files with 97 additions and 47 deletions

View File

@ -14,7 +14,7 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkResolveEntities {
val mapper = new ObjectMapper()
val entities = List(EntityType.dataset,EntityType.publication, EntityType.software, EntityType.otherresearchproduct)
val entities = List(EntityType.dataset, EntityType.publication, EntityType.software, EntityType.otherresearchproduct)
def main(args: Array[String]): Unit = {
val log: Logger = LoggerFactory.getLogger(getClass)
@ -36,25 +36,19 @@ object SparkResolveEntities {
val unresolvedPath = parser.get("unresolvedPath")
log.info(s"unresolvedPath -> $unresolvedPath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(new Path(workingPath))
resolveEntities(spark, workingPath, unresolvedPath)
generateResolvedEntities(spark, workingPath, graphBasePath)
// TO BE conservative we keep the original entities in the working dir
// and save the resolved entities on the graphBasePath
//In future these lines of code should be removed
entities.foreach {
e =>
fs.rename(new Path(s"$graphBasePath/$e"), new Path(s"$workingPath/${e}_old"))
fs.rename(new Path(s"$workingPath/resolvedGraph/$e"), new Path(s"$graphBasePath/$e"))
}
}
generateResolvedEntities(spark, workingPath, graphBasePath, targetPath)
}
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
@ -71,37 +65,43 @@ def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: St
}
def deserializeObject(input:String, entity:EntityType ) :Result = {
def deserializeObject(input: String, entity: EntityType): Result = {
entity match {
case EntityType.publication => mapper.readValue(input, classOf[Publication])
case EntityType.dataset => mapper.readValue(input, classOf[OafDataset])
case EntityType.software=> mapper.readValue(input, classOf[Software])
case EntityType.otherresearchproduct=> mapper.readValue(input, classOf[OtherResearchProduct])
}
entity match {
case EntityType.publication => mapper.readValue(input, classOf[Publication])
case EntityType.dataset => mapper.readValue(input, classOf[OafDataset])
case EntityType.software => mapper.readValue(input, classOf[Software])
case EntityType.otherresearchproduct => mapper.readValue(input, classOf[OtherResearchProduct])
}
}
def generateResolvedEntities(spark:SparkSession, workingPath: String, graphBasePath:String) = {
def generateResolvedEntities(spark: SparkSession, workingPath: String, graphBasePath: String, targetPath:String) = {
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
import spark.implicits._
val re:Dataset[Result] = spark.read.load(s"$workingPath/resolvedEntities").as[Result]
val re: Dataset[(String, Result)] = spark.read.load(s"$workingPath/resolvedEntities").as[Result].map(r => (r.getId, r))
entities.foreach {
e =>
e => {
val currentEntityDataset: Dataset[(String, Result)] = spark.read.text(s"$graphBasePath/$e").as[String].map(s => deserializeObject(s, e)).map(r => (r.getId, r))
currentEntityDataset.joinWith(re, currentEntityDataset("_1").equalTo(re("_1")), "left").map(k => {
val a = k._1
val b = k._2
if (b == null)
a._2
else {
a._2.mergeFrom(b._2)
a._2
}
}).map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$targetPath/$e")
}
spark.read.text(s"$graphBasePath/$e").as[String]
.map(s => deserializeObject(s, e))
.union(re)
.groupByKey(_.getId)
.reduceGroups {
(x, y) =>
x.mergeFrom(y)
x
}.map(_._2)
.filter(r => r.getClass.getSimpleName.toLowerCase != "result")
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
.write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingPath/resolvedGraph/$e")
}
}
}

View File

@ -35,6 +35,9 @@ object SparkResolveRelation {
val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
implicit val relEncoder: Encoder[Relation] = Encoders.kryo(classOf[Relation])
import spark.implicits._
@ -80,20 +83,13 @@ object SparkResolveRelation {
.mode(SaveMode.Overwrite)
.save(s"$workingPath/relation_resolved")
// TO BE conservative we keep the original relation in the working dir
// and save the relation resolved on the graphBasePath
//In future this two line of code should be removed
fs.rename(new Path(s"$graphBasePath/relation"), new Path(s"$workingPath/relation"))
spark.read.load(s"$workingPath/relation_resolved").as[Relation]
.filter(r => !r.getSource.startsWith("unresolved") && !r.getTarget.startsWith("unresolved"))
.map(r => mapper.writeValueAsString(r))
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(s"$graphBasePath/relation")
.text(s"$targetPath/relation")
}
def extractInstanceCF(input: String): List[(String, String)] = {

View File

@ -8,6 +8,10 @@
<name>unresolvedPath</name>
<description>the path of the unresolved Entities</description>
</property>
<property>
<name>targetPath</name>
<description>the target path after resolution</description>
</property>
</parameters>
<start to="ResolveRelations"/>
@ -36,6 +40,7 @@
<arg>--master</arg><arg>yarn</arg>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="ResolveEntities"/>
<error to="Kill"/>
@ -62,6 +67,7 @@
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--unresolvedPath</arg><arg>${unresolvedPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>

View File

@ -2,5 +2,6 @@
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"u", "paramLongName":"unresolvedPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target path", "paramRequired": true}
]

View File

@ -1,5 +1,6 @@
[
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target path", "paramRequired": true}
]

View File

@ -4,7 +4,7 @@ package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import eu.dnetlib.dhp.schema.oaf.{Result, StructuredProperty}
import eu.dnetlib.dhp.schema.oaf.{Publication, Result, StructuredProperty}
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql._
@ -154,19 +154,39 @@ class ResolveEntitiesTest extends Serializable {
val t = pubDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
var ct = pubDS.count()
var et = pubDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
assertEquals(ct, et)
val datDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset))
val td = datDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
ct = datDS.count()
et = datDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
assertEquals(ct, et)
val softDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software))
val ts = softDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
ct = softDS.count()
et = softDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
assertEquals(ct, et)
val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct))
val to = orpDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
ct = orpDS.count()
et = orpDS.filter(p => p.getTitle!= null && p.getTitle.asScala.forall(t => t.getValue != null && t.getValue.nonEmpty)).count()
assertEquals(ct, et)
assertEquals(0, t)
assertEquals(2, td)
assertEquals(1, ts)
@ -178,6 +198,32 @@ class ResolveEntitiesTest extends Serializable {
@Test
def testMerge():Unit = {
val r = new Result
r.setSubject(List(OafMapperUtils.structuredProperty(FAKE_SUBJECT, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
val mapper = new ObjectMapper()
val p = mapper.readValue(Source.fromInputStream(this.getClass.getResourceAsStream(s"publication")).mkString.lines.next(), classOf[Publication])
r.mergeFrom(p)
println(mapper.writeValueAsString(r))
}