forked from D-Net/dnet-hadoop
Merge branch 'beta' into doiboost_url
This commit is contained in:
commit
148289150f
|
@ -0,0 +1,107 @@
|
|||
package eu.dnetlib.dhp.oa.graph.resolution
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||
import eu.dnetlib.dhp.common.HdfsSupport
|
||||
import eu.dnetlib.dhp.schema.common.EntityType
|
||||
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql._
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
object SparkResolveEntities {
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
val entities = List(EntityType.dataset,EntityType.publication, EntityType.software, EntityType.otherresearchproduct)
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val log: Logger = LoggerFactory.getLogger(getClass)
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/resolution/resolve_params.json")))
|
||||
parser.parseArgument(args)
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
|
||||
val graphBasePath = parser.get("graphBasePath")
|
||||
log.info(s"graphBasePath -> $graphBasePath")
|
||||
val workingPath = parser.get("workingPath")
|
||||
log.info(s"workingPath -> $workingPath")
|
||||
val unresolvedPath = parser.get("unresolvedPath")
|
||||
log.info(s"unresolvedPath -> $unresolvedPath")
|
||||
|
||||
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
|
||||
fs.mkdirs(new Path(workingPath))
|
||||
|
||||
resolveEntities(spark, workingPath, unresolvedPath)
|
||||
generateResolvedEntities(spark, workingPath, graphBasePath)
|
||||
|
||||
// TO BE conservative we keep the original entities in the working dir
|
||||
// and save the resolved entities on the graphBasePath
|
||||
//In future these lines of code should be removed
|
||||
entities.foreach {
|
||||
e =>
|
||||
fs.rename(new Path(s"$graphBasePath/$e"), new Path(s"$workingPath/${e}_old"))
|
||||
fs.rename(new Path(s"$workingPath/resolvedGraph/$e"), new Path(s"$graphBasePath/$e"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {
|
||||
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
||||
import spark.implicits._
|
||||
|
||||
val rPid: Dataset[(String, String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String, String)]
|
||||
val up: Dataset[(String, Result)] = spark.read.text(unresolvedPath).as[String].map(s => mapper.readValue(s, classOf[Result])).map(r => (r.getId, r))(Encoders.tuple(Encoders.STRING, resEncoder))
|
||||
|
||||
rPid.joinWith(up, rPid("_2").equalTo(up("_1")), "inner").map {
|
||||
r =>
|
||||
val result = r._2._2
|
||||
val dnetId = r._1._1
|
||||
result.setId(dnetId)
|
||||
result
|
||||
}.write.mode(SaveMode.Overwrite).save(s"$workingPath/resolvedEntities")
|
||||
}
|
||||
|
||||
|
||||
def deserializeObject(input:String, entity:EntityType ) :Result = {
|
||||
|
||||
entity match {
|
||||
case EntityType.publication => mapper.readValue(input, classOf[Publication])
|
||||
case EntityType.dataset => mapper.readValue(input, classOf[OafDataset])
|
||||
case EntityType.software=> mapper.readValue(input, classOf[Software])
|
||||
case EntityType.otherresearchproduct=> mapper.readValue(input, classOf[OtherResearchProduct])
|
||||
}
|
||||
}
|
||||
|
||||
def generateResolvedEntities(spark:SparkSession, workingPath: String, graphBasePath:String) = {
|
||||
|
||||
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
||||
import spark.implicits._
|
||||
|
||||
val re:Dataset[Result] = spark.read.load(s"$workingPath/resolvedEntities").as[Result]
|
||||
entities.foreach {
|
||||
e =>
|
||||
|
||||
spark.read.text(s"$graphBasePath/$e").as[String]
|
||||
.map(s => deserializeObject(s, e))
|
||||
.union(re)
|
||||
.groupByKey(_.getId)
|
||||
.reduceGroups {
|
||||
(x, y) =>
|
||||
x.mergeFrom(y)
|
||||
x
|
||||
}.map(_._2)
|
||||
.filter(r => r.getClass.getSimpleName.toLowerCase != "result")
|
||||
.map(r => mapper.writeValueAsString(r))(Encoders.STRING)
|
||||
.write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingPath/resolvedGraph/$e")
|
||||
}
|
||||
}
|
||||
}
|
|
@ -128,7 +128,7 @@ object SparkResolveRelation {
|
|||
source != null
|
||||
}
|
||||
|
||||
private def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = {
|
||||
def extractPidResolvedTableFromJsonRDD(spark: SparkSession, graphPath: String, workingPath: String) = {
|
||||
import spark.implicits._
|
||||
|
||||
val d: RDD[(String, String)] = spark.sparkContext.textFile(s"$graphPath/*")
|
||||
|
|
|
@ -4,6 +4,10 @@
|
|||
<name>graphBasePath</name>
|
||||
<description>the path of the graph</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>unresolvedPath</name>
|
||||
<description>the path of the unresolved Entities</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResolveRelations"/>
|
||||
|
@ -36,5 +40,33 @@
|
|||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ResolveEntities">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Resolve Relations in raw graph</name>
|
||||
<class>eu.dnetlib.dhp.oa.graph.resolution.SparkResolveEntities</class>
|
||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.shuffle.partitions=10000
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn</arg>
|
||||
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
|
||||
<arg>--unresolvedPath</arg><arg>${unresolvedPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingDir}</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,6 @@
|
|||
[
|
||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the source Path", "paramRequired": true},
|
||||
{"paramName":"u", "paramLongName":"unresolvedPath", "paramDescription": "the source Path", "paramRequired": true},
|
||||
{"paramName":"g", "paramLongName":"graphBasePath", "paramDescription": "the path of the raw graph", "paramRequired": true}
|
||||
]
|
|
@ -0,0 +1,190 @@
|
|||
package eu.dnetlib.dhp.oa.graph.resolution
|
||||
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import eu.dnetlib.dhp.schema.common.EntityType
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
|
||||
import eu.dnetlib.dhp.schema.oaf.{Result, StructuredProperty}
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql._
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.junit.jupiter.api.TestInstance.Lifecycle
|
||||
import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, TestInstance}
|
||||
|
||||
import java.nio.file.{Files, Path}
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.io.Source
|
||||
|
||||
@TestInstance(Lifecycle.PER_CLASS)
|
||||
class ResolveEntitiesTest extends Serializable {
|
||||
|
||||
var workingDir:Path = null
|
||||
|
||||
val FAKE_TITLE = "FAKETITLE"
|
||||
val FAKE_SUBJECT = "FAKESUBJECT"
|
||||
|
||||
var sparkSession:Option[SparkSession] = None
|
||||
|
||||
|
||||
@BeforeAll
|
||||
def setUp() :Unit = {
|
||||
workingDir = Files.createTempDirectory(getClass.getSimpleName)
|
||||
|
||||
val conf = new SparkConf()
|
||||
sparkSession = Some(SparkSession
|
||||
.builder()
|
||||
.config(conf)
|
||||
.appName(getClass.getSimpleName)
|
||||
.master("local[*]").getOrCreate())
|
||||
populateDatasets(sparkSession.get)
|
||||
generateUpdates(sparkSession.get)
|
||||
|
||||
}
|
||||
|
||||
|
||||
@AfterAll
|
||||
def tearDown():Unit = {
|
||||
FileUtils.deleteDirectory(workingDir.toFile)
|
||||
sparkSession.get.stop()
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
def generateUpdates(spark:SparkSession):Unit = {
|
||||
val template = Source.fromInputStream(this.getClass.getResourceAsStream("updates")).mkString
|
||||
|
||||
|
||||
val pids:List[String] = template.lines.map{id =>
|
||||
val r = new Result
|
||||
r.setId(id.toLowerCase.trim)
|
||||
r.setSubject(List(OafMapperUtils.structuredProperty(FAKE_SUBJECT, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
|
||||
r.setTitle(List(OafMapperUtils.structuredProperty(FAKE_TITLE, OafMapperUtils.qualifier("fos","fosCS", "fossSchema", "fossiFIgo"), null)).asJava)
|
||||
r
|
||||
}.map{r =>
|
||||
val mapper = new ObjectMapper()
|
||||
|
||||
mapper.writeValueAsString(r)}.toList
|
||||
|
||||
|
||||
val sc =spark.sparkContext
|
||||
|
||||
println(sc.parallelize(pids).count())
|
||||
|
||||
spark.createDataset(sc.parallelize(pids))(Encoders.STRING).write.mode(SaveMode.Overwrite).option("compression", "gzip").text(s"$workingDir/updates")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
import spark.implicits._
|
||||
implicit val resEncoder: Encoder[Result] = Encoders.bean(classOf[Result])
|
||||
val ds = spark.read.text(s"$workingDir/updates").as[String].map{s => val mapper = new ObjectMapper()
|
||||
mapper.readValue(s, classOf[Result])}.collect()
|
||||
|
||||
|
||||
|
||||
|
||||
assertEquals(4, ds.length)
|
||||
ds.foreach{r => assertNotNull(r.getSubject)}
|
||||
ds.foreach{r => assertEquals(1,r.getSubject.size())}
|
||||
ds.foreach{r => assertNotNull(r.getTitle)}
|
||||
ds.foreach{r => assertEquals(1,r.getTitle.size())}
|
||||
|
||||
|
||||
|
||||
ds.flatMap(r => r.getTitle.asScala.map(t => t.getValue)).foreach(t => assertEquals(FAKE_TITLE,t))
|
||||
ds.flatMap(r => r.getSubject.asScala.map(t => t.getValue)).foreach(t => assertEquals(FAKE_SUBJECT,t))
|
||||
|
||||
println("generated Updates")
|
||||
}
|
||||
|
||||
|
||||
def populateDatasets(spark:SparkSession):Unit = {
|
||||
import spark.implicits._
|
||||
val entities =SparkResolveEntities.entities
|
||||
|
||||
entities.foreach{
|
||||
e =>
|
||||
val template = Source.fromInputStream(this.getClass.getResourceAsStream(s"$e")).mkString
|
||||
spark.createDataset(spark.sparkContext.parallelize(template.lines.toList)).as[String].write.option("compression", "gzip").text(s"$workingDir/graph/$e")
|
||||
println(s"Created Dataset $e")
|
||||
}
|
||||
SparkResolveRelation.extractPidResolvedTableFromJsonRDD(spark, s"$workingDir/graph", s"$workingDir/work")
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testResolution():Unit = {
|
||||
val spark:SparkSession = sparkSession.get
|
||||
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
||||
SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" )
|
||||
|
||||
val ds = spark.read.load(s"$workingDir/work/resolvedEntities").as[Result]
|
||||
|
||||
assertEquals(3, ds.count())
|
||||
|
||||
ds.collect().foreach{
|
||||
r =>
|
||||
assertTrue(r.getId.startsWith("50"))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private def structuredPContainsValue(l:java.util.List[StructuredProperty], exptectedValue:String):Boolean = {
|
||||
l.asScala.exists(p =>p.getValue!= null && p.getValue.equalsIgnoreCase(exptectedValue))
|
||||
}
|
||||
|
||||
@Test
|
||||
def testUpdate():Unit = {
|
||||
val spark:SparkSession = sparkSession.get
|
||||
import spark.implicits._
|
||||
implicit val resEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
|
||||
val m = new ObjectMapper()
|
||||
SparkResolveEntities.resolveEntities(spark,s"$workingDir/work", s"$workingDir/updates" )
|
||||
SparkResolveEntities.generateResolvedEntities(spark,s"$workingDir/work",s"$workingDir/graph" )
|
||||
|
||||
|
||||
|
||||
|
||||
val pubDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/publication").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.publication))
|
||||
val t = pubDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
|
||||
|
||||
|
||||
|
||||
val datDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/dataset").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.dataset))
|
||||
val td = datDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
|
||||
|
||||
|
||||
val softDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/software").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.software))
|
||||
val ts = softDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
|
||||
|
||||
|
||||
val orpDS:Dataset[Result] = spark.read.text(s"$workingDir/work/resolvedGraph/otherresearchproduct").as[String].map(s => SparkResolveEntities.deserializeObject(s, EntityType.otherresearchproduct))
|
||||
val to = orpDS.filter(p => p.getTitle!=null && p.getSubject!=null).filter(p => p.getTitle.asScala.exists(t => t.getValue.equalsIgnoreCase("FAKETITLE"))).count()
|
||||
|
||||
|
||||
assertEquals(0, t)
|
||||
assertEquals(2, td)
|
||||
assertEquals(1, ts)
|
||||
assertEquals(0, to)
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,4 @@
|
|||
unresolved::10.17026/dans-x3z-fsq5::doi
|
||||
unresolved::10.17026/dans-xsw-qtnx::doi
|
||||
unresolved::10.5281/zenodo.1473694::doi
|
||||
unresolved::10.17632/fake::doi
|
Loading…
Reference in New Issue