diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java index d853f3858..707462f24 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfo.java @@ -73,6 +73,9 @@ public class PrepareInfo implements Serializable { final String resultOrganizationPath = parser.get("resultOrgPath"); log.info("resultOrganizationPath: {}", resultOrganizationPath); + final String relationPath = parser.get("relationPath"); + log.info("relationPath: {}", relationPath); + SparkConf conf = new SparkConf(); conf.set("hive.metastore.uris", parser.get("hive_metastore_uris")); @@ -84,11 +87,12 @@ public class PrepareInfo implements Serializable { graphPath, childParentPath, leavesPath, - resultOrganizationPath)); + resultOrganizationPath, + relationPath)); } private static void prepareInfo(SparkSession spark, String inputPath, String childParentOrganizationPath, - String currentIterationPath, String resultOrganizationPath) { + String currentIterationPath, String resultOrganizationPath, String relationPath) { Dataset relation = readPath(spark, inputPath + "/relation", Relation.class); relation.createOrReplaceTempView("relation"); @@ -108,6 +112,15 @@ public class PrepareInfo implements Serializable { .option("compression", "gzip") .json(resultOrganizationPath); + relation + .filter( + (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && + r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)) + .write() + .mode(SaveMode.Overwrite) + .option("compression","gzip") + .json(relationPath); + Dataset children = spark .sql( "Select distinct target as child from relation where " + diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java index f5d8361d7..e26276f53 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkResultToOrganizationFromSemRel.java @@ -49,8 +49,8 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - String graphPath = parser.get("graphPath"); - log.info("graphPath: {}", graphPath); + String relationPath = parser.get("relationPath"); + log.info("relationPath: {}", relationPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -78,7 +78,7 @@ public class SparkResultToOrganizationFromSemRel implements Serializable { leavesPath, childParentPath, resultOrganizationPath, - graphPath, + relationPath, workingPath, outputPath)); } diff --git a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java index d190e5342..02444cb15 100644 --- a/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java +++ b/dhp-workflows/dhp-enrichment/src/main/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/StepActions.java @@ -26,16 +26,11 @@ import scala.Tuple2; public class StepActions implements Serializable { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - public static void execStep(SparkSession spark, String graphPath, String newRelationPath, String leavesPath, String chldParentOrgPath, String resultOrgPath) { - Dataset relationGraph = readPath(spark, graphPath + "/relation", Relation.class) - .filter( - (FilterFunction) r -> !r.getDataInfo().getDeletedbyinference() && - r.getRelClass().equals(ModelConstants.HAS_AUTHOR_INSTITUTION)); + Dataset relationGraph = readPath(spark, graphPath, Relation.class); // select only the relation source target among those proposed by propagation that are not already existent getNewRels( newRelationPath, relationGraph, @@ -80,8 +75,8 @@ public class StepActions implements Serializable { ret.setValueSet(orgs); return ret; }, Encoders.bean(KeyValueSet.class)) - .write() - .mode(SaveMode.Overwrite) + .write() + .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); } @@ -116,7 +111,7 @@ public class StepActions implements Serializable { // union of new propagation relations to the relation set // grouping from sourcetarget (we are sure the only relations are those from result to organization by // construction of the set) - // if at least one relation in the set was harvested no new relation will be returned + // if at least one relation in the set was not produced by propagation no new relation will be returned relationDataset .union(newRels) diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json index baa8ba333..c79bfe05d 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_preparation_parameter.json @@ -34,5 +34,11 @@ "paramLongName": "isSparkSessionManaged", "paramDescription": "the path where prepared info have been stored", "paramRequired": false + }, + { + "paramName": "rep", + "paramLongName": "relationPath", + "paramDescription": "the path where to store the selected subset of relations", + "paramRequired": false } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json index bd7bb50f9..f73cc221e 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json @@ -1,7 +1,7 @@ [ { - "paramName":"gp", - "paramLongName":"graphPath", + "paramName":"rep", + "paramLongName":"relationPath", "paramDescription": "the path of the sequencial file to read", "paramRequired": true }, diff --git a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml index e62ce0f5a..17502abea 100644 --- a/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-enrichment/src/main/resources/eu/dnetlib/dhp/resulttoorganizationfromsemrel/oozie_app/workflow.xml @@ -150,6 +150,7 @@ --leavesPath${workingDir}/preparedInfo/leavesPath --childParentPath${workingDir}/preparedInfo/childParentPath --resultOrgPath${workingDir}/preparedInfo/resultOrgPath + --relationPath${workingDir}/preparedInfo/relation @@ -173,7 +174,7 @@ --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors} - --graphPath${sourcePath} + --relationPath${workingDir}/preparedInfo/relation --outputPath${outputPath}/relation --leavesPath${workingDir}/preparedInfo/leavesPath --childParentPath${workingDir}/preparedInfo/childParentPath diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java index 973c663b9..21d99321b 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/PrepareInfoJobTest.java @@ -84,6 +84,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" }); @@ -228,6 +229,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -332,6 +334,35 @@ public class PrepareInfoJobTest { } + @Test + public void relationTest()throws Exception { + + PrepareInfo + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-graphPath", getClass() + .getResource( + "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") + .getPath(), + "-hive_metastore_uris", "", + "-leavesPath", workingDir.toString() + "/currentIteration/", + "-resultOrgPath", workingDir.toString() + "/resultOrganization/", + "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" + + }); + final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + JavaRDD tmp = sc + .textFile(workingDir.toString() + "/relation") + .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + + Dataset verificationDs = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); + + Assertions.assertEquals(7, verificationDs.count()); + + } @Test public void resultOrganizationTest1() throws Exception { @@ -347,6 +378,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -467,10 +499,6 @@ public class PrepareInfoJobTest { @Test public void foundLeavesTest1() throws Exception { -// PrepareInfo.prepareInfo(spark, getClass() -// .getResource( -// "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/resultorganizationtest") -// .getPath(), workingDir.toString() + "/childParentOrg/", workingDir.toString() + "/currentIteration/",workingDir.toString() + "/resultOrganization/"); PrepareInfo .main( @@ -484,6 +512,7 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" }); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -510,12 +539,9 @@ public class PrepareInfoJobTest { "-leavesPath", workingDir.toString() + "/currentIteration/", "-resultOrgPath", workingDir.toString() + "/resultOrganization/", "-childParentPath", workingDir.toString() + "/childParentOrg/", + "-relationPath", workingDir.toString() + "/relation" }); -// PrepareInfo.prepareInfo(spark, getClass() -// .getResource( -// "/eu/dnetlib/dhp/resulttoorganizationfromsemrel/childparenttest1") -// .getPath(), workingDir.toString() + "/childParentOrg/", workingDir.toString() + "/currentIteration/",workingDir.toString() + "/resultOrganization/"); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkJobTest.java b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkJobTest.java index 5a4b19a24..7dd575b66 100644 --- a/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkJobTest.java +++ b/dhp-workflows/dhp-enrichment/src/test/java/eu/dnetlib/dhp/resulttoorganizationfromsemrel/SparkJobTest.java @@ -101,9 +101,9 @@ public class SparkJobTest { .main( new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-graphPath", graphPath, + "-relationPath", graphPath, "-hive_metastore_uris", "", - "-outputPath", workingDir.toString() + "/relation", + "-outputPath", workingDir.toString() + "/finalrelation", "-leavesPath", workingDir.toString() + "/leavesInput", "-resultOrgPath", workingDir.toString() + "/orgsInput", "-childParentPath", childParentPath, @@ -113,9 +113,11 @@ public class SparkJobTest { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .textFile(workingDir.toString() + "/relation") + .textFile(workingDir.toString() + "/finalrelation") .map(item -> OBJECT_MAPPER.readValue(item, Relation.class)); + tmp.foreach(r -> System.out.println(OBJECT_MAPPER.writeValueAsString(r))); + Assertions.assertEquals(18, tmp.count()); tmp.foreach(r -> Assertions.assertEquals(ModelConstants.AFFILIATION, r.getSubRelType())); tmp.foreach(r -> Assertions.assertEquals(ModelConstants.RESULT_ORGANIZATION, r.getRelType()));