diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 8b8e05723..fb11e829f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -9,7 +9,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -25,8 +24,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; @@ -89,8 +89,7 @@ public class SparkAtomicActionScoreJob implements Serializable { default: throw new RuntimeException("Unknown target entity: " + targetEntity); } - } - ); + }); } private static void prepareProjects(SparkSession spark, String inputPath, String outputPath) { @@ -98,17 +97,18 @@ public class SparkAtomicActionScoreJob implements Serializable { // read input bip project scores Dataset projectScores = readPath(spark, inputPath, BipProjectModel.class); - projectScores.map( (MapFunction) bipProjectScores -> { + projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); return project; }, Encoders.bean(Project.class)) - .toJavaRDD() - .map(p -> new AtomicAction(Project.class, p)) - .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + .toJavaRDD() + .map(p -> new AtomicAction(Project.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java index 77c1567a8..680e12504 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java @@ -1,69 +1,74 @@ + package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; -import com.opencsv.bean.CsvBindByPosition; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; -import eu.dnetlib.dhp.schema.oaf.Measure; +import static eu.dnetlib.dhp.actionmanager.Constants.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import static eu.dnetlib.dhp.actionmanager.Constants.*; +import com.opencsv.bean.CsvBindByPosition; + +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Measure; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; @NoArgsConstructor @AllArgsConstructor @Getter @Setter public class BipProjectModel { - String projectId; + String projectId; - String numOfInfluentialResults; + String numOfInfluentialResults; - String numOfPopularResults; + String numOfPopularResults; - String totalImpulse; + String totalImpulse; - String totalCitationCount; + String totalCitationCount; - // each project bip measure has exactly one value, hence one key-value pair - private Measure createMeasure(String measureId, String measureValue) { + // each project bip measure has exactly one value, hence one key-value pair + private Measure createMeasure(String measureId, String measureValue) { - KeyValue kv = new KeyValue(); - kv.setKey("score"); - kv.setValue(measureValue); - kv.setDataInfo( - OafMapperUtils.dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils.qualifier( - UPDATE_MEASURE_BIP_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - "") - ); + KeyValue kv = new KeyValue(); + kv.setKey("score"); + kv.setValue(measureValue); + kv + .setDataInfo( + OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_BIP_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "")); - Measure measure = new Measure(); - measure.setId(measureId); - measure.setUnit(Collections.singletonList(kv)); - return measure; - } - public List toMeasures() { - return Arrays.asList( - createMeasure("numOfInfluentialResults", numOfInfluentialResults), - createMeasure("numOfPopularResults", numOfPopularResults), - createMeasure("totalImpulse", totalImpulse), - createMeasure("totalCitationCount", totalCitationCount) - ); - } + Measure measure = new Measure(); + measure.setId(measureId); + measure.setUnit(Collections.singletonList(kv)); + return measure; + } + + public List toMeasures() { + return Arrays + .asList( + createMeasure("numOfInfluentialResults", numOfInfluentialResults), + createMeasure("numOfPopularResults", numOfPopularResults), + createMeasure("totalImpulse", totalImpulse), + createMeasure("totalCitationCount", totalCitationCount)); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java index 06a173413..f992dc59f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java @@ -1,13 +1,13 @@ package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; -import eu.dnetlib.dhp.actionmanager.bipmodel.Score; - import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import eu.dnetlib.dhp.actionmanager.bipmodel.Score; + /** * Class that maps the model of the bipFinder! input data. * Only needed for deserialization purposes diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index efcb96a85..0507f90e5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -24,8 +24,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index aa5a19f11..7752fbc27 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -7,8 +7,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -28,6 +26,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionScoreJobTest { @@ -73,15 +73,16 @@ public class SparkAtomicActionScoreJobTest { } private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception { - SparkAtomicActionScoreJob.main( - new String[] { + SparkAtomicActionScoreJob + .main( + new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputPath", inputPath, "-outputPath", outputPath, "-targetEntity", targetEntity, - } - ); + }); } + @Test void testResultScores() throws Exception { final String targetEntity = RESULT; @@ -149,8 +150,8 @@ public class SparkAtomicActionScoreJobTest { void testProjectScores() throws Exception { String targetEntity = PROJECT; String inputResultScores = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") + .getPath(); String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; // execute the job to generate the action sets for project scores @@ -159,9 +160,9 @@ public class SparkAtomicActionScoreJobTest { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD projects = sc - .sequenceFile(outputPath, Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Project) aa.getPayload())); + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Project) aa.getPayload())); // test the number of projects assertEquals(4, projects.count()); @@ -171,7 +172,8 @@ public class SparkAtomicActionScoreJobTest { // count that the project with id testProjectId is present assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count()); - projects.filter(row -> row.getId().equals(testProjectId)) + projects + .filter(row -> row.getId().equals(testProjectId)) .flatMap(r -> r.getMeasures().iterator()) .foreach(m -> { log.info(m.getId() + " " + m.getUnit()); @@ -184,7 +186,7 @@ public class SparkAtomicActionScoreJobTest { // ensure that the correct key is provided, i.e. score assertEquals("score", kv.getKey()); - switch(m.getId()) { + switch (m.getId()) { case "numOfInfluentialResults": assertEquals("0", kv.getValue()); break; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java index c68bfa13a..b30658feb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java @@ -92,7 +92,7 @@ public class PrepareH2020ProgrammeTest { Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count()); - //tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); + // tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); Assertions .assertEquals( diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java index 4be09c4b7..0d92c48a8 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java @@ -98,7 +98,7 @@ public class ReadProjectsTest { Assertions.assertEquals("H2020-EU.1.3.", project.getLegalBasis()); Assertions.assertEquals("MSCA-IF-2019", project.getTopics()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java index bdb0cc3a1..82a9e6aed 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java @@ -93,7 +93,7 @@ public class ReadTopicTest { Assertions.assertEquals("Individual Fellowships", topic.getTitle()); Assertions.assertEquals("MSCA-IF-2019", topic.getTopic()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } } diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties index 9d6c94ca9..b1598910d 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties @@ -97,7 +97,7 @@ oozie.wf.application.path=${oozieTopWfApplicationPath} # Path where the final output should be? -actionSetOutputPath=${workingDir}/bip_actionsets/ +actionSetOutputPath=${workingDir}/bip_actionsets # The directory to store project impact indicators projectImpactIndicatorsOutput=${workingDir}/project_indicators diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 6eb783941..349e054d8 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -13,7 +13,6 @@ - @@ -24,40 +23,28 @@ ${wf:conf('resume') eq "impulse"} ${wf:conf('resume') eq "pagerank"} ${wf:conf('resume') eq "attrank"} - ${wf:conf('resume') eq "format-results"} ${wf:conf('resume') eq "map-ids"} ${wf:conf('resume') eq "map-scores"} ${wf:conf('resume') eq "start"} - ${wf:conf('resume') eq "projects-impact"} - + + ${wf:conf('resume') eq "projects-impact"} + ${wf:conf('resume') eq "projects-impact-actionsets"} + - + - - - - - yarn-cluster cluster - - - Openaire Ranking Graph Creation - + OpenAIRE Ranking Graph Creation create_openaire_ranking_graph.py - --executor-memory=${sparkHighExecutorMemory} @@ -79,39 +66,30 @@ ${sparkShufflePartitions} ${openaireGraphInputPath} - + ${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py - - - - + - - yarn-cluster cluster - - - Spark CC - + Citation Count calculation CC.py - --executor-memory=${sparkHighExecutorMemory} @@ -128,31 +106,23 @@ ${openaireGraphInputPath} ${sparkShufflePartitions} - + ${wfAppPath}/bip-ranker/CC.py#CC.py - - - + - - yarn-cluster cluster - - - Spark RAM - + RAM calculation TAR.py - --executor-memory=${sparkHighExecutorMemory} @@ -170,37 +140,27 @@ ${ramGamma} ${currentYear} RAM - ${sparkShufflePartitions} ${checkpointDir} - + ${wfAppPath}/bip-ranker/TAR.py#TAR.py - - - + - - - yarn-cluster cluster - - - Spark Impulse - + Impulse calculation CC.py - --executor-memory=${sparkHighExecutorMemory} @@ -218,47 +178,22 @@ ${sparkShufflePartitions} 3 - + ${wfAppPath}/bip-ranker/CC.py#CC.py - - - - - - - - - - - - - - - - yarn-cluster cluster - - - Spark Pagerank - + Pagerank calculation PageRank.py - --executor-memory=${sparkHighExecutorMemory} @@ -279,31 +214,22 @@ ${sparkShufflePartitions} dfs - + ${wfAppPath}/bip-ranker/PageRank.py#PageRank.py - - - - - yarn-cluster cluster - - - Spark AttRank - + AttRank calculation AttRank.py - --executor-memory=${sparkHighExecutorMemory} @@ -329,27 +255,16 @@ ${sparkShufflePartitions} dfs - + ${wfAppPath}/bip-ranker/AttRank.py#AttRank.py - - - - - - - @@ -359,15 +274,12 @@ ${workingDir} - ${wfAppPath}/get_ranking_files.sh#get_ranking_files.sh - - @@ -382,18 +294,12 @@ - - yarn-cluster cluster - - Format Ranking Results JSON - format_ranking_results.py - --executor-memory=${sparkNormalExecutorMemory} @@ -418,13 +324,11 @@ ${sparkShufflePartitions} openaire - + ${wfAppPath}/format_ranking_results.py#format_ranking_results.py - - @@ -470,18 +374,15 @@ ${wfAppPath}/format_ranking_results.py#format_ranking_results.py - - - + - + - @@ -489,15 +390,10 @@ - yarn-cluster cluster - - Openaire-DOI synonym collection - map_openaire_ids_to_dois.py - --executor-memory=${sparkHighExecutorMemory} @@ -514,19 +410,16 @@ ${openaireDataInput}/ ${synonymFolder} - + ${wfAppPath}/map_openaire_ids_to_dois.py#map_openaire_ids_to_dois.py - - - - + @@ -534,12 +427,8 @@ yarn-cluster cluster - - Mapping Openaire Scores to DOIs - map_scores_to_dois.py - --executor-memory=${sparkHighExecutorMemory} @@ -563,19 +452,16 @@ ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['impulse_file']} ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['ram_file']} - ${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py - - - - + - + + @@ -583,17 +469,19 @@ - + - + - yarn + + yarn-cluster cluster Produces the atomic action with the bip finder scores for publications eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob dhp-aggregation-${projectVersion}.jar + --executor-memory=${sparkNormalExecutorMemory} --executor-cores=${sparkExecutorCores} @@ -608,23 +496,19 @@ --outputPath${actionSetOutputPath}/results/ --targetEntityresult + + - - yarn-cluster cluster - - - Project Impact Indicators - + Project Impact Indicators calculation projects_impact.py - --executor-memory=${sparkHighExecutorMemory} @@ -638,9 +522,8 @@ - - ${openaireDataInput}/relations + ${openaireDataInput}/relation ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']} @@ -652,26 +535,23 @@ ${sparkShufflePartitions} ${projectImpactIndicatorsOutput} - - ${wfAppPath}/projects_impact.py#projects_impact.py - - - - + - + - yarn + + yarn-cluster cluster Produces the atomic action with the bip finder scores for projects eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob dhp-aggregation-${projectVersion}.jar + --executor-memory=${sparkNormalExecutorMemory} --executor-cores=${sparkExecutorCores} @@ -682,12 +562,15 @@ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + --inputPath${projectImpactIndicatorsOutput} --outputPath${actionSetOutputPath}/projects/ --targetEntityproject + +