changed the workflow to extract info from the dump

This commit is contained in:
Miriam Baglioni 2021-06-15 09:22:54 +02:00
parent d6e21bb6ea
commit f7379255b6
3 changed files with 143 additions and 120 deletions

View File

@ -1,54 +0,0 @@
package eu.dnetlib.doiboost.crossref
import eu.dnetlib.dhp.application.ArgumentApplicationParser
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
import org.json4s
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import scala.io.Source
object UnpackCrossrefDumpEntries {
val log: Logger = LoggerFactory.getLogger(UnpackCrossrefDumpEntries.getClass)
def extractDump(input:String):List[String] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
lazy val json: json4s.JValue = parse(input)
val a = (json \ "items").extract[JArray]
a.arr.map(s => compact(render(s)))
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf
val parser = new ArgumentApplicationParser(Source.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/doiboost/crossref_dump_reader/generate_dataset_params.json")).mkString)
parser.parseArgument(args)
val master = parser.get("master")
val sourcePath = parser.get("sourcePath")
val targetPath = parser.get("targetPath")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName(UnpackCrossrefDumpEntries.getClass.getSimpleName)
.master(master)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.wholeTextFiles(sourcePath,6000).flatMap(d =>extractDump(d._2))
.saveAsTextFile(targetPath, classOf[GzipCodec]);
}
}

View File

@ -1,9 +1,13 @@
<workflow-app name="read Crossref dump from HDFS" xmlns="uri:oozie:workflow:0.5">
<parameters>
<!-- <property>-->
<!-- <name>workingPath</name>-->
<!-- <description>the working dir base path</description>-->
<!-- </property>-->
<property>
<name>crossrefDumpPath</name>
<description>the working dir base path</description>
</property>
<property>
<name>inputPathCrossref</name>
<description>the working dir base path</description>
</property>
<property>
<name>sparkDriverMemory</name>
<description>memory for driver process</description>
@ -14,25 +18,27 @@
</property>
<property>
<name>sparkExecutorCores</name>
<value>2</value>
<description>number of cores used by single executor</description>
</property>
</parameters>
<start to="generateCrossrefDataset"/>
<start to="ImportCrossRef"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="ReadCrossRefDump">
<action name="ImportCrossRef">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--workingPath</arg><arg>/data/doiboost/crossref/</arg>
<arg>--crossrefFileNameTarGz</arg><arg>crossref.tar.gz</arg>
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/files/</arg>
</java>
<ok to="generateCrossrefDataset"/>
<error to="Kill"/>
@ -42,24 +48,42 @@
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkCreateCrossredDataset</name>
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDatasetSpark</class>
<name>SparkGenerateCrossrefDataset</name>
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=20
--executor-memory=6G
--driver-memory=7G
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
</spark-opts>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>/data/doiboost/crossref/filess</arg>
<arg>--targetPath</arg><arg>/tmp/miriam/crossref/crossrefDataset</arg>
<arg>--sourcePath</arg><arg>${workingDir}/files</arg>
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds_updated</arg>
</spark>
<ok to="removeFiles"/>
<error to="Kill"/>
</action>
<action name="removeFiles">
<fs>
<delete path="${workingDir}/files"/>
</fs>
<ok to="renameDataset"/>
<error to="Kill"/>
</action>
<action name="renameDataset">
<fs>
<delete path="${inputPathCrossref}/crossref_ds"/>
<move source="${inputPathCrossref}/crossref_ds_updated"
target="${inputPathCrossref}/crossref_ds"/>
</fs>
<ok to="End"/>
<error to="Kill"/>
</action>

View File

