dnet-hadoop/dhp-workflows/dhp-aggregation/src/test/scala/eu/dnetlib/dhp/datacite/DataciteToOAFTest.scala

111 lines
3.0 KiB
Scala
Raw Normal View History

2021-10-20 17:37:42 +02:00
package eu.dnetlib.dhp.datacite
2021-10-20 17:37:42 +02:00
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest
import eu.dnetlib.dhp.schema.oaf.Oaf
2021-11-25 11:02:38 +01:00
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{col, count}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.extension.ExtendWith
2021-11-25 11:02:38 +01:00
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.junit.jupiter.MockitoExtension
import org.slf4j.{Logger, LoggerFactory}
2021-06-04 10:14:22 +02:00
import java.nio.file.{Files, Path}
import java.text.SimpleDateFormat
import java.util.Locale
import scala.io.Source
2021-12-03 11:15:09 +01:00
@ExtendWith(Array(classOf[MockitoExtension]))
2022-01-11 16:57:48 +01:00
class DataciteToOAFTest extends AbstractVocabularyTest {
2022-01-11 16:57:48 +01:00
private var workingDir: Path = null
val log: Logger = LoggerFactory.getLogger(getClass)
@BeforeEach
2022-01-11 16:57:48 +01:00
def setUp(): Unit = {
2021-04-20 09:44:44 +02:00
2022-01-11 16:57:48 +01:00
workingDir = Files.createTempDirectory(getClass.getSimpleName)
super.setUpVocabulary()
}
2021-11-25 11:02:38 +01:00
@AfterEach
2022-01-11 16:57:48 +01:00
def tearDown(): Unit = {
2021-11-25 11:02:38 +01:00
FileUtils.deleteDirectory(workingDir.toFile)
}
@Test
2022-01-11 16:57:48 +01:00
def testDateMapping: Unit = {
val inputDate = "2021-07-14T11:52:54+0000"
val ISO8601FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.US)
val dt = ISO8601FORMAT.parse(inputDate)
println(dt.getTime)
}
@Test
def testConvert(): Unit = {
val path = getClass.getResource("/eu/dnetlib/dhp/actionmanager/datacite/dataset").getPath
val conf = new SparkConf()
2022-01-11 16:57:48 +01:00
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.appName(getClass.getSimpleName)
.master("local[*]")
.getOrCreate()
2022-01-11 16:57:48 +01:00
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
val instance = new GenerateDataciteDatasetSpark(null, null, log)
val targetPath = s"$workingDir/result"
2022-01-11 16:57:48 +01:00
instance.generateDataciteDataset(path, exportLinks = true, vocabularies, targetPath, spark)
import spark.implicits._
2022-01-11 16:57:48 +01:00
val nativeSize = spark.read.load(path).count()
assertEquals(100, nativeSize)
2022-03-29 10:59:14 +02:00
spark.read.load(targetPath).printSchema();
2022-01-11 16:57:48 +01:00
val result: Dataset[Oaf] = spark.read.load(targetPath).as[Oaf]
2022-01-11 16:57:48 +01:00
result
.map(s => s.getClass.getSimpleName)
.groupBy(col("value").alias("class"))
.agg(count("value").alias("Total"))
.show(false)
val t = spark.read.load(targetPath).count()
2022-01-11 16:57:48 +01:00
assertTrue(t > 0)
spark.stop()
}
@Test
2022-01-11 16:57:48 +01:00
def testMapping(): Unit = {
val record = Source
.fromInputStream(
getClass.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/datacite/record.json")
)
.mkString
2021-06-04 10:14:22 +02:00
val mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
2022-01-11 16:57:48 +01:00
val res: List[Oaf] = DataciteToOAFTransformation.generateOAF(record, 0L, 0L, vocabularies, true)
2021-06-04 10:14:22 +02:00
res.foreach(r => {
2022-01-11 16:57:48 +01:00
println(mapper.writeValueAsString(r))
2021-06-04 10:14:22 +02:00
println("----------------------------")
})
}
2022-01-11 16:57:48 +01:00
}