From 7df492fa346bb20f614a2d06e7dbeb5085338f49 Mon Sep 17 00:00:00 2001 From: Miriam Baglioni Date: Tue, 6 Aug 2024 16:27:35 +0200 Subject: [PATCH] [AffiliationFromPublisher]refactoring --- .../PrepareAffiliationRelations.java | 82 ++++++++++--------- 1 file changed, 42 insertions(+), 40 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java index 03b8fa73a..1964edce1 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipaffiliations/PrepareAffiliationRelations.java @@ -89,49 +89,51 @@ public class PrepareAffiliationRelations implements Serializable { isSparkSessionManaged, spark -> { Constants.removeOutputDir(spark, outputPath); - - List collectedFromCrossref = OafMapperUtils - .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); - JavaPairRDD crossrefRelations = prepareAffiliationRelations( - spark, crossrefInputPath, collectedFromCrossref); - - List collectedFromPubmed = OafMapperUtils - .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); - JavaPairRDD pubmedRelations = prepareAffiliationRelations( - spark, pubmedInputPath, collectedFromPubmed); - - List collectedFromOpenAPC = OafMapperUtils - .listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC"); - JavaPairRDD openAPCRelations = prepareAffiliationRelations( - spark, openapcInputPath, collectedFromOpenAPC); - - List collectedFromDatacite = OafMapperUtils - .listKeyValues(ModelConstants.DATACITE_ID, "Datacite"); - JavaPairRDD dataciteRelations = prepareAffiliationRelations( - spark, dataciteInputPath, collectedFromDatacite); - - List collectedFromWebCrawl = OafMapperUtils - .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); - JavaPairRDD webCrawlRelations = prepareAffiliationRelations( - spark, webcrawlInputPath, collectedFromWebCrawl); - - List collectedfromPublisher = OafMapperUtils - .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); - JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisher( - spark, publisherlInputPath, collectedfromPublisher); - - crossrefRelations - .union(pubmedRelations) - .union(openAPCRelations) - .union(dataciteRelations) - .union(webCrawlRelations) - .union(publisherRelations) - .saveAsHadoopFile( - outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); - + createActionSet(spark, crossrefInputPath, pubmedInputPath, openapcInputPath, dataciteInputPath, webcrawlInputPath, publisherlInputPath, outputPath); }); } + private static void createActionSet(SparkSession spark, String crossrefInputPath, String pubmedInputPath, String openapcInputPath, String dataciteInputPath, String webcrawlInputPath, String publisherlInputPath, String outputPath) { + List collectedFromCrossref = OafMapperUtils + .listKeyValues(ModelConstants.CROSSREF_ID, "Crossref"); + JavaPairRDD crossrefRelations = prepareAffiliationRelations( + spark, crossrefInputPath, collectedFromCrossref); + + List collectedFromPubmed = OafMapperUtils + .listKeyValues(ModelConstants.PUBMED_CENTRAL_ID, "Pubmed"); + JavaPairRDD pubmedRelations = prepareAffiliationRelations( + spark, pubmedInputPath, collectedFromPubmed); + + List collectedFromOpenAPC = OafMapperUtils + .listKeyValues(ModelConstants.OPEN_APC_ID, "OpenAPC"); + JavaPairRDD openAPCRelations = prepareAffiliationRelations( + spark, openapcInputPath, collectedFromOpenAPC); + + List collectedFromDatacite = OafMapperUtils + .listKeyValues(ModelConstants.DATACITE_ID, "Datacite"); + JavaPairRDD dataciteRelations = prepareAffiliationRelations( + spark, dataciteInputPath, collectedFromDatacite); + + List collectedFromWebCrawl = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + JavaPairRDD webCrawlRelations = prepareAffiliationRelations( + spark, webcrawlInputPath, collectedFromWebCrawl); + + List collectedfromPublisher = OafMapperUtils + .listKeyValues(OPENAIRE_DATASOURCE_ID, OPENAIRE_DATASOURCE_NAME); + JavaPairRDD publisherRelations = prepareAffiliationRelationFromPublisher( + spark, publisherlInputPath, collectedfromPublisher); + + crossrefRelations + .union(pubmedRelations) + .union(openAPCRelations) + .union(dataciteRelations) + .union(webCrawlRelations) + .union(publisherRelations) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class); + } + private static JavaPairRDD prepareAffiliationRelationFromPublisher(SparkSession spark, String inputPath, List collectedfrom){ Dataset df = spark