From 2581672c11b605590297b5016fdfbafd0beaed7d Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Thu, 11 Apr 2024 17:27:49 +0200 Subject: [PATCH] updated wf of MAG and crossref to use transaction --- .../application/SparkScalaApplication.scala | 12 +++ .../convert_crossref_dump_to_oaf_params.json | 6 +- .../crossref/oozie_app/workflow.xml | 88 +++++++++++++++++-- .../mag/convert_MAG_to_OAF_properties.json | 6 +- .../dhp/collection/mag/oozie_app/workflow.xml | 85 ++++++++++++++++-- .../crossref/SparkMapDumpIntoOAF.scala | 17 +++- .../dhp/collection/mag/MagUtility.scala | 2 +- .../dhp/collection/mag/SparkMAGtoOAF.scala | 16 +++- .../GenerateDataciteDatasetSpark.scala | 14 --- 9 files changed, 205 insertions(+), 41 deletions(-) diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala index f8afe9af4..a14c25837 100644 --- a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala +++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala @@ -1,5 +1,8 @@ package eu.dnetlib.dhp.application +import eu.dnetlib.dhp.common.Constants +import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile + import scala.io.Source /** This is the main Interface SparkApplication @@ -70,4 +73,13 @@ abstract class AbstractScalaApplication( .getOrCreate() } + def reportTotalSize(targetPath: String, outputBasePath: String): Unit = { + val total_items = spark.read.text(targetPath).count() + writeHdfsFile( + spark.sparkContext.hadoopConfiguration, + s"$total_items", + outputBasePath + Constants.MDSTORE_SIZE_PATH + ) + } + } diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json index cc7333e48..9aaf968ea 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json @@ -18,9 +18,9 @@ "paramRequired": true }, { - "paramName": "t", - "paramLongName": "targetPath", - "paramDescription": "The target path", + "paramName": "mov", + "paramLongName": "mdstoreOutputVersion", + "paramDescription": "The mdstore Output Version", "paramRequired": false }, { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml index c9c09d20f..bcfb40b35 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml @@ -4,10 +4,6 @@ sourcePath The base path of Crossref DUMP - - targetPath - The targetPath - unpaywallPath The base path of unpaywall DUMP @@ -16,16 +12,39 @@ isLookupUrl The Information service Lookup URL + + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore + - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + @@ -47,13 +66,66 @@ --sourcePath${sourcePath} --unpaywallPath${unpaywallPath} - --targetPath${targetPath} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} --isLookupUrl${isLookupUrl} --masteryarn + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json index 48926f3e9..d283845cf 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json @@ -6,9 +6,9 @@ "paramRequired": true }, { - "paramName": "md", - "paramLongName": "mdstorePath", - "paramDescription": "The base path of MAG DUMP CSV Tables", + "paramName": "mo", + "paramLongName": "mdstoreOutputVersion", + "paramDescription": "The mdstore output", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml index 511eacb5f..f45237218 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml @@ -5,8 +5,12 @@ The base path of MAG DUMP CSV Tables - mdstorePath - The base path of MAG DUMP CSV Tables + mdStoreOutputId + the identifier of the cleaned MDStore + + + mdStoreManagerURI + the path of the cleaned mdstore resume_from @@ -25,7 +29,7 @@ ${wf:conf('resume_from') eq 'generateTable'} - + @@ -51,9 +55,26 @@ --magBasePath${magBasePath} --masteryarn - + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionNEW_VERSION + --mdStoreID${mdStoreOutputId} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + @@ -73,13 +94,67 @@ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --mdstorePath${mdstorePath} + --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']} --magBasePath${magBasePath} --masteryarn + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionCOMMIT + --namenode${nameNode} + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionREAD_UNLOCK + --mdStoreManagerURI${mdStoreManagerURI} + --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']} + + + + + + + + + + + oozie.launcher.mapreduce.user.classpath.first + true + + + eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode + --actionROLLBACK + --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']} + --mdStoreManagerURI${mdStoreManagerURI} + + + + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala index 8377fa438..660dd01b4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala @@ -3,8 +3,10 @@ package eu.dnetlib.dhp.collection.crossref import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.AbstractScalaApplication import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf.{TransformationType, mergeUnpayWall} +import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup -import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset} +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion +import eu.dnetlib.dhp.schema.oaf.{Oaf, Result} import eu.dnetlib.dhp.utils.ISLookupClientFactory import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, explode, lower} @@ -20,8 +22,6 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger override def run(): Unit = { val sourcePath = parser.get("sourcePath") log.info("sourcePath: {}", sourcePath) - val targetPath = parser.get("targetPath") - log.info("targetPath: {}", targetPath) val unpaywallPath = parser.get("unpaywallPath") log.info("unpaywallPath: {}", unpaywallPath) val isLookupUrl: String = parser.get("isLookupUrl") @@ -29,8 +29,17 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl) val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService) require(vocabularies != null) - transformCrossref(spark, sourcePath, targetPath, unpaywallPath, vocabularies) + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info(s"outputBasePath is '$outputBasePath'") + val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" + log.info(s"targetPath is '$targetPath'") + transformCrossref(spark, sourcePath, targetPath, unpaywallPath, vocabularies) + reportTotalSize(targetPath, outputBasePath) } def transformUnpayWall(spark: SparkSession, unpaywallPath: String, crossrefPath: String): Dataset[UnpayWall] = { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala index 31202c455..3d311d128 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala @@ -490,7 +490,7 @@ object MagUtility extends Serializable { result.setDataInfo(MAGDataInfo) val i = new Instance i.setInstancetype(tp) - i.setInstanceTypeMapping(List(instanceTypeMapping(currentType)).asJava) + i.setInstanceTypeMapping(List(instanceTypeMapping(currentType,ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)).asJava) result.setInstance(List(i).asJava) } result diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala index 2fde0ab18..5dd38970d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala @@ -1,7 +1,10 @@ package eu.dnetlib.dhp.collection.mag +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.AbstractScalaApplication -import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Result} +import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion +import eu.dnetlib.dhp.schema.oaf.Relation import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession} @@ -14,12 +17,19 @@ class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger) * where the whole logic of the spark node is defined */ override def run(): Unit = { - val mdstorePath: String = parser.get("mdstorePath") - log.info("found parameters mdstorePath: {}", mdstorePath) + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") + + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info(s"outputBasePath is '$outputBasePath'") + val mdstorePath = s"$outputBasePath$MDSTORE_DATA_PATH" val magBasePath: String = parser.get("magBasePath") log.info("found parameters magBasePath: {}", magBasePath) convertMAG(spark, magBasePath, mdstorePath) generateAffiliations(spark, magBasePath, mdstorePath) + reportTotalSize(mdstorePath, outputBasePath) } def convertMAG(spark: SparkSession, magBasePath: String, mdStorePath: String): Unit = { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala index 046290969..a3c35e8d0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala @@ -46,20 +46,6 @@ class GenerateDataciteDatasetSpark(propertyPath: String, args: Array[String], lo reportTotalSize(targetPath, outputBasePath) } - /** For working with MDStore we need to store in a file on hdfs the size of - * the current dataset - * @param targetPath - * @param outputBasePath - */ - def reportTotalSize(targetPath: String, outputBasePath: String): Unit = { - val total_items = spark.read.text(targetPath).count() - writeHdfsFile( - spark.sparkContext.hadoopConfiguration, - s"$total_items", - outputBasePath + MDSTORE_SIZE_PATH - ) - } - /** Generate the transformed and cleaned OAF Dataset from the native one * * @param sourcePath sourcePath of the native Dataset in format JSON/Datacite