From 468f2aa5a58ca1aee198ea96b714060a9931b313 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Mon, 12 Aug 2024 18:10:46 +0200 Subject: [PATCH] [AffiliationAffRo]align beta with new affiliation from publisher webpage introduced in production. AffRo collectedfrom OpenAIRE to discriminate against WebCrawl --- .../PrepareAffiliationRelations.java | 102 ++++++++++++------ .../PrepareAffiliationRelationsTest.java | 18 +++- 2 files changed, 83 insertions(+), 37 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 98915bdc5..70ca1576c 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -44,6 +44,8 @@ public class PrepareAffiliationRelations implements Serializable { public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:openaireinference"; public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by OpenAIRE"; public static final String BIP_INFERENCE_PROVENANCE = "openaire:affiliation"; + public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556"; + public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE"; public static void main(String[] args) throws Exception { @@ -74,6 +76,9 @@ public class PrepareAffiliationRelations implements Serializable { final String webcrawlInputPath = parser.get("webCrawlInputPath"); log.info("webcrawlInputPath: {}", webcrawlInputPath); + final String publisherInputPath = parser.get("publisherInputPath"); + log.info("publisherInputPath: {}", publisherInputPath); + final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -84,43 +89,68 @@ public class PrepareAffiliationRelations implements Serializable { isSparkSessionManaged, spark -> { Constants.removeOutputDir(spark, outputPath); - - List collectedFromCrossref = OafMapperUtils - .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); - JavaPairRDD crossrefRelations = prepareAffiliationRelations( - spark, crossrefInputPath, collectedFromCrossref); - - List collectedFromPubmed = OafMapperUtils - .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); - JavaPairRDD pubmedRelations = prepareAffiliationRelations( - spark, pubmedInputPath, collectedFromPubmed); - - List collectedFromOpenAPC = OafMapperUtils - .listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC"); - JavaPairRDD openAPCRelations = prepareAffiliationRelations( - spark, openapcInputPath, collectedFromOpenAPC); - - List collectedFromDatacite = OafMapperUtils - .listKeyValues(ModelConstants.DATACITE_ID, "Datacite"); - JavaPairRDD dataciteRelations = prepareAffiliationRelations( - spark, dataciteInputPath, collectedFromDatacite); - - List collectedFromWebCrawl = OafMapperUtils - .listKeyValues(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME); - JavaPairRDD webCrawlRelations = prepareAffiliationRelations( - spark, webcrawlInputPath, collectedFromWebCrawl); - - crossrefRelations - .union(pubmedRelations) - .union(openAPCRelations) - .union(dataciteRelations) - .union(webCrawlRelations) - .saveAsHadoopFile( - outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); - + createActionSet( + spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath, + publisherInputPath, outputPath); }); } + private static void createActionSet(SparkSession spark, String crossrefInputPath, String pubmedInputPath, + String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath, + String outputPath) { + List collectedFromCrossref = OafMapperUtils + .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); + JavaPairRDD crossrefRelations = prepareAffiliationRelations( + spark, crossrefInputPath, collectedFromCrossref); + + List collectedFromPubmed = OafMapperUtils + .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); + JavaPairRDD pubmedRelations = prepareAffiliationRelations( + spark, pubmedInputPath, collectedFromPubmed); + + List collectedFromOpenAPC = OafMapperUtils + .listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC"); + JavaPairRDD openAPCRelations = prepareAffiliationRelations( + spark, openapcInputPath, collectedFromOpenAPC); + + List collectedFromDatacite = OafMapperUtils + .listKeyValues(ModelConstants.DATACITE_ID, "Datacite"); + JavaPairRDD dataciteRelations = prepareAffiliationRelations( + spark, dataciteInputPath, collectedFromDatacite); + + List collectedFromWebCrawl = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + JavaPairRDD webCrawlRelations = prepareAffiliationRelations( + spark, webcrawlInputPath, collectedFromWebCrawl); + + List collectedfromPublisher = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisher( + spark, publisherlInputPath, collectedfromPublisher); + + crossrefRelations + .union(pubmedRelations) + .union(openAPCRelations) + .union(dataciteRelations) + .union(webCrawlRelations) + .union(publisherRelations) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + } + + private static JavaPairRDD prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, + List collectedfrom) { + + Dataset df = spark + .read() + .schema("`DOI` STRING, `Organizations` ARRAY>") + .json(inputPath) + .where("DOI is not null"); + + return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); + + } + private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, String inputPath, List collectedfrom) { @@ -132,6 +162,10 @@ public class PrepareAffiliationRelations implements Serializable { .json(inputPath) .where("DOI is not null"); + return getTextTextJavaPairRDD(collectedfrom, df); + } + + private static JavaPairRDD getTextTextJavaPairRDD(List collectedfrom, Dataset df) { // unroll nested arrays df = df .withColumn("matching", functions.explode(new Column("Matchings"))) diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java index bffe41ac7..ac9977a7e 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelationsTest.java @@ -78,6 +78,10 @@ public class PrepareAffiliationRelationsTest { .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/doi_to_ror.json") .getPath(); + String publisherAffiliationRelationPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipaffiliations/publishers") + .getPath(); + String outputPath = workingDir.toString() + "/actionSet"; PrepareAffiliationRelations @@ -89,6 +93,7 @@ public class PrepareAffiliationRelationsTest { "-openapcInputPath", crossrefAffiliationRelationPath, "-dataciteInputPath", crossrefAffiliationRelationPath, "-webCrawlInputPath", crossrefAffiliationRelationPath, + "-publisherInputPath", publisherAffiliationRelationPath, "-outputPath", outputPath }); @@ -105,7 +110,7 @@ public class PrepareAffiliationRelationsTest { // ); // } // count the number of relations - assertEquals(120, tmp.count()); + assertEquals(138, tmp.count()); Dataset dataset = spark.createDataset(tmp.rdd(), Encoders.bean(Relation.class)); dataset.createOrReplaceTempView("result"); @@ -116,7 +121,7 @@ public class PrepareAffiliationRelationsTest { // verify that we have equal number of bi-directional relations Assertions .assertEquals( - 60, execVerification + 69, execVerification .filter( "relClass='" + ModelConstants.HAS_AUTHOR_INSTITUTION + "'") .collectAsList() @@ -124,7 +129,7 @@ public class PrepareAffiliationRelationsTest { Assertions .assertEquals( - 60, execVerification + 69, execVerification .filter( "relClass='" + ModelConstants.IS_AUTHOR_INSTITUTION_OF + "'") .collectAsList() @@ -145,5 +150,12 @@ public class PrepareAffiliationRelationsTest { .get(0) .getString(4)); + final String publisherid = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", "10.1007/s00217-010-1268-9")); + final String rorId = "20|ror_________::" + IdentifierFactory.md5("https://ror.org/03265fv13"); + + Assertions + .assertEquals( + 1, execVerification.filter("source = '" + publisherid + "' and target = '" + rorId + "'").count()); } }