From 29ee1b9d82c8c06038c5ae533e1b26e5573a662a Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Thu, 3 Mar 2022 12:31:29 +0100 Subject: [PATCH] added datasource key to workflow parameter to properly choose collected from and id values --- .../bioschema/generate_dataset_params.json | 7 +- .../dhp/bioschema/oozie_app/workflow.xml | 6 ++ .../bioschema/BioschemaModelConstants.scala | 35 +++++++- .../BioschemaToOAFTransformation.scala | 82 +++---------------- .../GenerateBioschemaDatasetSpark.scala | 7 +- .../BioschemaDataciteToOAFTest.scala | 6 +- 6 files changed, 67 insertions(+), 76 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json index 66f080000..1da9643dd 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json @@ -34,6 +34,11 @@ "paramLongName": "exportLinks", "paramDescription": "should export also links", "paramRequired": false + }, + { + "paramName": "k", + "paramLongName": "datasourceKey", + "paramDescription": "the key that identifies the datasource", + "paramRequired": true } - ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml index 88beed396..39be85eed 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml @@ -5,6 +5,11 @@ /data/bioschema/ped the working path of Bioschema stores + + datasourceKey + ped + the key that identifies the datasource (eg ped, disprot) + @@ -51,6 +56,7 @@ --sourcePath${mainPath}/json-datacite --targetPath${mainPath}/dataset --exportLinkstrue + --datasourceKey${datasourceKey} --masteryarn-cluster diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala index a642be772..c1af04170 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.bioschema import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.DataInfo +import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue} import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils import java.time.format.DateTimeFormatter @@ -61,6 +61,39 @@ class BioschemaModelConstants extends Serializable {} object BioschemaModelConstants { + val DATA_INFO: DataInfo = OafMapperUtils.dataInfo( + false, + null, + false, + false, + ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, + "0.9" + ) + + val PED_PREFIX: String = "ped_________" + + val resolvedURL: Map[String, String] = Map( + "uniprot" -> "https://www.uniprot.org/uniprot/", + "pubmed" -> "https://pubmed.ncbi.nlm.nih.gov/" + ) + + val collectedFromMap: Map[String, KeyValue] = { + val PEDCollectedFrom: KeyValue = OafMapperUtils.keyValue( + //TODO create pedDatasourceId and update this value + "10|ped_________::pedDatasourceId", + "Protein Ensemble Database" + ) + PEDCollectedFrom.setDataInfo(DATA_INFO) + + Map( + "ped" -> PEDCollectedFrom + ) + } + + val datasourceKeyPrefix: Map[String, String] = Map( + "ped" -> PED_PREFIX + ) + val REL_TYPE_VALUE: String = "resultResult" val DATE_RELATION_KEY = "RelationDate" val dataInfo: DataInfo = bioschemaDataInfo("0.9") diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala index f558e6eca..6a38aa56b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala @@ -18,33 +18,6 @@ object BioschemaToOAFTransformation { val mapper = new ObjectMapper() - val DATA_INFO: DataInfo = OafMapperUtils.dataInfo( - false, - null, - false, - false, - ModelConstants.PROVENANCE_ACTION_SET_QUALIFIER, - "0.9" - ) - - val resolvedURL: Map[String, String] = Map( - "uniprot" -> "https://www.uniprot.org/uniprot/", - "pubmed" -> "https://pubmed.ncbi.nlm.nih.gov/" - ) - - val collectedFromMap: Map[String, KeyValue] = { - val PEDCollectedFrom: KeyValue = OafMapperUtils.keyValue( - //TODO create pedDatasourceId and update this value - "10|ped_________::pedDatasourceId", - "Protein Ensemble Database" - ) - PEDCollectedFrom.setDataInfo(DATA_INFO) - - Map( - "ped" -> PEDCollectedFrom - ) - } - def extract_date(input: String): Option[String] = { val d = Date_regex .map(pattern => { @@ -89,38 +62,14 @@ object BioschemaToOAFTransformation { null } - def createDNetTargetIdentifier(pid: String, pidType: String, idPrefix: String): String = { - val f_part = s"$idPrefix|${pidType.toLowerCase}".padTo(15, '_') - s"$f_part::${IdentifierFactory.md5(pid.toLowerCase)}" - } - def generateOAFDate(dt: String, q: Qualifier): StructuredProperty = { OafMapperUtils.structuredProperty(dt, q, null) } - def generateRelation( - sourceId: String, - targetId: String, - relClass: String, - cf: KeyValue, - di: DataInfo - ): Relation = { - - val r = new Relation - r.setSource(sourceId) - r.setTarget(targetId) - r.setRelType(ModelConstants.RESULT_PROJECT) - r.setRelClass(relClass) - r.setSubRelType(ModelConstants.OUTCOME) - r.setCollectedfrom(List(cf).asJava) - r.setDataInfo(di) - r - - } - def generateOAF( input: String, - exportLinks: Boolean + exportLinks: Boolean, + datasourceKey: String ): List[Oaf] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats @@ -142,15 +91,15 @@ object BioschemaToOAFTransformation { List( OafMapperUtils.structuredProperty( pid, - "ped", - "ped", + datasourceKey, + datasourceKey, ModelConstants.DNET_PID_TYPES, ModelConstants.DNET_PID_TYPES, DATA_INFO ) ).asJava ) - result.setId(OafMapperUtils.createOpenaireId(50, s"ped_________::$pid", true)) + result.setId(OafMapperUtils.createOpenaireId(50, s"${datasourceKeyPrefix(datasourceKey)}::$pid", true)) result.setOriginalId(List(pid).asJava) result.setDataInfo(dataInfo) @@ -215,7 +164,7 @@ object BioschemaToOAFTransformation { .asJava ) - result.setCollectedfrom(List(collectedFromMap("ped")).asJava) + result.setCollectedfrom(List(collectedFromMap(datasourceKey)).asJava) val descriptions = (json \\ "descriptions").extract[List[DescriptionType]] @@ -246,7 +195,7 @@ object BioschemaToOAFTransformation { }) .asJava ) - instance.setCollectedfrom(collectedFromMap("ped")) + instance.setCollectedfrom(collectedFromMap(datasourceKey)) instance.setPid(result.getPid) result.setId(IdentifierFactory.createIdentifier(result)) @@ -294,7 +243,8 @@ object BioschemaToOAFTransformation { rels, result.getId, if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null, - pid + pid, + datasourceKey ) } @@ -308,12 +258,13 @@ object BioschemaToOAFTransformation { rels: List[RelatedIdentifierType], id: String, date: String, - pid: String + pid: String, + datasourceKey: String ): List[Relation] = { rels .map(r => { val rel = new Relation - rel.setCollectedfrom(List(collectedFromMap("ped")).asJava) + rel.setCollectedfrom(List(collectedFromMap(datasourceKey)).asJava) rel.setDataInfo(dataInfo) val subRelType = subRelTypeMapping(r.relationType).relType @@ -341,16 +292,9 @@ object BioschemaToOAFTransformation { DHPUtils.generateUnresolvedIdentifier(r.relatedIdentifier, r.relatedIdentifierType) ) rel.setSource(id) - rel.setCollectedfrom(List(collectedFromMap("ped")).asJava) + rel.setCollectedfrom(List(collectedFromMap(datasourceKey)).asJava) rel.getCollectedfrom.asScala.map(c => c.getValue).toList rel }) } - - def generateDSId(input: String): String = { - val b = StringUtils.substringBefore(input, "::") - val a = StringUtils.substringAfter(input, "::") - s"10|$b::${DHPUtils.md5(a)}" - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala index 857d57b08..0c4c867d7 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala @@ -22,6 +22,8 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l log.info(s"SourcePath is '$sourcePath'") val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks")) log.info(s"exportLinks is '$exportLinks'") + val datasourceKey = parser.get("datasourceKey").toLowerCase + log.info(s"datasourceKey is '$datasourceKey'") // val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") // log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") @@ -34,7 +36,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l val targetPath = parser.get("targetPath") - generateBioschemaDataset(sourcePath, exportLinks, targetPath, spark) + generateBioschemaDataset(sourcePath, exportLinks, targetPath, datasourceKey, spark) // reportTotalSize(targetPath, outputBasePath) } @@ -63,6 +65,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l sourcePath: String, exportLinks: Boolean, targetPath: String, + datasourceKey: String, spark: SparkSession ): Unit = { require(spark != null) @@ -72,7 +75,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l spark.createDataset( spark.sparkContext .textFile(sourcePath) - .flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks)) + .flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks, datasourceKey)) ), targetPath ) diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala index 01c23284c..0e4ede5d0 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala @@ -46,13 +46,13 @@ class BioschemaDataciteToOAFTest { val instance = new GenerateBioschemaDatasetSpark(null, null, log) val targetPath = s"$workingDir/result" - instance.generateBioschemaDataset(path, exportLinks = true, targetPath, spark) + instance.generateBioschemaDataset(path, exportLinks = true, targetPath, "ped", spark) val total_items = spark.read.text(targetPath).count() println(s"total_items: $total_items") assertTrue(total_items == 50) - instance.generateBioschemaDataset(path, exportLinks = false, targetPath, spark) + instance.generateBioschemaDataset(path, exportLinks = false, targetPath, "ped", spark) val total_datasets = spark.read.text(targetPath).count() println(s"total_datasets: $total_datasets") @@ -67,7 +67,7 @@ class BioschemaDataciteToOAFTest { .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/bioschema/ped_record.json")) .mkString val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT) - val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true) + val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true, "ped") res.foreach(r => { println(mapper.writeValueAsString(r)) println("----------------------------")