adding test and resouces

This commit is contained in:
Miriam Baglioni 2021-03-30 10:26:51 +02:00
parent d69c19e3fe
commit 9d617a0a58
4 changed files with 5128 additions and 203 deletions

View File

@ -1,5 +1,22 @@
package eu.dnetlib.dhp.contextpropagation
class PropagationTest {
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.contextpropagation.model.{EnrichedEntries, RelationPropagation}
import eu.dnetlib.dhp.provision.scholix.Scholix
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
import org.apache.spark.sql.{Encoder, Encoders}
class PropagationTest extends java.io.Serializable {
val m: ObjectMapper = new ObjectMapper()
m.enable(SerializationFeature.INDENT_OUTPUT)
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val propagationEncoder: Encoder[RelationPropagation] = Encoders.kryo[RelationPropagation]
implicit val enrichedEncoder: Encoder[EnrichedEntries] = Encoders.kryo[EnrichedEntries]
implicit val tupleForJoinEncoder: Encoder[(String, EnrichedEntries)] = Encoders.tuple(Encoders.STRING, enrichedEncoder)
}

View File

@ -1,25 +1,30 @@
package eu.dnetlib.dhp.contextpropagation
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.contextpropagation.model.RelationPropagation
import eu.dnetlib.dhp.contextpropagation.model.{EnrichedEntries, RelationPropagation}
import eu.dnetlib.dhp.provision.scholix.Scholix
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.{assertFalse, assertNotNull}
class SelectScholixRelationTest extends java.io.Serializable{
class ScholixTest extends java.io.Serializable{
val m: ObjectMapper = new ObjectMapper()
// m.enable(SerializationFeature.INDENT_OUTPUT)
m.enable(SerializationFeature.INDENT_OUTPUT)
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val propagationEncoder: Encoder[RelationPropagation] = Encoders.kryo[RelationPropagation]
implicit val enrichedEncoder: Encoder[EnrichedEntries] = Encoders.kryo[EnrichedEntries]
implicit val tupleForJoinEncoder: Encoder[(String, EnrichedEntries)] = Encoders.tuple(Encoders.STRING, enrichedEncoder)
@Test
def mappingScholixOpenAIRETest(): Unit ={
val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/scholix-relations-00000.parquet").getPath
def selectScholexplorerRelationTest(): Unit ={
val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/input/scholix-relations-00000.parquet").getPath
val conf : SparkConf = new SparkConf()
@ -30,11 +35,110 @@ class SelectScholixRelationTest extends java.io.Serializable{
tmp.write.mode(SaveMode.Overwrite).save("/tmp/temp")
assert(tmp.count > 0)
//tmp.foreach(r => println(m.writeValueAsString(r)))
}
@Test
def SelectDistinctIDTest(): Unit ={
val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/producedInfo/selectedRelations.parquet").getPath
val conf : SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().appName("SelectDistinctIdsTest").master("local").config(conf).getOrCreate()
implicit val propagationEncoder: Encoder[RelationPropagation] = Encoders.kryo[RelationPropagation]
val allowedRelations = spark.read.load(sourcePath).as[RelationPropagation]
val numberOfNodes = allowedRelations.map(r => r.getSource.getId)(Encoders.STRING)
.union(allowedRelations.map(r => r.getTarget.getId)(Encoders.STRING)).count()
val tmp : Dataset[String]= PropagationUtils.getSelectedNodes(sourcePath, spark)
assert (numberOfNodes > tmp.count())
}
@Test
def mappingScholixOpenAIRETest(): Unit ={
val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/input/scholix-relations-00000.parquet").getPath
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
//val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
val spark: SparkSession = SparkSession.builder().appName("Test").master("local").config(new SparkConf()).getOrCreate()
val tmp = SparkEnrichScholixStep2.getMappingScholexplorerOpenAIRE(sourcePath, spark)
tmp.filter(e => e.getScholixId.contains("dedup"))
.foreach(e => assertFalse(!(e.getScholixId.substring(17).equals(e.getOaid.substring(17)))))
tmp.filter(e => !e.getScholixId.contains("dedup"))
.foreach(e => assertFalse(!(e.getOaid.substring(17).equals(e.getScholixId.substring(3)))))
}
@Test
def enrichScholixTest():Unit = {
val summaryPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/input/part-00000-summaries.parquet").getPath
val relationPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/producedInfo/selectedRelations.parquet").getPath
val conf : SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().config(conf) .appName("Test").master("local").getOrCreate()
val tmp = SparkEnrichScholixStep1.getEnrichedSubset(relationPath, summaryPath, spark)
assert(tmp.count() == 5)
//tmp.write.mode(SaveMode.Overwrite).save("/tmp/scholixEnriched")
}
@Test
def enrichOpenAIRETest():Unit = {
val conf : SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().config(conf) .appName("Test").master("local").getOrCreate()
val scholixPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/input/scholix-relations-00000.parquet").getPath
val relationPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/input/relation.json").getPath
val resultPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/input/result/publication").getPath
val tmp = SparkEnrichScholixStep2.getEnrichedSubset(scholixPath, relationPath , resultPath , spark)
print(tmp.count())
assert(tmp.count() == 1)
tmp.write.mode(SaveMode.Overwrite).save("/tmp/openaireEnriched")
}
@Test
def mergeEnrichmentsTest():Unit = {
val conf : SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().config(conf) .appName("Test").master("local").getOrCreate()
val scholixPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/producedInfo/scholixEnriched.parquet").getPath
val resultPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/producedInfo/result").getPath
val tmp = SparkEnrichScholixStep3.getEnriched(scholixPath, resultPath , spark)
assert(tmp.count() == 5)
tmp.write.mode(SaveMode.Overwrite).save("/tmp/mergedEnriched")
tmp.foreach(r => print(m.writeValueAsString(r)))
}
}

View File

@ -1,14 +1,17 @@
package eu.dnetlib.dhp.contextpropagation
import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
import eu.dnetlib.dhp.contextpropagation.model.{DatasetPropagationStructure, EnrichedEntries, Node, PropagationStructure, PropagationUse, RelationPropagation}
import eu.dnetlib.dhp.contextpropagation.model.{DatasetPropagationStructure, EnrichedEntries, MapSxOA, Node, PropagationStructure, PropagationUse, RelationPropagation}
import eu.dnetlib.dhp.provision.scholix.summary.{SchemeValue, ScholixSummary}
import eu.dnetlib.dhp.provision.scholix.{Scholix, ScholixEntityId}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import org.junit.jupiter.api.Assertions.{assertFalse, assertNotNull}
import org.junit.jupiter.api.Test
import scala.collection.JavaConverters._
class TestProva extends java.io.Serializable{
@ -16,210 +19,151 @@ class TestProva extends java.io.Serializable{
val m: ObjectMapper = new ObjectMapper()
m.enable(SerializationFeature.INDENT_OUTPUT)
// implicit val subjectEncoder: Encoder[Subject] = Encoders.kryo[Subject]
@Test
def mappingScholixOpenAIRETest(): Unit ={
val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/part-00000.parquet").getPath
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
val tmp = PropagationUtils.getMappingScholexplorerOpenAIRE(sourcePath, spark)
tmp.filter(e => e.getScholixId.contains("dedup"))
.foreach(e => assertFalse(!(e.getScholixId.substring(17).equals(e.getOaid.substring(17)))))
tmp.filter(e => !e.getScholixId.contains("dedup"))
.foreach(e => assertFalse(!(e.getOaid.substring(17).equals(e.getScholixId.substring(3)))))
}
@Test
def enrichScholixTest():Unit = {
val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/part-00000-summaries.parquet").getPath
val sourceResPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/part-00000.parquet").getPath
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val enrichedEncoder: Encoder[EnrichedEntries] = Encoders.kryo[EnrichedEntries]
implicit val tupleForJoinEncoder: Encoder[(String, EnrichedEntries)] = Encoders.tuple(Encoders.STRING, enrichedEncoder)
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
val tmp : Dataset[(String, EnrichedEntries)] = PropagationUtils.enrichScholix(sourcePath, spark).filter(o => o != null)
.map(e => (e.getScholixId, e))
tmp.show(false)
val distinctNodes: Dataset[String] = spark.read.load(sourceResPath).as[Scholix]
.filter(s => !s.getSource().getDnetIdentifier().substring(0, 2).equals("70")).map(s => s.getSource.getDnetIdentifier)(Encoders.STRING).distinct()
distinctNodes.joinWith(tmp, distinctNodes("value").equalTo(tmp("_1")))
.foreach(p => print(m.writeValueAsString(p)))
}
@Test
def enrichOpenaireTest():Unit = {
val relationPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/relation.json").getPath
val publicationPath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/publication.json").getPath
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
val tmp = PropagationUtils.enrichOpenAIRE(publicationPath, relationPath, spark).filter(o => o != null)
assert(tmp.count == 3)
tmp.foreach(r => print(m.writeValueAsString(r)))
}
@Test
def testFunderRelationshipsMapping(): Unit = {
def findInDats(dats: Dataset[(String, DatasetPropagationStructure)], elem:String) : Dataset[(String, DatasetPropagationStructure)] = {
dats.filter(dats("_1") === elem)
}
val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/part-00000.parquet").getPath
implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
implicit val propagationEncoder: Encoder[RelationPropagation] = Encoders.kryo[RelationPropagation]
implicit val mapEncoderPub: Encoder[PropagationStructure] = Encoders.kryo[PropagationStructure]
implicit val mapEncoderDats: Encoder[DatasetPropagationStructure] = Encoders.kryo[DatasetPropagationStructure]
implicit val tupleForPropagation: Encoder[(String, PropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
implicit val tupleForPropagationDars: Encoder[(String, DatasetPropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderDats)
implicit val stringEncoder: Encoder[String] = Encoders.STRING
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
val ds: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix]
val allowedRelations : Dataset[RelationPropagation] = ds
.filter(s => !s.getSource().getDnetIdentifier().substring(0,2).equals("70") )
.filter(s => !s.getTarget().getDnetIdentifier().substring(0,2).equals("70"))
.map(s => {
val rp = new RelationPropagation
rp.setSource(Node.newInstance(s.getSource.getDnetIdentifier))//, getPublisherList(s.getSource.getPublisher.asScala.toList)))
rp.setTarget(Node.newInstance(s.getTarget.getDnetIdentifier))//, getPublisherList(s.getTarget.getPublisher.asScala.toList)))
rp.setSemantics(s.getRelationship.getName)
rp
})
//println(allowedRelations.count())
val pubs_rel : Dataset[RelationPropagation] = allowedRelations.filter(r => r.getSource.getId.startsWith("50"))
.filter(r => r.getTarget.getId.startsWith("60")).filter(r => Costants.containedInPubSem(r.getSemantics.toLowerCase()))
val dats_rel : Dataset[RelationPropagation] = allowedRelations
.filter(r => r.getSource.getId.startsWith("60")
&& r.getTarget.getId.startsWith("60")
&& Costants.containedInDatsSem(r.getSemantics.toLowerCase())
&& r.getSource.getId != r.getTarget.getId)
val publication_dataset : Dataset[(String, PropagationStructure)] = pubs_rel.map(r => {
val ps = new PropagationStructure
val pv : List[PropagationUse] = List(PropagationUse.copyInstance(Costants.getPublicationValue(r.getSemantics.toLowerCase())))
ps.add(r.getSource.getId, pv.asJava)
(r.getTarget.getId, ps)
})
val pl1 : Dataset[(String, PropagationStructure)] = publication_dataset.groupByKey(_._1)(Encoders.STRING)
.agg(PropagationAggregator.getDatasetAggregator().toColumn)
// print(pl1.count)
val dataset_dataset : Dataset[(String, DatasetPropagationStructure)] = dats_rel.map(r => {
val ps = new DatasetPropagationStructure
ps.add(r.getTarget.getId, PropagationUse.copyInstance(Costants.getDatasetValue(r.getSemantics.toLowerCase())))
(r.getSource.getId, ps)
})
//
// //pl1.foreach(r => print(m.writeValueAsString(r._1)))
// @Test
// def testFunderRelationshipsMapping(): Unit = {
//
//
//
val dataset_dataset_modified : Dataset[(String, DatasetPropagationStructure)] =
dataset_dataset.map(ds => {
if(ds._1 == "60|4b5e9fa8e91b206001589993179f69d1"){
("60|82368200e90cf75c714b58288a371bbe", ds._2)
}
else{
ds
}
})
//
// // findInDats(dataset_dataset_modified, "60|82368200e90cf75c714b58288a371bbe").show(false)
// def findInDats(dats: Dataset[(String, DatasetPropagationStructure)], elem:String) : Dataset[(String, DatasetPropagationStructure)] = {
// dats.filter(dats("_1") === elem)
// }
//
//
val pl2_step1 = pl1.joinWith(dataset_dataset_modified, pl1("value")
.equalTo(dataset_dataset_modified("_1")), "left")
.flatMap(PropagationUtils.propagateDataset)
val pl2= pl2_step1.groupByKey(_._1)(Encoders.STRING).agg(PropagationAggregator.getDatasetAggregator().toColumn)
print(pl2.count())
// pl1.foreach(i=> {
// if (i._1 =="60|b91b1296e3e37523887c2eaaf3f2e673")
// print(m.writeValueAsString(i))
// val sourcePath = getClass.getResource("/eu/dnetlib/dhp/contextpropagation/part-00000.parquet").getPath
//
//
// implicit val summaryEncoder: Encoder[ScholixSummary] = Encoders.kryo[ScholixSummary]
// implicit val scholixEncoder: Encoder[Scholix] = Encoders.kryo[Scholix]
// implicit val propagationEncoder: Encoder[RelationPropagation] = Encoders.kryo[RelationPropagation]
// implicit val mapEncoderPub: Encoder[PropagationStructure] = Encoders.kryo[PropagationStructure]
// implicit val mapEncoderDats: Encoder[DatasetPropagationStructure] = Encoders.kryo[DatasetPropagationStructure]
// implicit val tupleForPropagation: Encoder[(String, PropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderPub)
// implicit val tupleForPropagationDars: Encoder[(String, DatasetPropagationStructure)] = Encoders.tuple(Encoders.STRING, mapEncoderDats)
// implicit val stringEncoder: Encoder[String] = Encoders.STRING
//
//
// val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
//
//
// val ds: Dataset[Scholix] = spark.read.load(sourcePath).as[Scholix]
//
// val allowedRelations : Dataset[RelationPropagation] = ds
// .filter(s => !s.getSource().getDnetIdentifier().substring(0,2).equals("70") )
// .filter(s => !s.getTarget().getDnetIdentifier().substring(0,2).equals("70"))
// .map(s => {
// val rp = new RelationPropagation
// rp.setSource(Node.newInstance(s.getSource.getDnetIdentifier))//, getPublisherList(s.getSource.getPublisher.asScala.toList)))
// rp.setTarget(Node.newInstance(s.getTarget.getDnetIdentifier))//, getPublisherList(s.getTarget.getPublisher.asScala.toList)))
// rp.setSemantics(s.getRelationship.getName)
// rp
// })
//
//
// //println(allowedRelations.count())
//
// val pubs_rel : Dataset[RelationPropagation] = allowedRelations.filter(r => r.getSource.getId.startsWith("50"))
// .filter(r => r.getTarget.getId.startsWith("60")).filter(r => Costants.containedInPubSem(r.getSemantics.toLowerCase()))
//
// val dats_rel : Dataset[RelationPropagation] = allowedRelations
// .filter(r => r.getSource.getId.startsWith("60")
// && r.getTarget.getId.startsWith("60")
// && Costants.containedInDatsSem(r.getSemantics.toLowerCase())
// && r.getSource.getId != r.getTarget.getId)
//
// val publication_dataset : Dataset[(String, PropagationStructure)] = pubs_rel.map(r => {
// val ps = new PropagationStructure
//
// val pv : List[PropagationUse] = List(PropagationUse.copyInstance(Costants.getPublicationValue(r.getSemantics.toLowerCase())))
// ps.add(r.getSource.getId, pv.asJava)
// (r.getTarget.getId, ps)
//
// })
//
// print(pl1.count)
//
// print(m.writeValueAsString(dsprob.getPropagation.get(source).getUse))
// print(dataset_dataset.map(d => {
// var found : Boolean = false
// for (elem <- d._2.getPropagation.keySet().asScala){
// if (d._2.getPropagation.get(elem).getUse == "proxy"){
// found = true
// }
// }
// if (found){
// d
// }else{
// null
// }
// }).filter(o => o != null).first()._1)
// dataset_dataset.foreach(d => {
// val pl1 : Dataset[(String, PropagationStructure)] = publication_dataset.groupByKey(_._1)(Encoders.STRING)
// .agg(PropagationAggregator.getDatasetAggregator().toColumn)
//
//
//
//
//
// // print(pl1.count)
//
// val dataset_dataset : Dataset[(String, DatasetPropagationStructure)] = dats_rel.map(r => {
// val ps = new DatasetPropagationStructure
//
// ps.add(r.getTarget.getId, PropagationUse.copyInstance(Costants.getDatasetValue(r.getSemantics.toLowerCase())))
// (r.getSource.getId, ps)
//
// for (elem <- d._2.getPropagation.keySet().asScala){
// if (d._2.getPropagation.get(elem).getUse == "reuse"){
// print("reuse")
// }
// }
// println()
// })
}
////
//// //pl1.foreach(r => print(m.writeValueAsString(r._1)))
////
////
////
// val dataset_dataset_modified : Dataset[(String, DatasetPropagationStructure)] =
// dataset_dataset.map(ds => {
// if(ds._1 == "60|4b5e9fa8e91b206001589993179f69d1"){
// ("60|82368200e90cf75c714b58288a371bbe", ds._2)
// }
// else{
// ds
// }
// })
////
//// // findInDats(dataset_dataset_modified, "60|82368200e90cf75c714b58288a371bbe").show(false)
////
////
// val pl2_step1 = pl1.joinWith(dataset_dataset_modified, pl1("value")
// .equalTo(dataset_dataset_modified("_1")), "left")
// .flatMap(PropagationUtils.propagateDataset)
//
//
//
//
// val pl2= pl2_step1.groupByKey(_._1)(Encoders.STRING).agg(PropagationAggregator.getDatasetAggregator().toColumn)
// print(pl2.count())
//
//
//// pl1.foreach(i=> {
//// if (i._1 =="60|b91b1296e3e37523887c2eaaf3f2e673")
//// print(m.writeValueAsString(i))
//// })
////
//// print(pl1.count)
//
////
//
// // print(m.writeValueAsString(dsprob.getPropagation.get(source).getUse))
//
//// print(dataset_dataset.map(d => {
//// var found : Boolean = false
//// for (elem <- d._2.getPropagation.keySet().asScala){
//// if (d._2.getPropagation.get(elem).getUse == "proxy"){
//// found = true
//// }
//// }
//// if (found){
//// d
//// }else{
//// null
//// }
//// }).filter(o => o != null).first()._1)
//
//
//// dataset_dataset.foreach(d => {
////
//// for (elem <- d._2.getPropagation.keySet().asScala){
//// if (d._2.getPropagation.get(elem).getUse == "reuse"){
//// print("reuse")
//// }
//// }
//// println()
//// })
//
// }
}