implemented last part of workflows to generate scholixGraph

This commit is contained in:
Sandro La Bruzzo 2021-07-23 16:38:32 +02:00
parent cfde63a7c3
commit d9e3b89937
5 changed files with 75 additions and 53 deletions

View File

@ -0,0 +1,43 @@
package eu.dnetlib.dhp.sx.graph
import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
object SparkConvertRDDtoDataset {
def main(args: Array[String]): Unit = {
val entities = List(
("dataset", classOf[OafDataset]),
("otherresearchproduct", classOf[OtherResearchProduct]),
("publication", classOf[Publication]),
("software", classOf[Software])
)
val log: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/convert_dataset_json_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
log.info(s"sourcePath -> $sourcePath")
val targetPath = parser.get("targetPath")
log.info(s"targetPath -> $targetPath")
val mapper = new ObjectMapper()
implicit val resultEncoder: Encoder[Result] = Encoders.kryo(classOf[Result])
entities.foreach{
e =>
val rdd =spark.sparkContext.textFile(s"$sourcePath/${e._1}").map(s => mapper.readValue(s, e._2))
spark.createDataset(rdd).as[Result].write.mode(SaveMode.Overwrite).save(s"$targetPath/${e._1}")
}
}
}

View File

@ -30,7 +30,7 @@ object SparkResolveRelation {
val relationPath = parser.get("relationPath") val relationPath = parser.get("relationPath")
log.info(s"sourcePath -> $relationPath") log.info(s"sourcePath -> $relationPath")
val entityPath = parser.get("entityPath") val entityPath = parser.get("entityPath")
log.info(s"targetPath -> $entityPath") log.info(s"entityPath -> $entityPath")
val workingPath = parser.get("workingPath") val workingPath = parser.get("workingPath")
log.info(s"workingPath -> $workingPath") log.info(s"workingPath -> $workingPath")
@ -48,8 +48,8 @@ object SparkResolveRelation {
m => m =>
val sourceResolved = m._2 val sourceResolved = m._2
val currentRelation = m._1._2 val currentRelation = m._1._2
if (sourceResolved!=null && sourceResolved._2!=null && sourceResolved._2.nonEmpty) if (sourceResolved!=null && sourceResolved._1!=null && sourceResolved._1.nonEmpty)
currentRelation.setSource(sourceResolved._2) currentRelation.setSource(sourceResolved._1)
currentRelation currentRelation
}.write }.write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
@ -61,13 +61,13 @@ object SparkResolveRelation {
m => m =>
val targetResolved = m._2 val targetResolved = m._2
val currentRelation = m._1._2 val currentRelation = m._1._2
if (targetResolved!=null && targetResolved._2.nonEmpty) if (targetResolved!=null && targetResolved._1.nonEmpty)
currentRelation.setTarget(targetResolved._2) currentRelation.setTarget(targetResolved._1)
currentRelation currentRelation
}.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50")) }.filter(r => r.getSource.startsWith("50")&& r.getTarget.startsWith("50"))
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.save(s"$workingPath/relation") .save(s"$workingPath/relation_resolved")
} }
@ -89,16 +89,16 @@ object SparkResolveRelation {
val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*") val d: RDD[(String,String)] = spark.sparkContext.textFile(s"$entityPath/*")
.map(i => extractPidsFromRecord(i)) .map(i => extractPidsFromRecord(i))
.filter(s => s != null && s._2!=null && s._2.nonEmpty) .filter(s => s != null && s._1!= null && s._2!=null && s._2.nonEmpty)
.flatMap{ p => .flatMap{ p =>
p._2.map(pid => p._2.map(pid =>
(p._1,convertPidToDNETIdentifier(pid._1, pid._2)) (p._1, convertPidToDNETIdentifier(pid._1, pid._2))
) )
} }.filter(r =>r._1 != null || r._2 != null)
spark.createDataset(d) spark.createDataset(d)
.groupByKey(_._1) .groupByKey(_._2)
.reduceGroups((x, y) => if (x._2.startsWith("50|doi") || x._2.startsWith("50|pmid")) x else y) .reduceGroups((x, y) => if (x._1.startsWith("50|doi") || x._1.startsWith("50|pmid")) x else y)
.map(s => s._2) .map(s => s._2)
.write .write
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)

View File

@ -1,4 +1,4 @@
<workflow-app name="Create Raw Graph Step 1: extract Entities in raw graph" xmlns="uri:oozie:workflow:0.5"> <workflow-app name="Create Scholix final Graph" xmlns="uri:oozie:workflow:0.5">
<parameters> <parameters>
<property> <property>
<name>sourcePath</name> <name>sourcePath</name>
@ -6,48 +6,22 @@
</property> </property>
<property> <property>
<name>targetPath</name> <name>targetPath</name>
<description>the graph Raw base path</description> <description>the final graph path</description>
</property> </property>
</parameters> </parameters>
<start to="ExtractEntities"/> <start to="ImportDatasetEntities"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="ExtractEntities"> <action name="ImportDatasetEntities">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>
<mode>cluster</mode> <mode>cluster</mode>
<name>Extract entities in raw graph</name> <name>Import JSONRDD to Dataset kryo</name>
<class>eu.dnetlib.dhp.sx.graph.SparkCreateInputGraph</class> <class>eu.dnetlib.dhp.sx.graph.SparkConvertRDDtoDataset</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=2000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
</spark>
<ok to="ResolveRelations"/>
<error to="Kill"/>
</action>
<action name="ResolveRelations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations in raw graph</name>
<class>eu.dnetlib.dhp.sx.graph.SparkResolveRelation</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar> <jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts> <spark-opts>
--executor-memory=${sparkExecutorMemory} --executor-memory=${sparkExecutorMemory}
@ -60,9 +34,8 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
<arg>--relationPath</arg><arg>${targetPath}/extracted/relation</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--workingPath</arg><arg>${targetPath}/resolved/</arg> <arg>--targetPath</arg><arg>${targetPath}/entities</arg>
<arg>--entityPath</arg><arg>${targetPath}/dedup</arg>
</spark> </spark>
<ok to="CreateSummaries"/> <ok to="CreateSummaries"/>
<error to="Kill"/> <error to="Kill"/>
@ -87,7 +60,7 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
<arg>--sourcePath</arg><arg>${targetPath}/dedup</arg> <arg>--sourcePath</arg><arg>${targetPath}/entities</arg>
<arg>--targetPath</arg><arg>${targetPath}/provision/summaries</arg> <arg>--targetPath</arg><arg>${targetPath}/provision/summaries</arg>
</spark> </spark>
<ok to="CreateScholix"/> <ok to="CreateScholix"/>
@ -114,7 +87,7 @@
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
<arg>--summaryPath</arg><arg>${targetPath}/provision/summaries</arg> <arg>--summaryPath</arg><arg>${targetPath}/provision/summaries</arg>
<arg>--targetPath</arg><arg>${targetPath}/provision/scholix</arg> <arg>--targetPath</arg><arg>${targetPath}/provision/scholix</arg>
<arg>--relationPath</arg><arg>${targetPath}/resolved/resolvedRelation</arg> <arg>--relationPath</arg><arg>${sourcePath}/relation_resolved</arg>
</spark> </spark>
<ok to="DropJSONPath"/> <ok to="DropJSONPath"/>
@ -182,9 +155,5 @@
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -15,14 +15,24 @@
</parameters> </parameters>
<start to="ResolveRelations"/> <start to="DropRelFolder"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="DropRelFolder">
<fs>
<delete path='${targetPath}/relation'/>
<delete path='${targetPath}/relation_resolved'/>
<delete path='${targetPath}/resolvedSource'/>
<delete path='${targetPath}/resolvedPid'/>
</fs>
<ok to="ResolveRelations"/>
<error to="Kill"/>
</action>
<action name="ResolveRelations"> <action name="ResolveRelations">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master> <master>yarn</master>