From 652114c6419d8ee877b2ce593058ec0b83afd7d3 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 20 Oct 2021 11:44:23 +0200 Subject: [PATCH] [affiliationPropagation] first try. preparetion --- .../main/java/eu/dnetlib/dhp/KeyValueSet.java | 26 ++ .../PrepareResultInstRepoAssociation.java | 5 +- .../ResultOrganizationSet.java | 26 -- ...arkResultToOrganizationFromIstRepoJob.java | 25 +- .../PrepareInfo.java | 118 +++++++ .../SparkResultToOrganizationFromSemRel.java | 29 ++ .../PrepareInfoJobTest.java | 323 ++++++++++++++++++ .../childparenttest1/relation | 7 + .../childparenttest2/relation | 7 + .../resultorganizationtest/relation | 7 + 10 files changed, 533 insertions(+), 40 deletions(-) create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/KeyValueSet.java delete mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java create mode 100644 dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java create mode 100644 dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java create mode 100644 dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1/relation create mode 100644 dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2/relation create mode 100644 dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest/relation diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/KeyValueSet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/KeyValueSet.java new file mode 100644 index 000000000..57ab716b3 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/KeyValueSet.java @@ -0,0 +1,26 @@ + +package eu.dnetlib.dhp; + +import java.io.Serializable; +import java.util.ArrayList; + +public class KeyValueSet implements Serializable { + private String key; + private ArrayList valueSet; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public ArrayList getValueSet() { + return valueSet; + } + + public void setValueSet(ArrayList valueSet) { + this.valueSet = valueSet; + } +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java index 0ef5ca181..dbd3bccfb 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/PrepareResultInstRepoAssociation.java @@ -10,6 +10,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.KeyValueSet; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.spark.SparkConf; @@ -124,7 +125,7 @@ public class PrepareResultInstRepoAssociation { private static void prepareAlreadyLinkedAssociation( SparkSession spark, String alreadyLinkedPath) { - String query = "Select source resultId, collect_set(target) organizationSet " + String query = "Select source key, collect_set(target) valueSet " + "from relation " + "where datainfo.deletedbyinference = false " + "and lower(relClass) = '" @@ -134,7 +135,7 @@ public class PrepareResultInstRepoAssociation { spark .sql(query) - .as(Encoders.bean(ResultOrganizationSet.class)) + .as(Encoders.bean(KeyValueSet.class)) // TODO retry to stick with datasets .toJavaRDD() .map(r -> OBJECT_MAPPER.writeValueAsString(r)) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java deleted file mode 100644 index 3bce14cdb..000000000 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/ResultOrganizationSet.java +++ /dev/null @@ -1,26 +0,0 @@ - -package eu.dnetlib.dhp.resulttoorganizationfrominstrepo; - -import java.io.Serializable; -import java.util.ArrayList; - -public class ResultOrganizationSet implements Serializable { - private String resultId; - private ArrayList organizationSet; - - public String getResultId() { - return resultId; - } - - public void setResultId(String resultId) { - this.resultId = resultId; - } - - public ArrayList getOrganizationSet() { - return organizationSet; - } - - public void setOrganizationSet(ArrayList organizationSet) { - this.organizationSet = organizationSet; - } -} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java index 63824f1a8..be0a4804a 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/SparkResultToOrganizationFromIstRepoJob.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import eu.dnetlib.dhp.KeyValueSet; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; @@ -28,7 +29,7 @@ public class SparkResultToOrganizationFromIstRepoJob { private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob.class); - private static final String RESULT_ORGANIZATIONSET_QUERY = "SELECT id resultId, collect_set(organizationId) organizationSet " + private static final String RESULT_ORGANIZATIONSET_QUERY = "SELECT id key, collect_set(organizationId) valueSet " + "FROM ( SELECT id, organizationId " + "FROM rels " + "JOIN cfhb " @@ -107,14 +108,14 @@ public class SparkResultToOrganizationFromIstRepoJob { Dataset dsOrg = readPath(spark, datasourceorganization, DatasourceOrganization.class); - Dataset potentialUpdates = getPotentialRelations(spark, inputPath, clazz, dsOrg); + Dataset potentialUpdates = getPotentialRelations(spark, inputPath, clazz, dsOrg); - Dataset alreadyLinked = readPath(spark, alreadyLinkedPath, ResultOrganizationSet.class); + Dataset alreadyLinked = readPath(spark, alreadyLinkedPath, KeyValueSet.class); potentialUpdates .joinWith( alreadyLinked, - potentialUpdates.col("resultId").equalTo(alreadyLinked.col("resultId")), + potentialUpdates.col("key").equalTo(alreadyLinked.col("key")), "left_outer") .flatMap(createRelationFn(), Encoders.bean(Relation.class)) .write() @@ -123,18 +124,18 @@ public class SparkResultToOrganizationFromIstRepoJob { .json(outputPath); } - private static FlatMapFunction, Relation> createRelationFn() { + private static FlatMapFunction, Relation> createRelationFn() { return value -> { List newRelations = new ArrayList<>(); - ResultOrganizationSet potentialUpdate = value._1(); - Optional alreadyLinked = Optional.ofNullable(value._2()); - List organizations = potentialUpdate.getOrganizationSet(); + KeyValueSet potentialUpdate = value._1(); + Optional alreadyLinked = Optional.ofNullable(value._2()); + List organizations = potentialUpdate.getValueSet(); alreadyLinked .ifPresent( resOrg -> resOrg - .getOrganizationSet() + .getValueSet() .forEach(organizations::remove)); - String resultId = potentialUpdate.getResultId(); + String resultId = potentialUpdate.getKey(); organizations .forEach( orgId -> { @@ -165,7 +166,7 @@ public class SparkResultToOrganizationFromIstRepoJob { }; } - private static Dataset getPotentialRelations( + private static Dataset getPotentialRelations( SparkSession spark, String inputPath, Class resultClazz, @@ -179,7 +180,7 @@ public class SparkResultToOrganizationFromIstRepoJob { return spark .sql(RESULT_ORGANIZATIONSET_QUERY) - .as(Encoders.bean(ResultOrganizationSet.class)); + .as(Encoders.bean(KeyValueSet.class)); } } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java new file mode 100644 index 000000000..34817d9a9 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java @@ -0,0 +1,118 @@ +package eu.dnetlib.dhp.resulttoorganizationfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import eu.dnetlib.dhp.KeyValueSet; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.Relation; + + +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; + +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Tuple2; + +import java.io.Serializable; +import java.util.*; + +import static eu.dnetlib.dhp.PropagationConstant.*; + +/** + * Searches for all the association between result and organization already existing in the graph + * Creates also the parenthood hierarchy from the organizations + */ + +public class PrepareInfo implements Serializable { + + //leggo le relazioni e seleziono quelle fra result ed organizzazioni + //raggruppo per result e salvo + //r => {o1, o2, o3} + + //leggo le relazioni fra le organizzazioni e creo la gerarchia delle parentele: + //hashMap key organizzazione -> value tutti i suoi padri + // o => {p1, p2} + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger log = LoggerFactory.getLogger(PrepareInfo.class); + + //associa i figli con tutti i loro padri + private static final String relOrgQuery = + "SELECT target key, collect_set(source) as valueSet " + + "FROM relation " + + "WHERE lower(relclass) = '" + ModelConstants.IS_PARENT_OF.toLowerCase() + "' and datainfo.deletedbyinference = false " + + "GROUP BY target"; + + private static final String relResQuery = "SELECT source key, collect_set(target) as valueSet " + + "FROM relation " + + "WHERE lower(relclass) = '" + ModelConstants.HAS_AUTHOR_INSTITUTION.toLowerCase() + "' and datainfo.deletedbyinference = false " + + "GROUP BY source"; + + + public static void prepareChildrenParent(SparkSession spark, String inputPath, String childParentOrganizationPath){ + Dataset relation = readPath(spark, inputPath + "/relation", Relation.class); + relation.createOrReplaceTempView("relation"); + + spark + .sql(relOrgQuery) + .as(Encoders.bean(KeyValueSet.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(childParentOrganizationPath); + } + + public static void prepareResultOrgnization(SparkSession spark, String inputPath, String resultOrganizationPath){ + spark + .sql(relResQuery) + .as(Encoders.bean(KeyValueSet.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(resultOrganizationPath); + } + + private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, + String currentIterationPath, String resultOrganizationPath){ + Dataset relation = readPath(spark, inputPath + "/relation", Relation.class); + relation.createOrReplaceTempView("relation"); + + spark + .sql(relOrgQuery) + .as(Encoders.bean(KeyValueSet.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(childParentOrganizationPath); + + spark + .sql(relResQuery) + .as(Encoders.bean(KeyValueSet.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(resultOrganizationPath); + + Dataset children = spark.sql("Select distinct target from relation where relclass='IsParentOf' and datainfo.deletedbyinference = false").as(Encoders.STRING()); + + Dataset parent = spark.sql("Select distinct source from relation where relclass='IsParentOf' and datainfo.deletedbyinference = false").as(Encoders.STRING()); + + //prendo dalla join i risultati che hanno solo il lato sinistro: sono foglie + children.joinWith(parent, children.col("_1").equalTo(parent.col("_1")), "left") + .map((MapFunction, String>) value -> { + if (Optional.ofNullable(value._2()).isPresent()) { + return null; + } + + return value._1(); + }, Encoders.STRING()).filter(Objects::nonNull) + .write() + .mode(SaveMode.Overwrite) + .json(currentIterationPath); + } + +} diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java new file mode 100644 index 000000000..bd1b7803f --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -0,0 +1,29 @@ +package eu.dnetlib.dhp.resulttoorganizationfromsemrel; + +public class SparkResultToOrganizationFromSemRel { + + //per ogni figlio nel file delle organizzazioni + //devo fare una roba iterativa che legge info da un file e le cambia via via + //passo 1: creo l'informazione iniale: organizzazioni che non hanno figli con almeno un padre + //ogni organizzazione punta alla lista di padri + //eseguo la propagazione dall'organizzazione figlio all'organizzazione padre + //ricerco nel dataset delle relazioni se l'organizzazione a cui ho propagato ha, a sua volta, dei padri + //e propago anche a quelli e cosi' via fino a che arrivo ad organizzazione senza padre + + //organizationFile: + //f => {p1, p2, ..., pn} + //resultFile + //o => {r1, r2, ... rm} + + //supponiamo che f => {r1, r2} e che nessuno dei padri abbia gia' l'associazione con questi result + //quindi + //p1 => {r1, r2} + //p2 => {r1, r2} + //pn => {r1, r2} + + + //mi serve un file con tutta la gerarchia per organizzazioni + //un file con le organizzazioni foglia da joinare con l'altro file + //un file con le associazioni organizzazione -> result forse meglio result -> organization + +} diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java new file mode 100644 index 000000000..1d3498d04 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java @@ -0,0 +1,323 @@ + +package eu.dnetlib.dhp.resulttoorganizationfromsemrel; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.KeyValueSet; +import eu.dnetlib.dhp.projecttoresult.SparkResultToProjectThroughSemRelJob; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.ForeachFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +public class PrepareInfoJobTest { + + private static final Logger log = LoggerFactory.getLogger(PrepareInfoJobTest.class); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static SparkSession spark; + + private static Path workingDir; + + @BeforeAll + public static void beforeAll() throws IOException { + workingDir = Files.createTempDirectory(PrepareInfoJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + conf.setAppName(PrepareInfoJobTest.class.getSimpleName()); + + conf.setMaster("local[*]"); + conf.set("spark.driver.host", "localhost"); + conf.set("hive.metastore.local", "true"); + conf.set("spark.ui.enabled", "false"); + conf.set("spark.sql.warehouse.dir", workingDir.toString()); + conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString()); + + spark = SparkSession + .builder() + .appName(PrepareInfoJobTest.class.getSimpleName()) + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + + @Test + public void childParentTest1() { + + PrepareInfo.prepareChildrenParent(spark, getClass() + .getResource( + "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1") + .getPath(), workingDir.toString() + "/childParentOrg/"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile( workingDir.toString() + "/childParentOrg/") + .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class)); + + Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(KeyValueSet.class)); + + Assertions.assertEquals(6, verificationDs.count()); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|dedup_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertEquals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f", + verificationDs.filter("key = '20|dedup_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList().get(0).getValueSet().get(0)); + + Assertions.assertEquals(2, verificationDs.filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList().get(0).getValueSet().contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + Assertions.assertTrue(verificationDs.filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList().get(0).getValueSet().contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'") + .collectAsList().get(0).getValueSet().contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList().get(0).getValueSet().contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") + .collectAsList().get(0).getValueSet().contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d")); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1'") + .collectAsList().get(0).getValueSet().contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); + + } + + + + @Test + public void childParentTest2() { + + PrepareInfo.prepareChildrenParent(spark, getClass() + .getResource( + "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2") + .getPath(), workingDir.toString() + "/childParentOrg/"); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile( workingDir.toString() + "/childParentOrg/") + .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class)); + + Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(KeyValueSet.class)); + + Assertions.assertEquals(5, verificationDs.count()); + + Assertions.assertEquals(0, verificationDs.filter("key = '20|dedup_wf_001::2899e571609779168222fdeb59cb916d'").count()); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertEquals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f", + verificationDs.filter("key = '20|pippo_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList().get(0).getValueSet().get(0)); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0'") + .collectAsList().get(0).getValueSet().contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList().get(0).getValueSet().contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") + .collectAsList().get(0).getValueSet().contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d")); + + Assertions.assertEquals(1, verificationDs.filter("key = '20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList().get(0).getValueSet().size()); + Assertions.assertTrue(verificationDs.filter("key = '20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1'") + .collectAsList().get(0).getValueSet().contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); + + } + + + @Test + public void resultOrganizationTest1(){ + + } + + /** + * All the possible updates will produce a new relation. No relations are already linked in the grpha + * + * @throws Exception + */ + @Test + void UpdateTenTest() throws Exception { + final String potentialUpdatePath = getClass() + .getResource( + "/eu/dnetlib/dhp/projecttoresult/preparedInfo/tenupdates/potentialUpdates") + .getPath(); + final String alreadyLinkedPath = getClass() + .getResource( + "/eu/dnetlib/dhp/projecttoresult/preparedInfo/alreadyLinked") + .getPath(); + SparkResultToProjectThroughSemRelJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-hive_metastore_uris", "", + "-saveGraph", "true", + "-outputPath", workingDir.toString() + "/relation", + "-potentialUpdatePath", potentialUpdatePath, + "-alreadyLinkedPath", alreadyLinkedPath, + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + + // got 20 new relations because "produces" and "isProducedBy" are added + Assertions.assertEquals(10, tmp.count()); + + Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + + Assertions.assertEquals(5, verificationDs.filter("relClass = 'produces'").count()); + Assertions.assertEquals(5, verificationDs.filter("relClass = 'isProducedBy'").count()); + + Assertions + .assertEquals( + 5, + verificationDs + .filter( + (FilterFunction) r -> r.getSource().startsWith("50") + && r.getTarget().startsWith("40") + && r.getRelClass().equals("isProducedBy")) + .count()); + Assertions + .assertEquals( + 5, + verificationDs + .filter( + (FilterFunction) r -> r.getSource().startsWith("40") + && r.getTarget().startsWith("50") + && r.getRelClass().equals("produces")) + .count()); + + verificationDs.createOrReplaceTempView("temporary"); + + Assertions + .assertEquals( + 10, + spark + .sql( + "Select * from temporary where datainfo.inferenceprovenance = 'propagation'") + .count()); + } + + /** + * One of the relations in the possible updates is already linked to the project in the graph. All the others are + * not. There will be 9 new associations leading to 18 new relations + * + * @throws Exception + */ + @Test + void UpdateMixTest() throws Exception { + final String potentialUpdatepath = getClass() + .getResource( + "/eu/dnetlib/dhp/projecttoresult/preparedInfo/updatesmixed/potentialUpdates") + .getPath(); + final String alreadyLinkedPath = getClass() + .getResource( + "/eu/dnetlib/dhp/projecttoresult/preparedInfo/alreadyLinked") + .getPath(); + SparkResultToProjectThroughSemRelJob + .main( + new String[] { + "-isTest", Boolean.TRUE.toString(), + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-hive_metastore_uris", "", + "-saveGraph", "true", + "-outputPath", workingDir.toString() + "/relation", + "-potentialUpdatePath", potentialUpdatepath, + "-alreadyLinkedPath", alreadyLinkedPath, + }); + + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + + // JavaRDD tmp = sc.textFile("/tmp/relation") + // .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + + // got 20 new relations because "produces" and "isProducedBy" are added + Assertions.assertEquals(8, tmp.count()); + + Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + + Assertions.assertEquals(4, verificationDs.filter("relClass = 'produces'").count()); + Assertions.assertEquals(4, verificationDs.filter("relClass = 'isProducedBy'").count()); + + Assertions + .assertEquals( + 4, + verificationDs + .filter( + (FilterFunction) r -> r.getSource().startsWith("50") + && r.getTarget().startsWith("40") + && r.getRelClass().equals("isProducedBy")) + .count()); + Assertions + .assertEquals( + 4, + verificationDs + .filter( + (FilterFunction) r -> r.getSource().startsWith("40") + && r.getTarget().startsWith("50") + && r.getRelClass().equals("produces")) + .count()); + + verificationDs.createOrReplaceTempView("temporary"); + + Assertions + .assertEquals( + 8, + spark + .sql( + "Select * from temporary where datainfo.inferenceprovenance = 'propagation'") + .count()); + } +} diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1/relation new file mode 100644 index 000000000..c63a2e0ac --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1/relation @@ -0,0 +1,7 @@ +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"} \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2/relation new file mode 100644 index 000000000..54589de32 --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest2/relation @@ -0,0 +1,7 @@ +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":true,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"isparentof","relType":"datasourceOrganization","source":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"} \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest/relation b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest/relation new file mode 100644 index 000000000..5aeabb71b --- /dev/null +++ b/dhp-workflows/dhp-enrichment/src/test/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest/relation @@ -0,0 +1,7 @@ +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|dedup_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|pippo_wf_001::2899e571609779168222fdeb59cb916d"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|dedup_wf_001::2899e571609779168222fdeb59cb916d","subRelType":"provision","target":"20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::03748bcb5d754c951efec9700e18a56d","subRelType":"provision","target":"20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|openaire____::ec653e804967133b9436fdd30d3ff51d","subRelType":"provision","target":"20|doajarticles::1cae0b82b56ccd97c2db1f698def7074"} +{"collectedfrom":[{"key":"10|driver______::bee53aa31dc2cbb538c10c2b65fa5824","value":"DOAJ-Articles"}],"dataInfo":{"deletedbyinference":false,"inferred":false,"invisible":false,"provenanceaction":{"classid":"sysimport:crosswalk:entityregistry","classname":"Harvested","schemeid":"dnet:provenanceActions","schemename":"dnet:provenanceActions"},"trust":"0.9"},"lastupdatetimestamp":1592688952862,"properties":[],"relClass":"hasAuthorInstitution","relType":"datasourceOrganization","source":"50|doajarticles::1cae0b82b56ccd97c2db1f698def7074","subRelType":"provision","target":"20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1"} \ No newline at end of file