From 6b19dcee80cdc19b6acdd95e6b05e5a4093b3ba7 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 19 Oct 2023 19:58:25 +0300 Subject: [PATCH 1/4] Add actionset creation for pubmed affiliations --- .../PrepareAffiliationRelations.java | 32 ++++++++++++------- .../input_actionset_parameter.json | 8 ++++- .../bipaffiliations/job.properties | 1 + .../bipaffiliations/oozie_app/workflow.xml | 7 +++- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 603ad6339f..cbfba30c55 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; @@ -58,10 +59,13 @@ public class PrepareAffiliationRelations implements Serializable { log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + log.info("inputPath: {}", inputPath); + + final String pubmedInputPath = parser.get("pubmedInputPath"); + log.info("pubmedInputPath: {}", pubmedInputPath); final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + log.info("outputPath: {}", outputPath); SparkConf conf = new SparkConf(); @@ -70,12 +74,22 @@ public class PrepareAffiliationRelations implements Serializable { isSparkSessionManaged, spark -> { Constants.removeOutputDir(spark, outputPath); - prepareAffiliationRelations(spark, inputPath, outputPath); + + List collectedFromCrossref = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); + JavaPairRDD crossrefRelations = prepareAffiliationRelations(spark, inputPath, collectedFromCrossref); + + List collectedFromPubmed = OafMapperUtils.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); + JavaPairRDD pubmedRelations = prepareAffiliationRelations(spark, inputPath, collectedFromPubmed); + + crossrefRelations + .union(pubmedRelations) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + }); } - private static void prepareAffiliationRelations(SparkSession spark, String inputPath, - String outputPath) { + private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, String inputPath, + List collectedfrom) { // load and parse affiliation relations from HDFS Dataset df = spark @@ -92,7 +106,7 @@ public class PrepareAffiliationRelations implements Serializable { new Column("matching.Confidence").as("confidence")); // prepare action sets for affiliation relations - df + return df .toJavaRDD() .flatMap((FlatMapFunction) row -> { @@ -120,8 +134,6 @@ public class PrepareAffiliationRelations implements Serializable { qualifier, Double.toString(row.getAs("confidence"))); - List collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); - // return bi-directional relations return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); @@ -129,9 +141,7 @@ public class PrepareAffiliationRelations implements Serializable { .map(p -> new AtomicAction(Relation.class, p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } private static List getAffiliationRelationPair(String paperId, String affId, List collectedfrom, diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json index 7663a454b9..96dcc3b327 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json @@ -8,7 +8,13 @@ { "paramName": "ip", "paramLongName": "inputPath", - "paramDescription": "the URL from where to get the programme file", + "paramDescription": "the path to get the input data from Crossref", + "paramRequired": true + }, + { + "paramName": "pip", + "paramLongName": "pubmedInputPath", + "paramDescription": "the path to get the input data from Pubmed", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index d942e67723..fe3cbb633b 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -32,4 +32,5 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen oozie.wf.application.path=${oozieTopWfApplicationPath} inputPath=/data/bip-affiliations/data.json +pubmedInputPath=/data/bip-affiiations/pubmed-data.json outputPath=/tmp/crossref-affiliations-output-v5 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index 9930cfe173..c0a6bfc528 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -3,7 +3,11 @@ inputPath - the path where to find the inferred affiliation relations + the path where to find the inferred affiliation relations from Crossref + + + pubmedInputPath + the path where to find the inferred affiliation relations from Pubmed outputPath @@ -97,6 +101,7 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --inputPath${inputPath} + --pubmedInputPath${pubmedInputPath} --outputPath${outputPath} From aad5982bf171d4504de660afa1a4350bd7e761f9 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Fri, 20 Oct 2023 12:48:21 +0300 Subject: [PATCH 2/4] Change the description of the workflow --- .../PrepareAffiliationRelations.java | 22 ++++++++++++------- .../bipaffiliations/oozie_app/workflow.xml | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index cbfba30c55..18d98be543 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -75,21 +75,27 @@ public class PrepareAffiliationRelations implements Serializable { spark -> { Constants.removeOutputDir(spark, outputPath); - List collectedFromCrossref = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); - JavaPairRDD crossrefRelations = prepareAffiliationRelations(spark, inputPath, collectedFromCrossref); + List collectedFromCrossref = OafMapperUtils + .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); + JavaPairRDD crossrefRelations = prepareAffiliationRelations( + spark, inputPath, collectedFromCrossref); - List collectedFromPubmed = OafMapperUtils.listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); - JavaPairRDD pubmedRelations = prepareAffiliationRelations(spark, inputPath, collectedFromPubmed); + List collectedFromPubmed = OafMapperUtils + .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); + JavaPairRDD pubmedRelations = prepareAffiliationRelations( + spark, inputPath, collectedFromPubmed); crossrefRelations - .union(pubmedRelations) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + .union(pubmedRelations) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); }); } - private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, String inputPath, - List collectedfrom) { + private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, + String inputPath, + List collectedfrom) { // load and parse affiliation relations from HDFS Dataset df = spark diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index c0a6bfc528..e3fdddfd6d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -87,7 +87,7 @@ yarn cluster - Produces the atomic action with the inferred by BIP! affiliation relations from Crossref + Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed) eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations dhp-aggregation-${projectVersion}.jar From a82aaf57b219faa880817cf18213b22944cb730b Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Wed, 25 Oct 2023 12:05:02 -0700 Subject: [PATCH 3/4] Renaming input param for crossref input path --- .../bipaffiliations/PrepareAffiliationRelations.java | 8 ++++---- .../bipaffiliations/input_actionset_parameter.json | 4 ++-- .../dhp/actionmanager/bipaffiliations/job.properties | 4 ++-- .../actionmanager/bipaffiliations/oozie_app/workflow.xml | 4 ++-- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 18d98be543..1bdb06ecc0 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -58,8 +58,8 @@ public class PrepareAffiliationRelations implements Serializable { Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + final String crossrefInputPath = parser.get("crossrefInputPath"); + log.info("crossrefInputPath: {}", crossrefInputPath); final String pubmedInputPath = parser.get("pubmedInputPath"); log.info("pubmedInputPath: {}", pubmedInputPath); @@ -78,12 +78,12 @@ public class PrepareAffiliationRelations implements Serializable { List collectedFromCrossref = OafMapperUtils .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); JavaPairRDD crossrefRelations = prepareAffiliationRelations( - spark, inputPath, collectedFromCrossref); + spark, crossrefInputPath, collectedFromCrossref); List collectedFromPubmed = OafMapperUtils .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); JavaPairRDD pubmedRelations = prepareAffiliationRelations( - spark, inputPath, collectedFromPubmed); + spark, pubmedInputPath, collectedFromPubmed); crossrefRelations .union(pubmedRelations) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json index 96dcc3b327..c6f9051996 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json @@ -6,8 +6,8 @@ "paramRequired": false }, { - "paramName": "ip", - "paramLongName": "inputPath", + "paramName": "cip", + "paramLongName": "crossrefInputPath", "paramDescription": "the path to get the input data from Crossref", "paramRequired": true }, diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index fe3cbb633b..a3d55ff0c1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -31,6 +31,6 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen # The following is needed as a property of a workflow oozie.wf.application.path=${oozieTopWfApplicationPath} -inputPath=/data/bip-affiliations/data.json -pubmedInputPath=/data/bip-affiiations/pubmed-data.json +crossrefInputPath=/data/bip-affiliations/data.json +pubmedInputPath=/data/bip-affiliations/pubmed-data.json outputPath=/tmp/crossref-affiliations-output-v5 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index e3fdddfd6d..c5ac6f8843 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -2,7 +2,7 @@ - inputPath + crossrefInputPath the path where to find the inferred affiliation relations from Crossref @@ -100,7 +100,7 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${inputPath} + --crossrefInputPath${crossrefInputPath} --pubmedInputPath${pubmedInputPath} --outputPath${outputPath} From 2090003ea96dee43c3cd6c90f50f6d4a2b39a869 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Thu, 26 Oct 2023 13:47:06 -0700 Subject: [PATCH 4/4] Adjust tests to new WF input params --- .../PrepareAffiliationRelationsTest.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index ed8e5fe0df..e2639996cb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -74,17 +74,22 @@ public class PrepareAffiliationRelationsTest { @Test void testMatch() throws Exception { - String affiliationRelationsPath = getClass() + String crossrefAffiliationRelationPath = getClass() .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") .getPath(); + String pubmedAffiliationRelationsPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") + .getPath(); + String outputPath = workingDir.toString() + "/actionSet"; PrepareAffiliationRelations .main( new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-inputPath", affiliationRelationsPath, + "-crossrefInputPath", crossrefAffiliationRelationPath, + "-pubmedInputPath", pubmedAffiliationRelationsPath, "-outputPath", outputPath }); @@ -101,7 +106,7 @@ public class PrepareAffiliationRelationsTest { // ); // } // count the number of relations - assertEquals(20, tmp.count()); + assertEquals(40, tmp.count()); Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); dataset.createOrReplaceTempView("result"); @@ -112,7 +117,7 @@ public class PrepareAffiliationRelationsTest { // verify that we have equal number of bi-directional relations Assertions .assertEquals( - 10, execVerification + 20, execVerification .filter( "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") .collectAsList() @@ -120,7 +125,7 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 10, execVerification + 20, execVerification .filter( "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") .collectAsList()