diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 98915bdc5..03b8fa73a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -44,6 +44,8 @@ public class PrepareAffiliationRelations implements Serializable { public static final String BIP_AFFILIATIONS_CLASSID = "result:organization:openaireinference"; public static final String BIP_AFFILIATIONS_CLASSNAME = "Affiliation relation inferred by OpenAIRE"; public static final String BIP_INFERENCE_PROVENANCE = "openaire:affiliation"; + public static final String OPENAIRE_DATASOURCE_ID = "10|infrastruct_::f66f1bd369679b5b077dcdf006089556"; + public static final String OPENAIRE_DATASOURCE_NAME = "OpenAIRE"; public static void main(String[] args) throws Exception { @@ -74,6 +76,9 @@ public class PrepareAffiliationRelations implements Serializable { final String webcrawlInputPath = parser.get("webCrawlInputPath"); log.info("webcrawlInputPath: {}", webcrawlInputPath); + final String publisherlInputPath = parser.get("publisherlInputPath"); + log.info("publisherlInputPath: {}", publisherlInputPath); + final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -106,21 +111,40 @@ public class PrepareAffiliationRelations implements Serializable { spark, dataciteInputPath, collectedFromDatacite); List collectedFromWebCrawl = OafMapperUtils - .listKeyValues(Constants.WEB_CRAWL_ID, Constants.WEB_CRAWL_NAME); + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); JavaPairRDD webCrawlRelations = prepareAffiliationRelations( spark, webcrawlInputPath, collectedFromWebCrawl); + List collectedfromPublisher = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisher( + spark, publisherlInputPath, collectedfromPublisher); + crossrefRelations .union(pubmedRelations) .union(openAPCRelations) .union(dataciteRelations) .union(webCrawlRelations) + .union(publisherRelations) .saveAsHadoopFile( outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); }); } + private static JavaPairRDD prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, + List collectedfrom){ + Dataset df = spark + .read() + .schema("`DOI` STRING, `Organizations` ARRAY>") + .json(inputPath) + .where("DOI is not null"); + + return getTextTextJavaPairRDD(collectedfrom, df.selectExpr("DOI", "Organizations as Matchings")); + + + } + private static JavaPairRDD prepareAffiliationRelations(SparkSession spark, String inputPath, List collectedfrom) { @@ -132,6 +156,10 @@ public class PrepareAffiliationRelations implements Serializable { .json(inputPath) .where("DOI is not null"); + return getTextTextJavaPairRDD(collectedfrom, df); + } + + private static JavaPairRDD getTextTextJavaPairRDD(List collectedfrom, Dataset df) { // unroll nested arrays df = df .withColumn("matching", functions.explode(new Column("Matchings"))) @@ -142,41 +170,41 @@ public class PrepareAffiliationRelations implements Serializable { // prepare action sets for affiliation relations return df - .toJavaRDD() - .flatMap((FlatMapFunction) row -> { + .toJavaRDD() + .flatMap((FlatMapFunction) row -> { - // DOI to OpenAIRE id - final String paperId = ID_PREFIX - + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); + // DOI to OpenAIRE id + final String paperId = ID_PREFIX + + IdentifierFactory.md5(CleaningFunctions.normalizePidValue("doi", row.getAs("doi"))); - // ROR id to OpenAIRE id - final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); + // ROR id to OpenAIRE id + final String affId = GenerateRorActionSetJob.calculateOpenaireId(row.getAs("rorid")); - Qualifier qualifier = OafMapperUtils - .qualifier( - BIP_AFFILIATIONS_CLASSID, - BIP_AFFILIATIONS_CLASSNAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS); + Qualifier qualifier = OafMapperUtils + .qualifier( + BIP_AFFILIATIONS_CLASSID, + BIP_AFFILIATIONS_CLASSNAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS); - // format data info; setting `confidence` into relation's `trust` - DataInfo dataInfo = OafMapperUtils - .dataInfo( - false, - BIP_INFERENCE_PROVENANCE, - true, - false, - qualifier, - Double.toString(row.getAs("confidence"))); + // format data info; setting `confidence` into relation's `trust` + DataInfo dataInfo = OafMapperUtils + .dataInfo( + false, + BIP_INFERENCE_PROVENANCE, + true, + false, + qualifier, + Double.toString(row.getAs("confidence"))); - // return bi-directional relations - return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); + // return bi-directional relations + return getAffiliationRelationPair(paperId, affId, collectedfrom, dataInfo).iterator(); - }) - .map(p -> new AtomicAction(Relation.class, p)) - .mapToPair( - aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))); + }) + .map(p -> new AtomicAction(Relation.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } private static List getAffiliationRelationPair(String paperId, String affId, List collectedfrom,