diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 603ad6339..1bdb06ecc 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -12,6 +12,7 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.Dataset; @@ -57,11 +58,14 @@ public class PrepareAffiliationRelations implements Serializable { Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + final String crossrefInputPath = parser.get("crossrefInputPath"); + log.info("crossrefInputPath: {}", crossrefInputPath); + + final String pubmedInputPath = parser.get("pubmedInputPath"); + log.info("pubmedInputPath: {}", pubmedInputPath); final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + log.info("outputPath: {}", outputPath); SparkConf conf = new SparkConf(); @@ -70,12 +74,28 @@ public class PrepareAffiliationRelations implements Serializable { isSparkSessionManaged, spark -> { Constants.removeOutputDir(spark, outputPath); - prepareAffiliationRelations(spark, inputPath, outputPath); + + List collectedFromCrossref = OafMapperUtils + .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); + JavaPairRDD crossrefRelations = prepareAffiliationRelations( + spark, crossrefInputPath, collectedFromCrossref); + + List collectedFromPubmed = OafMapperUtils + .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); + JavaPairRDD pubmedRelations = prepareAffiliationRelations( + spark, pubmedInputPath, collectedFromPubmed); + + crossrefRelations + .union(pubmedRelations) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); + }); } - private static void prepareAffiliationRelations(SparkSession spark, String inputPath, - String outputPath) { + private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, + String inputPath, + List collectedfrom) { // load and parse affiliation relations from HDFS Dataset df = spark @@ -92,7 +112,7 @@ public class PrepareAffiliationRelations implements Serializable { new Column("matching.Confidence").as("confidence")); // prepare action sets for affiliation relations - df + return df .toJavaRDD() .flatMap((FlatMapFunction) row -> { @@ -120,8 +140,6 @@ public class PrepareAffiliationRelations implements Serializable { qualifier, Double.toString(row.getAs("confidence"))); - List collectedfrom = OafMapperUtils.listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); - // return bi-directional relations return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); @@ -129,9 +147,7 @@ public class PrepareAffiliationRelations implements Serializable { .map(p -> new AtomicAction(Relation.class, p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); - + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } private static List getAffiliationRelationPair(String paperId, String affId, List collectedfrom, diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json index 7663a454b..c6f905199 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/input_actionset_parameter.json @@ -6,9 +6,15 @@ "paramRequired": false }, { - "paramName": "ip", - "paramLongName": "inputPath", - "paramDescription": "the URL from where to get the programme file", + "paramName": "cip", + "paramLongName": "crossrefInputPath", + "paramDescription": "the path to get the input data from Crossref", + "paramRequired": true + }, + { + "paramName": "pip", + "paramLongName": "pubmedInputPath", + "paramDescription": "the path to get the input data from Pubmed", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index d942e6772..a3d55ff0c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -31,5 +31,6 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen # The following is needed as a property of a workflow oozie.wf.application.path=${oozieTopWfApplicationPath} -inputPath=/data/bip-affiliations/data.json +crossrefInputPath=/data/bip-affiliations/data.json +pubmedInputPath=/data/bip-affiliations/pubmed-data.json outputPath=/tmp/crossref-affiliations-output-v5 diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index 9930cfe17..c5ac6f884 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -2,8 +2,12 @@ - inputPath - the path where to find the inferred affiliation relations + crossrefInputPath + the path where to find the inferred affiliation relations from Crossref + + + pubmedInputPath + the path where to find the inferred affiliation relations from Pubmed outputPath @@ -83,7 +87,7 @@ yarn cluster - Produces the atomic action with the inferred by BIP! affiliation relations from Crossref + Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed) eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations dhp-aggregation-${projectVersion}.jar @@ -96,7 +100,8 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${inputPath} + --crossrefInputPath${crossrefInputPath} + --pubmedInputPath${pubmedInputPath} --outputPath${outputPath} diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index ed8e5fe0d..e2639996c 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -74,17 +74,22 @@ public class PrepareAffiliationRelationsTest { @Test void testMatch() throws Exception { - String affiliationRelationsPath = getClass() + String crossrefAffiliationRelationPath = getClass() .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") .getPath(); + String pubmedAffiliationRelationsPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") + .getPath(); + String outputPath = workingDir.toString() + "/actionSet"; PrepareAffiliationRelations .main( new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-inputPath", affiliationRelationsPath, + "-crossrefInputPath", crossrefAffiliationRelationPath, + "-pubmedInputPath", pubmedAffiliationRelationsPath, "-outputPath", outputPath }); @@ -101,7 +106,7 @@ public class PrepareAffiliationRelationsTest { // ); // } // count the number of relations - assertEquals(20, tmp.count()); + assertEquals(40, tmp.count()); Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); dataset.createOrReplaceTempView("result"); @@ -112,7 +117,7 @@ public class PrepareAffiliationRelationsTest { // verify that we have equal number of bi-directional relations Assertions .assertEquals( - 10, execVerification + 20, execVerification .filter( "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") .collectAsList() @@ -120,7 +125,7 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 10, execVerification + 20, execVerification .filter( "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") .collectAsList()