diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json
index 66f080000..1da9643dd 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json
@@ -34,6 +34,11 @@
"paramLongName": "exportLinks",
"paramDescription": "should export also links",
"paramRequired": false
+ },
+ {
+ "paramName": "k",
+ "paramLongName": "datasourceKey",
+ "paramDescription": "the key that identifies the datasource",
+ "paramRequired": true
}
-
]
\ No newline at end of file
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml
index 88beed396..39be85eed 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml
@@ -5,6 +5,11 @@
/data/bioschema/ped
the working path of Bioschema stores
+
+ datasourceKey
+ ped
+ the key that identifies the datasource (eg ped, disprot)
+
@@ -51,6 +56,7 @@
--sourcePath${mainPath}/json-datacite
--targetPath${mainPath}/dataset
--exportLinkstrue
+ --datasourceKey${datasourceKey}
--masteryarn-cluster
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala
index a642be772..c1af04170 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala
@@ -1,7 +1,7 @@
package eu.dnetlib.dhp.bioschema
import eu.dnetlib.dhp.schema.common.ModelConstants
-import eu.dnetlib.dhp.schema.oaf.DataInfo
+import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue}
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
import java.time.format.DateTimeFormatter
@@ -61,6 +61,39 @@ class BioschemaModelConstants extends Serializable {}
object BioschemaModelConstants {
+ val DATA_INFO: DataInfo = OafMapperUtils.dataInfo(
+ false,
+ null,
+ false,
+ false,
+ ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER,
+ "0.9"
+ )
+
+ val PED_PREFIX: String = "ped_________"
+
+ val resolvedURL: Map[String, String] = Map(
+ "uniprot" -> "https://www.uniprot.org/uniprot/",
+ "pubmed" -> "https://pubmed.ncbi.nlm.nih.gov/"
+ )
+
+ val collectedFromMap: Map[String, KeyValue] = {
+ val PEDCollectedFrom: KeyValue = OafMapperUtils.keyValue(
+ //TODO create pedDatasourceId and update this value
+ "10|ped_________::pedDatasourceId",
+ "Protein Ensemble Database"
+ )
+ PEDCollectedFrom.setDataInfo(DATA_INFO)
+
+ Map(
+ "ped" -> PEDCollectedFrom
+ )
+ }
+
+ val datasourceKeyPrefix: Map[String, String] = Map(
+ "ped" -> PED_PREFIX
+ )
+
val REL_TYPE_VALUE: String = "resultResult"
val DATE_RELATION_KEY = "RelationDate"
val dataInfo: DataInfo = bioschemaDataInfo("0.9")
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala
index f558e6eca..6a38aa56b 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala
@@ -18,33 +18,6 @@ object BioschemaToOAFTransformation {
val mapper = new ObjectMapper()
- val DATA_INFO: DataInfo = OafMapperUtils.dataInfo(
- false,
- null,
- false,
- false,
- ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER,
- "0.9"
- )
-
- val resolvedURL: Map[String, String] = Map(
- "uniprot" -> "https://www.uniprot.org/uniprot/",
- "pubmed" -> "https://pubmed.ncbi.nlm.nih.gov/"
- )
-
- val collectedFromMap: Map[String, KeyValue] = {
- val PEDCollectedFrom: KeyValue = OafMapperUtils.keyValue(
- //TODO create pedDatasourceId and update this value
- "10|ped_________::pedDatasourceId",
- "Protein Ensemble Database"
- )
- PEDCollectedFrom.setDataInfo(DATA_INFO)
-
- Map(
- "ped" -> PEDCollectedFrom
- )
- }
-
def extract_date(input: String): Option[String] = {
val d = Date_regex
.map(pattern => {
@@ -89,38 +62,14 @@ object BioschemaToOAFTransformation {
null
}
- def createDNetTargetIdentifier(pid: String, pidType: String, idPrefix: String): String = {
- val f_part = s"$idPrefix|${pidType.toLowerCase}".padTo(15, '_')
- s"$f_part::${IdentifierFactory.md5(pid.toLowerCase)}"
- }
-
def generateOAFDate(dt: String, q: Qualifier): StructuredProperty = {
OafMapperUtils.structuredProperty(dt, q, null)
}
- def generateRelation(
- sourceId: String,
- targetId: String,
- relClass: String,
- cf: KeyValue,
- di: DataInfo
- ): Relation = {
-
- val r = new Relation
- r.setSource(sourceId)
- r.setTarget(targetId)
- r.setRelType(ModelConstants.RESULT_PROJECT)
- r.setRelClass(relClass)
- r.setSubRelType(ModelConstants.OUTCOME)
- r.setCollectedfrom(List(cf).asJava)
- r.setDataInfo(di)
- r
-
- }
-
def generateOAF(
input: String,
- exportLinks: Boolean
+ exportLinks: Boolean,
+ datasourceKey: String
): List[Oaf] = {
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
@@ -142,15 +91,15 @@ object BioschemaToOAFTransformation {
List(
OafMapperUtils.structuredProperty(
pid,
- "ped",
- "ped",
+ datasourceKey,
+ datasourceKey,
ModelConstants.DNET_PID_TYPES,
ModelConstants.DNET_PID_TYPES,
DATA_INFO
)
).asJava
)
- result.setId(OafMapperUtils.createOpenaireId(50, s"ped_________::$pid", true))
+ result.setId(OafMapperUtils.createOpenaireId(50, s"${datasourceKeyPrefix(datasourceKey)}::$pid", true))
result.setOriginalId(List(pid).asJava)
result.setDataInfo(dataInfo)
@@ -215,7 +164,7 @@ object BioschemaToOAFTransformation {
.asJava
)
- result.setCollectedfrom(List(collectedFromMap("ped")).asJava)
+ result.setCollectedfrom(List(collectedFromMap(datasourceKey)).asJava)
val descriptions = (json \\ "descriptions").extract[List[DescriptionType]]
@@ -246,7 +195,7 @@ object BioschemaToOAFTransformation {
})
.asJava
)
- instance.setCollectedfrom(collectedFromMap("ped"))
+ instance.setCollectedfrom(collectedFromMap(datasourceKey))
instance.setPid(result.getPid)
result.setId(IdentifierFactory.createIdentifier(result))
@@ -294,7 +243,8 @@ object BioschemaToOAFTransformation {
rels,
result.getId,
if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null,
- pid
+ pid,
+ datasourceKey
)
}
@@ -308,12 +258,13 @@ object BioschemaToOAFTransformation {
rels: List[RelatedIdentifierType],
id: String,
date: String,
- pid: String
+ pid: String,
+ datasourceKey: String
): List[Relation] = {
rels
.map(r => {
val rel = new Relation
- rel.setCollectedfrom(List(collectedFromMap("ped")).asJava)
+ rel.setCollectedfrom(List(collectedFromMap(datasourceKey)).asJava)
rel.setDataInfo(dataInfo)
val subRelType = subRelTypeMapping(r.relationType).relType
@@ -341,16 +292,9 @@ object BioschemaToOAFTransformation {
DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier, r.relatedIdentifierType)
)
rel.setSource(id)
- rel.setCollectedfrom(List(collectedFromMap("ped")).asJava)
+ rel.setCollectedfrom(List(collectedFromMap(datasourceKey)).asJava)
rel.getCollectedfrom.asScala.map(c => c.getValue).toList
rel
})
}
-
- def generateDSId(input: String): String = {
- val b = StringUtils.substringBefore(input, "::")
- val a = StringUtils.substringAfter(input, "::")
- s"10|$b::${DHPUtils.md5(a)}"
- }
-
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala
index 857d57b08..0c4c867d7 100644
--- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala
+++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala
@@ -22,6 +22,8 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
log.info(s"SourcePath is '$sourcePath'")
val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks"))
log.info(s"exportLinks is '$exportLinks'")
+ val datasourceKey = parser.get("datasourceKey").toLowerCase
+ log.info(s"datasourceKey is '$datasourceKey'")
// val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
// log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
@@ -34,7 +36,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
val targetPath = parser.get("targetPath")
- generateBioschemaDataset(sourcePath, exportLinks, targetPath, spark)
+ generateBioschemaDataset(sourcePath, exportLinks, targetPath, datasourceKey, spark)
// reportTotalSize(targetPath, outputBasePath)
}
@@ -63,6 +65,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
sourcePath: String,
exportLinks: Boolean,
targetPath: String,
+ datasourceKey: String,
spark: SparkSession
): Unit = {
require(spark != null)
@@ -72,7 +75,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
spark.createDataset(
spark.sparkContext
.textFile(sourcePath)
- .flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks))
+ .flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks, datasourceKey))
),
targetPath
)
diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala
index 01c23284c..0e4ede5d0 100644
--- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala
+++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala
@@ -46,13 +46,13 @@ class BioschemaDataciteToOAFTest {
val instance = new GenerateBioschemaDatasetSpark(null, null, log)
val targetPath = s"$workingDir/result"
- instance.generateBioschemaDataset(path, exportLinks = true, targetPath, spark)
+ instance.generateBioschemaDataset(path, exportLinks = true, targetPath, "ped", spark)
val total_items = spark.read.text(targetPath).count()
println(s"total_items: $total_items")
assertTrue(total_items == 50)
- instance.generateBioschemaDataset(path, exportLinks = false, targetPath, spark)
+ instance.generateBioschemaDataset(path, exportLinks = false, targetPath, "ped", spark)
val total_datasets = spark.read.text(targetPath).count()
println(s"total_datasets: $total_datasets")
@@ -67,7 +67,7 @@ class BioschemaDataciteToOAFTest {
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/bioschema/ped_record.json"))
.mkString
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
- val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true)
+ val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true, "ped")
res.foreach(r => {
println(mapper.writeValueAsString(r))
println("----------------------------")