From f28d7e3b9d6b6c91de093a856ac23a28c45bfdf3 Mon Sep 17 00:00:00 2001 From: Enrico Ottonello Date: Wed, 2 Mar 2022 12:12:37 +0100 Subject: [PATCH] added spark dataset creation --- .../bioschema/BioschemaModelConstants.scala | 14 +-- .../BioschemaToOAFTransformation.scala | 119 ++---------------- .../GenerateBioschemaDatasetSpark.scala | 77 ++++++++++++ .../eu/dnetlib/dhp/bioschema/ped_dump | 10 ++ .../BioschemaDataciteToOAFTest.scala | 90 +++++-------- 5 files changed, 129 insertions(+), 181 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/bioschema/ped_dump diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala index b2ecb1a24..a642be772 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaModelConstants.scala @@ -1,22 +1,12 @@ package eu.dnetlib.dhp.bioschema import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf.DataInfo import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils -import eu.dnetlib.dhp.schema.oaf.{DataInfo, KeyValue} -import java.io.InputStream import java.time.format.DateTimeFormatter import java.util.Locale import java.util.regex.Pattern -import scala.io.Source - -///** This class represent the dataModel of the input Dataset of Bioschema Datacite -// * @param doi THE DOI -// * @param timestamp timestamp of last update date -// * @param isActive the record is active or deleted -// * @param json the json native records -// */ -//case class DataciteType(doi: String, timestamp: Long, isActive: Boolean, json: String) {} /* The following class are utility class used for the mapping from @@ -73,8 +63,6 @@ object BioschemaModelConstants { val REL_TYPE_VALUE: String = "resultResult" val DATE_RELATION_KEY = "RelationDate" - val DOI_CLASS = "doi" - val SUBJ_CLASS = "keywords" val dataInfo: DataInfo = bioschemaDataInfo("0.9") val subRelTypeMapping: Map[String, OAFRelations] = Map( diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala index 4d4ab98ce..f558e6eca 100644 --- a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/BioschemaToOAFTransformation.scala @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.bioschema import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.bioschema.BioschemaModelConstants._ -import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup import eu.dnetlib.dhp.schema.common.ModelConstants import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils} import eu.dnetlib.dhp.schema.oaf.{Dataset => OafDataset, _} @@ -74,65 +73,7 @@ object BioschemaToOAFTransformation { d } - def getTypeQualifier( - resourceType: String, - resourceTypeGeneral: String, - schemaOrg: String, - vocabularies: VocabularyGroup - ): (Qualifier, Qualifier) = { - if (resourceType != null && resourceType.nonEmpty) { - val typeQualifier = - vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, resourceType) - if (typeQualifier != null) - return ( - typeQualifier, - vocabularies.getSynonymAsQualifier( - ModelConstants.DNET_RESULT_TYPOLOGIES, - typeQualifier.getClassid - ) - ) - } - if (schemaOrg != null && schemaOrg.nonEmpty) { - val typeQualifier = - vocabularies.getSynonymAsQualifier(ModelConstants.DNET_PUBLICATION_RESOURCE, schemaOrg) - if (typeQualifier != null) - return ( - typeQualifier, - vocabularies.getSynonymAsQualifier( - ModelConstants.DNET_RESULT_TYPOLOGIES, - typeQualifier.getClassid - ) - ) - - } - if (resourceTypeGeneral != null && resourceTypeGeneral.nonEmpty) { - val typeQualifier = vocabularies.getSynonymAsQualifier( - ModelConstants.DNET_PUBLICATION_RESOURCE, - resourceTypeGeneral - ) - if (typeQualifier != null) - return ( - typeQualifier, - vocabularies.getSynonymAsQualifier( - ModelConstants.DNET_RESULT_TYPOLOGIES, - typeQualifier.getClassid - ) - ) - - } - null - } - - def getResult( - resourceType: String, - resourceTypeGeneral: String, - schemaOrg: String, - vocabularies: VocabularyGroup - ): Result = { - val typeQualifiers: (Qualifier, Qualifier) = - getTypeQualifier(resourceType, resourceTypeGeneral, schemaOrg, vocabularies) - if (typeQualifiers == null) - return null + def getResult(): Result = { val i = new Instance i.setInstancetype( OafMapperUtils.qualifier( @@ -142,41 +83,12 @@ object BioschemaToOAFTransformation { ModelConstants.DNET_PUBLICATION_RESOURCE ) ) -// i.setInstancetype(typeQualifiers._1) - typeQualifiers._2.getClassname match { - case "dataset" => - val r = new OafDataset - r.setInstance(List(i).asJava) - return r - case "publication" => - val r = new Publication - r.setInstance(List(i).asJava) - return r - case "software" => - val r = new Software - r.setInstance(List(i).asJava) - return r - case "other" => - val r = new OtherResearchProduct - r.setInstance(List(i).asJava) - return r - } + val d = new OafDataset + d.setInstance(List(i).asJava) + return d null } - def available_date(input: String): Boolean = { - - implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats - lazy val json: org.json4s.JValue = parse(input) - val l: List[String] = for { - JObject(dates) <- json \\ "dates" - JField("dateType", JString(dateTypes)) <- dates - } yield dateTypes - - l.exists(p => p.equalsIgnoreCase("available")) - - } - def createDNetTargetIdentifier(pid: String, pidType: String, idPrefix: String): String = { val f_part = s"$idPrefix|${pidType.toLowerCase}".padTo(15, '_') s"$f_part::${IdentifierFactory.md5(pid.toLowerCase)}" @@ -208,9 +120,6 @@ object BioschemaToOAFTransformation { def generateOAF( input: String, - ts: Long, - dateOfCollection: Long, - vocabularies: VocabularyGroup, exportLinks: Boolean ): List[Oaf] = { @@ -222,8 +131,8 @@ object BioschemaToOAFTransformation { (json \ "types" \ "resourceTypeGeneral").extractOrElse[String](null) val schemaOrg = (json \ "types" \ "schemaOrg").extractOrElse[String](null) - //Mapping type based on vocabularies dnet:publication_resource and dnet:result_typologies - val result = getResult(resourceType, resourceTypeGeneral, schemaOrg, vocabularies) + val result = getResult( + ) if (result == null) return List() @@ -246,8 +155,6 @@ object BioschemaToOAFTransformation { result.setDataInfo(dataInfo) - val creators = (json \\ "creators").extractOrElse[List[CreatorType]](List()) - val titles: List[TitleType] = (json \\ "titles").extractOrElse[List[TitleType]](List()) result.setTitle( @@ -295,7 +202,12 @@ object BioschemaToOAFTransformation { .map(d => ( d._1.get, - vocabularies.getTermAsQualifier(ModelConstants.DNET_DATACITE_DATE, d._2.toLowerCase()) + OafMapperUtils.qualifier( + d._2.toLowerCase(), + d._2.toLowerCase(), + ModelConstants.DNET_DATACITE_DATE, + ModelConstants.DNET_DATACITE_DATE + ) ) ) .filter(d => d._2 != null) @@ -319,13 +231,6 @@ object BioschemaToOAFTransformation { if (publisher != null) result.setPublisher(OafMapperUtils.field(publisher, null)) - val language: String = (json \\ "language").extractOrElse[String](null) - - if (language != null) - result.setLanguage( - vocabularies.getSynonymAsQualifier(ModelConstants.DNET_LANGUAGES, language) - ) - val instance = result.getInstance().get(0) val ids: List[IdentifierType] = for { diff --git a/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala new file mode 100644 index 000000000..73ea0e374 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/scala/eu/dnetlib/dhp/bioschema/GenerateBioschemaDatasetSpark.scala @@ -0,0 +1,77 @@ +package eu.dnetlib.dhp.bioschema + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.AbstractScalaApplication +import eu.dnetlib.dhp.collection.CollectionUtils +import eu.dnetlib.dhp.common.Constants.{MDSTORE_DATA_PATH, MDSTORE_SIZE_PATH} +import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion +import eu.dnetlib.dhp.schema.oaf.Oaf +import eu.dnetlib.dhp.utils.DHPUtils.writeHdfsFile +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} +import org.slf4j.Logger + +class GenerateBioschemaDatasetSpark(propertyPath: String, args: Array[String], log: Logger) + extends AbstractScalaApplication(propertyPath, args, log: Logger) { + + /** Here all the spark applications runs this method + * where the whole logic of the spark node is defined + */ + override def run(): Unit = { + + val sourcePath = parser.get("sourcePath") + log.info(s"SourcePath is '$sourcePath'") + val exportLinks = "true".equalsIgnoreCase(parser.get("exportLinks")) + log.info(s"exportLinks is '$exportLinks'") + val mdstoreOutputVersion = parser.get("mdstoreOutputVersion") + log.info(s"mdstoreOutputVersion is '$mdstoreOutputVersion'") + val mapper = new ObjectMapper() + val cleanedMdStoreVersion = mapper.readValue(mdstoreOutputVersion, classOf[MDStoreVersion]) + val outputBasePath = cleanedMdStoreVersion.getHdfsPath + log.info(s"outputBasePath is '$outputBasePath'") + val targetPath = s"$outputBasePath$MDSTORE_DATA_PATH" + log.info(s"targetPath is '$targetPath'") + + generateBioschemaDataset(sourcePath, exportLinks, targetPath, spark) + + reportTotalSize(targetPath, outputBasePath) + } + + /** For working with MDStore we need to store in a file on hdfs the size of + * the current dataset + * @param targetPath + * @param outputBasePath + */ + def reportTotalSize(targetPath: String, outputBasePath: String): Unit = { + val total_items = spark.read.text(targetPath).count() + writeHdfsFile( + spark.sparkContext.hadoopConfiguration, + s"$total_items", + outputBasePath + MDSTORE_SIZE_PATH + ) + } + + /** Generate the transformed and cleaned OAF Dataset from the native one + * + * @param sourcePath sourcePath of the native Dataset in format JSON/Datacite + * @param exportLinks If true it generates unresolved links + * @param targetPath the targetPath of the result Dataset + */ + def generateBioschemaDataset( + sourcePath: String, + exportLinks: Boolean, + targetPath: String, + spark: SparkSession + ): Unit = { + require(spark != null) + + implicit val resultEncoder: Encoder[Oaf] = Encoders.kryo(classOf[Oaf]) + CollectionUtils.saveDataset( + spark.createDataset( + spark.sparkContext + .textFile(sourcePath) + .flatMap(i => BioschemaToOAFTransformation.generateOAF(i, exportLinks)) + ), + targetPath + ) + } +} diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/bioschema/ped_dump b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/bioschema/ped_dump new file mode 100644 index 000000000..1de55720a --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/bioschema/ped_dump @@ -0,0 +1,10 @@ +{"id":"PED00001#P38634_A_1","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00001#P38634_A_1","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:20399186","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/P38634","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:P38634"}],"descriptions":[],"titles":[{"title":"Protein SIC1"}],"dates":[{"date":"2021-12-10T11:11:09","dateType":"Collected"}]} +{"id":"PED00002#O14558_A_0","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00002#O14558_A_0","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:24382496","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/O14558","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:O14558"}],"descriptions":[],"titles":[{"title":"Heat shock protein beta-6"}],"dates":[{"date":"2021-12-10T11:11:14","dateType":"Collected"}]} +{"id":"PED00002#O14558_B_0","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00002#O14558_B_0","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:24382496","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/O14558","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:O14558"}],"descriptions":[],"titles":[{"title":"Heat shock protein beta-6"}],"dates":[{"date":"2021-12-10T11:11:14","dateType":"Collected"}]} +{"id":"PED00003#Q16143_A_0","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00003#Q16143_A_0","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:25389903","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/Q16143","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:Q16143"}],"descriptions":[],"titles":[{"title":"Beta-synuclein"}],"dates":[{"date":"2021-12-10T11:11:17","dateType":"Collected"}]} +{"id":"PED00004#O43806_A_0","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00004#O43806_A_0","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:16214166","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/O43806","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:O43806"}],"descriptions":[],"titles":[{"title":"Cyclin-dependent kinase inhibitor 1B"}],"dates":[{"date":"2021-12-10T11:11:22","dateType":"Collected"}]} +{"id":"PED00005#O14558_A_0","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00005#O14558_A_0","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:24382496","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/O14558","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:O14558"}],"descriptions":[],"titles":[{"title":"Heat shock protein beta-6"}],"dates":[{"date":"2021-12-10T11:11:24","dateType":"Collected"}]} +{"id":"PED00005#O14558_B_0","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00005#O14558_B_0","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:24382496","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/O14558","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:O14558"}],"descriptions":[],"titles":[{"title":"Heat shock protein beta-6"}],"dates":[{"date":"2021-12-10T11:11:24","dateType":"Collected"}]} +{"id":"PED00006#P37840_A_1","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00006#P37840_A_1","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:25389903","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/P37840","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:P37840"}],"descriptions":[],"titles":[{"title":"Alpha-synuclein"}],"dates":[{"date":"2021-12-10T11:11:27","dateType":"Collected"}]} +{"id":"PED00006#Q16143_A_0","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00006#Q16143_A_0","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:25389903","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/Q16143","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:Q16143"}],"descriptions":[],"titles":[{"title":"Beta-synuclein"}],"dates":[{"date":"2021-12-10T11:11:27","dateType":"Collected"}]} +{"id":"PED00006#Q16143_A_2","types":{"resourceType":"Protein","resourceTypeGeneral":"Dataset"},"creators":[],"identifiers":[{"identifier":"https://proteinensemble.org/PED00006#Q16143_A_2","identifierType":"URL"}],"relatedIdentifiers":[{"relationType":"IsCitedBy","relatedIdentifier":"https://identifiers.org/pubmed:25389903","relatedIdentifierType":"URL"},{"relationType":"IsIdenticalTo","relatedIdentifier":"http://purl.uniprot.org/uniprot/Q16143","relatedIdentifierType":"URL"}],"alternateIdentifiers":[{"alternateIdentifier":"https://identifiers.org/uniprot:Q16143"}],"descriptions":[],"titles":[{"title":"Beta-synuclein"}],"dates":[{"date":"2021-12-10T11:11:27","dateType":"Collected"}]} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala index f35749fa4..01c23284c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala +++ b/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/bioschema/BioschemaDataciteToOAFTest.scala @@ -2,13 +2,10 @@ package eu.dnetlib.dhp.bioschema import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest -//import eu.dnetlib.dhp.bioschema.{BioschemaToOAFTransformation, GenerateDataciteDatasetSpark} -import eu.dnetlib.dhp.bioschema.BioschemaToOAFTransformation import eu.dnetlib.dhp.schema.oaf.Oaf import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf -import org.apache.spark.sql.functions.{col, count} -import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -16,21 +13,17 @@ import org.mockito.junit.jupiter.MockitoExtension import org.slf4j.{Logger, LoggerFactory} import java.nio.file.{Files, Path} -import java.text.SimpleDateFormat -import java.util.Locale import scala.io.Source @ExtendWith(Array(classOf[MockitoExtension])) -class BioschemaDataciteToOAFTest extends AbstractVocabularyTest { +class BioschemaDataciteToOAFTest { private var workingDir: Path = null val log: Logger = LoggerFactory.getLogger(getClass) @BeforeEach def setUp(): Unit = { - workingDir = Files.createTempDirectory(getClass.getSimpleName) - super.setUpVocabulary() } @AfterEach @@ -39,70 +32,45 @@ class BioschemaDataciteToOAFTest extends AbstractVocabularyTest { } @Test - def testDateMapping: Unit = { - val inputDate = "2021-07-14T11:52:54+0000" - val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US) - val dt = ISO8601FORMAT.parse(inputDate) - println(dt.getTime) + def testGeneratePED(): Unit = { + val path = getClass.getResource("/eu/dnetlib/dhp/bioschema/ped_dump").getPath + val conf = new SparkConf() + val spark: SparkSession = SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master("local[*]") + .getOrCreate() + implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] + val instance = new GenerateBioschemaDatasetSpark(null, null, log) + val targetPath = s"$workingDir/result" + + instance.generateBioschemaDataset(path, exportLinks = true, targetPath, spark) + + val total_items = spark.read.text(targetPath).count() + println(s"total_items: $total_items") + assertTrue(total_items == 50) + + instance.generateBioschemaDataset(path, exportLinks = false, targetPath, spark) + + val total_datasets = spark.read.text(targetPath).count() + println(s"total_datasets: $total_datasets") + assertTrue(total_datasets == 10) + + spark.stop() } -// @Test -// def testConvert(): Unit = { -// -// val path = getClass.getResource("/eu/dnetlib/dhp/actionmanager/datacite/dataset").getPath -// -// val conf = new SparkConf() -// val spark: SparkSession = SparkSession -// .builder() -// .config(conf) -// .appName(getClass.getSimpleName) -// .master("local[*]") -// .getOrCreate() -// -// implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf] -// val instance = new GenerateDataciteDatasetSpark(null, null, log) -// val targetPath = s"$workingDir/result" -// -// instance.generateDataciteDataset(path, exportLinks = true, vocabularies, targetPath, spark) -// -// import spark.implicits._ -// -// val nativeSize = spark.read.load(path).count() -// -// assertEquals(100, nativeSize) -// -// val result: Dataset[Oaf] = spark.read.load(targetPath).as[Oaf] -// -// result -// .map(s => s.getClass.getSimpleName) -// .groupBy(col("value").alias("class")) -// .agg(count("value").alias("Total")) -// .show(false) -// -// val t = spark.read.load(targetPath).count() -// -// assertTrue(t > 0) -// -// spark.stop() -// -// } - @Test def testMapping(): Unit = { val record = Source .fromInputStream(getClass.getResourceAsStream("/eu/dnetlib/dhp/bioschema/ped_record.json")) .mkString - val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT) - val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, 0L, 0L, vocabularies, true) - + val res: List[Oaf] = BioschemaToOAFTransformation.generateOAF(record, true) res.foreach(r => { println(mapper.writeValueAsString(r)) println("----------------------------") - }) - } - }