From 8d755cca808062f2f55e70c1c927eca7c84131cb Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 13 Dec 2021 15:01:40 +0100 Subject: [PATCH] - --- .../dump/complete/CreateContextEntities.java | 2 +- .../dump/complete/CreateContextRelation.java | 2 +- .../dump/complete/SparkCollectAndSave.java | 2 +- .../dump/complete/SparkDumpEntitiesJob.java | 2 +- .../dump/complete/SparkDumpRelationJob.java | 9 +- .../complete/SparkOrganizationRelation.java | 2 +- .../SparkSelectValidRelationsJob.java | 2 +- .../dhp/oa/graph/dump/DumpJobTest.java | 49 +++++++-- .../graph/dump/complete/DumpRelationTest.java | 100 ++++++++---------- 9 files changed, 94 insertions(+), 76 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextEntities.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextEntities.java index aa031203d..c422c3b40 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextEntities.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextEntities.java @@ -41,7 +41,7 @@ public class CreateContextEntities implements Serializable { .toString( CreateContextEntities.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json")); + "/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java index 3c71fcaa5..03f53de64 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/CreateContextRelation.java @@ -48,7 +48,7 @@ public class CreateContextRelation implements Serializable { .requireNonNull( CreateContextRelation.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json"))); + "/eu/dnetlib/dhp/oa/graph/dump/input_entity_parameter.json"))); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java index e902a02c8..1c98ea6cb 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.java @@ -31,7 +31,7 @@ public class SparkCollectAndSave implements Serializable { .toString( SparkCollectAndSave.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json")); + "/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpEntitiesJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpEntitiesJob.java index f239ffca7..7cc05a821 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpEntitiesJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpEntitiesJob.java @@ -22,7 +22,7 @@ public class SparkDumpEntitiesJob implements Serializable { .toString( SparkDumpEntitiesJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/wf/input_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/input_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java index 05e3f8835..0ac97dcb6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkDumpRelationJob.java @@ -38,7 +38,7 @@ public class SparkDumpRelationJob implements Serializable { .toString( SparkDumpRelationJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -55,13 +55,12 @@ public class SparkDumpRelationJob implements Serializable { final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - Optional rs = Optional.ofNullable(parser.get("removeSet")); + Optional rs = Optional.ofNullable(parser.get("removeSet")); final Set removeSet = new HashSet<>(); - if(rs.isPresent()){ + if (rs.isPresent()) { Collections.addAll(removeSet, rs.get().split(";")); } - SparkConf conf = new SparkConf(); runWithSparkSession( @@ -78,7 +77,7 @@ public class SparkDumpRelationJob implements Serializable { private static void dumpRelation(SparkSession spark, String inputPath, String outputPath, Set removeSet) { Dataset relations = Utils.readPath(spark, inputPath, Relation.class); relations - .filter((FilterFunction)r -> !removeSet.contains(r.getRelClass())) + .filter((FilterFunction) r -> !removeSet.contains(r.getRelClass())) .map((MapFunction) relation -> { eu.dnetlib.dhp.schema.dump.oaf.graph.Relation relNew = new eu.dnetlib.dhp.schema.dump.oaf.graph.Relation(); relNew diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java index 4475232de..dff1a1066 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkOrganizationRelation.java @@ -39,7 +39,7 @@ public class SparkOrganizationRelation implements Serializable { .toString( SparkOrganizationRelation.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_organization_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/input_organization_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java index d1bca6a7c..1e5675e5f 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/dump/complete/SparkSelectValidRelationsJob.java @@ -35,7 +35,7 @@ public class SparkSelectValidRelationsJob implements Serializable { .toString( SparkSelectValidRelationsJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); + "/eu/dnetlib/dhp/oa/graph/dump/input_relationdump_parameters.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java index 6dc56c846..30e2f93ef 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/DumpJobTest.java @@ -917,7 +917,7 @@ public class DumpJobTest { DumpProducts dump = new DumpProducts(); dump .run( - false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, + false, sourcePath, workingDir.toString() + "/result", communityMapPath, Publication.class, GraphResult.class, Constants.DUMPTYPE.COMPLETE.getType()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -949,18 +949,45 @@ public class DumpJobTest { Assertions.assertTrue(temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").count() == 1); temp = spark - .sql( - "select id, inst.articleprocessingcharge.amount, inst.articleprocessingcharge.currency " + - "from check " + - "lateral view explode (instance) i as inst " + - "where inst.articleprocessingcharge is not null"); + .sql( + "select id, inst.articleprocessingcharge.amount, inst.articleprocessingcharge.currency " + + "from check " + + "lateral view explode (instance) i as inst " + + "where inst.articleprocessingcharge is not null"); + Assertions + .assertEquals( + "3131.64", + temp + .filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'") + .collectAsList() + .get(0) + .getString(1)); + Assertions + .assertEquals( + "EUR", + temp + .filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'") + .collectAsList() + .get(0) + .getString(2)); - Assertions.assertEquals("3131.64", temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").collectAsList().get(0).getString(1)); - Assertions.assertEquals("EUR", temp.filter("id = '50|datacite____::05c611fdfc93d7a2a703d1324e28104a'").collectAsList().get(0).getString(2)); - - Assertions.assertEquals("2578.35", temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").collectAsList().get(0).getString(1)); - Assertions.assertEquals("EUR", temp.filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'").collectAsList().get(0).getString(2)); + Assertions + .assertEquals( + "2578.35", + temp + .filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'") + .collectAsList() + .get(0) + .getString(1)); + Assertions + .assertEquals( + "EUR", + temp + .filter("id = '50|dedup_wf_001::01e6a28565ca01376b7548e530c6f6e8'") + .collectAsList() + .get(0) + .getString(2)); } } diff --git a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java index 8a2eb585e..763318ae4 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java +++ b/dhp-workflows/dhp-graph-mapper/src/test/java/eu/dnetlib/dhp/oa/graph/dump/complete/DumpRelationTest.java @@ -4,10 +4,8 @@ package eu.dnetlib.dhp.oa.graph.dump.complete; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; - import java.util.HashMap; - import org.apache.commons.io.FileUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -83,7 +81,6 @@ public class DumpRelationTest { "-sourcePath", sourcePath }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc @@ -145,7 +142,6 @@ public class DumpRelationTest { "-sourcePath", sourcePath }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc @@ -207,105 +203,101 @@ public class DumpRelationTest { @Test public void test3() throws Exception {// final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation") + .getPath(); SparkDumpRelationJob.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/relation", - "-sourcePath", sourcePath, - "-removeSet", "isParticipant" + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/relation", + "-sourcePath", sourcePath, + "-removeSet", "isParticipant" }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + .createDataset(tmp.rdd(), Encoders.bean(Relation.class)); verificationDataset.createOrReplaceTempView("table"); verificationDataset - .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); Dataset check = spark - .sql( - "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " - + - "from table "); + .sql( + "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " + + + "from table "); Assertions.assertEquals(22, check.filter("name = 'isProvidedBy'").count()); Assertions - .assertEquals( - 22, check - .filter( - "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + - "provenance = 'Harvested'") - .count()); + .assertEquals( + 22, check + .filter( + "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + + "provenance = 'Harvested'") + .count()); Assertions.assertEquals(0, check.filter("name = 'isParticipant'").count()); - Assertions.assertEquals(1, check.filter("name = 'isAuthorInstitutionOf'").count()); Assertions - .assertEquals( - 1, check - .filter( - "name = 'isAuthorInstitutionOf' and stype = 'organization' and ttype = 'result' " + - "and provenance = 'Inferred by OpenAIRE'") - .count()); + .assertEquals( + 1, check + .filter( + "name = 'isAuthorInstitutionOf' and stype = 'organization' and ttype = 'result' " + + "and provenance = 'Inferred by OpenAIRE'") + .count()); } @Test public void test4() throws Exception {// final String sourcePath = getClass() - .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation") - .getPath(); + .getResource("/eu/dnetlib/dhp/oa/graph/dump/relation/relation") + .getPath(); SparkDumpRelationJob.main(new String[] { - "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-outputPath", workingDir.toString() + "/relation", - "-sourcePath", sourcePath, - "-removeSet", "isParticipant;isAuthorInstitutionOf" + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-outputPath", workingDir.toString() + "/relation", + "-sourcePath", sourcePath, + "-removeSet", "isParticipant;isAuthorInstitutionOf" }); - final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation") - .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); org.apache.spark.sql.Dataset verificationDataset = spark - .createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + .createDataset(tmp.rdd(), Encoders.bean(Relation.class)); verificationDataset.createOrReplaceTempView("table"); verificationDataset - .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); + .foreach((ForeachFunction) r -> System.out.println(new ObjectMapper().writeValueAsString(r))); Dataset check = spark - .sql( - "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " - + - "from table "); + .sql( + "SELECT reltype.name, source.id source, source.type stype, target.id target,target.type ttype, provenance.provenance " + + + "from table "); Assertions.assertEquals(22, check.filter("name = 'isProvidedBy'").count()); Assertions - .assertEquals( - 22, check - .filter( - "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + - "provenance = 'Harvested'") - .count()); + .assertEquals( + 22, check + .filter( + "name = 'isProvidedBy' and stype = 'datasource' and ttype = 'organization' and " + + "provenance = 'Harvested'") + .count()); Assertions.assertEquals(0, check.filter("name = 'isParticipant'").count()); - Assertions.assertEquals(0, check.filter("name = 'isAuthorInstitutionOf'").count()); }