diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
index 8326fab0f3..0fcc03266e 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json
@@ -1,7 +1,7 @@
[
- {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
- {"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true},
- {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the baseline path", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the mdstore path to save", "paramRequired": true}
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"i", "paramLongName":"isLookupUrl", "paramDescription": "isLookupUrl", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath", "paramDescription": "the baseline path", "paramRequired": true},
+ {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore path to save", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
index 30eb414698..0f4c5c2495 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/pubmed/oozie_app/workflow.xml
@@ -16,11 +16,6 @@
mdStoreManagerURI
the path of the cleaned mdstore
-
- skipUpdate
- false
- The request block size
-
@@ -44,16 +39,16 @@
--mdStoreManagerURI${mdStoreManagerURI}
-
+
-
+
yarn
cluster
- Convert Baseline to OAF Dataset
- eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame
+ Convert Baseline Pubmed to OAF Dataset
+ eu.dnetlib.dhp.sx.bio.ebi.SparkCreatePubmedDump
dhp-aggregation-${projectVersion}.jar
--executor-memory=${sparkExecutorMemory}
@@ -65,12 +60,10 @@
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
- --workingPath${baselineWorkingPath}
+ --sourcePath${baselineWorkingPath}
--mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
--masteryarn
--isLookupUrl${isLookupUrl}
- --hdfsServerUri${nameNode}
- --skipUpdate${skipUpdate}
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala
index c21bfd7c3b..1bdd2a4bcc 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkCreatePubmedDump.scala
@@ -1,18 +1,14 @@
package eu.dnetlib.dhp.sx.bio.ebi
import com.fasterxml.jackson.databind.ObjectMapper
-import eu.dnetlib.dhp.application.{AbstractScalaApplication, ArgumentApplicationParser}
+import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup
-import eu.dnetlib.dhp.schema.oaf.Oaf
-import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMAuthor, PMJournal, PMParser, PMParser2, PubMedToOaf}
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
+import eu.dnetlib.dhp.sx.bio.pubmed.{PMArticle, PMParser2, PubMedToOaf}
import eu.dnetlib.dhp.utils.ISLookupClientFactory
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
+import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
-import java.io.ByteArrayInputStream
-import javax.xml.stream.XMLInputFactory
-
class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logger)
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
@@ -24,16 +20,26 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg
log.info("isLookupUrl: {}", isLookupUrl)
val sourcePath = parser.get("sourcePath")
log.info(s"SourcePath is '$sourcePath'")
- val targetPath = parser.get("targetPath")
- log.info(s"TargetPath is '$targetPath'")
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
+ val mapper = new ObjectMapper()
+ val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info(s"outputBasePath is '$outputBasePath'")
val isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl)
val vocabularies = VocabularyGroup.loadVocsFromIS(isLookupService)
- createPubmedDump(spark, sourcePath, targetPath, vocabularies)
+ createPubmedDump(spark, sourcePath, outputBasePath, vocabularies)
}
+ /** This method creates a dump of the pubmed articles
+ * @param spark the spark session
+ * @param sourcePath the path of the source file
+ * @param targetPath the path of the target file
+ * @param vocabularies the vocabularies
+ */
def createPubmedDump(
spark: SparkSession,
sourcePath: String,
@@ -54,6 +60,7 @@ class SparkCreatePubmedDump(propertyPath: String, args: Array[String], log: Logg
})
.filter(s => s != null)
.map { i =>
+ //remove try catch
try {
new PMParser2().parse(i)
} catch {