diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml
index 4f9871a5c..05bc854d9 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/baseline_oaf/oozie_app/workflow.xml
@@ -65,7 +65,7 @@
--masteryarn
-
+
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml
index 071d202b6..08693a987 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/db/oozie_app/workflow.xml
@@ -8,44 +8,121 @@
database
the PDB Database Working Path
-
- targetPath
- the Target Working dir path
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
-
+
+
Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
-
-
- yarn
- cluster
- Convert Bio DB to OAF Dataset
- eu.dnetlib.dhp.sx.bio.SparkTransformBioDatabaseToOAF
- dhp-aggregation-${projectVersion}.jar
-
- --executor-memory=${sparkExecutorMemory}
- --executor-cores=${sparkExecutorCores}
- --driver-memory=${sparkDriverMemory}
- --conf spark.extraListeners=${spark2ExtraListeners}
- --conf spark.sql.shuffle.partitions=2000
- --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
- --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
- --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
-
- --masteryarn
- --dbPath${sourcePath}
- --database${database}
- --targetPath${targetPath}
-
-
-
-
-
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Convert Bio DB to OAF Dataset
+ eu.dnetlib.dhp.sx.bio.SparkTransformBioDatabaseToOAF
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.shuffle.partitions=2000
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+
+ --masteryarn
+ --dbPath${sourcePath}
+ --database${database}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json
index 76d0bfd6d..ec4af6ac9 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json
@@ -2,5 +2,5 @@
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
{"paramName":"db", "paramLongName":"database", "paramDescription": "should be PDB or UNIPROT", "paramRequired": true},
{"paramName":"p", "paramLongName":"dbPath", "paramDescription": "the path of the database to transform", "paramRequired": true},
- {"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the OAF target path ", "paramRequired": true}
+ {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore Output Version", "paramRequired": true}
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/config-default.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/config-default.xml
new file mode 100644
index 000000000..17cd6c9a3
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/config-default.xml
@@ -0,0 +1,68 @@
+
+
+
+
+
+ jobTracker
+ yarnRM
+
+
+ nameNode
+ hdfs://nameservice1
+
+
+ hive_metastore_uris
+ thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083
+
+
+ spark2YarnHistoryServerAddress
+ http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+
+ oozie.use.system.libpath
+ true
+
+
+ oozie.action.sharelib.for.spark
+ spark2
+
+
+ spark2EventLogDir
+ /user/spark/spark2ApplicationHistory
+
+
+ spark2ExtraListeners
+ "com.cloudera.spark.lineage.NavigatorAppListener"
+
+
+ spark2SqlQueryExecutionListeners
+ "com.cloudera.spark.lineage.NavigatorQueryListener"
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/workflow.xml
new file mode 100644
index 000000000..502ac0149
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform/oozie_app/workflow.xml
@@ -0,0 +1,118 @@
+
+
+
+ sourcePath
+ the PDB Database Working Path
+
+
+ mdStoreOutputId
+ the identifier of the cleaned MDStore
+
+
+ mdStoreManagerURI
+ the path of the cleaned mdstore
+
+
+
+
+
+
+ Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionNEW_VERSION
+ --mdStoreID${mdStoreOutputId}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+ yarn-cluster
+ cluster
+ Create OAF DataSet
+ eu.dnetlib.dhp.sx.bio.ebi.SparkTransformEBILinksToOaf
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.sql.shuffle.partitions=2000
+ ${sparkExtraOPT}
+
+ --sourcePath${sourcePath}
+ --mdstoreOutputVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --masteryarn
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionCOMMIT
+ --namenode${nameNode}
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionREAD_UNLOCK
+ --mdStoreManagerURI${mdStoreManagerURI}
+ --readMDStoreId${wf:actionData('BeginRead')['mdStoreReadLockVersion']}
+
+
+
+
+
+
+
+
+
+
+ oozie.launcher.mapreduce.user.classpath.first
+ true
+
+
+ eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode
+ --actionROLLBACK
+ --mdStoreVersion${wf:actionData('StartTransaction')['mdStoreVersion']}
+ --mdStoreManagerURI${mdStoreManagerURI}
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json
new file mode 100644
index 000000000..9aa5b178c
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json
@@ -0,0 +1,5 @@
+[
+ {"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
+ {"paramName":"s", "paramLongName":"sourcePath","paramDescription": "the source Path", "paramRequired": true},
+ {"paramName":"mo", "paramLongName":"mdstoreOutputVersion", "paramDescription": "the mdstore Output Version", "paramRequired": true}
+]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala
index ffdab1799..ceee14155 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/BioDBToOAF.scala
@@ -82,8 +82,8 @@ object BioDBToOAF {
"Springer Nature"
)
val EBICollectedFrom: KeyValue = OafMapperUtils.keyValue(
- "10|opendoar____::83e60e09c222f206c725385f53d7e567c",
- "EMBL-EBIs Protein Data Bank in Europe (PDBe)"
+ "10|fairsharing_::96c5c28becf18e71190460a9955aa4d8",
+ "Protein Data Bank in Europe"
)
val pubmedCollectedFrom: KeyValue =
OafMapperUtils.keyValue(ModelConstants.EUROPE_PUBMED_CENTRAL_ID, "Europe PubMed Central")
@@ -120,7 +120,7 @@ object BioDBToOAF {
val relation_semantic = (json \ "RelationshipType" \ "Name").extract[String]
- val date = GraphCleaningFunctions.cleanDate((json \ "LinkedPublicationDate").extract[String])
+ val date = GraphCleaningFunctions.cleanDate((json \ "LinkPublicationDate").extract[String])
createRelation(
target_pid,
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
index 96075b4f3..10bfdce3c 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/SparkTransformBioDatabaseToOAF.scala
@@ -1,42 +1,26 @@
package eu.dnetlib.dhp.sx.bio
-import eu.dnetlib.dhp.application.ArgumentApplicationParser
+import com.fasterxml.jackson.databind.ObjectMapper
+import eu.dnetlib.dhp.application.AbstractScalaApplication
import eu.dnetlib.dhp.collection.CollectionUtils
+import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
import eu.dnetlib.dhp.schema.oaf.Oaf
import eu.dnetlib.dhp.sx.bio.BioDBToOAF.ScholixResolved
-import org.apache.commons.io.IOUtils
-import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
-object SparkTransformBioDatabaseToOAF {
+class SparkTransformBioDatabaseToOAF(propertyPath: String, args: Array[String], log: Logger)
+ extends AbstractScalaApplication(propertyPath, args, log: Logger) {
- def main(args: Array[String]): Unit = {
- val conf: SparkConf = new SparkConf()
- val log: Logger = LoggerFactory.getLogger(getClass)
- val parser = new ArgumentApplicationParser(
- IOUtils.toString(
- getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json")
- )
- )
- parser.parseArgument(args)
- val database: String = parser.get("database")
- log.info("database: {}", database)
-
- val dbPath: String = parser.get("dbPath")
- log.info("dbPath: {}", database)
- val targetPath: String = parser.get("targetPath")
- log.info("targetPath: {}", database)
-
- val spark: SparkSession =
- SparkSession
- .builder()
- .config(conf)
- .appName(getClass.getSimpleName)
- .master(parser.get("master"))
- .getOrCreate()
+ private def convertDatabase(
+ spark: SparkSession,
+ dbPath: String,
+ database: String,
+ targetPath: String,
+ outputBasePath: String
+ ): Unit = {
val sc = spark.sparkContext
-
implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
import spark.implicits._
database.toUpperCase() match {
@@ -45,22 +29,60 @@ object SparkTransformBioDatabaseToOAF {
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.uniprotToOAF(i))),
targetPath
)
+ reportTotalSize(targetPath, outputBasePath)
case "PDB" =>
CollectionUtils.saveDataset(
spark.createDataset(sc.textFile(dbPath).flatMap(i => BioDBToOAF.pdbTOOaf(i))),
targetPath
)
+ reportTotalSize(targetPath, outputBasePath)
case "SCHOLIX" =>
CollectionUtils.saveDataset(
spark.read.load(dbPath).as[ScholixResolved].map(i => BioDBToOAF.scholixResolvedToOAF(i)),
targetPath
)
+ reportTotalSize(targetPath, outputBasePath)
case "CROSSREF_LINKS" =>
CollectionUtils.saveDataset(
- spark.createDataset(sc.textFile(dbPath).map(i => BioDBToOAF.crossrefLinksToOaf(i))),
+ spark.read.load(dbPath).select("json").as[String].map(i => BioDBToOAF.crossrefLinksToOaf(i)),
targetPath
)
+ reportTotalSize(targetPath, outputBasePath)
}
}
+ /** Here all the spark applications runs this method
+ * where the whole logic of the spark node is defined
+ */
+ override def run(): Unit = {
+ val database: String = parser.get("database")
+ log.info("database: {}", database)
+
+ val dbPath: String = parser.get("dbPath")
+ log.info("dbPath: {}", database)
+
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
+
+ val mapper = new ObjectMapper()
+ val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info(s"outputBasePath is '$outputBasePath'")
+
+ val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
+ log.info(s"targetPath is '$targetPath'")
+ convertDatabase(spark, dbPath, database, targetPath, outputBasePath)
+
+ }
+}
+
+object SparkTransformBioDatabaseToOAF {
+ val log: Logger = LoggerFactory.getLogger(SparkTransformBioDatabaseToOAF.getClass)
+
+ def main(args: Array[String]): Unit = {
+ new SparkTransformBioDatabaseToOAF("/eu/dnetlib/dhp/sx/bio/ebi/bio_to_oaf_params.json", args, log)
+ .initialize()
+ .run()
+ }
+
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkTransformEBILinksToOaf.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkTransformEBILinksToOaf.scala
new file mode 100644
index 000000000..ff6f0563a
--- /dev/null
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/sx/bio/ebi/SparkTransformEBILinksToOaf.scala
@@ -0,0 +1,66 @@
+package eu.dnetlib.dhp.sx.bio.ebi
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import eu.dnetlib.dhp.application.AbstractScalaApplication
+import eu.dnetlib.dhp.collection.CollectionUtils
+import eu.dnetlib.dhp.common.Constants.MDSTORE_DATA_PATH
+import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion
+import eu.dnetlib.dhp.schema.oaf.Oaf
+import eu.dnetlib.dhp.sx.bio.BioDBToOAF
+import eu.dnetlib.dhp.sx.bio.BioDBToOAF.EBILinkItem
+import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
+import org.slf4j.{Logger, LoggerFactory}
+
+class SparkTransformEBILinksToOaf(propertyPath: String, args: Array[String], log: Logger)
+ extends AbstractScalaApplication(propertyPath, args, log: Logger) {
+
+ def transformLinks(spark: SparkSession, sourcePath: String, outputBasePath: String, targetPath: String) = {
+ implicit val PMEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf])
+ import spark.implicits._
+ val ebLinks: Dataset[EBILinkItem] = spark.read
+ .load(sourcePath)
+ .as[EBILinkItem]
+ .filter(l => l.links != null && l.links.startsWith("{"))
+
+ CollectionUtils.saveDataset(
+ ebLinks
+ .flatMap(j => BioDBToOAF.parse_ebi_links(j.links))
+ .filter(p => BioDBToOAF.EBITargetLinksFilter(p))
+ .flatMap(p => BioDBToOAF.convertEBILinksToOaf(p)),
+ targetPath
+ )
+ reportTotalSize(targetPath, outputBasePath)
+ }
+
+ /** Here all the spark applications runs this method
+ * where the whole logic of the spark node is defined
+ */
+ override def run(): Unit = {
+
+ val sourcePath = parser.get("sourcePath")
+ log.info(s"sourcePath is '$sourcePath'")
+
+ val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
+ log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
+
+ val mapper = new ObjectMapper()
+ val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion])
+ val outputBasePath = cleanedMdStoreVersion.getHdfsPath
+ log.info(s"outputBasePath is '$outputBasePath'")
+
+ val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH"
+ log.info(s"targetPath is '$targetPath'")
+ transformLinks(spark, sourcePath, outputBasePath, targetPath)
+
+ }
+}
+
+object SparkTransformEBILinksToOaf {
+ val log: Logger = LoggerFactory.getLogger(SparkTransformEBILinksToOaf.getClass)
+
+ def main(args: Array[String]): Unit = {
+ new SparkTransformEBILinksToOaf("/eu/dnetlib/dhp/sx/bio/ebi/transform_ebi_to_df_params.json", args, log)
+ .initialize()
+ .run()
+ }
+}