From e53a606afc86123fa8fed47ca23a7ca49c7bc680 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Tue, 15 Mar 2022 17:36:48 +0100 Subject: [PATCH] added date of collection, resource type as workflow parameter --- .../bioschema/generate_dataset_params.json | 6 ++ .../dhp/bioschema/oozie_app/workflow.xml | 6 ++ .../bioschema/BioschemaModelConstants.scala | 13 +++- .../BioschemaToOAFTransformation.scala | 64 +++++++------------ .../GenerateBioschemaDatasetSpark.scala | 7 +- .../BioschemaDataciteToOAFTest.scala | 6 +- 6 files changed, 55 insertions(+), 47 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json index 1da9643dd..69cd709fc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/generate_dataset_params.json @@ -40,5 +40,11 @@ "paramLongName": "datasourceKey", "paramDescription": "the key that identifies the datasource", "paramRequired": true + }, + { + "paramName": "p", + "paramLongName": "profile", + "paramDescription": "resource profile", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml index 39be85eed..92205ed96 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/bioschema/oozie_app/workflow.xml @@ -10,6 +10,11 @@ ped the key that identifies the datasource (eg ped, disprot) + + profile + protein + resource profile (eg protein, gene) + @@ -57,6 +62,7 @@ --targetPath${mainPath}/dataset --exportLinkstrue --datasourceKey${datasourceKey} + --profile${profile} --masteryarn-cluster diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala index c1af04170..b0a51de93 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala @@ -1,7 +1,7 @@ package eu.dnetlib.dhp.bioschema import eu.dnetlib.dhp.schema.common.ModelConstants -import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue} +import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue, Qualifier} import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils import java.time.format.DateTimeFormatter @@ -61,6 +61,9 @@ class BioschemaModelConstants extends Serializable {} object BioschemaModelConstants { + val PROTEIN_RESOURCETYPE: Qualifier = + qualifier("protein", "protein", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE) + val DATA_INFO: DataInfo = OafMapperUtils.dataInfo( false, null, @@ -273,4 +276,12 @@ object BioschemaModelConstants { Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE) ) + private def qualifier(classid: String, classname: String, schemeid: String, schemename: String) = { + val q = new Qualifier + q.setClassid(classid) + q.setClassname(classname) + q.setSchemeid(schemeid) + q.setSchemename(schemename) + q + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala index 6a38aa56b..3c1e52b87 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala @@ -11,7 +11,9 @@ import org.json4s.DefaultFormats import org.json4s.JsonAST.{JField, JObject, JString} import org.json4s.jackson.JsonMethods.parse +import java.text.SimpleDateFormat import java.time.LocalDate +import java.util.{Date, Locale} import scala.collection.JavaConverters._ object BioschemaToOAFTransformation { @@ -46,19 +48,20 @@ object BioschemaToOAFTransformation { d } - def getResult(): Result = { + def getResult(resourceClassName: String): Result = { val i = new Instance - i.setInstancetype( - OafMapperUtils.qualifier( - "0046", - "Bioentity", - ModelConstants.DNET_PUBLICATION_RESOURCE, - ModelConstants.DNET_PUBLICATION_RESOURCE - ) - ) - val d = new OafDataset - d.setInstance(List(i).asJava) - return d + resourceClassName.toUpperCase() match { + case "PROTEIN" => + i.setInstancetype( + PROTEIN_RESOURCETYPE + ) + val d = new OafDataset + d.setInstance(List(i).asJava) + d.setResourcetype( + PROTEIN_RESOURCETYPE + ) + return d + } null } @@ -69,19 +72,14 @@ object BioschemaToOAFTransformation { def generateOAF( input: String, exportLinks: Boolean, - datasourceKey: String + datasourceKey: String, + resourceClassName: String ): List[Oaf] = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json = parse(input) - val resourceType = (json \ "types" \ "resourceType").extractOrElse[String](null) - val resourceTypeGeneral = - (json \ "types" \ "resourceTypeGeneral").extractOrElse[String](null) - val schemaOrg = (json \ "types" \ "schemaOrg").extractOrElse[String](null) - - val result = getResult( - ) + val result = getResult(resourceClassName) if (result == null) return List() @@ -101,7 +99,6 @@ object BioschemaToOAFTransformation { ) result.setId(OafMapperUtils.createOpenaireId(50, s"${datasourceKeyPrefix(datasourceKey)}::$pid", true)) result.setOriginalId(List(pid).asJava) - result.setDataInfo(dataInfo) val titles: List[TitleType] = (json \\ "titles").extractOrElse[List[TitleType]](List()) @@ -117,30 +114,15 @@ object BioschemaToOAFTransformation { ) val dates = (json \\ "dates").extract[List[DateType]] - val publication_year = (json \\ "publicationYear").extractOrElse[String](null) - val i_date = dates + val collected_date = dates .filter(d => d.date.isDefined && d.dateType.isDefined) - .find(d => d.dateType.get.equalsIgnoreCase("issued")) - .map(d => extract_date(d.date.get)) - val a_date: Option[String] = dates - .filter(d => d.date.isDefined && d.dateType.isDefined && d.dateType.get.equalsIgnoreCase("available")) + .find(d => d.dateType.get.equalsIgnoreCase("collected")) .map(d => extract_date(d.date.get)) .find(d => d != null && d.isDefined) .map(d => d.get) - - if (a_date.isDefined) { - result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null)) - } - if (i_date.isDefined && i_date.get.isDefined) { - result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null)) - result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(i_date.get.get, null)) - } else if (publication_year != null) { - result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null)) - result - .getInstance() - .get(0) - .setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null)) + if (collected_date.isDefined) { + result.setDateofcollection(collected_date.get) } result.setRelevantdate( @@ -242,7 +224,7 @@ object BioschemaToOAFTransformation { relations = relations ::: generateRelations( rels, result.getId, - if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null, + null, pid, datasourceKey ) diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala index 0c4c867d7..298b9d404 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala @@ -24,6 +24,8 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l log.info(s"exportLinks is '$exportLinks'") val datasourceKey = parser.get("datasourceKey").toLowerCase log.info(s"datasourceKey is '$datasourceKey'") + val profile = parser.get("profile").toLowerCase + log.info(s"profile is '$profile'") // val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") // log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") @@ -36,7 +38,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l val targetPath = parser.get("targetPath") - generateBioschemaDataset(sourcePath, exportLinks, targetPath, datasourceKey, spark) + generateBioschemaDataset(sourcePath, exportLinks, targetPath, datasourceKey, profile, spark) // reportTotalSize(targetPath, outputBasePath) } @@ -66,6 +68,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l exportLinks: Boolean, targetPath: String, datasourceKey: String, + profile: String, spark: SparkSession ): Unit = { require(spark != null) @@ -75,7 +78,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l spark.createDataset( spark.sparkContext .textFile(sourcePath) - .flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks, datasourceKey)) + .flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks, datasourceKey, profile)) ), targetPath ) diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala index 0e4ede5d0..3d0e66a2a 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala @@ -46,13 +46,13 @@ class BioschemaDataciteToOAFTest { val instance = new GenerateBioschemaDatasetSpark(null, null, log) val targetPath = s"$workingDir/result" - instance.generateBioschemaDataset(path, exportLinks = true, targetPath, "ped", spark) + instance.generateBioschemaDataset(path, exportLinks = true, targetPath, "ped", "protein", spark) val total_items = spark.read.text(targetPath).count() println(s"total_items: $total_items") assertTrue(total_items == 50) - instance.generateBioschemaDataset(path, exportLinks = false, targetPath, "ped", spark) + instance.generateBioschemaDataset(path, exportLinks = false, targetPath, "ped", "protein", spark) val total_datasets = spark.read.text(targetPath).count() println(s"total_datasets: $total_datasets") @@ -67,7 +67,7 @@ class BioschemaDataciteToOAFTest { .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/bioschema/ped_record.json")) .mkString val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT) - val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true, "ped") + val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true, "ped", "protein") res.foreach(r => { println(mapper.writeValueAsString(r)) println("----------------------------")