updated wf of MAG and crossref to use transaction

This commit is contained in:
Sandro La Bruzzo 2024-04-11 17:27:49 +02:00
parent a0642bd190
commit 2581672c11
9 changed files with 205 additions and 41 deletions

View File

@ -1,5 +1,8 @@
package eu.dnetlib.dhp.application package eu.dnetlib.dhp.application
import eu.dnetlib.dhp.common.Constants
import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
import scala.io.Source import scala.io.Source
/** This is the main Interface SparkApplication /** This is the main Interface SparkApplication
@ -70,4 +73,13 @@ abstract class AbstractScalaApplication(
.getOrCreate() .getOrCreate()
} }
def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
val total_items = spark.read.text(targetPath).count()
writeHdfsFile(
spark.sparkContext.hadoopConfiguration,
s"$total_items",
outputBasePath + Constants.MDSTORE_SIZE_PATH
)
}
} }

View File

@ -18,9 +18,9 @@
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "t", "paramName": "mov",
"paramLongName": "targetPath", "paramLongName": "mdstoreOutputVersion",
"paramDescription": "The target path", "paramDescription": "The mdstore Output Version",
"paramRequired": false "paramRequired": false
}, },
{ {

View File

@ -4,10 +4,6 @@
<name>sourcePath</name> <name>sourcePath</name>
<description>The base path of Crossref DUMP </description> <description>The base path of Crossref DUMP </description>
</property> </property>
<property>
<name>targetPath</name>
<description>The targetPath</description>
</property>
<property> <property>
<name>unpaywallPath</name> <name>unpaywallPath</name>
<description>The base path of unpaywall DUMP </description> <description>The base path of unpaywall DUMP </description>
@ -16,16 +12,39 @@
<name>isLookupUrl</name> <name>isLookupUrl</name>
<description>The Information service Lookup URL</description> <description>The Information service Lookup URL</description>
</property> </property>
<property>
<name>mdStoreOutputId</name>
<description>the identifier of the cleaned MDStore</description>
</property>
<property>
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property>
</parameters> </parameters>
<start to="generateOAF"/> <start to="StartTransaction"/>
<kill name="Kill"> <kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message> <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill> </kill>
<action name="StartTransaction">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>NEW_VERSION</arg>
<arg>--mdStoreID</arg><arg>${mdStoreOutputId}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="generateOAF"/>
<error to="EndReadRollBack"/>
</action>
<action name="generateOAF"> <action name="generateOAF">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -47,13 +66,66 @@
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}</arg> <arg>--sourcePath</arg><arg>${sourcePath}</arg>
<arg>--unpaywallPath</arg><arg>${unpaywallPath}</arg> <arg>--unpaywallPath</arg><arg>${unpaywallPath}</arg>
<arg>--targetPath</arg><arg>${targetPath}</arg> <arg>--mdstoreOutputVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg> <arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
</spark> </spark>
<ok to="CommitVersion"/>
<error to="RollBack"/>
</action>
<action name="CommitVersion">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="EndReadRollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="RollBack"/>
<error to="Kill"/>
</action>
<action name="RollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>ROLLBACK</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -6,9 +6,9 @@
"paramRequired": true "paramRequired": true
}, },
{ {
"paramName": "md", "paramName": "mo",
"paramLongName": "mdstorePath", "paramLongName": "mdstoreOutputVersion",
"paramDescription": "The base path of MAG DUMP CSV Tables", "paramDescription": "The mdstore output",
"paramRequired": true "paramRequired": true
}, },
{ {

View File

@ -5,8 +5,12 @@
<description>The base path of MAG DUMP CSV Tables</description> <description>The base path of MAG DUMP CSV Tables</description>
</property> </property>
<property> <property>
<name>mdstorePath</name> <name>mdStoreOutputId</name>
<description>The base path of MAG DUMP CSV Tables</description> <description>the identifier of the cleaned MDStore</description>
</property>
<property>
<name>mdStoreManagerURI</name>
<description>the path of the cleaned mdstore</description>
</property> </property>
<property> <property>
<name>resume_from</name> <name>resume_from</name>
@ -25,7 +29,7 @@
<decision name="resume_from"> <decision name="resume_from">
<switch> <switch>
<case to="generateTable">${wf:conf('resume_from') eq 'generateTable'}</case> <case to="generateTable">${wf:conf('resume_from') eq 'generateTable'}</case>
<default to="generateOAF"/> <!-- first action to be done when downloadDump is to be performed --> <default to="StartTransaction"/> <!-- first action to be done when downloadDump is to be performed -->
</switch> </switch>
</decision> </decision>
@ -51,9 +55,26 @@
<arg>--magBasePath</arg><arg>${magBasePath}</arg> <arg>--magBasePath</arg><arg>${magBasePath}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
</spark> </spark>
<ok to="generateOAF"/> <ok to="StartTransaction"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="StartTransaction">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>NEW_VERSION</arg>
<arg>--mdStoreID</arg><arg>${mdStoreOutputId}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<capture-output/>
</java>
<ok to="generateOAF"/>
<error to="EndReadRollBack"/>
</action>
<action name="generateOAF"> <action name="generateOAF">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -73,13 +94,67 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
</spark-opts> </spark-opts>
<arg>--mdstorePath</arg><arg>${mdstorePath}</arg> <arg>--mdstoreOutputVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--magBasePath</arg><arg>${magBasePath}</arg> <arg>--magBasePath</arg><arg>${magBasePath}</arg>
<arg>--master</arg><arg>yarn</arg> <arg>--master</arg><arg>yarn</arg>
</spark> </spark>
<ok to="CommitVersion"/>
<error to="RollBack"/>
</action>
<action name="CommitVersion">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>COMMIT</arg>
<arg>--namenode</arg><arg>${nameNode}</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="End"/> <ok to="End"/>
<error to="Kill"/> <error to="Kill"/>
</action> </action>
<action name="EndReadRollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>READ_UNLOCK</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
<arg>--readMDStoreId</arg><arg>${wf:actionData('BeginRead')['mdStoreReadLockVersion']}</arg>
<capture-output/>
</java>
<ok to="RollBack"/>
<error to="Kill"/>
</action>
<action name="RollBack">
<java>
<configuration>
<property>
<name>oozie.launcher.mapreduce.user.classpath.first</name>
<value>true</value>
</property>
</configuration>
<main-class>eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode</main-class>
<arg>--action</arg><arg>ROLLBACK</arg>
<arg>--mdStoreVersion</arg><arg>${wf:actionData('StartTransaction')['mdStoreVersion']}</arg>
<arg>--mdStoreManagerURI</arg><arg>${mdStoreManagerURI}</arg>
</java>
<ok to="Kill"/>
<error to="Kill"/>
</action>
<end name="End"/> <end name="End"/>
</workflow-app> </workflow-app>

View File

@ -3,8 +3,10 @@ package eu.dnetlib.dhp.collection.crossref
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf.{TransformationType, mergeUnpayWall} import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf.{TransformationType, mergeUnpayWall}
import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset} import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.utils.ISLookupClientFactory import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, explode, lower} import org.apache.spark.sql.functions.{col, explode, lower}
@ -20,8 +22,6 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger
override def run(): Unit = { override def run(): Unit = {
val sourcePath = parser.get("sourcePath") val sourcePath = parser.get("sourcePath")
log.info("sourcePath: {}", sourcePath) log.info("sourcePath: {}", sourcePath)
val targetPath = parser.get("targetPath")
log.info("targetPath: {}", targetPath)
val unpaywallPath = parser.get("unpaywallPath") val unpaywallPath = parser.get("unpaywallPath")
log.info("unpaywallPath: {}", unpaywallPath) log.info("unpaywallPath: {}", unpaywallPath)
val isLookupUrl: String = parser.get("isLookupUrl") val isLookupUrl: String = parser.get("isLookupUrl")
@ -29,8 +29,17 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
require(vocabularies != null) require(vocabularies != null)
transformCrossref(spark, sourcePath, targetPath, unpaywallPath, vocabularies) val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info(s"outputBasePath is '$outputBasePath'")
val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
log.info(s"targetPath is '$targetPath'")
transformCrossref(spark, sourcePath, targetPath, unpaywallPath, vocabularies)
reportTotalSize(targetPath, outputBasePath)
} }
def transformUnpayWall(spark: SparkSession, unpaywallPath: String, crossrefPath: String): Dataset[UnpayWall] = { def transformUnpayWall(spark: SparkSession, unpaywallPath: String, crossrefPath: String): Dataset[UnpayWall] = {

View File

@ -490,7 +490,7 @@ object MagUtility extends Serializable {
result.setDataInfo(MAGDataInfo) result.setDataInfo(MAGDataInfo)
val i = new Instance val i = new Instance
i.setInstancetype(tp) i.setInstancetype(tp)
i.setInstanceTypeMapping(List(instanceTypeMapping(currentType)).asJava) i.setInstanceTypeMapping(List(instanceTypeMapping(currentType,ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)).asJava)
result.setInstance(List(i).asJava) result.setInstance(List(i).asJava)
} }
result result

View File

@ -1,7 +1,10 @@
package eu.dnetlib.dhp.collection.mag package eu.dnetlib.dhp.collection.mag
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Result} import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.Relation
import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@ -14,12 +17,19 @@ class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger)
* where the whole logic of the spark node is defined * where the whole logic of the spark node is defined
*/ */
override def run(): Unit = { override def run(): Unit = {
val mdstorePath: String = parser.get("mdstorePath") val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
log.info("found parameters mdstorePath: {}", mdstorePath) log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
val mapper = new ObjectMapper()
val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
val outputBasePath = cleanedMdStoreVersion.getHdfsPath
log.info(s"outputBasePath is '$outputBasePath'")
val mdstorePath = s"$outputBasePath$MDSTORE_DATA_PATH"
val magBasePath: String = parser.get("magBasePath") val magBasePath: String = parser.get("magBasePath")
log.info("found parameters magBasePath: {}", magBasePath) log.info("found parameters magBasePath: {}", magBasePath)
convertMAG(spark, magBasePath, mdstorePath) convertMAG(spark, magBasePath, mdstorePath)
generateAffiliations(spark, magBasePath, mdstorePath) generateAffiliations(spark, magBasePath, mdstorePath)
reportTotalSize(mdstorePath, outputBasePath)
} }
def convertMAG(spark: SparkSession, magBasePath: String, mdStorePath: String): Unit = { def convertMAG(spark: SparkSession, magBasePath: String, mdStorePath: String): Unit = {

View File

@ -46,20 +46,6 @@ class GenerateDataciteDatasetSpark(propertyPath: String, args: Array[String], lo
reportTotalSize(targetPath, outputBasePath) reportTotalSize(targetPath, outputBasePath)
} }
/** For working with MDStore we need to store in a file on hdfs the size of
* the current dataset
* @param targetPath
* @param outputBasePath
*/
def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
val total_items = spark.read.text(targetPath).count()
writeHdfsFile(
spark.sparkContext.hadoopConfiguration,
s"$total_items",
outputBasePath + MDSTORE_SIZE_PATH
)
}
/** Generate the transformed and cleaned OAF Dataset from the native one /** Generate the transformed and cleaned OAF Dataset from the native one
* *
* @param sourcePath sourcePath of the native Dataset in format JSON/Datacite * @param sourcePath sourcePath of the native Dataset in format JSON/Datacite