diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 1964edce1..04073cdee 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -17,6 +17,7 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.types.StructType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,8 +77,8 @@ public class PrepareAffiliationRelations implements Serializable { final String webcrawlInputPath = parser.get("webCrawlInputPath"); log.info("webcrawlInputPath: {}", webcrawlInputPath); - final String publisherlInputPath = parser.get("publisherlInputPath"); - log.info("publisherlInputPath: {}", publisherlInputPath); + final String publisherInputPath = parser.get("publisherInputPath"); + log.info("publisherInputPath: {}", publisherInputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -89,7 +90,7 @@ public class PrepareAffiliationRelations implements Serializable { isSparkSessionManaged, spark -> { Constants.removeOutputDir(spark, outputPath); - createActionSet(spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath, publisherlInputPath, outputPath); + createActionSet(spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath, publisherInputPath, outputPath); }); } @@ -136,9 +137,11 @@ public class PrepareAffiliationRelations implements Serializable { private static JavaPairRDD prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, List collectedfrom){ + + Dataset df = spark .read() - .schema("`DOI` STRING, `Organizations` ARRAY>") + .schema("`DOI` STRING, `Organizations` ARRAY>") .json(inputPath) .where("DOI is not null"); diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json index 4d85cf26b..941f84525 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json @@ -33,6 +33,11 @@ "paramLongName": "webCrawlInputPath", "paramDescription": "the path to get the input data from Web Crawl", "paramRequired": true +},{ + "paramName": "pip", + "paramLongName": "publisherInputPath", + "paramDescription": "the path to get the input data from publishers", + "paramRequired": true } , { diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index bffe41ac7..15fd777a2 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -78,6 +78,10 @@ public class PrepareAffiliationRelationsTest { .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") .getPath(); + String publisherAffiliationRelationPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/publishers") + .getPath(); + String outputPath = workingDir.toString() + "/actionSet"; PrepareAffiliationRelations @@ -89,6 +93,7 @@ public class PrepareAffiliationRelationsTest { "-openapcInputPath", crossrefAffiliationRelationPath, "-dataciteInputPath", crossrefAffiliationRelationPath, "-webCrawlInputPath", crossrefAffiliationRelationPath, + "-publisherInputPath", publisherAffiliationRelationPath, "-outputPath", outputPath }); @@ -105,7 +110,7 @@ public class PrepareAffiliationRelationsTest { // ); // } // count the number of relations - assertEquals(120, tmp.count()); + assertEquals(138, tmp.count()); Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); dataset.createOrReplaceTempView("result"); @@ -116,7 +121,7 @@ public class PrepareAffiliationRelationsTest { // verify that we have equal number of bi-directional relations Assertions .assertEquals( - 60, execVerification + 69, execVerification .filter( "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") .collectAsList() @@ -124,7 +129,7 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 60, execVerification + 69, execVerification .filter( "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") .collectAsList()