forked from D-Net/dnet-hadoop
added date of collection, resource type as workflow parameter
This commit is contained in:
parent
bd37f14941
commit
e53a606afc
|
@ -40,5 +40,11 @@
|
|||
"paramLongName": "datasourceKey",
|
||||
"paramDescription": "the key that identifies the datasource",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "p",
|
||||
"paramLongName": "profile",
|
||||
"paramDescription": "resource profile",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -10,6 +10,11 @@
|
|||
<value>ped</value>
|
||||
<description>the key that identifies the datasource (eg ped, disprot)</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>profile</name>
|
||||
<value>protein</value>
|
||||
<description>resource profile (eg protein, gene)</description>
|
||||
</property>
|
||||
</parameters>
|
||||
|
||||
<start to="TransformJob"/>
|
||||
|
@ -57,6 +62,7 @@
|
|||
<arg>--targetPath</arg><arg>${mainPath}/dataset</arg>
|
||||
<arg>--exportLinks</arg><arg>true</arg>
|
||||
<arg>--datasourceKey</arg><arg>${datasourceKey}</arg>
|
||||
<arg>--profile</arg><arg>${profile}</arg>
|
||||
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||
</spark>
|
||||
<ok to="End"/>
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package eu.dnetlib.dhp.bioschema
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||
import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue}
|
||||
import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue, Qualifier}
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils
|
||||
|
||||
import java.time.format.DateTimeFormatter
|
||||
|
@ -61,6 +61,9 @@ class BioschemaModelConstants extends Serializable {}
|
|||
|
||||
object BioschemaModelConstants {
|
||||
|
||||
val PROTEIN_RESOURCETYPE: Qualifier =
|
||||
qualifier("protein", "protein", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE)
|
||||
|
||||
val DATA_INFO: DataInfo = OafMapperUtils.dataInfo(
|
||||
false,
|
||||
null,
|
||||
|
@ -273,4 +276,12 @@ object BioschemaModelConstants {
|
|||
Pattern.compile("(19|20)\\d\\d", Pattern.MULTILINE)
|
||||
)
|
||||
|
||||
private def qualifier(classid: String, classname: String, schemeid: String, schemename: String) = {
|
||||
val q = new Qualifier
|
||||
q.setClassid(classid)
|
||||
q.setClassname(classname)
|
||||
q.setSchemeid(schemeid)
|
||||
q.setSchemename(schemename)
|
||||
q
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,9 @@ import org.json4s.DefaultFormats
|
|||
import org.json4s.JsonAST.{JField, JObject, JString}
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.time.LocalDate
|
||||
import java.util.{Date, Locale}
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
object BioschemaToOAFTransformation {
|
||||
|
@ -46,19 +48,20 @@ object BioschemaToOAFTransformation {
|
|||
d
|
||||
}
|
||||
|
||||
def getResult(): Result = {
|
||||
def getResult(resourceClassName: String): Result = {
|
||||
val i = new Instance
|
||||
resourceClassName.toUpperCase() match {
|
||||
case "PROTEIN" =>
|
||||
i.setInstancetype(
|
||||
OafMapperUtils.qualifier(
|
||||
"0046",
|
||||
"Bioentity",
|
||||
ModelConstants.DNET_PUBLICATION_RESOURCE,
|
||||
ModelConstants.DNET_PUBLICATION_RESOURCE
|
||||
)
|
||||
PROTEIN_RESOURCETYPE
|
||||
)
|
||||
val d = new OafDataset
|
||||
d.setInstance(List(i).asJava)
|
||||
d.setResourcetype(
|
||||
PROTEIN_RESOURCETYPE
|
||||
)
|
||||
return d
|
||||
}
|
||||
null
|
||||
}
|
||||
|
||||
|
@ -69,19 +72,14 @@ object BioschemaToOAFTransformation {
|
|||
def generateOAF(
|
||||
input: String,
|
||||
exportLinks: Boolean,
|
||||
datasourceKey: String
|
||||
datasourceKey: String,
|
||||
resourceClassName: String
|
||||
): List[Oaf] = {
|
||||
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json = parse(input)
|
||||
|
||||
val resourceType = (json \ "types" \ "resourceType").extractOrElse[String](null)
|
||||
val resourceTypeGeneral =
|
||||
(json \ "types" \ "resourceTypeGeneral").extractOrElse[String](null)
|
||||
val schemaOrg = (json \ "types" \ "schemaOrg").extractOrElse[String](null)
|
||||
|
||||
val result = getResult(
|
||||
)
|
||||
val result = getResult(resourceClassName)
|
||||
if (result == null)
|
||||
return List()
|
||||
|
||||
|
@ -101,7 +99,6 @@ object BioschemaToOAFTransformation {
|
|||
)
|
||||
result.setId(OafMapperUtils.createOpenaireId(50, s"${datasourceKeyPrefix(datasourceKey)}::$pid", true))
|
||||
result.setOriginalId(List(pid).asJava)
|
||||
|
||||
result.setDataInfo(dataInfo)
|
||||
|
||||
val titles: List[TitleType] = (json \\ "titles").extractOrElse[List[TitleType]](List())
|
||||
|
@ -117,30 +114,15 @@ object BioschemaToOAFTransformation {
|
|||
)
|
||||
|
||||
val dates = (json \\ "dates").extract[List[DateType]]
|
||||
val publication_year = (json \\ "publicationYear").extractOrElse[String](null)
|
||||
|
||||
val i_date = dates
|
||||
val collected_date = dates
|
||||
.filter(d => d.date.isDefined && d.dateType.isDefined)
|
||||
.find(d => d.dateType.get.equalsIgnoreCase("issued"))
|
||||
.map(d => extract_date(d.date.get))
|
||||
val a_date: Option[String] = dates
|
||||
.filter(d => d.date.isDefined && d.dateType.isDefined && d.dateType.get.equalsIgnoreCase("available"))
|
||||
.find(d => d.dateType.get.equalsIgnoreCase("collected"))
|
||||
.map(d => extract_date(d.date.get))
|
||||
.find(d => d != null && d.isDefined)
|
||||
.map(d => d.get)
|
||||
|
||||
if (a_date.isDefined) {
|
||||
result.setEmbargoenddate(OafMapperUtils.field(a_date.get, null))
|
||||
}
|
||||
if (i_date.isDefined && i_date.get.isDefined) {
|
||||
result.setDateofacceptance(OafMapperUtils.field(i_date.get.get, null))
|
||||
result.getInstance().get(0).setDateofacceptance(OafMapperUtils.field(i_date.get.get, null))
|
||||
} else if (publication_year != null) {
|
||||
result.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null))
|
||||
result
|
||||
.getInstance()
|
||||
.get(0)
|
||||
.setDateofacceptance(OafMapperUtils.field(s"01-01-$publication_year", null))
|
||||
if (collected_date.isDefined) {
|
||||
result.setDateofcollection(collected_date.get)
|
||||
}
|
||||
|
||||
result.setRelevantdate(
|
||||
|
@ -242,7 +224,7 @@ object BioschemaToOAFTransformation {
|
|||
relations = relations ::: generateRelations(
|
||||
rels,
|
||||
result.getId,
|
||||
if (i_date.isDefined && i_date.get.isDefined) i_date.get.get else null,
|
||||
null,
|
||||
pid,
|
||||
datasourceKey
|
||||
)
|
||||
|
|
|
@ -24,6 +24,8 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
|
|||
log.info(s"exportLinks is '$exportLinks'")
|
||||
val datasourceKey = parser.get("datasourceKey").toLowerCase
|
||||
log.info(s"datasourceKey is '$datasourceKey'")
|
||||
val profile = parser.get("profile").toLowerCase
|
||||
log.info(s"profile is '$profile'")
|
||||
|
||||
// val mdstoreOutputVersion = parser.get("mdstoreOutputVersion")
|
||||
// log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'")
|
||||
|
@ -36,7 +38,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
|
|||
|
||||
val targetPath = parser.get("targetPath")
|
||||
|
||||
generateBioschemaDataset(sourcePath, exportLinks, targetPath, datasourceKey, spark)
|
||||
generateBioschemaDataset(sourcePath, exportLinks, targetPath, datasourceKey, profile, spark)
|
||||
|
||||
// reportTotalSize(targetPath, outputBasePath)
|
||||
}
|
||||
|
@ -66,6 +68,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
|
|||
exportLinks: Boolean,
|
||||
targetPath: String,
|
||||
datasourceKey: String,
|
||||
profile: String,
|
||||
spark: SparkSession
|
||||
): Unit = {
|
||||
require(spark != null)
|
||||
|
@ -75,7 +78,7 @@ class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], l
|
|||
spark.createDataset(
|
||||
spark.sparkContext
|
||||
.textFile(sourcePath)
|
||||
.flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks, datasourceKey))
|
||||
.flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks, datasourceKey, profile))
|
||||
),
|
||||
targetPath
|
||||
)
|
||||
|
|
|
@ -46,13 +46,13 @@ class BioschemaDataciteToOAFTest {
|
|||
val instance = new GenerateBioschemaDatasetSpark(null, null, log)
|
||||
val targetPath = s"$workingDir/result"
|
||||
|
||||
instance.generateBioschemaDataset(path, exportLinks = true, targetPath, "ped", spark)
|
||||
instance.generateBioschemaDataset(path, exportLinks = true, targetPath, "ped", "protein", spark)
|
||||
|
||||
val total_items = spark.read.text(targetPath).count()
|
||||
println(s"total_items: $total_items")
|
||||
assertTrue(total_items == 50)
|
||||
|
||||
instance.generateBioschemaDataset(path, exportLinks = false, targetPath, "ped", spark)
|
||||
instance.generateBioschemaDataset(path, exportLinks = false, targetPath, "ped", "protein", spark)
|
||||
|
||||
val total_datasets = spark.read.text(targetPath).count()
|
||||
println(s"total_datasets: $total_datasets")
|
||||
|
@ -67,7 +67,7 @@ class BioschemaDataciteToOAFTest {
|
|||
.fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/bioschema/ped_record.json"))
|
||||
.mkString
|
||||
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
|
||||
val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true, "ped")
|
||||
val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true, "ped", "protein")
|
||||
res.foreach(r => {
|
||||
println(mapper.writeValueAsString(r))
|
||||
println("----------------------------")
|
||||
|
|
Loading…
Reference in New Issue