From 2cc5b1a39b36f6c0bc35a23cd2c76b7e04609eaf Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Fri, 21 Jul 2023 15:26:50 +0300 Subject: [PATCH] Fixes in workflow.xml --- .../bipfinder/SparkAtomicActionScoreJob.java | 20 ++-- .../score/deserializers/BipProjectModel.java | 97 ++++++++++--------- .../score/deserializers/BipResultModel.java | 4 +- .../PrepareBipFinder.java | 2 +- .../SparkAtomicActionScoreJobTest.java | 28 +++--- .../project/PrepareH2020ProgrammeTest.java | 2 +- .../project/ReadProjectsTest.java | 2 +- .../actionmanager/project/ReadTopicTest.java | 2 +- .../oa/graph/impact_indicators/job.properties | 2 +- .../impact_indicators/oozie_app/workflow.xml | 33 ++++--- 10 files changed, 100 insertions(+), 92 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index 8b8e05723..fb11e829f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -9,7 +9,6 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -25,8 +24,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; @@ -89,8 +89,7 @@ public class SparkAtomicActionScoreJob implements Serializable { default: throw new RuntimeException("Unknown target entity: " + targetEntity); } - } - ); + }); } private static void prepareProjects(SparkSession spark, String inputPath, String outputPath) { @@ -98,17 +97,18 @@ public class SparkAtomicActionScoreJob implements Serializable { // read input bip project scores Dataset projectScores = readPath(spark, inputPath, BipProjectModel.class); - projectScores.map( (MapFunction) bipProjectScores -> { + projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); return project; }, Encoders.bean(Project.class)) - .toJavaRDD() - .map(p -> new AtomicAction(Project.class, p)) - .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + .toJavaRDD() + .map(p -> new AtomicAction(Project.class, p)) + .mapToPair( + aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java index 77c1567a8..680e12504 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java @@ -1,69 +1,74 @@ + package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; -import com.opencsv.bean.CsvBindByPosition; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; -import eu.dnetlib.dhp.schema.oaf.Measure; +import static eu.dnetlib.dhp.actionmanager.Constants.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import static eu.dnetlib.dhp.actionmanager.Constants.*; +import com.opencsv.bean.CsvBindByPosition; + +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Measure; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; @NoArgsConstructor @AllArgsConstructor @Getter @Setter public class BipProjectModel { - String projectId; + String projectId; - String numOfInfluentialResults; + String numOfInfluentialResults; - String numOfPopularResults; + String numOfPopularResults; - String totalImpulse; + String totalImpulse; - String totalCitationCount; + String totalCitationCount; - // each project bip measure has exactly one value, hence one key-value pair - private Measure createMeasure(String measureId, String measureValue) { + // each project bip measure has exactly one value, hence one key-value pair + private Measure createMeasure(String measureId, String measureValue) { - KeyValue kv = new KeyValue(); - kv.setKey("score"); - kv.setValue(measureValue); - kv.setDataInfo( - OafMapperUtils.dataInfo( - false, - UPDATE_DATA_INFO_TYPE, - true, - false, - OafMapperUtils.qualifier( - UPDATE_MEASURE_BIP_CLASS_ID, - UPDATE_CLASS_NAME, - ModelConstants.DNET_PROVENANCE_ACTIONS, - ModelConstants.DNET_PROVENANCE_ACTIONS), - "") - ); + KeyValue kv = new KeyValue(); + kv.setKey("score"); + kv.setValue(measureValue); + kv + .setDataInfo( + OafMapperUtils + .dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils + .qualifier( + UPDATE_MEASURE_BIP_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "")); - Measure measure = new Measure(); - measure.setId(measureId); - measure.setUnit(Collections.singletonList(kv)); - return measure; - } - public List toMeasures() { - return Arrays.asList( - createMeasure("numOfInfluentialResults", numOfInfluentialResults), - createMeasure("numOfPopularResults", numOfPopularResults), - createMeasure("totalImpulse", totalImpulse), - createMeasure("totalCitationCount", totalCitationCount) - ); - } + Measure measure = new Measure(); + measure.setId(measureId); + measure.setUnit(Collections.singletonList(kv)); + return measure; + } + + public List toMeasures() { + return Arrays + .asList( + createMeasure("numOfInfluentialResults", numOfInfluentialResults), + createMeasure("numOfPopularResults", numOfPopularResults), + createMeasure("totalImpulse", totalImpulse), + createMeasure("totalCitationCount", totalCitationCount)); + } } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java index 06a173413..f992dc59f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java @@ -1,13 +1,13 @@ package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; -import eu.dnetlib.dhp.actionmanager.bipmodel.Score; - import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import eu.dnetlib.dhp.actionmanager.bipmodel.Score; + /** * Class that maps the model of the bipFinder! input data. * Only needed for deserialization purposes diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index efcb96a85..0507f90e5 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -24,8 +24,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.common.ModelConstants; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index aa5a19f11..7752fbc27 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -7,8 +7,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import eu.dnetlib.dhp.schema.oaf.KeyValue; -import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -28,6 +26,8 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionScoreJobTest { @@ -73,15 +73,16 @@ public class SparkAtomicActionScoreJobTest { } private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception { - SparkAtomicActionScoreJob.main( - new String[] { + SparkAtomicActionScoreJob + .main( + new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputPath", inputPath, "-outputPath", outputPath, "-targetEntity", targetEntity, - } - ); + }); } + @Test void testResultScores() throws Exception { final String targetEntity = RESULT; @@ -149,8 +150,8 @@ public class SparkAtomicActionScoreJobTest { void testProjectScores() throws Exception { String targetEntity = PROJECT; String inputResultScores = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") - .getPath(); + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") + .getPath(); String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; // execute the job to generate the action sets for project scores @@ -159,9 +160,9 @@ public class SparkAtomicActionScoreJobTest { final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD projects = sc - .sequenceFile(outputPath, Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Project) aa.getPayload())); + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Project) aa.getPayload())); // test the number of projects assertEquals(4, projects.count()); @@ -171,7 +172,8 @@ public class SparkAtomicActionScoreJobTest { // count that the project with id testProjectId is present assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count()); - projects.filter(row -> row.getId().equals(testProjectId)) + projects + .filter(row -> row.getId().equals(testProjectId)) .flatMap(r -> r.getMeasures().iterator()) .foreach(m -> { log.info(m.getId() + " " + m.getUnit()); @@ -184,7 +186,7 @@ public class SparkAtomicActionScoreJobTest { // ensure that the correct key is provided, i.e. score assertEquals("score", kv.getKey()); - switch(m.getId()) { + switch (m.getId()) { case "numOfInfluentialResults": assertEquals("0", kv.getValue()); break; diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java index c68bfa13a..b30658feb 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/PrepareH2020ProgrammeTest.java @@ -92,7 +92,7 @@ public class PrepareH2020ProgrammeTest { Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count()); - //tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); + // tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme))); Assertions .assertEquals( diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java index 4be09c4b7..0d92c48a8 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadProjectsTest.java @@ -98,7 +98,7 @@ public class ReadProjectsTest { Assertions.assertEquals("H2020-EU.1.3.", project.getLegalBasis()); Assertions.assertEquals("MSCA-IF-2019", project.getTopics()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } } diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java index bdb0cc3a1..82a9e6aed 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/project/ReadTopicTest.java @@ -93,7 +93,7 @@ public class ReadTopicTest { Assertions.assertEquals("Individual Fellowships", topic.getTitle()); Assertions.assertEquals("MSCA-IF-2019", topic.getTopic()); - //tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); + // tmp.foreach(p -> System.out.println(OBJECT_MAPPER.writeValueAsString(p))); } } diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties index 9d6c94ca9..b1598910d 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/job.properties @@ -97,7 +97,7 @@ oozie.wf.application.path=${oozieTopWfApplicationPath} # Path where the final output should be? -actionSetOutputPath=${workingDir}/bip_actionsets/ +actionSetOutputPath=${workingDir}/bip_actionsets # The directory to store project impact indicators projectImpactIndicatorsOutput=${workingDir}/project_indicators diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 6eb783941..65067dace 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -24,20 +24,21 @@ ${wf:conf('resume') eq "impulse"} ${wf:conf('resume') eq "pagerank"} ${wf:conf('resume') eq "attrank"} - ${wf:conf('resume') eq "format-results"} ${wf:conf('resume') eq "map-ids"} ${wf:conf('resume') eq "map-scores"} ${wf:conf('resume') eq "start"} - ${wf:conf('resume') eq "projects-impact"} - + + ${wf:conf('resume') eq "projects-impact"} + ${wf:conf('resume') eq "projects-impact-actionsets"} + - + @@ -479,7 +480,7 @@ - + @@ -526,7 +527,7 @@ - + @@ -568,14 +569,14 @@ - - + + - + @@ -583,13 +584,13 @@ - + - + - yarn + yarn-cluster cluster Produces the atomic action with the bip finder scores for publications eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob @@ -640,7 +641,7 @@ - ${openaireDataInput}/relations + ${openaireDataInput}/relation ${nameNode}/${workingDir}/${wf:actionData('get-file-names')['pr_file']} @@ -658,16 +659,16 @@ - + - + - yarn + yarn-cluster cluster Produces the atomic action with the bip finder scores for projects eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob