From 675de0713838edf19bbe86d9d99cde6a7570c1b0 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Tue, 6 Aug 2024 15:34:37 +0200 Subject: [PATCH] [AffiliationFromPublisher]Extend the action set to include also the affiliation extracted from publishers. Rename to openaire datasource the collected from of the relations generated by affro on the raw aff string got from OA. The new relations taken from publishers have the same collected from (OpenAIRE) --- .../PrepareAffiliationRelations.java | 88 ++++++++++++------- 1 file changed, 58 insertions(+), 30 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 98915bdc5..03b8fa73a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -44,6 +44,8 @@ public class PrepareAffiliationRelations implements Serializable { public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:openaireinference"; public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by OpenAIRE"; public static final String BIP_INFERENCE_PROVENANCE = "openaire:affiliation"; + public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556"; + public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE"; public static void main(String[] args) throws Exception { @@ -74,6 +76,9 @@ public class PrepareAffiliationRelations implements Serializable { final String webcrawlInputPath = parser.get("webCrawlInputPath"); log.info("webcrawlInputPath: {}", webcrawlInputPath); + final String publisherlInputPath = parser.get("publisherlInputPath"); + log.info("publisherlInputPath: {}", publisherlInputPath); + final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -106,21 +111,40 @@ public class PrepareAffiliationRelations implements Serializable { spark, dataciteInputPath, collectedFromDatacite); List collectedFromWebCrawl = OafMapperUtils - .listKeyValues(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME); + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); JavaPairRDD webCrawlRelations = prepareAffiliationRelations( spark, webcrawlInputPath, collectedFromWebCrawl); + List collectedfromPublisher = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisher( + spark, publisherlInputPath, collectedfromPublisher); + crossrefRelations .union(pubmedRelations) .union(openAPCRelations) .union(dataciteRelations) .union(webCrawlRelations) + .union(publisherRelations) .saveAsHadoopFile( outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); }); } + private static JavaPairRDD prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, + List collectedfrom){ + Dataset df = spark + .read() + .schema("`DOI` STRING, `Organizations` ARRAY>") + .json(inputPath) + .where("DOI is not null"); + + return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); + + + } + private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, String inputPath, List collectedfrom) { @@ -132,6 +156,10 @@ public class PrepareAffiliationRelations implements Serializable { .json(inputPath) .where("DOI is not null"); + return getTextTextJavaPairRDD(collectedfrom, df); + } + + private static JavaPairRDD getTextTextJavaPairRDD(List collectedfrom, Dataset df) { // unroll nested arrays df = df .withColumn("matching", functions.explode(new Column("Matchings"))) @@ -142,41 +170,41 @@ public class PrepareAffiliationRelations implements Serializable { // prepare action sets for affiliation relations return df - .toJavaRDD() - .flatMap((FlatMapFunction) row -> { + .toJavaRDD() + .flatMap((FlatMapFunction) row -> { - // DOI to OpenAIRE id - final String paperId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); + // DOI to OpenAIRE id + final String paperId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); - // ROR id to OpenAIRE id - final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); + // ROR id to OpenAIRE id + final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); - Qualifier qualifier = OafMapperUtils - .qualifier( - BIP_AFFILIATIONS_CLASSID, - BIP_AFFILIATIONS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS); + Qualifier qualifier = OafMapperUtils + .qualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS); - // format data info; setting `confidence` into relation's `trust` - DataInfo dataInfo = OafMapperUtils - .dataInfo( - false, - BIP_INFERENCE_PROVENANCE, - true, - false, - qualifier, - Double.toString(row.getAs("confidence"))); + // format data info; setting `confidence` into relation's `trust` + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + BIP_INFERENCE_PROVENANCE, + true, + false, + qualifier, + Double.toString(row.getAs("confidence"))); - // return bi-directional relations - return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); + // return bi-directional relations + return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); - }) - .map(p -> new AtomicAction(Relation.class, p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))); + }) + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } private static List getAffiliationRelationPair(String paperId, String affId, List collectedfrom,