@ -41,17 +41,21 @@
<description>the Crossref input path</description>
</property>
<property>
<name>crossrefTimestamp</name>
<description>Timestamp for the Crossref incremental Harvesting</description>
</property>
<property>
<name>esServer</name>
<description>elasticsearch server url for the Crossref Harvesting</description>
</property>
<property>
<name>esIndex</name>
<description>elasticsearch index name for the Crossref Harvesting</description>
<name>crossrefDumpPath</name>
<description>the Crossref dump path</description>
</property>
<!-- <property>-->
<!-- <name>crossrefTimestamp</name>-->
<!-- <description>Timestamp for the Crossref incremental Harvesting</description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>esServer</name>-->
<!-- <description>elasticsearch server url for the Crossref Harvesting</description>-->
<!-- </property>-->
<!-- <property>-->
<!-- <name>esIndex</name>-->
<!-- <description>elasticsearch index name for the Crossref Harvesting</description>-->
<!-- </property>-->
<!-- MAG Parameters -->
<property>
@ -114,55 +118,104 @@
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<!-- <action name="ImportCrossRef">-->
<!-- <java>-->
<!-- <main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>-->
<!-- <arg>&#45;&#45;targetPath</arg><arg>${inputPathCrossref}/index_update</arg>-->
<!-- <arg>&#45;&#45;namenode</arg><arg>${nameNode}</arg>-->
<!-- <arg>&#45;&#45;esServer</arg><arg>${esServer}</arg>-->
<!-- <arg>&#45;&#45;esIndex</arg><arg>${esIndex}</arg>-->
<!-- <arg>&#45;&#45;timestamp</arg><arg>${crossrefTimestamp}</arg>-->
<!-- </java>-->
<!-- <ok to="GenerateCrossrefDataset"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="ImportCrossRef">
<java>
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
<arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--esServer</arg><arg>${esServer}</arg>
<arg>--esIndex</arg><arg>${esIndex}</arg>
<arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.doiboost.crossref.ExtractCrossrefRecords</main-class>
<arg>--hdfsServerUri</arg><arg>${nameNode}</arg>
<arg>--crossrefFileNameTarGz</arg><arg>${crossrefDumpPath}/crossref.tar.gz</arg>
<arg>--workingPath</arg><arg>${crossrefDumpPath}</arg>
<arg>--outputPath</arg><arg>${workingDir}/files</arg>
</java>
<ok to="GenerateCrossrefDataset"/>
<ok to="generateCrossrefDataset"/>
<error to="Kill"/>
</action>
<action name="generateCrossrefDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>SparkGenerateCrossrefDataset</name>
<class>eu.dnetlib.doiboost.crossref.GenerateCrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--master</arg><arg>yarn-cluster</arg>
<arg>--sourcePath</arg><arg>${workingDir}/files/</arg>
<arg>--targetPath</arg><arg>${inputPathCrossref}/crossref_ds_updated</arg>
</spark>
<ok to="removeFiles"/>
<error to="Kill"/>
</action>
<action name="removeFiles">
<fs>
<delete path="${workingDir}/files"/>
</fs>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- CROSSREF SECTION -->
<action name="GenerateCrossrefDataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn-cluster</master>
<mode>cluster</mode>
<name>GenerateCrossrefDataset</name>
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
<jar>dhp-doiboost-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemory}
--executor-cores=${sparkExecutorCores}
--driver-memory=${sparkDriverMemory}
--conf spark.sql.shuffle.partitions=3840
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts>
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
<arg>--master</arg><arg>yarn-cluster</arg>
</spark>
<ok to="RenameDataset"/>
<error to="Kill"/>
</action>
<!-- <action name="GenerateCrossrefDataset">-->
<!-- <spark xmlns="uri:oozie:spark-action:0.2">-->
<!-- <master>yarn-cluster</master>-->
<!-- <mode>cluster</mode>-->
<!-- <name>GenerateCrossrefDataset</name>-->
<!-- <class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>-->
<!-- <jar>dhp-doiboost-${projectVersion}.jar</jar>-->
<!-- <spark-opts>-->
<!-- &#45;&#45;executor-memory=${sparkExecutorMemory}-->
<!-- &#45;&#45;executor-cores=${sparkExecutorCores}-->
<!-- &#45;&#45;driver-memory=${sparkDriverMemory}-->
<!-- &#45;&#45;conf spark.sql.shuffle.partitions=3840-->
<!-- &#45;&#45;conf spark.extraListeners=${spark2ExtraListeners}-->
<!-- &#45;&#45;conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}-->
<!-- &#45;&#45;conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}-->
<!-- &#45;&#45;conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}-->
<!-- </spark-opts>-->
<!-- <arg>&#45;&#45;workingPath</arg><arg>${inputPathCrossref}</arg>-->
<!-- <arg>&#45;&#45;master</arg><arg>yarn-cluster</arg>-->
<!-- </spark>-->
<!-- <ok to="RenameDataset"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->
<action name="RenameDataset">
<fs>
<delete path="${inputPathCrossref}/crossref_ds"/>
<move source="${inputPathCrossref}/crossref_ds_updated"
target="${inputPathCrossref}/crossref_ds"/>
</fs>
<ok to="ResetMagWorkingPath"/>
<error to="Kill"/>
</action>
<!-- <action name="RenameDataset">-->
<!-- <fs>-->
<!-- <delete path="${inputPathCrossref}/crossref_ds"/>-->
<!-- <move source="${inputPathCrossref}/crossref_ds_updated"-->
<!-- target="${inputPathCrossref}/crossref_ds"/>-->
<!-- </fs>-->
<!-- <ok to="ResetMagWorkingPath"/>-->
<!-- <error to="Kill"/>-->
<!-- </action>-->