Code refactor:
- removed old workflows in doiboost - splitted workflow of doiboost in preprocess and process
This commit is contained in:
parent
c35c117601
commit
3d8e2aa146
|
@ -10,6 +10,16 @@ import org.slf4j.{Logger, LoggerFactory}
|
|||
object SparkConvertORCIDToOAF {
|
||||
val logger: Logger = LoggerFactory.getLogger(SparkConvertORCIDToOAF.getClass)
|
||||
|
||||
|
||||
def run(spark:SparkSession, workingPath:String, targetPath:String) :Unit = {
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
import spark.implicits._
|
||||
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
|
||||
|
||||
logger.info("Converting ORCID to OAF")
|
||||
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json")))
|
||||
|
@ -21,16 +31,11 @@ object SparkConvertORCIDToOAF {
|
|||
.appName(getClass.getSimpleName)
|
||||
.master(parser.get("master")).getOrCreate()
|
||||
|
||||
implicit val mapEncoderPubs: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
import spark.implicits._
|
||||
|
||||
val workingPath = parser.get("workingPath")
|
||||
val targetPath = parser.get("targetPath")
|
||||
|
||||
val dataset: Dataset[ORCIDItem] =spark.read.load(s"$workingPath/orcidworksWithAuthor").as[ORCIDItem]
|
||||
|
||||
logger.info("Converting ORCID to OAF")
|
||||
dataset.map(o => ORCIDToOAF.convertTOOAF(o)).write.mode(SaveMode.Overwrite).save(targetPath)
|
||||
run(spark,workingPath, targetPath)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ object SparkPreprocessORCID {
|
|||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val conf: SparkConf = new SparkConf()
|
||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/convert_orcid_to_oaf_params.json")))
|
||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkConvertORCIDToOAF.getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/preprocess_orcid_params.json")))
|
||||
parser.parseArgument(args)
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
|
|
|
@ -1,101 +0,0 @@
|
|||
<workflow-app name="import Crossref from index into HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>the working dir base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>timestamp</name>
|
||||
<description>Timestamp for incremental Harvesting</description>
|
||||
</property>
|
||||
|
||||
</parameters>
|
||||
|
||||
<start to="ImportCrossRef"/>
|
||||
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ImportCrossRef">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
|
||||
<arg>-t</arg><arg>${workingPath}/input/crossref/index_update</arg>
|
||||
<arg>-n</arg><arg>${nameNode}</arg>
|
||||
<arg>-ts</arg><arg>${timestamp}</arg>
|
||||
</java>
|
||||
<ok to="GenerateDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ExtractCrossrefToOAF</name>
|
||||
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--workingPath</arg><arg>/data/doiboost/input/crossref</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="RenameDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="RenameDataset">
|
||||
<fs>
|
||||
<delete path='${workingPath}/input/crossref/crossref_ds'/>
|
||||
<move source="${workingPath}/input/crossref/crossref_ds_updated"
|
||||
target="${workingPath}/input/crossref/crossref_ds"/>
|
||||
</fs>
|
||||
<ok to="ConvertCrossrefToOAF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ConvertCrossrefToOAF">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>ConvertCrossrefToOAF</name>
|
||||
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}/input/crossref/crossref_ds</arg>
|
||||
<arg>--targetPath</arg><arg>${workingPath}/process/</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,38 +0,0 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -1,96 +0,0 @@
|
|||
<workflow-app name="Create DOIBoostActionSet" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>hostedByMapPath</name>
|
||||
<description>the Hosted By Map Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>affiliationPath</name>
|
||||
<description>the Affliation Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>paperAffiliationPath</name>
|
||||
<description>the paperAffiliation Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingDirPath</name>
|
||||
<description>the Working Path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
|
||||
|
||||
<start to="CreateDOIBoost"/>
|
||||
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
|
||||
|
||||
<action name="CreateDOIBoost">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Create DOIBoost Infospace</name>
|
||||
<class>eu.dnetlib.doiboost.SparkGenerateDoiBoost</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
|
||||
<arg>--affiliationPath</arg><arg>${affiliationPath}</arg>
|
||||
<arg>--paperAffiliationPath</arg><arg>${paperAffiliationPath}</arg>
|
||||
<arg>--workingDirPath</arg><arg>${workingDirPath}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="GenerateActionSet"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="GenerateActionSet">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Generate DOIBoost ActionSet</name>
|
||||
<class>eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--dbPublicationPath</arg><arg>${workingDirPath}/doiBoostPublicationFiltered</arg>
|
||||
<arg>--dbDatasetPath</arg><arg>${workingDirPath}/crossrefDataset</arg>
|
||||
<arg>--crossRefRelation</arg><arg>${workingDirPath}/crossrefRelation</arg>
|
||||
<arg>--dbaffiliationRelationPath</arg><arg>${workingDirPath}/doiBoostPublicationAffiliation</arg>
|
||||
<arg>-do</arg><arg>${workingDirPath}/doiBoostOrganization</arg>
|
||||
<arg>--targetPath</arg><arg>${workingDirPath}/actionDataSet</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,42 +0,0 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.wf.rerun.failnodes</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -1,92 +0,0 @@
|
|||
<workflow-app name="import MAG into HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the working dir base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>targetPath</name>
|
||||
<description>the working dir base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
<description>the working dir base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="ResetWorkingPath"/>
|
||||
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ResetWorkingPath">
|
||||
<fs>
|
||||
<delete path='${workingPath}'/>
|
||||
<mkdir path='${workingPath}'/>
|
||||
</fs>
|
||||
<ok to="ConvertMagToDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ConvertMagToDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert Mag to Dataset</name>
|
||||
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}</arg>
|
||||
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="PreprocessMag"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
<action name="PreprocessMag">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert Mag to OAF Dataset</name>
|
||||
<class>eu.dnetlib.doiboost.mag.SparkPreProcessMAG</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${workingPath}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPath}/process</arg>
|
||||
<arg>--targetPath</arg><arg>${targetPath}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -0,0 +1,216 @@
|
|||
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
|
||||
<!-- Crossref Parameters -->
|
||||
<property>
|
||||
<name>inputPathCrossref</name>
|
||||
<description>the Crossref input path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>crossrefDumpPath</name>
|
||||
<description>the Crossref dump path</description>
|
||||
</property>
|
||||
|
||||
<!-- MAG Parameters -->
|
||||
<property>
|
||||
<name>MAGDumpPath</name>
|
||||
<description>the MAG dump working path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>inputPathMAG</name>
|
||||
<description>the MAG working path</description>
|
||||
</property>
|
||||
|
||||
|
||||
<!-- ORCID Parameters -->
|
||||
<property>
|
||||
<name>inputPathOrcid</name>
|
||||
<description>the ORCID input path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>workingPathOrcid</name>
|
||||
<description>the ORCID working path</description>
|
||||
</property>
|
||||
|
||||
</parameters>
|
||||
|
||||
<global>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>${oozieActionShareLibForSpark2}</value>
|
||||
</property>
|
||||
</configuration>
|
||||
</global>
|
||||
|
||||
<start to="resume_from"/>
|
||||
|
||||
<decision name="resume_from">
|
||||
<switch>
|
||||
<case to="UnpackCrossrefEntries">${wf:conf('resumeFrom') eq 'UnpackCrossrefEntries'}</case>
|
||||
<case to="GenerateCrossrefDataset">${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'}</case>
|
||||
<case to="ResetMagWorkingPath">${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'}</case>
|
||||
<case to="ConvertMagToDataset">${wf:conf('resumeFrom') eq 'ConvertMagToDataset'}</case>
|
||||
<case to="PreProcessORCID">${wf:conf('resumeFrom') eq 'PreProcessORCID'}</case>
|
||||
<default to="ImportCrossRef"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="ImportCrossRef">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
|
||||
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
|
||||
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${crossrefDumpPath}/files/</arg>
|
||||
</java>
|
||||
<ok to="UnpackCrossrefEntries"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="UnpackCrossrefEntries">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>SparkUnpackCrossrefEntries</name>
|
||||
<class>eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/files</arg>
|
||||
<arg>--targetPath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="GenerateCrossrefDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateCrossrefDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>SparkGenerateCrossrefDataset</name>
|
||||
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=7G
|
||||
--executor-cores=2
|
||||
--driver-memory=7G
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
|
||||
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="removeFiles"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="removeFiles">
|
||||
<fs>
|
||||
<!-- <delete path="${crossrefDumpPath}/files"/>-->
|
||||
<delete path="${crossrefDumpPath}/crossref_unpack/"/>
|
||||
</fs>
|
||||
<ok to="ResetMagWorkingPath"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<!-- MAG SECTION -->
|
||||
<action name="ResetMagWorkingPath">
|
||||
<fs>
|
||||
<delete path="${inputPathMAG}/dataset"/>
|
||||
<delete path="${inputPathMAG}/process"/>
|
||||
</fs>
|
||||
<ok to="ConvertMagToDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ConvertMagToDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert Mag to Dataset</name>
|
||||
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="PreProcessORCID"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<!-- ORCID SECTION -->
|
||||
<action name="PreProcessORCID">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert ORCID to Dataset</name>
|
||||
<class>eu.dnetlib.doiboost.orcid.SparkPreprocessORCID</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,4 +1,4 @@
|
|||
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
|
||||
<workflow-app name="Generate DOIBoost ActionSet for PROD" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
|
@ -17,8 +17,6 @@
|
|||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
|
||||
|
||||
<!-- Itersection Parameters -->
|
||||
<property>
|
||||
<name>workingPath</name>
|
||||
|
@ -40,29 +38,8 @@
|
|||
<name>inputPathCrossref</name>
|
||||
<description>the Crossref input path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>crossrefDumpPath</name>
|
||||
<description>the Crossref dump path</description>
|
||||
</property>
|
||||
<!-- <property>-->
|
||||
<!-- <name>crossrefTimestamp</name>-->
|
||||
<!-- <description>Timestamp for the Crossref incremental Harvesting</description>-->
|
||||
<!-- </property>-->
|
||||
<!-- <property>-->
|
||||
<!-- <name>esServer</name>-->
|
||||
<!-- <description>elasticsearch server url for the Crossref Harvesting</description>-->
|
||||
<!-- </property>-->
|
||||
<!-- <property>-->
|
||||
<!-- <name>esIndex</name>-->
|
||||
<!-- <description>elasticsearch index name for the Crossref Harvesting</description>-->
|
||||
<!-- </property>-->
|
||||
|
||||
<!-- MAG Parameters -->
|
||||
<property>
|
||||
<name>MAGDumpPath</name>
|
||||
<description>the MAG dump working path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>inputPathMAG</name>
|
||||
<description>the MAG working path</description>
|
||||
|
@ -76,11 +53,6 @@
|
|||
</property>
|
||||
|
||||
<!-- ORCID Parameters -->
|
||||
<property>
|
||||
<name>inputPathOrcid</name>
|
||||
<description>the ORCID input path</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>workingPathOrcid</name>
|
||||
<description>the ORCID working path</description>
|
||||
|
@ -103,15 +75,12 @@
|
|||
|
||||
<decision name="resume_from">
|
||||
<switch>
|
||||
<case to="ConvertCrossrefToOAF">${wf:conf('resumeFrom') eq 'ConvertCrossrefToOAF'}</case>
|
||||
<case to="ResetMagWorkingPath">${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'}</case>
|
||||
<case to="ProcessMAG">${wf:conf('resumeFrom') eq 'PreprocessMag'}</case>
|
||||
<case to="ProcessUW">${wf:conf('resumeFrom') eq 'PreprocessUW'}</case>
|
||||
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'PreprocessORCID'}</case>
|
||||
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'ProcessORCID'}</case>
|
||||
<case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case>
|
||||
<case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case>
|
||||
<case to="GenerateCrossrefDataset">${wf:conf('resumeFrom') eq 'GenerateCrossrefDataset'}</case>
|
||||
<default to="ImportCrossRef"/>
|
||||
<default to="ConvertCrossrefToOAF"/>
|
||||
</switch>
|
||||
</decision>
|
||||
|
||||
|
@ -119,170 +88,6 @@
|
|||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<!-- <action name="ImportCrossRef">-->
|
||||
<!-- <java>-->
|
||||
<!-- <main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>-->
|
||||
<!-- <arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>-->
|
||||
<!-- <arg>--namenode</arg><arg>${nameNode}</arg>-->
|
||||
<!-- <arg>--esServer</arg><arg>${esServer}</arg>-->
|
||||
<!-- <arg>--esIndex</arg><arg>${esIndex}</arg>-->
|
||||
<!-- <arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>-->
|
||||
<!-- </java>-->
|
||||
<!-- <ok to="GenerateCrossrefDataset"/>-->
|
||||
<!-- <error to="Kill"/>-->
|
||||
<!-- </action>-->
|
||||
|
||||
<action name="ImportCrossRef">
|
||||
<java>
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
|
||||
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
|
||||
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
|
||||
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${crossrefDumpPath}/files/</arg>
|
||||
</java>
|
||||
<ok to="UnpackCrossrefEntries"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="UnpackCrossrefEntries">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>SparkUnpackCrossrefEntries</name>
|
||||
<class>eu.dnetlib.doiboost.crossref.UnpackCrtossrefEntries</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/files</arg>
|
||||
<arg>--targetPath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="GenerateCrossrefDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="GenerateCrossrefDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>SparkGenerateCrossrefDataset</name>
|
||||
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=7G
|
||||
--executor-cores=2
|
||||
--driver-memory=7G
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
<arg>--sourcePath</arg><arg>${crossrefDumpPath}/crossref_unpack/</arg>
|
||||
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="removeFiles"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="removeFiles">
|
||||
<fs>
|
||||
<!-- <delete path="${crossrefDumpPath}/files"/>-->
|
||||
<delete path="${crossrefDumpPath}/crossref_unpack/"/>
|
||||
</fs>
|
||||
<ok to="ResetMagWorkingPath"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
|
||||
<!-- CROSSREF SECTION -->
|
||||
|
||||
<!-- <action name="GenerateCrossrefDataset">-->
|
||||
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
|
||||
<!-- <master>yarn-cluster</master>-->
|
||||
<!-- <mode>cluster</mode>-->
|
||||
<!-- <name>GenerateCrossrefDataset</name>-->
|
||||
<!-- <class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>-->
|
||||
<!-- <jar>dhp-doiboost-${projectVersion}.jar</jar>-->
|
||||
<!-- <spark-opts>-->
|
||||
<!-- --executor-memory=${sparkExecutorMemory}-->
|
||||
<!-- --executor-cores=${sparkExecutorCores}-->
|
||||
<!-- --driver-memory=${sparkDriverMemory}-->
|
||||
<!-- --conf spark.sql.shuffle.partitions=3840-->
|
||||
<!-- --conf spark.extraListeners=${spark2ExtraListeners}-->
|
||||
<!-- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
|
||||
<!-- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
|
||||
<!-- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
|
||||
<!-- </spark-opts>-->
|
||||
<!-- <arg>--workingPath</arg><arg>${inputPathCrossref}</arg>-->
|
||||
<!-- <arg>--master</arg><arg>yarn-cluster</arg>-->
|
||||
<!-- </spark>-->
|
||||
<!-- <ok to="RenameDataset"/>-->
|
||||
<!-- <error to="Kill"/>-->
|
||||
<!-- </action>-->
|
||||
|
||||
<!-- <action name="RenameDataset">-->
|
||||
<!-- <fs>-->
|
||||
<!-- <delete path="${inputPathCrossref}/crossref_ds"/>-->
|
||||
<!-- <move source="${inputPathCrossref}/crossref_ds_updated"-->
|
||||
<!-- target="${inputPathCrossref}/crossref_ds"/>-->
|
||||
<!-- </fs>-->
|
||||
<!-- <ok to="ResetMagWorkingPath"/>-->
|
||||
<!-- <error to="Kill"/>-->
|
||||
<!-- </action>-->
|
||||
|
||||
|
||||
|
||||
<!-- MAG SECTION -->
|
||||
<action name="ResetMagWorkingPath">
|
||||
<fs>
|
||||
<delete path="${inputPathMAG}/dataset"/>
|
||||
<delete path="${inputPathMAG}/process"/>
|
||||
</fs>
|
||||
<ok to="ConvertMagToDataset"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<action name="ConvertMagToDataset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert Mag to Dataset</name>
|
||||
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${MAGDumpPath}</arg>
|
||||
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="ConvertCrossrefToOAF"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="ConvertCrossrefToOAF">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
|
@ -326,7 +131,7 @@
|
|||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
|
||||
<arg>--workingPath</arg><arg>${inputPathMAG}/process</arg>
|
||||
<arg>--workingPath</arg><arg>${inputPathMAG}/process_p</arg>
|
||||
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
|
@ -368,7 +173,7 @@
|
|||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert ORCID to Dataset</name>
|
||||
<class>eu.dnetlib.doiboost.orcid.SparkPreprocessORCID</class>
|
||||
<class>eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
|
@ -380,7 +185,6 @@
|
|||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
|
||||
<arg>--workingPath</arg><arg>${workingPathOrcid}</arg>
|
||||
<arg>--targetPath</arg><arg>${workingPath}/orcidPublication</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
|
@ -1,38 +0,0 @@
|
|||
<configuration>
|
||||
<property>
|
||||
<name>jobTracker</name>
|
||||
<value>yarnRM</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>nameNode</name>
|
||||
<value>hdfs://nameservice1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.use.system.libpath</name>
|
||||
<value>true</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>oozie.action.sharelib.for.spark</name>
|
||||
<value>spark2</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>hive_metastore_uris</name>
|
||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2YarnHistoryServerAddress</name>
|
||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2EventLogDir</name>
|
||||
<value>/user/spark/spark2ApplicationHistory</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2ExtraListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>spark2SqlQueryExecutionListeners</name>
|
||||
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||
</property>
|
||||
</configuration>
|
|
@ -1,55 +0,0 @@
|
|||
<workflow-app name="import UnpayWall into HDFS" xmlns="uri:oozie:workflow:0.5">
|
||||
<parameters>
|
||||
<property>
|
||||
<name>sourcePath</name>
|
||||
<description>the working dir base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>targetPath</name>
|
||||
<description>the working dir base path</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkDriverMemory</name>
|
||||
<description>memory for driver process</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorMemory</name>
|
||||
<description>memory for individual executor</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>sparkExecutorCores</name>
|
||||
<description>number of cores used by single executor</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="PreprocessUW"/>
|
||||
|
||||
|
||||
<kill name="Kill">
|
||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<action name="PreprocessUW">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Convert UnpayWall to Dataset</name>
|
||||
<class>eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF</class>
|
||||
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
${sparkExtraOPT}
|
||||
</spark-opts>
|
||||
<arg>--sourcePath</arg><arg>${sourcePath}/uw_extracted</arg>
|
||||
<arg>--targetPath</arg><arg>${targetPath}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
<end name="End"/>
|
||||
</workflow-app>
|
|
@ -1,22 +1,15 @@
|
|||
package eu.dnetlib.doiboost.mag
|
||||
|
||||
import java.sql.Timestamp
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||
import org.apache.htrace.fasterxml.jackson.databind.SerializationFeature
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.apache.spark.api.java.function.MapFunction
|
||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.sql.{Dataset, SparkSession}
|
||||
import org.codehaus.jackson.map.ObjectMapper
|
||||
import org.junit.jupiter.api.Assertions._
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.json4s.DefaultFormats
|
||||
import org.slf4j.{Logger, LoggerFactory}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import java.sql.Timestamp
|
||||
import scala.io.Source
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.matching.Regex
|
||||
|
||||
|
||||
|
||||
|
@ -65,14 +58,19 @@ class MAGMappingTest {
|
|||
@Test
|
||||
def normalizeDoiTest():Unit = {
|
||||
|
||||
import org.json4s.jackson.Serialization.write
|
||||
import org.json4s.DefaultFormats
|
||||
|
||||
|
||||
implicit val formats = DefaultFormats
|
||||
|
||||
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
|
||||
val sc = new SparkContext(conf)
|
||||
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()
|
||||
val conf = new SparkConf()
|
||||
conf.setMaster("local[*]")
|
||||
conf.set("spark.driver.host", "localhost")
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.appName(getClass.getSimpleName)
|
||||
.config(conf)
|
||||
.getOrCreate()
|
||||
val path = getClass.getResource("magPapers.json").getPath
|
||||
|
||||
import org.apache.spark.sql.Encoders
|
||||
|
@ -90,14 +88,19 @@ class MAGMappingTest {
|
|||
@Test
|
||||
def normalizeDoiTest2():Unit = {
|
||||
|
||||
import org.json4s.jackson.Serialization.write
|
||||
import org.json4s.DefaultFormats
|
||||
|
||||
implicit val formats = DefaultFormats
|
||||
|
||||
val conf = new SparkConf().setAppName("test").setMaster("local[2]")
|
||||
val sc = new SparkContext(conf)
|
||||
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()
|
||||
val conf = new SparkConf()
|
||||
conf.setMaster("local[*]")
|
||||
conf.set("spark.driver.host", "localhost")
|
||||
val spark: SparkSession =
|
||||
SparkSession
|
||||
.builder()
|
||||
.appName(getClass.getSimpleName)
|
||||
.config(conf)
|
||||
.getOrCreate()
|
||||
val path = getClass.getResource("duplicatedMagPapers.json").getPath
|
||||
|
||||
import org.apache.spark.sql.Encoders
|
||||
|
|
|
@ -48,6 +48,8 @@ class MappingORCIDToOAFTest {
|
|||
|
||||
SparkPreprocessORCID.run( spark,sourcePath, workingPath)
|
||||
|
||||
SparkConvertORCIDToOAF.run(spark, workingPath,targetPath)
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
|
||||
|
||||
|
@ -61,6 +63,8 @@ class MappingORCIDToOAFTest {
|
|||
assertTrue(oA == p.count())
|
||||
println(mapper.writerWithDefaultPrettyPrinter().writeValueAsString(p.first()))
|
||||
|
||||
spark.close()
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
@ -78,7 +82,7 @@ class MappingORCIDToOAFTest {
|
|||
val oaf = ORCIDToOAF.convertTOOAF(orcid)
|
||||
assert(oaf.getPid.size() == 1)
|
||||
oaf.getPid.toList.foreach(pid => assert(pid.getQualifier.getClassid.equals("doi")))
|
||||
oaf.getPid.toList.foreach(pid => assert(pid.getValue.equals("10.1042/BCJ20160876".toLowerCase())))
|
||||
oaf.getPid.toList.foreach(pid => assert(pid.getValue.equals("10.1042/BCJ20160876")))
|
||||
//println(mapper.writeValueAsString(ORCIDToOAF.convertTOOAF(orcid)))
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue