From 0e5dd14538fc8b2ba2bc08f3af93793e3b9e19b7 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Fri, 18 Oct 2024 16:22:21 +0200 Subject: [PATCH 1/5] [createASfromAffRo] adding the provenance datasource used to get the relation (no datasource can be webcrawl = publisher, rawaff means oalex) --- .../PrepareAffiliationRelations.java | 47 ++++++++++--------- .../PrepareAffiliationRelationsTest.java | 16 +++---- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 028fa47dc4..61a018a414 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -104,22 +104,22 @@ public class PrepareAffiliationRelations implements Serializable { .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); JavaPairRDD crossrefRelations = prepareAffiliationRelationsNewModel( - spark, crossrefInputPath, collectedfromOpenAIRE); + spark, crossrefInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::crossref"); JavaPairRDD pubmedRelations = prepareAffiliationRelations( - spark, pubmedInputPath, collectedfromOpenAIRE); + spark, pubmedInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::pubmed"); JavaPairRDD openAPCRelations = prepareAffiliationRelationsNewModel( - spark, openapcInputPath, collectedfromOpenAIRE); + spark, openapcInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::openapc"); - JavaPairRDD dataciteRelations = prepareAffiliationRelations( - spark, dataciteInputPath, collectedfromOpenAIRE); + JavaPairRDD dataciteRelations = prepareAffiliationRelationsNewModel( + spark, dataciteInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::datacite"); - JavaPairRDD webCrawlRelations = prepareAffiliationRelations( - spark, webcrawlInputPath, collectedfromOpenAIRE); + JavaPairRDD webCrawlRelations = prepareAffiliationRelationsNewModel( + spark, webcrawlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::rawaff"); - JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisher( - spark, publisherlInputPath, collectedfromOpenAIRE); + JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisherNewModel( + spark, publisherlInputPath, collectedfromOpenAIRE, BIP_INFERENCE_PROVENANCE + "::webcrawl"); crossrefRelations .union(pubmedRelations) @@ -133,7 +133,8 @@ public class PrepareAffiliationRelations implements Serializable { private static JavaPairRDD prepareAffiliationRelationFromPublisherNewModel(SparkSession spark, String inputPath, - List collectedfrom) { + List collectedfrom, + String dataprovenance) { Dataset df = spark .read() @@ -142,12 +143,13 @@ public class PrepareAffiliationRelations implements Serializable { .json(inputPath) .where("DOI is not null"); - return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); + return getTextTextJavaPairRDDNew( + collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"), dataprovenance); } private static JavaPairRDD prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, - List collectedfrom) { + List collectedfrom, String dataprovenance) { Dataset df = spark .read() @@ -155,13 +157,14 @@ public class PrepareAffiliationRelations implements Serializable { .json(inputPath) .where("DOI is not null"); - return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); + return getTextTextJavaPairRDD( + collectedfrom, df.selectExpr("DOI", "Organizations as Matchings"), dataprovenance); } private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, String inputPath, - List collectedfrom) { + List collectedfrom, String dataprovenance) { // load and parse affiliation relations from HDFS Dataset df = spark @@ -170,12 +173,12 @@ public class PrepareAffiliationRelations implements Serializable { .json(inputPath) .where("DOI is not null"); - return getTextTextJavaPairRDD(collectedfrom, df); + return getTextTextJavaPairRDD(collectedfrom, df, dataprovenance); } private static JavaPairRDD prepareAffiliationRelationsNewModel(SparkSession spark, String inputPath, - List collectedfrom) { + List collectedfrom, String dataprovenance) { // load and parse affiliation relations from HDFS Dataset df = spark .read() @@ -184,10 +187,11 @@ public class PrepareAffiliationRelations implements Serializable { .json(inputPath) .where("DOI is not null"); - return getTextTextJavaPairRDDNew(collectedfrom, df); + return getTextTextJavaPairRDDNew(collectedfrom, df, dataprovenance); } - private static JavaPairRDD getTextTextJavaPairRDD(List collectedfrom, Dataset df) { + private static JavaPairRDD getTextTextJavaPairRDD(List collectedfrom, Dataset df, + String dataprovenance) { // unroll nested arrays df = df .withColumn("matching", functions.explode(new Column("Matchings"))) @@ -219,7 +223,7 @@ public class PrepareAffiliationRelations implements Serializable { DataInfo dataInfo = OafMapperUtils .dataInfo( false, - BIP_INFERENCE_PROVENANCE, + dataprovenance, true, false, qualifier, @@ -235,7 +239,8 @@ public class PrepareAffiliationRelations implements Serializable { new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } - private static JavaPairRDD getTextTextJavaPairRDDNew(List collectedfrom, Dataset df) { + private static JavaPairRDD getTextTextJavaPairRDDNew(List collectedfrom, Dataset df, + String dataprovenance) { // unroll nested arrays df = df .withColumn("matching", functions.explode(new Column("Matchings"))) @@ -276,7 +281,7 @@ public class PrepareAffiliationRelations implements Serializable { DataInfo dataInfo = OafMapperUtils .dataInfo( false, - BIP_INFERENCE_PROVENANCE, + dataprovenance, true, false, qualifier, diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index 179cbecb5f..c704bb99b4 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -98,9 +98,9 @@ public class PrepareAffiliationRelationsTest { "-crossrefInputPath", crossrefAffiliationRelationPathNew, "-pubmedInputPath", crossrefAffiliationRelationPath, "-openapcInputPath", crossrefAffiliationRelationPathNew, - "-dataciteInputPath", crossrefAffiliationRelationPath, - "-webCrawlInputPath", crossrefAffiliationRelationPath, - "-publisherInputPath", publisherAffiliationRelationOldPath, + "-dataciteInputPath", crossrefAffiliationRelationPathNew, + "-webCrawlInputPath", crossrefAffiliationRelationPathNew, + "-publisherInputPath", publisherAffiliationRelationPath, "-outputPath", outputPath }); @@ -112,7 +112,7 @@ public class PrepareAffiliationRelationsTest { .map(aa -> ((Relation) aa.getPayload())); // count the number of relations - assertEquals(150, tmp.count());// 18 + 24 *3 + 30 * 2 = + assertEquals(162, tmp.count());// 18 + 24 + 30 * 4 = Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); dataset.createOrReplaceTempView("result"); @@ -123,7 +123,7 @@ public class PrepareAffiliationRelationsTest { // verify that we have equal number of bi-directional relations Assertions .assertEquals( - 75, execVerification + 81, execVerification .filter( "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") .collectAsList() @@ -131,7 +131,7 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 75, execVerification + 81, execVerification .filter( "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") .collectAsList() @@ -158,7 +158,7 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 2, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count()); + 4, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count()); Assertions .assertEquals( @@ -173,7 +173,7 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 3, execVerification + 1, execVerification .filter( "source = '" + ID_PREFIX + IdentifierFactory From 2b27afaec8d3a6bd7bdaf2ff83040f88c68660e2 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Fri, 18 Oct 2024 16:22:51 +0200 Subject: [PATCH 2/5] [createASfromAffRo] refactoring after compilation --- .../dhp/oa/graph/hive/GraphHiveTableImporterJob.java | 6 +++--- .../dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java index 73243dbc5f..d4fec3f523 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hive/GraphHiveTableImporterJob.java @@ -72,9 +72,9 @@ public class GraphHiveTableImporterJob { final Encoder clazzEncoder = Encoders.bean(clazz); Dataset dataset = spark - .read() - .schema(clazzEncoder.schema()) - .json(inputPath); + .read() + .schema(clazzEncoder.schema()) + .json(inputPath); if (numPartitions > 0) { log.info("repartitioning {} to {} partitions", clazz.getSimpleName(), numPartitions); diff --git a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala index 4e5ad5365a..eece56b746 100644 --- a/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala +++ b/dhp-workflows/dhp-graph-mapper/src/test/scala/eu/dnetlib/dhp/enrich/orcid/ORCIDAuthorMatchersTest.scala @@ -31,6 +31,7 @@ class ORCIDAuthorMatchersTest { assertTrue(matchOrderedTokenAndAbbreviations("孙林 Sun Lin", "Sun Lin")) // assertTrue(AuthorsMatchRevised.compare("孙林 Sun Lin", "孙林")); // not yet implemented } + @Test def testDocumentationNames(): Unit = { assertTrue(matchOrderedTokenAndAbbreviations("James C. A. Miller-Jones", "James Antony Miller-Jones")) } From c93bf824875fb3ffdb2220ac62fc52495e17fa01 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 24 Oct 2024 17:34:34 +0200 Subject: [PATCH 3/5] [affroNewModel] extended wf definition --- .../actionmanager/bipaffiliations/oozie_app/workflow.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index 2e89c07fd4..88ff42dc29 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -21,6 +21,10 @@ webCrawlInputPath the path where to find the inferred affiliation relations from webCrawl + + publisherInputPath + the path where to find the inferred affiliation relations from publisher websites + outputPath the path where to store the actionset @@ -99,7 +103,7 @@ yarn cluster - Produces the atomic action with the inferred by BIP! affiliation relations (from Crossref and Pubmed) + Produces the atomic action with the inferred by OpenAIRE affiliation relations eu.dnetlib.dhp.actionmanager.bipaffiliations.PrepareAffiliationRelations dhp-aggregation-${projectVersion}.jar @@ -117,6 +121,7 @@ --openapcInputPath${openapcInputPath} --dataciteInputPath${dataciteInputPath} --webCrawlInputPath${webCrawlInputPath} + --publisherInputPath${publisherInputPath} --outputPath${outputPath} From cab8f1135fd82d2721cb8051d44550ac24b0b3eb Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Thu, 24 Oct 2024 17:44:33 +0200 Subject: [PATCH 4/5] [affroNewModel] - --- .../bipaffiliations/PrepareAffiliationRelations.java | 2 +- .../dhp/actionmanager/bipaffiliations/job.properties | 6 ++++-- .../bipaffiliations/PrepareAffiliationRelationsTest.java | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 61a018a414..15c1cc3760 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -34,7 +34,7 @@ import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; import scala.Tuple2; /** - * Creates action sets for Crossref affiliation relations inferred by BIP! + * Creates action sets for Crossref affiliation relations inferred by OpenAIRE */ public class PrepareAffiliationRelations implements Serializable { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index ded4fe4097..58124c9d1d 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -32,8 +32,10 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen oozie.wf.application.path=${oozieTopWfApplicationPath} crossrefInputPath=/data/bip-affiliations/crossref-data.json -pubmedInputPath=/data/bip-affiliations/pubmed-data.json +pubmedInputPath=/data/bip-affiliations/pubmed-data-v4.json openapcInputPath=/data/bip-affiliations/openapc-data.json dataciteInputPath=/data/bip-affiliations/datacite-data.json +webCrawlInputPath=/data/bip-affiliations/webCrawl +publisherInputPath=/data/bip-affiliations/publishers -outputPath=/tmp/crossref-affiliations-output-v5 +outputPath=/tmp/affRoAS diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index c704bb99b4..16d60f7daa 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -112,7 +112,7 @@ public class PrepareAffiliationRelationsTest { .map(aa -> ((Relation) aa.getPayload())); // count the number of relations - assertEquals(162, tmp.count());// 18 + 24 + 30 * 4 = + assertEquals(162, tmp.count());// 18 + 24 + 30 * 4 = Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); dataset.createOrReplaceTempView("result"); From 842cc75dae0b11bcc0f4974cf6fe199813f7696d Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Fri, 25 Oct 2024 09:42:52 +0200 Subject: [PATCH 5/5] [AffRo] fix name --- .../dhp/actionmanager/bipaffiliations/job.properties | 12 ++++++------ .../bipaffiliations/oozie_app/workflow.xml | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties index 58124c9d1d..c61830cba4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/job.properties @@ -31,11 +31,11 @@ spark2SqlQueryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListen # The following is needed as a property of a workflow oozie.wf.application.path=${oozieTopWfApplicationPath} -crossrefInputPath=/data/bip-affiliations/crossref-data.json -pubmedInputPath=/data/bip-affiliations/pubmed-data-v4.json -openapcInputPath=/data/bip-affiliations/openapc-data.json -dataciteInputPath=/data/bip-affiliations/datacite-data.json -webCrawlInputPath=/data/bip-affiliations/webCrawl -publisherInputPath=/data/bip-affiliations/publishers +crossrefInputPath=/data/openaire-affiliations/crossref-data.json +pubmedInputPath=/data/openaire-affiliations/pubmed-data-v4.json +openapcInputPath=/data/openaire-affiliations/openapc-data.json +dataciteInputPath=/data/openaire-affiliations/datacite-data.json +webCrawlInputPath=/data/openaire-affiliations/webCrawl +publisherInputPath=/data/openaire-affiliations/publishers outputPath=/tmp/affRoAS diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml index 88ff42dc29..2e65aaa5e9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipaffiliations/oozie_app/workflow.xml @@ -1,4 +1,4 @@ - +