diff --git a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala
index f8afe9af41..a14c258379 100644
--- a/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala
+++ b/dhp-common/src/main/scala/eu/dnetlib/dhp/application/SparkScalaApplication.scala
@@ -1,5 +1,8 @@
package eu.dnetlib.dhp.application
+import eu.dnetlib.dhp.common.Constants
+import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile
+
import scala.io.Source
/** This is the main Interface SparkApplication
@@ -70,4 +73,13 @@ abstract class AbstractScalaApplication(
.getOrCreate()
}
+ def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
+ val total_items = spark.read.text(targetPath).count()
+ writeHdfsFile(
+ spark.sparkContext.hadoopConfiguration,
+ s"$total_items",
+ outputBasePath + Constants.MDSTORE_SIZE_PATH
+ )
+ }
+
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json
index cc7333e489..9aaf968ead 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/convert_crossref_dump_to_oaf_params.json
@@ -18,9 +18,9 @@
"paramRequired": true
},
{
- "paramName": "t",
- "paramLongName": "targetPath",
- "paramDescription": "The target path",
+ "paramName": "mov",
+ "paramLongName": "mdstoreOutputVersion",
+ "paramDescription": "The mdstore Output Version",
"paramRequired": false
},
{
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml
index c9c09d20fa..bcfb40b35e 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/crossref/oozie_app/workflow.xml
@@ -4,10 +4,6 @@
sourcePath
The base path of Crossref DUMP
-
- targetPath
- The targetPath
-
unpaywallPath
The base path of unpaywall DUMP
@@ -16,16 +12,39 @@
isLookupUrl
The Information service Lookup URL
+
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
+
-
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
@@ -47,13 +66,66 @@
--sourcePath${sourcePath}
--unpaywallPath${unpaywallPath}
- --targetPath${targetPath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
--isLookupUrl${isLookupUrl}
--masteryarn
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json
index 48926f3e99..d283845cf6 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/convert_MAG_to_OAF_properties.json
@@ -6,9 +6,9 @@
"paramRequired": true
},
{
- "paramName": "md",
- "paramLongName": "mdstorePath",
- "paramDescription": "The base path of MAG DUMP CSV Tables",
+ "paramName": "mo",
+ "paramLongName": "mdstoreOutputVersion",
+ "paramDescription": "The mdstore output",
"paramRequired": true
},
{
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml
index 511eacb5f8..f452372189 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/collection/mag/oozie_app/workflow.xml
@@ -5,8 +5,12 @@
The base path of MAG DUMP CSV Tables
- mdstorePath
- The base path of MAG DUMP CSV Tables
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
resume_from
@@ -25,7 +29,7 @@
${wf:conf('resume_from') eq 'generateTable'}
-
+
@@ -51,9 +55,26 @@
--magBasePath${magBasePath}
--masteryarn
-
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
@@ -73,13 +94,67 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --mdstorePath${mdstorePath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
--magBasePath${magBasePath}
--masteryarn
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala
index 8377fa4384..660dd01b4e 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/crossref/SparkMapDumpIntoOAF.scala
@@ -3,8 +3,10 @@ package eu.dnetlib.dhp.collection.crossref
import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf.{TransformationType, mergeUnpayWall}
+import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
-import eu.dnetlib.dhp.schema.oaf.{Oaf, Publication, Relation, Result, Dataset => OafDataset}
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
+import eu.dnetlib.dhp.schema.oaf.{Oaf, Result}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, explode, lower}
@@ -20,8 +22,6 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger
override def run(): Unit = {
val sourcePath = parser.get("sourcePath")
log.info("sourcePath: {}", sourcePath)
- val targetPath = parser.get("targetPath")
- log.info("targetPath: {}", targetPath)
val unpaywallPath = parser.get("unpaywallPath")
log.info("unpaywallPath: {}", unpaywallPath)
val isLookupUrl: String = parser.get("isLookupUrl")
@@ -29,8 +29,17 @@ class SparkMapDumpIntoOAF(propertyPath: String, args: Array[String], log: Logger
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
require(vocabularies != null)
- transformCrossref(spark, sourcePath, targetPath, unpaywallPath, vocabularies)
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
+ val mapper = new ObjectMapper()
+ val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info(s"outputBasePath is '$outputBasePath'")
+ val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
+ log.info(s"targetPath is '$targetPath'")
+ transformCrossref(spark, sourcePath, targetPath, unpaywallPath, vocabularies)
+ reportTotalSize(targetPath, outputBasePath)
}
def transformUnpayWall(spark: SparkSession, unpaywallPath: String, crossrefPath: String): Dataset[UnpayWall] = {
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala
index 31202c455b..3d311d1280 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/MagUtility.scala
@@ -490,7 +490,7 @@ object MagUtility extends Serializable {
result.setDataInfo(MAGDataInfo)
val i = new Instance
i.setInstancetype(tp)
- i.setInstanceTypeMapping(List(instanceTypeMapping(currentType)).asJava)
+ i.setInstanceTypeMapping(List(instanceTypeMapping(currentType,ModelConstants.OPENAIRE_COAR_RESOURCE_TYPES_3_1)).asJava)
result.setInstance(List(i).asJava)
}
result
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala
index 2fde0ab18d..5dd38970de 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/collection/mag/SparkMAGtoOAF.scala
@@ -1,7 +1,10 @@
package eu.dnetlib.dhp.collection.mag
+import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.application.AbstractScalaApplication
-import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Result}
+import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
+import eu.dnetlib.dhp.schema.oaf.Relation
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType}
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
@@ -14,12 +17,19 @@ class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger)
* where the whole logic of the spark node is defined
*/
override def run(): Unit = {
- val mdstorePath: String = parser.get("mdstorePath")
- log.info("found parameters mdstorePath: {}", mdstorePath)
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
+
+ val mapper = new ObjectMapper()
+ val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info(s"outputBasePath is '$outputBasePath'")
+ val mdstorePath = s"$outputBasePath$MDSTORE_DATA_PATH"
val magBasePath: String = parser.get("magBasePath")
log.info("found parameters magBasePath: {}", magBasePath)
convertMAG(spark, magBasePath, mdstorePath)
generateAffiliations(spark, magBasePath, mdstorePath)
+ reportTotalSize(mdstorePath, outputBasePath)
}
def convertMAG(spark: SparkSession, magBasePath: String, mdStorePath: String): Unit = {
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala
index 0462909693..a3c35e8d04 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/datacite/GenerateDataciteDatasetSpark.scala
@@ -46,20 +46,6 @@ class GenerateDataciteDatasetSpark(propertyPath: String, args: Array[String], lo
reportTotalSize(targetPath, outputBasePath)
}
- /** For working with MDStore we need to store in a file on hdfs the size of
- * the current dataset
- * @param targetPath
- * @param outputBasePath
- */
- def reportTotalSize(targetPath: String, outputBasePath: String): Unit = {
- val total_items = spark.read.text(targetPath).count()
- writeHdfsFile(
- spark.sparkContext.hadoopConfiguration,
- s"$total_items",
- outputBasePath + MDSTORE_SIZE_PATH
- )
- }
-
/** Generate the transformed and cleaned OAF Dataset from the native one
*
* @param sourcePath sourcePath of the native Dataset in format JSON/Datacite