hostedbymap #136

Merged
claudio.atzori merged 60 commits from hostedbymap into beta 2021-08-12 17:10:55 +02:00
1 changed files with 124 additions and 18 deletions
Showing only changes of commit 90e91486e2 - Show all commits

View File

@ -1,16 +1,31 @@
package eu.dnetlib.dhp.oa.graph.hostedbymap package eu.dnetlib.dhp.oa.graph.hostedbymap
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.DatasourceInfo import eu.dnetlib.dhp.oa.graph.hostedbymap.SparkPrepareHostedByInfoToApply.{joinResHBM, prepareResultInfo, toEntityInfo}
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
import eu.dnetlib.dhp.schema.oaf.{Datasource, OpenAccessRoute, Publication}
import javax.management.openmbean.OpenMBeanAttributeInfo
import org.apache.spark.SparkConf import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.json4s
import org.json4s.DefaultFormats
import eu.dnetlib.dhp.schema.common.ModelConstants
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
class TestPrepare extends java.io.Serializable{ class TestPrepare extends java.io.Serializable{
def getString(input:HostedByItemType):String = {
import org.json4s.jackson.Serialization.write
implicit val formats = DefaultFormats
write(input)
}
@Test @Test
def testPrepareDatasource():Unit = { def testHostedByMaptoEntityInfo() : Unit = {
val conf = new SparkConf() val conf = new SparkConf()
conf.setMaster("local[*]") conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost") conf.set("spark.driver.host", "localhost")
@ -20,34 +35,125 @@ class TestPrepare extends java.io.Serializable{
.appName(getClass.getSimpleName) .appName(getClass.getSimpleName)
.config(conf) .config(conf)
.getOrCreate() .getOrCreate()
val path = getClass.getResource("datasource.json").getPath val hbm = getClass.getResource("hostedbymap.json").getPath
val ds :Dataset[DatasourceInfo]= SparkPrepareHostedByInfoToApply.prepareDatasourceInfo(spark, path)
import spark.implicits._
val mapper:ObjectMapper = new ObjectMapper() val mapper:ObjectMapper = new ObjectMapper()
assertEquals(9, ds.count) implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
assertEquals(8, ds.filter(hbi => !hbi.getIssn.equals("")).count) val ds :Dataset[EntityInfo] = spark.createDataset(spark.sparkContext.textFile(hbm)).map(toEntityInfo)
assertEquals(5, ds.filter(hbi => !hbi.getEissn.equals("")).count)
assertEquals(0, ds.filter(hbi => !hbi.getLissn.equals("")).count)
assertEquals(0, ds.filter(hbi => hbi.getIssn.equals("") && hbi.getEissn.equals("") && hbi.getLissn.equals("")).count)
assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365")).count == 1)
assertTrue(ds.filter(hbi => hbi.getEissn.equals("2253-900X")).count == 1)
assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365") && hbi.getEissn.equals("2253-900X")).count == 1)
assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365") && hbi.getOfficialname.equals("Thémata")).count == 1)
assertTrue(ds.filter(hbi => hbi.getIssn.equals("0212-8365") && hbi.getId.equals("10|doajarticles::abbc9265bea9ff62776a1c39785af00c")).count == 1)
ds.foreach(hbi => assertTrue(hbi.getId.startsWith("10|")))
ds.foreach(e => println(mapper.writeValueAsString(e))) ds.foreach(e => println(mapper.writeValueAsString(e)))
assertEquals(20, ds.count)
spark.close() spark.close()
} }
@Test @Test
def testPrepareHostedByMap():Unit = { def testPublicationtoEntityInfo() : Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost")
val spark: SparkSession =
SparkSession
.builder()
.appName(getClass.getSimpleName)
.config(conf)
.getOrCreate()
val path = getClass.getResource("publication.json").getPath
val mapper:ObjectMapper = new ObjectMapper()
implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
val ds :Dataset[EntityInfo] = prepareResultInfo(spark, path)
ds.foreach(e => println(mapper.writeValueAsString(e)))
assertEquals(2, ds.count)
assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ds.filter(ei => ei.getJournal_id.equals("1728-5852")).first().getId)
assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ds.filter(ei => ei.getJournal_id.equals("0001-396X")).first().getId)
spark.close()
}
@Test
def testJoinResHBM (): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost")
val spark: SparkSession =
SparkSession
.builder()
.appName(getClass.getSimpleName)
.config(conf)
.getOrCreate()
val pub = getClass.getResource("iteminfofrompublication").getPath
val hbm = getClass.getResource("iteminfofromhostedbymap.json").getPath
val mapper:ObjectMapper = new ObjectMapper()
implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
val pub_ds :Dataset[EntityInfo] = spark.read.textFile(pub).map(p => mapper.readValue(p, classOf[EntityInfo]))
val hbm_ds :Dataset[EntityInfo] = spark.read.textFile(hbm).map(p => mapper.readValue(p, classOf[EntityInfo]))
val ds: Dataset[EntityInfo] = joinResHBM(pub_ds, hbm_ds)
assertEquals(1, ds.count)
val ei:EntityInfo = ds.first()
assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ei.getId)
assertEquals("10|issn___print::e4b6d6d978f67520f6f37679a98c5735", ei.getHb_id)
assertEquals("0001-396X", ei.getJournal_id)
assertEquals("Academic Therapy", ei.getName)
assertTrue(!ei.getOpenaccess)
spark.close()
}
@Test
def testJoinResHBM2 (): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]")
conf.set("spark.driver.host", "localhost")
val spark: SparkSession =
SparkSession
.builder()
.appName(getClass.getSimpleName)
.config(conf)
.getOrCreate()
val pub = getClass.getResource("iteminfofrompublication2").getPath
val hbm = getClass.getResource("iteminfofromhostedbymap2.json").getPath
val mapper:ObjectMapper = new ObjectMapper()
implicit val mapEncoderDSInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
val pub_ds :Dataset[EntityInfo] = spark.read.textFile(pub).map(p => mapper.readValue(p, classOf[EntityInfo]))
val hbm_ds :Dataset[EntityInfo] = spark.read.textFile(hbm).map(p => mapper.readValue(p, classOf[EntityInfo]))
val ds: Dataset[EntityInfo] = joinResHBM(pub_ds, hbm_ds)
assertEquals(1, ds.count)
val ei:EntityInfo = ds.first()
assertEquals("50|4dc99724cf04::ed1ba83e1add6ce292433729acd8b0d9", ei.getId)
assertEquals("10|issn___print::e4b6d6d978f67520f6f37679a98c5735", ei.getHb_id)
assertEquals("Academic Therapy", ei.getName)
assertTrue(ei.getOpenaccess)
ds.foreach(e => println(mapper.writeValueAsString(e)))
spark.close()
} }
} }