From daf4d7971b6b1a355b8930b4a2e1f4fc25efd380 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 31 May 2023 18:56:58 +0200 Subject: [PATCH] refactoring --- .../eu/dnetlib/dhp/PropagationConstant.java | 48 +- .../PrepareInfo.java | 50 +- .../SparkResultToOrganizationFromSemRel.java | 70 ++- .../StepActions.java | 88 ++- .../PrepareInfoJobTest.java | 246 ++++----- .../SparkJobTest.java | 509 +++++++++--------- .../StepActionsTest.java | 3 +- 7 files changed, 518 insertions(+), 496 deletions(-) diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java index 87528ef58..053300696 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/PropagationConstant.java @@ -174,37 +174,39 @@ public class PropagationConstant { return newRelations; } - public static Relation getRelation(String source, String target, String rel_class){ - if (ModelConstants.HAS_PARTICIPANT.equals(rel_class)){ + public static Relation getRelation(String source, String target, String rel_class) { + if (ModelConstants.HAS_PARTICIPANT.equals(rel_class)) { return getParticipantRelation(source, target, rel_class); - }else + } else return getAffiliationRelation(source, target, rel_class); } public static Relation getParticipantRelation( - String source, - String target, - String rel_class) { - return getRelation(source, target , - rel_class, - ModelConstants.PROJECT_ORGANIZATION, - ModelConstants.PARTICIPATION, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_NAME); + String source, + String target, + String rel_class) { + return getRelation( + source, target, + rel_class, + ModelConstants.PROJECT_ORGANIZATION, + ModelConstants.PARTICIPATION, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_NAME); } public static Relation getAffiliationRelation( - String source, - String target, - String rel_class) { - return getRelation(source, target , - rel_class, - ModelConstants.RESULT_ORGANIZATION, - ModelConstants.AFFILIATION, - PROPAGATION_DATA_INFO_TYPE, - PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID, - PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME); + String source, + String target, + String rel_class) { + return getRelation( + source, target, + rel_class, + ModelConstants.RESULT_ORGANIZATION, + ModelConstants.AFFILIATION, + PROPAGATION_DATA_INFO_TYPE, + PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID, + PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_NAME); } public static Relation getRelation( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfo.java index 971ef436f..8d3432f06 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfo.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfo.java @@ -7,7 +7,6 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession; import java.io.Serializable; import java.util.*; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.*; @@ -15,6 +14,8 @@ import org.apache.spark.sql.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.ObjectMapper; + import eu.dnetlib.dhp.KeyValueSet; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob; @@ -48,10 +49,10 @@ public class PrepareInfo implements Serializable { // associate projects to all the participant orgs private static final String PROJECT_ORGANIZATION_QUERY = "SELECT source key, collect_set(target) as valueSet " + - "FROM relation " + - "WHERE lower(relclass) = '" + ModelConstants.HAS_PARTICIPANT.toLowerCase() + - "' and datainfo.deletedbyinference = false " + - "GROUP BY source"; + "FROM relation " + + "WHERE lower(relclass) = '" + ModelConstants.HAS_PARTICIPANT.toLowerCase() + + "' and datainfo.deletedbyinference = false " + + "GROUP BY source"; public static void main(String[] args) throws Exception { @@ -98,12 +99,13 @@ public class PrepareInfo implements Serializable { childParentPath, leavesPath, resultOrganizationPath, - projectOrgPath, + projectOrgPath, relationPath)); } private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, - String currentIterationPath, String resultOrganizationPath, String projectOrganizationPath, String relationPath) { + String currentIterationPath, String resultOrganizationPath, String projectOrganizationPath, + String relationPath) { Dataset relation = readPath(spark, inputPath + "/relation", Relation.class); relation.createOrReplaceTempView("relation"); @@ -124,30 +126,30 @@ public class PrepareInfo implements Serializable { .json(resultOrganizationPath); spark - .sql(PROJECT_ORGANIZATION_QUERY) - .as(Encoders.bean(KeyValueSet.class)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(projectOrganizationPath); + .sql(PROJECT_ORGANIZATION_QUERY) + .as(Encoders.bean(KeyValueSet.class)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(projectOrganizationPath); relation - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)) - .write() + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)) + .write() .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(relationPath + "/result"); relation - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - r.getRelClass().equals(ModelConstants.HAS_PARTICIPANT)) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(relationPath + "/project"); + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equals(ModelConstants.HAS_PARTICIPANT)) + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(relationPath + "/project"); Dataset children = spark .sql( diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java index dd32552ad..27e502aba 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -92,7 +92,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { leavesPath, childParentPath, resultOrganizationPath, - projectOrganizationPath, + projectOrganizationPath, relationPath, workingPath, outputPath, @@ -147,15 +147,16 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { addNewRelations(spark, workingPath + NEW_RESULT_RELATION_PATH, outputPath); StepActions - .execStep( - spark, graphPath + "/project", workingPath + NEW_PROJECT_RELATION_PATH, - leavesPath, childParentPath, projectOrganizationPath, ModelConstants.HAS_PARTICIPANT); + .execStep( + spark, graphPath + "/project", workingPath + NEW_PROJECT_RELATION_PATH, + leavesPath, childParentPath, projectOrganizationPath, ModelConstants.HAS_PARTICIPANT); addNewRelations(spark, workingPath + NEW_PROJECT_RELATION_PATH, outputPath); } private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, - String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath, String outputPath, + String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath, + String outputPath, PropagationCounter propagationCounter) { int iteration = 0; long leavesCount; @@ -167,13 +168,13 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { spark, graphPath + "/result", workingPath + NEW_RESULT_RELATION_PATH, leavesPath, childParentPath, resultOrganizationPath, ModelConstants.HAS_AUTHOR_INSTITUTION); StepActions - .execStep( - spark, graphPath + "/project", workingPath + NEW_PROJECT_RELATION_PATH, - leavesPath, childParentPath, projectOrganizationPath, ModelConstants.HAS_PARTICIPANT); + .execStep( + spark, graphPath + "/project", workingPath + NEW_PROJECT_RELATION_PATH, + leavesPath, childParentPath, projectOrganizationPath, ModelConstants.HAS_PARTICIPANT); StepActions .prepareForNextStep( - spark, workingPath , resultOrganizationPath, projectOrganizationPath, leavesPath, + spark, workingPath, resultOrganizationPath, projectOrganizationPath, leavesPath, childParentPath, workingPath + "/leaves", workingPath + "/resOrg", workingPath + "/projOrg"); moveOutput(spark, workingPath, leavesPath, resultOrganizationPath, projectOrganizationPath); leavesCount = readPath(spark, leavesPath, Leaves.class).count(); @@ -224,24 +225,24 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { } private static void moveOutput(SparkSession spark, String workingPath, String leavesPath, - String resultOrganizationPath, String projectOrganizationPath) { + String resultOrganizationPath, String projectOrganizationPath) { readPath(spark, workingPath + "/leaves", Leaves.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(leavesPath); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(leavesPath); readPath(spark, workingPath + "/resOrg", KeyValueSet.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(resultOrganizationPath); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(resultOrganizationPath); readPath(spark, workingPath + "/projOrg", KeyValueSet.class) - .write() - .mode(SaveMode.Overwrite) - .option("compression", "gzip") - .json(projectOrganizationPath); + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .json(projectOrganizationPath); } @@ -253,25 +254,22 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { .mapGroups( (MapGroupsFunction) (k, it) -> it.next(), Encoders.bean(Relation.class)) .flatMap( - (FlatMapFunction) r -> - { - if(r.getSource().startsWith("50|")){ + (FlatMapFunction) r -> { + if (r.getSource().startsWith("50|")) { return Arrays - .asList( - r, getAffiliationRelation( - r.getTarget(), r.getSource(), ModelConstants.IS_AUTHOR_INSTITUTION_OF)) - .iterator(); - }else{ + .asList( + r, getAffiliationRelation( + r.getTarget(), r.getSource(), ModelConstants.IS_AUTHOR_INSTITUTION_OF)) + .iterator(); + } else { return Arrays - .asList( - r, getParticipantRelation( - r.getTarget(), r.getSource(), ModelConstants.IS_PARTICIPANT)) - .iterator(); + .asList( + r, getParticipantRelation( + r.getTarget(), r.getSource(), ModelConstants.IS_PARTICIPANT)) + .iterator(); } } - - , Encoders.bean(Relation.class)) .write() diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActions.java index de5034d38..386ea1a5c 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActions.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActions.java @@ -50,22 +50,25 @@ public class StepActions implements Serializable { spark, resultOrgPath, readPath(spark, selectedRelsPath, Relation.class), orgOutputPath); } - public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath, String projectOrgPath, - String leavesPath, String chldParentOrgPath, String leavesOutputPath, - String orgOutputPath, String outputProjectPath) { + public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath, + String projectOrgPath, + String leavesPath, String chldParentOrgPath, String leavesOutputPath, + String orgOutputPath, String outputProjectPath) { // use of the parents as new leaves set changeLeavesSet(spark, leavesPath, chldParentOrgPath, leavesOutputPath); // add the new relations obtained from propagation to the keyvalueset result organization updateEntityOrganization( - spark, resultOrgPath, readPath(spark, selectedRelsPath + NEW_RESULT_RELATION_PATH, Relation.class), orgOutputPath); + spark, resultOrgPath, readPath(spark, selectedRelsPath + NEW_RESULT_RELATION_PATH, Relation.class), + orgOutputPath); updateEntityOrganization( - spark, projectOrgPath, readPath(spark, selectedRelsPath + NEW_PROJECT_RELATION_PATH, Relation.class), outputProjectPath); + spark, projectOrgPath, readPath(spark, selectedRelsPath + NEW_PROJECT_RELATION_PATH, Relation.class), + outputProjectPath); } private static void updateEntityOrganization(SparkSession spark, String entityOrgPath, - Dataset selectedRels, String outputPath) { + Dataset selectedRels, String outputPath) { Dataset entityOrg = readPath(spark, entityOrgPath, KeyValueSet.class); entityOrg .joinWith( @@ -128,45 +131,43 @@ public class StepActions implements Serializable { // construction of the set) // if at least one relation in the set was not produced by propagation no new relation will be returned - relationDataset - .union(newRels) - .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) - .mapGroups((MapGroupsFunction) (k, it) -> { + .union(newRels) + .groupByKey((MapFunction) r -> r.getSource() + r.getTarget(), Encoders.STRING()) + .mapGroups((MapGroupsFunction) (k, it) -> { - ArrayList relationList = new ArrayList<>(); - relationList.add(it.next()); - it.forEachRemaining(rel -> relationList.add(rel)); + ArrayList relationList = new ArrayList<>(); + relationList.add(it.next()); + it.forEachRemaining(rel -> relationList.add(rel)); - if (relationList - .stream() - .filter( - rel -> !rel - .getDataInfo() - .getProvenanceaction() - .getClassid() - .equals(PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID) && !rel - .getDataInfo() - .getProvenanceaction() - .getClassid() - .equals(PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID)) - .count() > 0) { - return null; - } - - return new ObjectMapper().writeValueAsString(relationList.get(0)); - - }, Encoders.STRING()) - .filter(Objects::nonNull) - .map( - (MapFunction) r -> new ObjectMapper().readValue(r, Relation.class), - Encoders.bean(Relation.class)) - .write() - .mode(SaveMode.Append) - .option("compression", "gzip") - .json(newRelationPath); + if (relationList + .stream() + .filter( + rel -> !rel + .getDataInfo() + .getProvenanceaction() + .getClassid() + .equals(PROPAGATION_RELATION_RESULT_ORGANIZATION_SEM_REL_CLASS_ID) + && !rel + .getDataInfo() + .getProvenanceaction() + .getClassid() + .equals(PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID)) + .count() > 0) { + return null; + } + return new ObjectMapper().writeValueAsString(relationList.get(0)); + }, Encoders.STRING()) + .filter(Objects::nonNull) + .map( + (MapFunction) r -> new ObjectMapper().readValue(r, Relation.class), + Encoders.bean(Relation.class)) + .write() + .mode(SaveMode.Append) + .option("compression", "gzip") + .json(newRelationPath); } @@ -175,7 +176,7 @@ public class StepActions implements Serializable { String leavesPath, String chldParentOrgPath, String entityOrgPath, - String semantics) { + String semantics) { Dataset childParent = readPath(spark, chldParentOrgPath, KeyValueSet.class); Dataset entityOrg = readPath(spark, entityOrgPath, KeyValueSet.class); @@ -202,7 +203,6 @@ public class StepActions implements Serializable { "GROUP BY entityId") .as(Encoders.bean(KeyValueSet.class)); - // create new relations from entity to organization for each entity linked to a leaf return resultParent .flatMap( @@ -213,13 +213,11 @@ public class StepActions implements Serializable { orgId -> getRelation( v.getKey(), orgId, - semantics)) + semantics)) .collect(Collectors.toList()) .iterator(), Encoders.bean(Relation.class)); } - - } diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfoJobTest.java index f29e8d24a..7c9c2b97b 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfoJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/PrepareInfoJobTest.java @@ -77,7 +77,7 @@ public class PrepareInfoJobTest { "-hive_metastore_uris", "", "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", "-relationPath", workingDir.toString() + "/relation" @@ -223,7 +223,7 @@ public class PrepareInfoJobTest { "-hive_metastore_uris", "", "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", "-relationPath", workingDir.toString() + "/relation" @@ -344,7 +344,7 @@ public class PrepareInfoJobTest { "-hive_metastore_uris", "", "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", "-relationPath", workingDir.toString() + "/relation" @@ -365,26 +365,26 @@ public class PrepareInfoJobTest { public void relationProjectTest() throws Exception { PrepareInfo - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-graphPath", getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/projectorganizationtest") - .getPath(), - "-hive_metastore_uris", "", - "-leavesPath", workingDir.toString() + "/currentIteration/", - "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", - "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-graphPath", getClass() + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/projectorganizationtest") + .getPath(), + "-hive_metastore_uris", "", + "-leavesPath", workingDir.toString() + "/currentIteration/", + "-resultOrgPath", workingDir.toString() + "/resultOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" - }); + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation/project") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/relation/project") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); @@ -406,7 +406,7 @@ public class PrepareInfoJobTest { "-hive_metastore_uris", "", "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", "-relationPath", workingDir.toString() + "/relation" @@ -531,134 +531,134 @@ public class PrepareInfoJobTest { public void projectOrganizationTest1() throws Exception { PrepareInfo - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-graphPath", getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/projectorganizationtest") - .getPath(), - "-hive_metastore_uris", "", - "-leavesPath", workingDir.toString() + "/currentIteration/", - "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", - "-childParentPath", workingDir.toString() + "/childParentOrg/", - "-relationPath", workingDir.toString() + "/relation" + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-graphPath", getClass() + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/projectorganizationtest") + .getPath(), + "-hive_metastore_uris", "", + "-leavesPath", workingDir.toString() + "/currentIteration/", + "-resultOrgPath", workingDir.toString() + "/resultOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" - }); + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/projectOrganization/") - .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class)); + .textFile(workingDir.toString() + "/projectOrganization/") + .map(item -> OBJECT_MAPPER.readValue(item, KeyValueSet.class)); Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(KeyValueSet.class)); Assertions.assertEquals(5, verificationDs.count()); Assertions - .assertEquals( - 2, verificationDs - .filter("key = '40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") - .collectAsList() - .get(0) - .getValueSet() - .size()); + .assertEquals( + 2, verificationDs + .filter("key = '40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList() + .get(0) + .getValueSet() + .size()); Assertions - .assertTrue( - verificationDs - .filter("key = '40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") - .collectAsList() - .get(0) - .getValueSet() - .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + .assertTrue( + verificationDs + .filter("key = '40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList() + .get(0) + .getValueSet() + .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")); Assertions - .assertTrue( - verificationDs - .filter("key = '40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") - .collectAsList() - .get(0) - .getValueSet() - .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d")); + .assertTrue( + verificationDs + .filter("key = '40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f'") + .collectAsList() + .get(0) + .getValueSet() + .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d")); Assertions - .assertEquals( - 2, verificationDs - .filter("key = '40|dedup_wf_001::2899e571609779168222fdeb59cb916d'") - .collectAsList() - .get(0) - .getValueSet() - .size()); + .assertEquals( + 2, verificationDs + .filter("key = '40|dedup_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList() + .get(0) + .getValueSet() + .size()); Assertions - .assertTrue( - verificationDs - .filter("key = '40|dedup_wf_001::2899e571609779168222fdeb59cb916d'") - .collectAsList() - .get(0) - .getValueSet() - .contains("20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0")); + .assertTrue( + verificationDs + .filter("key = '40|dedup_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList() + .get(0) + .getValueSet() + .contains("20|doajarticles::396262ee936f3d3e26ff0e60bea6cae0")); Assertions - .assertTrue( - verificationDs - .filter("key = '40|dedup_wf_001::2899e571609779168222fdeb59cb916d'") - .collectAsList() - .get(0) - .getValueSet() - .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d")); + .assertTrue( + verificationDs + .filter("key = '40|dedup_wf_001::2899e571609779168222fdeb59cb916d'") + .collectAsList() + .get(0) + .getValueSet() + .contains("20|pippo_wf_001::2899e571609779168222fdeb59cb916d")); Assertions - .assertEquals( - 1, verificationDs - .filter("key = '40|doajarticles::03748bcb5d754c951efec9700e18a56d'") - .collectAsList() - .get(0) - .getValueSet() - .size()); + .assertEquals( + 1, verificationDs + .filter("key = '40|doajarticles::03748bcb5d754c951efec9700e18a56d'") + .collectAsList() + .get(0) + .getValueSet() + .size()); Assertions - .assertTrue( - verificationDs - .filter("key = '40|doajarticles::03748bcb5d754c951efec9700e18a56d'") - .collectAsList() - .get(0) - .getValueSet() - .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); + .assertTrue( + verificationDs + .filter("key = '40|doajarticles::03748bcb5d754c951efec9700e18a56d'") + .collectAsList() + .get(0) + .getValueSet() + .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); Assertions - .assertEquals( - 1, verificationDs - .filter("key = '40|openaire____::ec653e804967133b9436fdd30d3ff51d'") - .collectAsList() - .get(0) - .getValueSet() - .size()); + .assertEquals( + 1, verificationDs + .filter("key = '40|openaire____::ec653e804967133b9436fdd30d3ff51d'") + .collectAsList() + .get(0) + .getValueSet() + .size()); Assertions - .assertTrue( - verificationDs - .filter("key = '40|openaire____::ec653e804967133b9436fdd30d3ff51d'") - .collectAsList() - .get(0) - .getValueSet() - .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); + .assertTrue( + verificationDs + .filter("key = '40|openaire____::ec653e804967133b9436fdd30d3ff51d'") + .collectAsList() + .get(0) + .getValueSet() + .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); Assertions - .assertEquals( - 1, verificationDs - .filter("key = '40|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") - .collectAsList() - .get(0) - .getValueSet() - .size()); + .assertEquals( + 1, verificationDs + .filter("key = '40|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") + .collectAsList() + .get(0) + .getValueSet() + .size()); Assertions - .assertTrue( - verificationDs - .filter("key = '40|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") - .collectAsList() - .get(0) - .getValueSet() - .contains("20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1")); + .assertTrue( + verificationDs + .filter("key = '40|doajarticles::1cae0b82b56ccd97c2db1f698def7074'") + .collectAsList() + .get(0) + .getValueSet() + .contains("20|opendoar____::a5fcb8eb25ebd6f7cd219e0fa1e6ddc1")); verificationDs - .foreach((ForeachFunction) v -> System.out.println(OBJECT_MAPPER.writeValueAsString(v))); + .foreach((ForeachFunction) v -> System.out.println(OBJECT_MAPPER.writeValueAsString(v))); } @@ -676,7 +676,7 @@ public class PrepareInfoJobTest { "-hive_metastore_uris", "", "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", "-relationPath", workingDir.toString() + "/relation" @@ -704,7 +704,7 @@ public class PrepareInfoJobTest { "-hive_metastore_uris", "", "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", - "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", + "-projectOrganizationPath", workingDir.toString() + "/projectOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", "-relationPath", workingDir.toString() + "/relation" diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkJobTest.java index eb4ade0da..2e75c75ad 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/SparkJobTest.java @@ -85,9 +85,9 @@ public class SparkJobTest { .getPath(); final String projectOrgPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/projectOrganization/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/projectOrganization/") + .getPath(); readPath(spark, leavesPath, Leaves.class) .write() @@ -100,9 +100,9 @@ public class SparkJobTest { .json(workingDir.toString() + "/orgsInput"); readPath(spark, projectOrgPath, KeyValueSet.class) - .write() - .option("compression", "gzip") - .json(workingDir.toString() + "/projectInput"); + .write() + .option("compression", "gzip") + .json(workingDir.toString() + "/projectInput"); SparkResultToOrganizationFromSemRel @@ -114,7 +114,7 @@ public class SparkJobTest { "-outputPath", workingDir.toString() + "/finalrelation", "-leavesPath", workingDir.toString() + "/leavesInput", "-resultOrgPath", workingDir.toString() + "/orgsInput", - "-projectOrganizationPath", workingDir.toString() + "/projectInput", + "-projectOrganizationPath", workingDir.toString() + "/projectInput", "-childParentPath", childParentPath, "-workingDir", workingDir.toString() }); @@ -161,19 +161,24 @@ public class SparkJobTest { .foreach(r -> Assertions.assertEquals(ModelConstants.HAS_AUTHOR_INSTITUTION, r.getRelClass())); Assertions .assertEquals( - 2, result.filter(r -> r.getSource().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); + 2, + result.filter(r -> r.getSource().equals("50|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); Assertions .assertEquals( - 3, result.filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); + 3, + result.filter(r -> r.getSource().equals("50|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); Assertions .assertEquals( - 2, result.filter(r -> r.getSource().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); + 2, + result.filter(r -> r.getSource().equals("50|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); Assertions .assertEquals( - 1, result.filter(r -> r.getSource().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); + 1, + result.filter(r -> r.getSource().equals("50|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); Assertions .assertEquals( - 1, result.filter(r -> r.getSource().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); + 1, + result.filter(r -> r.getSource().equals("50|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); Assertions.assertEquals(9, result.filter(r -> r.getSource().substring(0, 3).equals("20|")).count()); result @@ -181,19 +186,24 @@ public class SparkJobTest { .foreach(r -> Assertions.assertEquals(ModelConstants.IS_AUTHOR_INSTITUTION_OF, r.getRelClass())); Assertions .assertEquals( - 1, result.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); + 1, + result.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); Assertions .assertEquals( - 1, result.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); + 1, + result.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); Assertions .assertEquals( - 2, result.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); + 2, + result.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); Assertions .assertEquals( - 2, result.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); + 2, + result.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); Assertions .assertEquals( - 3, result.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); + 3, + result.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); Assertions .assertTrue( @@ -336,332 +346,343 @@ public class SparkJobTest { public void completeProjectExecution() throws Exception { final String graphPath = getClass() - .getResource("/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/graph") - .getPath(); + .getResource("/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/graph") + .getPath(); final String leavesPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/currentIteration/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/currentIteration/") + .getPath(); final String childParentPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/childParentOrg/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/childParentOrg/") + .getPath(); final String resultOrgPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/resultOrganization/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/resultOrganization/") + .getPath(); final String projectOrgPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/projectOrganization/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/projectOrganization/") + .getPath(); readPath(spark, leavesPath, Leaves.class) - .write() - .option("compression", "gzip") - .json(workingDir.toString() + "/leavesInput"); + .write() + .option("compression", "gzip") + .json(workingDir.toString() + "/leavesInput"); readPath(spark, resultOrgPath, KeyValueSet.class) - .write() - .option("compression", "gzip") - .json(workingDir.toString() + "/orgsInput"); + .write() + .option("compression", "gzip") + .json(workingDir.toString() + "/orgsInput"); readPath(spark, projectOrgPath, KeyValueSet.class) - .write() - .option("compression", "gzip") - .json(workingDir.toString() + "/projectInput"); + .write() + .option("compression", "gzip") + .json(workingDir.toString() + "/projectInput"); SparkResultToOrganizationFromSemRel - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-relationPath", graphPath, - "-hive_metastore_uris", "", - "-outputPath", workingDir.toString() + "/finalrelation", - "-leavesPath", workingDir.toString() + "/leavesInput", - "-resultOrgPath", workingDir.toString() + "/orgsInput", - "-projectOrganizationPath", workingDir.toString() + "/projectInput", - "-childParentPath", childParentPath, - "-workingDir", workingDir.toString() - }); + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-relationPath", graphPath, + "-hive_metastore_uris", "", + "-outputPath", workingDir.toString() + "/finalrelation", + "-leavesPath", workingDir.toString() + "/leavesInput", + "-resultOrgPath", workingDir.toString() + "/orgsInput", + "-projectOrganizationPath", workingDir.toString() + "/projectInput", + "-childParentPath", childParentPath, + "-workingDir", workingDir.toString() + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD temp = sc - .textFile(workingDir.toString() + "/finalrelation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/finalrelation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); Assertions.assertEquals(36, temp.count()); - JavaRDD project = temp.filter(r -> r.getSource().startsWith("40|") || r.getTarget().startsWith("40|")); + JavaRDD project = temp + .filter(r -> r.getSource().startsWith("40|") || r.getTarget().startsWith("40|")); Assertions.assertEquals(18, project.count()); project.foreach(r -> Assertions.assertEquals(ModelConstants.PARTICIPATION, r.getSubRelType())); project.foreach(r -> Assertions.assertEquals(ModelConstants.PROJECT_ORGANIZATION, r.getRelType())); project - .foreach( - r -> Assertions - .assertEquals( - PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance())); + .foreach( + r -> Assertions + .assertEquals( + PropagationConstant.PROPAGATION_DATA_INFO_TYPE, r.getDataInfo().getInferenceprovenance())); project - .foreach( - r -> Assertions - .assertEquals( - PropagationConstant.PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID, - r.getDataInfo().getProvenanceaction().getClassid())); + .foreach( + r -> Assertions + .assertEquals( + PropagationConstant.PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_ID, + r.getDataInfo().getProvenanceaction().getClassid())); project - .foreach( - r -> Assertions - .assertEquals( - PropagationConstant.PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_NAME, - r.getDataInfo().getProvenanceaction().getClassname())); + .foreach( + r -> Assertions + .assertEquals( + PropagationConstant.PROPAGATION_RELATION_PROJECT_ORGANIZATION_SEM_REL_CLASS_NAME, + r.getDataInfo().getProvenanceaction().getClassname())); project - .foreach( - r -> Assertions - .assertEquals( - "0.85", - r.getDataInfo().getTrust())); + .foreach( + r -> Assertions + .assertEquals( + "0.85", + r.getDataInfo().getTrust())); Assertions.assertEquals(9, project.filter(r -> r.getSource().substring(0, 3).equals("40|")).count()); project - .filter(r -> r.getSource().substring(0, 3).equals("40|")) - .foreach(r -> Assertions.assertEquals(ModelConstants.HAS_PARTICIPANT, r.getRelClass())); + .filter(r -> r.getSource().substring(0, 3).equals("40|")) + .foreach(r -> Assertions.assertEquals(ModelConstants.HAS_PARTICIPANT, r.getRelClass())); Assertions - .assertEquals( - 2, project.filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); + .assertEquals( + 2, + project.filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); Assertions - .assertEquals( - 3, project.filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); + .assertEquals( + 3, + project.filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); Assertions - .assertEquals( - 2, project.filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); + .assertEquals( + 2, + project.filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); Assertions - .assertEquals( - 1, project.filter(r -> r.getSource().equals("40|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); + .assertEquals( + 1, + project.filter(r -> r.getSource().equals("40|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); Assertions - .assertEquals( - 1, project.filter(r -> r.getSource().equals("40|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); + .assertEquals( + 1, + project.filter(r -> r.getSource().equals("40|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); Assertions.assertEquals(9, project.filter(r -> r.getSource().substring(0, 3).equals("20|")).count()); project - .filter(r -> r.getSource().substring(0, 3).equals("20|")) - .foreach(r -> Assertions.assertEquals(ModelConstants.IS_PARTICIPANT, r.getRelClass())); + .filter(r -> r.getSource().substring(0, 3).equals("20|")) + .foreach(r -> Assertions.assertEquals(ModelConstants.IS_PARTICIPANT, r.getRelClass())); Assertions - .assertEquals( - 1, project.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); + .assertEquals( + 1, + project.filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")).count()); Assertions - .assertEquals( - 1, project.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); + .assertEquals( + 1, + project.filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")).count()); Assertions - .assertEquals( - 2, project.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); + .assertEquals( + 2, + project.filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")).count()); Assertions - .assertEquals( - 2, project.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); + .assertEquals( + 2, + project.filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")).count()); Assertions - .assertEquals( - 3, project.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); + .assertEquals( + 3, + project.filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")).count()); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")) - .map(r -> r.getTarget()) - .collect() - .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")) + .map(r -> r.getTarget()) + .collect() + .contains("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")) - .map(r -> r.getTarget()) - .collect() - .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")) + .map(r -> r.getTarget()) + .collect() + .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) - .map(r -> r.getTarget()) - .collect() - .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) + .map(r -> r.getTarget()) + .collect() + .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) - .map(r -> r.getTarget()) - .collect() - .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) + .map(r -> r.getTarget()) + .collect() + .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")) - .map(r -> r.getTarget()) - .collect() - .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")) + .map(r -> r.getTarget()) + .collect() + .contains("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")) - .map(r -> r.getTarget()) - .collect() - .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")) + .map(r -> r.getTarget()) + .collect() + .contains("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")) - .map(r -> r.getTarget()) - .collect() - .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")) + .map(r -> r.getTarget()) + .collect() + .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|openaire____::ec653e804967133b9436fdd30d3ff51d")) - .map(r -> r.getTarget()) - .collect() - .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|openaire____::ec653e804967133b9436fdd30d3ff51d")) + .map(r -> r.getTarget()) + .collect() + .contains("20|openaire____::ec653e804967133b9436fdd30d3ff51d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("40|doajarticles::03748bcb5d754c951efec9700e18a56d")) - .map(r -> r.getTarget()) - .collect() - .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("40|doajarticles::03748bcb5d754c951efec9700e18a56d")) + .map(r -> r.getTarget()) + .collect() + .contains("20|doajarticles::03748bcb5d754c951efec9700e18a56d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")) - .map(r -> r.getTarget()) - .collect() - .contains("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")) + .map(r -> r.getTarget()) + .collect() + .contains("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")) - .map(r -> r.getTarget()) - .collect() - .contains("40|openaire____::ec653e804967133b9436fdd30d3ff51d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|openaire____::ec653e804967133b9436fdd30d3ff51d")) + .map(r -> r.getTarget()) + .collect() + .contains("40|openaire____::ec653e804967133b9436fdd30d3ff51d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) - .map(r -> r.getTarget()) - .collect() - .contains("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) + .map(r -> r.getTarget()) + .collect() + .contains("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) - .map(r -> r.getTarget()) - .collect() - .contains("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")) + .map(r -> r.getTarget()) + .collect() + .contains("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")) - .map(r -> r.getTarget()) - .collect() - .contains("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")) + .map(r -> r.getTarget()) + .collect() + .contains("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")) - .map(r -> r.getTarget()) - .collect() - .contains("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")) + .map(r -> r.getTarget()) + .collect() + .contains("40|doajarticles::2baa9032dc058d3c8ff780c426b0c19f")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")) - .map(r -> r.getTarget()) - .collect() - .contains("40|doajarticles::03748bcb5d754c951efec9700e18a56d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|doajarticles::03748bcb5d754c951efec9700e18a56d")) + .map(r -> r.getTarget()) + .collect() + .contains("40|doajarticles::03748bcb5d754c951efec9700e18a56d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")) - .map(r -> r.getTarget()) - .collect() - .contains("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|dedup_wf_001::2899e571609779168222fdeb59cb916d")) + .map(r -> r.getTarget()) + .collect() + .contains("40|dedup_wf_001::2899e571609779168222fdeb59cb916d")); Assertions - .assertTrue( - project - .filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")) - .map(r -> r.getTarget()) - .collect() - .contains("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); + .assertTrue( + project + .filter(r -> r.getSource().equals("20|doajarticles::1cae0b82b56ccd97c2db1f698def7074")) + .map(r -> r.getTarget()) + .collect() + .contains("40|doajarticles::1cae0b82b56ccd97c2db1f698def7074")); } @Test public void singleIterationExecution() throws Exception { final String graphPath = getClass() - .getResource("/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/graph") - .getPath(); + .getResource("/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/graph") + .getPath(); final String leavesPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/currentIteration/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/currentIteration/") + .getPath(); final String childParentPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/childParentOrg/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/childParentOrg/") + .getPath(); final String resultOrgPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/resultOrganization/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/resultOrganization/") + .getPath(); final String projectOrgPath = getClass() - .getResource( - "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/projectOrganization/") - .getPath(); + .getResource( + "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/projectOrganization/") + .getPath(); readPath(spark, leavesPath, Leaves.class) - .write() - .option("compression", "gzip") - .json(workingDir.toString() + "/leavesInput"); + .write() + .option("compression", "gzip") + .json(workingDir.toString() + "/leavesInput"); readPath(spark, resultOrgPath, KeyValueSet.class) - .write() - .option("compression", "gzip") - .json(workingDir.toString() + "/orgsInput"); + .write() + .option("compression", "gzip") + .json(workingDir.toString() + "/orgsInput"); readPath(spark, projectOrgPath, KeyValueSet.class) - .write() - .option("compression", "gzip") - .json(workingDir.toString() + "/projectInput"); + .write() + .option("compression", "gzip") + .json(workingDir.toString() + "/projectInput"); SparkResultToOrganizationFromSemRel - .main( - new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-relationPath", graphPath, - "-hive_metastore_uris", "", - "-outputPath", workingDir.toString() + "/finalrelation", - "-leavesPath", workingDir.toString() + "/leavesInput", - "-resultOrgPath", workingDir.toString() + "/orgsInput", - "-projectOrganizationPath", workingDir.toString() + "/projectInput", - "-childParentPath", childParentPath, - "-workingDir", workingDir.toString(), - "-iterations", "1" - }); + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-relationPath", graphPath, + "-hive_metastore_uris", "", + "-outputPath", workingDir.toString() + "/finalrelation", + "-leavesPath", workingDir.toString() + "/leavesInput", + "-resultOrgPath", workingDir.toString() + "/orgsInput", + "-projectOrganizationPath", workingDir.toString() + "/projectInput", + "-childParentPath", childParentPath, + "-workingDir", workingDir.toString(), + "-iterations", "1" + }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD temp = sc - .textFile(workingDir.toString() + "/finalrelation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/finalrelation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); Assertions.assertEquals(16, temp.count()); diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActionsTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActionsTest.java index 7a71240b2..64339e3b7 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActionsTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/entitytoorganizationfromsemrel/StepActionsTest.java @@ -87,7 +87,8 @@ public class StepActionsTest { getClass() .getResource( "/eu/dnetlib/dhp/entitytoorganizationfromsemrel/execstep/resultOrganization/") - .getPath(), ModelConstants.HAS_AUTHOR_INSTITUTION); + .getPath(), + ModelConstants.HAS_AUTHOR_INSTITUTION); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());