diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/TestPrepare.scala b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/TestPrepare.scala index f01e4f59f..efd598fef 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/TestPrepare.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/hostedbymap/TestPrepare.scala @@ -1,16 +1,31 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap import com.fasterxml.jackson.databind.ObjectMapper -import eu.dnetlib.dhp.oa.graph.hostedbymap.model.DatasourceInfo +import eu.dnetlib.dhp.oa.graph.hostedbymap.SparkPrepareHostedByInfoToApply.{joinResHBM, prepareResultInfo, toEntityInfo} +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo +import eu.dnetlib.dhp.schema.oaf.{Datasource, OpenAccessRoute, Publication} +import javax.management.openmbean.OpenMBeanAttributeInfo import org.apache.spark.SparkConf -import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import eu.dnetlib.dhp.schema.common.ModelConstants import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Test class TestPrepare extends java.io.Serializable{ + def getString(input:HostedByItemType):String = { + + import org.json4s.jackson.Serialization.write + implicit val formats = DefaultFormats + + write(input) + } + + @Test - def testPrepareDatasource():Unit = { + def testHostedByMaptoEntityInfo() : Unit = { val conf = new SparkConf() conf.setMaster("local[*]") conf.set("spark.driver.host", "localhost") @@ -20,34 +35,125 @@ class TestPrepare extends java.io.Serializable{ .appName(getClass.getSimpleName) .config(conf) .getOrCreate() - val path = getClass.getResource("datasource.json").getPath + val hbm = getClass.getResource("hostedbymap.json").getPath - val ds :Dataset[DatasourceInfo]= SparkPrepareHostedByInfoToApply.prepareDatasourceInfo(spark, path) - val mapper :ObjectMapper = new ObjectMapper() + import spark.implicits._ - assertEquals(9, ds.count) + val mapper:ObjectMapper = new ObjectMapper() - assertEquals(8, ds.filter(hbi => !hbi.getIssn.equals("")).count) - assertEquals(5, ds.filter(hbi => !hbi.getEissn.equals("")).count) - assertEquals(0, ds.filter(hbi => !hbi.getLissn.equals("")).count) + implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) - assertEquals(0, ds.filter(hbi => hbi.getIssn.equals("") && hbi.getEissn.equals("") && hbi.getLissn.equals("")).count) + val ds :Dataset[EntityInfo] = spark.createDataset(spark.sparkContext.textFile(hbm)).map(toEntityInfo) - assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365")).count == 1) - assertTrue(ds.filter(hbi => hbi.getEissn.equals("2253-900X")).count == 1) - assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365") && hbi.getEissn.equals("2253-900X")).count == 1) - assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365") && hbi.getOfficialname.equals("Thémata")).count == 1) - assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365") && hbi.getId.equals("10|doajarticles::abbc9265bea9ff62776a1c39785af00c")).count == 1) - ds.foreach(hbi => assertTrue(hbi.getId.startsWith("10|"))) ds.foreach(e => println(mapper.writeValueAsString(e))) + + assertEquals(20, ds.count) spark.close() } @Test - def testPrepareHostedByMap():Unit = { + def testPublicationtoEntityInfo() : Unit = { + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.set("spark.driver.host", "localhost") + val spark: SparkSession = + SparkSession + .builder() + .appName(getClass.getSimpleName) + .config(conf) + .getOrCreate() + val path = getClass.getResource("publication.json").getPath + val mapper:ObjectMapper = new ObjectMapper() + + implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + + val ds :Dataset[EntityInfo] = prepareResultInfo(spark, path) + + ds.foreach(e => println(mapper.writeValueAsString(e))) + + assertEquals(2, ds.count) + + assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ds.filter(ei => ei.getJournal_id.equals("1728-5852")).first().getId) + assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ds.filter(ei => ei.getJournal_id.equals("0001-396X")).first().getId) + + spark.close() + } + + @Test + def testJoinResHBM (): Unit = { + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.set("spark.driver.host", "localhost") + val spark: SparkSession = + SparkSession + .builder() + .appName(getClass.getSimpleName) + .config(conf) + .getOrCreate() + val pub = getClass.getResource("iteminfofrompublication").getPath + val hbm = getClass.getResource("iteminfofromhostedbymap.json").getPath + + val mapper:ObjectMapper = new ObjectMapper() + + implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + + val pub_ds :Dataset[EntityInfo] = spark.read.textFile(pub).map(p => mapper.readValue(p, classOf[EntityInfo])) + val hbm_ds :Dataset[EntityInfo] = spark.read.textFile(hbm).map(p => mapper.readValue(p, classOf[EntityInfo])) + + val ds: Dataset[EntityInfo] = joinResHBM(pub_ds, hbm_ds) + + assertEquals(1, ds.count) + + val ei:EntityInfo = ds.first() + + assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ei.getId) + assertEquals("10|issn___print::e4b6d6d978f67520f6f37679a98c5735", ei.getHb_id) + assertEquals("0001-396X", ei.getJournal_id) + assertEquals("Academic Therapy", ei.getName) + assertTrue(!ei.getOpenaccess) + + spark.close() + } + + @Test + def testJoinResHBM2 (): Unit = { + val conf = new SparkConf() + conf.setMaster("local[*]") + conf.set("spark.driver.host", "localhost") + val spark: SparkSession = + SparkSession + .builder() + .appName(getClass.getSimpleName) + .config(conf) + .getOrCreate() + val pub = getClass.getResource("iteminfofrompublication2").getPath + val hbm = getClass.getResource("iteminfofromhostedbymap2.json").getPath + + val mapper:ObjectMapper = new ObjectMapper() + + implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + + val pub_ds :Dataset[EntityInfo] = spark.read.textFile(pub).map(p => mapper.readValue(p, classOf[EntityInfo])) + val hbm_ds :Dataset[EntityInfo] = spark.read.textFile(hbm).map(p => mapper.readValue(p, classOf[EntityInfo])) + + val ds: Dataset[EntityInfo] = joinResHBM(pub_ds, hbm_ds) + + assertEquals(1, ds.count) + + val ei:EntityInfo = ds.first() + + assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ei.getId) + assertEquals("10|issn___print::e4b6d6d978f67520f6f37679a98c5735", ei.getHb_id) + assertEquals("Academic Therapy", ei.getName) + assertTrue(ei.getOpenaccess) + + ds.foreach(e => println(mapper.writeValueAsString(e))) + + spark.close() } + }