imported changes in stable_id into master

pull/115/head
Sandro La Bruzzo 3 years ago
parent 8c96a82a03
commit 7dc824fc23

@ -12,31 +12,10 @@ import org.slf4j.{Logger, LoggerFactory}
object SparkConvertORCIDToOAF {
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
def run(spark:SparkSession,sourcePath:String,workingPath:String, targetPath:String):Unit = {
def run(spark:SparkSession,workingPath:String, targetPath:String):Unit = {
import spark.implicits._
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
works.joinWith(authors, authors("oid").equalTo(works("oid")))
.map(i =>{
val doi = i._1.doi
val author = i._2
(doi, author)
}).groupBy(col("_1").alias("doi"))
.agg(collect_list(col("_2")).alias("authors"))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
logger.info("Converting ORCID to OAF")
@ -55,10 +34,10 @@ object SparkConvertORCIDToOAF {
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
val workingPath = parser.get("workingPath")
val targetPath = parser.get("targetPath")
run(spark, sourcePath, workingPath, targetPath)
run(spark, workingPath, targetPath)
}

@ -0,0 +1,56 @@
package eu.dnetlib.doiboost.orcid
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import eu.dnetlib.dhp.schema.oaf.Publication
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, collect_list}
import org.slf4j.{Logger, LoggerFactory}
class SparkPreprocessORCID {
val logger: Logger = LoggerFactory.getLogger(getClass)
def run(spark:SparkSession,sourcePath:String,workingPath:String):Unit = {
import spark.implicits._
val inputRDD:RDD[OrcidAuthor] = spark.sparkContext.textFile(s"$sourcePath/authors").map(s => ORCIDToOAF.convertORCIDAuthor(s)).filter(s => s!= null).filter(s => ORCIDToOAF.authorValid(s))
spark.createDataset(inputRDD).as[OrcidAuthor].write.mode(SaveMode.Overwrite).save(s"$workingPath/author")
val res = spark.sparkContext.textFile(s"$sourcePath/works").flatMap(s => ORCIDToOAF.extractDOIWorks(s)).filter(s => s!= null)
spark.createDataset(res).as[OrcidWork].write.mode(SaveMode.Overwrite).save(s"$workingPath/works")
val authors :Dataset[OrcidAuthor] = spark.read.load(s"$workingPath/author").as[OrcidAuthor]
val works :Dataset[OrcidWork] = spark.read.load(s"$workingPath/works").as[OrcidWork]
works.joinWith(authors, authors("oid").equalTo(works("oid")))
.map(i =>{
val doi = i._1.doi
val author = i._2
(doi, author)
}).groupBy(col("_1").alias("doi"))
.agg(collect_list(col("_2")).alias("authors"))
.write.mode(SaveMode.Overwrite).save(s"$workingPath/orcidworksWithAuthor")
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json")))
parser.parseArgument(args)
val spark: SparkSession =
SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master(parser.get("master")).getOrCreate()
val sourcePath = parser.get("sourcePath")
val workingPath = parser.get("workingPath")
run(spark, sourcePath, workingPath)
}
}

@ -1,6 +1,6 @@
[
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the target dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

@ -1,101 +0,0 @@
<workflow-app name="import Crossref from index into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<property>
<name>timestamp</name>
<description>Timestamp for incremental Harvesting</description>
</property>
</parameters>
<start to="ImportCrossRef"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>-t</arg><arg>${workingPath}/input/crossref/index_update</arg>
<arg>-n</arg><arg>${nameNode}</arg>
<arg>-ts</arg><arg>${timestamp}</arg>
</java>
<ok to="GenerateDataset"/>
<error to="Kill"/>
</action>
<action name="GenerateDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ExtractCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--workingPath</arg><arg>/data/doiboost/input/crossref</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="RenameDataset"/>
<error to="Kill"/>
</action>
<action name="RenameDataset">
<fs>
<delete path='${workingPath}/input/crossref/crossref_ds'/>
<move source="${workingPath}/input/crossref/crossref_ds_updated"
target="${workingPath}/input/crossref/crossref_ds"/>
</fs>
<ok to="ConvertCrossrefToOAF"/>
<error to="Kill"/>
</action>
<action name="ConvertCrossrefToOAF">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ConvertCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}/input/crossref/crossref_ds</arg>
<arg>--targetPath</arg><arg>${workingPath}/process/</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -1,38 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

@ -1,96 +0,0 @@
<workflow-app name="Create DOIBoostActionSet" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>hostedByMapPath</name>
<description>the Hosted By Map Path</description>
</property>
<property>
<name>affiliationPath</name>
<description>the Affliation Path</description>
</property>
<property>
<name>paperAffiliationPath</name>
<description>the paperAffiliation Path</description>
</property>
<property>
<name>workingDirPath</name>
<description>the Working Path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="CreateDOIBoost"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="CreateDOIBoost">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create DOIBoost Infospace</name>
<class>eu.dnetlib.doiboost.SparkGenerateDoiBoost</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
<arg>--affiliationPath</arg><arg>${affiliationPath}</arg>
<arg>--paperAffiliationPath</arg><arg>${paperAffiliationPath}</arg>
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="GenerateActionSet"/>
<error to="Kill"/>
</action>
<action name="GenerateActionSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Generate DOIBoost ActionSet</name>
<class>eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--dbPublicationPath</arg><arg>${workingDirPath}/doiBoostPublicationFiltered</arg>
<arg>--dbDatasetPath</arg><arg>${workingDirPath}/crossrefDataset</arg>
<arg>--crossRefRelation</arg><arg>${workingDirPath}/crossrefRelation</arg>
<arg>--dbaffiliationRelationPath</arg><arg>${workingDirPath}/doiBoostPublicationAffiliation</arg>
<arg>-do</arg><arg>${workingDirPath}/doiBoostOrganization</arg>
<arg>--targetPath</arg><arg>${workingDirPath}/actionDataSet</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -1,92 +0,0 @@
<workflow-app name="import MAG into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working dir base path</description>
</property>
<property>
<name>targetPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>workingPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="ResetWorkingPath"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ResetWorkingPath">
<fs>
<delete path='${workingPath}'/>
<mkdir path='${workingPath}'/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="PreprocessMag"/>
<error to="Kill"/>
</action>
<action name="PreprocessMag">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to OAF Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkPreProcessMAG</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${workingPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}/process</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -0,0 +1,194 @@
<workflow-app name="preprocess DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorIntersectionMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<!-- Crossref Parameters -->
<property>
<name>inputPathCrossref</name>
<description>the Crossref input path</description>
</property>
<property>
<name>crossrefTimestamp</name>
<description>Timestamp for the Crossref incremental Harvesting</description>
</property>
<property>
<name>esServer</name>
<description>elasticsearch server url for the Crossref Harvesting</description>
</property>
<property>
<name>esIndex</name>
<description>elasticsearch index name for the Crossref Harvesting</description>
</property>
<!-- MAG Parameters -->
<property>
<name>MAGDumpPath</name>
<description>the MAG dump working path</description>
</property>
<property>
<name>inputPathMAG</name>
<description>the MAG working path</description>
</property>
<!-- ORCID Parameters -->
<property>
<name>inputPathOrcid</name>
<description>the ORCID input path</description>
</property>
<property>
<name>workingPathOrcid</name>
<description>the ORCID working path</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="ImportCrossRef"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ImportCrossRef">
<java>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--esServer</arg><arg>${esServer}</arg>
<arg>--esIndex</arg><arg>${esIndex}</arg>
<arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>
</java>
<ok to="GenerateCrossrefDataset"/>
<error to="Kill"/>
</action>
<!-- CROSSREF SECTION -->
<action name="GenerateCrossrefDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>GenerateCrossrefDataset</name>
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="RenameDataset"/>
<error to="Kill"/>
</action>
<action name="RenameDataset">
<fs>
<delete path="${inputPathCrossref}/crossref_ds"/>
<move source="${inputPathCrossref}/crossref_ds_updated"
target="${inputPathCrossref}/crossref_ds"/>
</fs>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- MAG SECTION -->
<action name="ResetMagWorkingPath">
<fs>
<delete path="${inputPathMAG}/dataset"/>
<delete path="${inputPathMAG}/process"/>
</fs>
<ok to="ConvertMagToDataset"/>
<error to="Kill"/>
</action>
<action name="ConvertMagToDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="PreprocessORCID"/>
<error to="Kill"/>
</action>
<!-- ORCID SECTION -->
<action name="PreprocessORCID">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert ORCID to Dataset</name>
<class>eu.dnetlib.doiboost.orcid.SparkPreprocessORCID</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -0,0 +1,6 @@
[
{"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true},
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the working dir path", "paramRequired": true},
{"paramName":"m", "paramLongName":"master", "paramDescription": "the master name", "paramRequired": true}
]

@ -16,8 +16,8 @@
<value>spark2</value>
</property>
<property>
<name>oozie.wf.rerun.failnodes</name>
<value>false</value>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
<property>
<name>hive_metastore_uris</name>

@ -0,0 +1,262 @@
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorIntersectionMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
<!-- Itersection Parameters -->
<property>
<name>workingPath</name>
<description>the working Path</description>
</property>
<property>
<name>hostedByMapPath</name>
<description>the hostedByMap Path</description>
</property>
<property>
<name>outputPath</name>
<description>the Path of the sequence file action set</description>
</property>
<!-- Crossref Parameters -->
<property>
<name>inputPathCrossref</name>
<description>the Crossref input path</description>
</property>
<!-- MAG Parameters -->
<property>
<name>inputPathMAG</name>
<description>the MAG working path</description>
</property>
<!-- UnpayWall Parameters -->
<property>
<name>inputPathUnpayWall</name>
<description>the UnpayWall working path</description>
</property>
<!-- ORCID Parameters -->
<property>
<name>inputPathOrcid</name>
<description>the ORCID input path</description>
</property>
<property>
<name>workingPathOrcid</name>
<description>the ORCID working path</description>
</property>
</parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="resume_from"/>
<decision name="resume_from">
<switch>
<case to="ProcessMAG">${wf:conf('resumeFrom') eq 'PreprocessMag'}</case>
<case to="ProcessUW">${wf:conf('resumeFrom') eq 'PreprocessUW'}</case>
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'PreprocessORCID'}</case>
<case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case>
<case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case>
<default to="ConvertCrossrefToOAF"/>
</switch>
</decision>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ConvertCrossrefToOAF">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>ConvertCrossrefToOAF</name>
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ProcessMAG"/>
<error to="Kill"/>
</action>
<action name="ProcessMAG">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert Mag to OAF Dataset</name>
<class>eu.dnetlib.doiboost.mag.SparkProcessMAG</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorIntersectionMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
<arg>--workingPath</arg><arg>${inputPathMAG}/process_p</arg>
<arg>--targetPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ProcessUW"/>
<error to="Kill"/>
</action>
<!-- UnpayWall SECTION -->
<action name="ProcessUW">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert UnpayWall to Dataset</name>
<class>eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--sourcePath</arg><arg>${inputPathUnpayWall}/uw_extracted</arg>
<arg>--targetPath</arg><arg>${workingPath}/uwPublication</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="ProcessORCID"/>
<error to="Kill"/>
</action>
<!-- ORCID SECTION -->
<action name="ProcessORCID">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert ORCID to Dataset</name>
<class>eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
<arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="CreateDOIBoost"/>
<error to="Kill"/>
</action>
<!-- INTERSECTION SECTION-->
<action name="CreateDOIBoost">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Create DOIBoost Infospace</name>
<class>eu.dnetlib.doiboost.SparkGenerateDoiBoost</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorIntersectionMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
<arg>--affiliationPath</arg><arg>${inputPathMAG}/dataset/Affiliations</arg>
<arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/dataset/PaperAuthorAffiliations</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="GenerateActionSet"/>
<error to="Kill"/>
</action>
<action name="GenerateActionSet">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Generate DOIBoost ActionSet</name>
<class>eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--dbPublicationPath</arg><arg>${workingPath}/doiBoostPublicationFiltered</arg>
<arg>--dbDatasetPath</arg><arg>${workingPath}/crossrefDataset</arg>
<arg>--crossRefRelation</arg><arg>${workingPath}/crossrefRelation</arg>
<arg>--dbaffiliationRelationPath</arg><arg>${workingPath}/doiBoostPublicationAffiliation</arg>
<arg>--dbOrganizationPath</arg><arg>${workingPath}/doiBoostOrganization</arg>
<arg>--targetPath</arg><arg>${workingPath}/actionDataSet</arg>
<arg>--sFilePath</arg><arg>${outputPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>

@ -1,38 +0,0 @@
<configuration>
<property>
<name>jobTracker</name>
<value>yarnRM</value>
</property>
<property>
<name>nameNode</name>
<value>hdfs://nameservice1</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>spark2</value>
</property>
<property>
<name>hive_metastore_uris</name>
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
</property>
<property>
<name>spark2YarnHistoryServerAddress</name>
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
</property>
<property>
<name>spark2EventLogDir</name>
<value>/user/spark/spark2ApplicationHistory</value>
</property>
<property>
<name>spark2ExtraListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
</property>
<property>
<name>spark2SqlQueryExecutionListeners</name>
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
</property>
</configuration>

@ -1,55 +0,0 @@
<workflow-app name="import UnpayWall into HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<property>
<name>sourcePath</name>
<description>the working dir base path</description>
</property>
<property>
<name>targetPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
</property>
<property>
<name>sparkExecutorMemory</name>
<description>memory for individual executor</description>
</property>
<property>
<name>sparkExecutorCores</name>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="PreprocessUW"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="PreprocessUW">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>Convert UnpayWall to Dataset</name>
<class>eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
${sparkExtraOPT}
</spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/uw_extracted</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Loading…
Cancel
Save