1
0
Fork 0

added SparkResolveEntities node to the oozie wf

This commit is contained in:
Sandro La Bruzzo 2021-11-11 10:25:42 +01:00
parent 9cb195314f
commit 2ca0a436ad
2 changed files with 46 additions and 3 deletions

View File

@ -2,9 +2,11 @@ package eu.dnetlib.dhp.oa.graph.resolution
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.common.HdfsSupport
import eu.dnetlib.dhp.schema.common.EntityType import eu.dnetlib.dhp.schema.common.EntityType
import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset} import eu.dnetlib.dhp.schema.oaf.{OtherResearchProduct, Publication, Result, Software, Dataset => OafDataset}
import org.apache.commons.io.IOUtils import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.slf4j.{Logger, LoggerFactory} import org.slf4j.{Logger, LoggerFactory}
@ -34,13 +36,22 @@ object SparkResolveEntities {
val unresolvedPath = parser.get("unresolvedPath") val unresolvedPath = parser.get("unresolvedPath")
log.info(s"unresolvedPath -> $unresolvedPath") log.info(s"unresolvedPath -> $unresolvedPath")
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(new Path(workingPath))
resolveEntities(spark, workingPath, unresolvedPath) resolveEntities(spark, workingPath, unresolvedPath)
generateResolvedEntities(spark, workingPath, graphBasePath) generateResolvedEntities(spark, workingPath, graphBasePath)
} // TO BE conservative we keep the original entities in the working dir
// and save the resolved entities on the graphBasePath
//In future these lines of code should be removed
entities.foreach {
e =>
fs.rename(new Path(s"$graphBasePath/$e"), new Path(s"$workingPath/${e}_old"))
fs.rename(new Path(s"$workingPath/resolvedGraph/$e"), new Path(s"$graphBasePath/$e"))
}
}
def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = { def resolveEntities(spark: SparkSession, workingPath: String, unresolvedPath: String) = {

View File

@ -4,6 +4,10 @@
<name>graphBasePath</name> <name>graphBasePath</name>
<description>the path of the graph</description> <description>the path of the graph</description>
</property> </property>
<property>
<name>unresolvedPath</name>
<description>the path of the unresolved Entities</description>
</property>
</parameters> </parameters>
<start to="ResolveRelations"/> <start to="ResolveRelations"/>
@ -36,5 +40,33 @@
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="ResolveEntities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Resolve Relations in raw graph</name>
<class>eu.dnetlib.dhp.oa.graph.resolution.SparkResolveEntities</class>
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.shuffle.partitions=10000
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn</arg>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--unresolvedPath</arg><arg>${unresolvedPath}</arg>
<arg>--workingPath</arg><arg>${workingDir}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>