diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index ddf5f4adf..8b8e05723 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -24,7 +25,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -40,7 +41,8 @@ import scala.Tuple2; */ public class SparkAtomicActionScoreJob implements Serializable { - private static final String DOI = "doi"; + private static final String RESULT = "result"; + private static final String PROJECT = "project"; private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -56,18 +58,17 @@ public class SparkAtomicActionScoreJob implements Serializable { parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + log.info("inputPath: {}", inputPath); final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + log.info("outputPath: {}", outputPath); + + final String targetEntity = parser.get("targetEntity"); + log.info("targetEntity: {}", targetEntity); SparkConf conf = new SparkConf(); @@ -76,17 +77,48 @@ public class SparkAtomicActionScoreJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResults(spark, inputPath, outputPath); - }); + + // follow different procedures for different target entities + switch (targetEntity) { + case RESULT: + prepareResults(spark, inputPath, outputPath); + break; + case PROJECT: + prepareProjects(spark, inputPath, outputPath); + break; + default: + throw new RuntimeException("Unknown target entity: " + targetEntity); + } + } + ); + } + + private static void prepareProjects(SparkSession spark, String inputPath, String outputPath) { + + // read input bip project scores + Dataset projectScores = readPath(spark, inputPath, BipProjectModel.class); + + projectScores.map( (MapFunction) bipProjectScores -> { + Project project = new Project(); + project.setId(bipProjectScores.getProjectId()); + project.setMeasures(bipProjectScores.toMeasures()); + return project; + }, Encoders.bean(Project.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Project.class, p)) + .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + } private static void prepareResults(SparkSession spark, String bipScorePath, String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD bipDeserializeJavaRDD = sc .textFile(bipScorePath) - .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class)); Dataset bipScores = spark .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { @@ -159,12 +191,4 @@ public class SparkAtomicActionScoreJob implements Serializable { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - public static Dataset readPath( - SparkSession spark, String inputPath, Class clazz) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java new file mode 100644 index 000000000..77c1567a8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java @@ -0,0 +1,69 @@ +package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; + +import com.opencsv.bean.CsvBindByPosition; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import eu.dnetlib.dhp.schema.oaf.Measure; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; + +@NoArgsConstructor +@AllArgsConstructor +@Getter +@Setter +public class BipProjectModel { + String projectId; + + String numOfInfluentialResults; + + String numOfPopularResults; + + String totalImpulse; + + String totalCitationCount; + + // each project bip measure has exactly one value, hence one key-value pair + private Measure createMeasure(String measureId, String measureValue) { + + KeyValue kv = new KeyValue(); + kv.setKey("score"); + kv.setValue(measureValue); + kv.setDataInfo( + OafMapperUtils.dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils.qualifier( + UPDATE_MEASURE_BIP_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "") + ); + + Measure measure = new Measure(); + measure.setId(measureId); + measure.setUnit(Collections.singletonList(kv)); + return measure; + } + public List toMeasures() { + return Arrays.asList( + createMeasure("numOfInfluentialResults", numOfInfluentialResults), + createMeasure("numOfPopularResults", numOfPopularResults), + createMeasure("totalImpulse", totalImpulse), + createMeasure("totalCitationCount", totalCitationCount) + ); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java similarity index 65% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java index a70bca618..06a173413 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java @@ -1,5 +1,7 @@ -package eu.dnetlib.dhp.actionmanager.bipmodel; +package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; + +import eu.dnetlib.dhp.actionmanager.bipmodel.Score; import java.io.Serializable; import java.util.ArrayList; @@ -11,9 +13,9 @@ import java.util.List; * Only needed for deserialization purposes */ -public class BipDeserialize extends HashMap> implements Serializable { +public class BipResultModel extends HashMap> implements Serializable { - public BipDeserialize() { + public BipResultModel() { super(); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index 80573c71a..efcb96a85 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -82,9 +82,9 @@ public class PrepareBipFinder implements Serializable { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD bipDeserializeJavaRDD = sc .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class)); spark .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json index 7663a454b..d6b93c5af 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json @@ -16,5 +16,11 @@ "paramLongName": "outputPath", "paramDescription": "the path of the new ActionSet", "paramRequired": true + }, + { + "paramName": "te", + "paramLongName": "targetEntity", + "paramDescription": "the type of target entity to be enriched; currently supported one of { 'result', 'project' }", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index be82b9fc3..aa5a19f11 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -6,8 +6,9 @@ import static org.junit.jupiter.api.Assertions.*; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -27,7 +28,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionScoreJobTest { @@ -37,8 +37,11 @@ public class SparkAtomicActionScoreJobTest { private static SparkSession spark; private static Path workingDir; - private static final Logger log = LoggerFactory - .getLogger(SparkAtomicActionScoreJobTest.class); + + private final static String RESULT = "result"; + private final static String PROJECT = "project"; + + private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJobTest.class); @BeforeAll public static void beforeAll() throws IOException { @@ -69,29 +72,31 @@ public class SparkAtomicActionScoreJobTest { spark.stop(); } + private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception { + SparkAtomicActionScoreJob.main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", inputPath, + "-outputPath", outputPath, + "-targetEntity", targetEntity, + } + ); + } @Test - void testMatch() throws Exception { - String bipScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json") + void testResultScores() throws Exception { + final String targetEntity = RESULT; + String inputResultScores = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json") .getPath(); + String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; - SparkAtomicActionScoreJob - .main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - - bipScoresPath, - - "-outputPath", - workingDir.toString() + "/actionSet" - }); + // execute the job to generate the action sets for result scores + runJob(inputResultScores, outputPath, targetEntity); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .sequenceFile(outputPath, Text.class, Text.class) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Result) aa.getPayload())); @@ -140,4 +145,61 @@ public class SparkAtomicActionScoreJobTest { } + @Test + void testProjectScores() throws Exception { + String targetEntity = PROJECT; + String inputResultScores = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") + .getPath(); + String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; + + // execute the job to generate the action sets for project scores + runJob(inputResultScores, outputPath, PROJECT); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD projects = sc + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Project) aa.getPayload())); + + // test the number of projects + assertEquals(4, projects.count()); + + String testProjectId = "40|nih_________::c02a8233e9b60f05bb418f0c9b714833"; + + // count that the project with id testProjectId is present + assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count()); + + projects.filter(row -> row.getId().equals(testProjectId)) + .flatMap(r -> r.getMeasures().iterator()) + .foreach(m -> { + log.info(m.getId() + " " + m.getUnit()); + + // ensure that only one score is present for each bip impact measure + assertEquals(1, m.getUnit().size()); + + KeyValue kv = m.getUnit().get(0); + + // ensure that the correct key is provided, i.e. score + assertEquals("score", kv.getKey()); + + switch(m.getId()) { + case "numOfInfluentialResults": + assertEquals("0", kv.getValue()); + break; + case "numOfPopularResults": + assertEquals("1", kv.getValue()); + break; + case "totalImpulse": + assertEquals("25", kv.getValue()); + break; + case "totalCitationCount": + assertEquals("43", kv.getValue()); + break; + default: + fail("Unknown measure id in the context of projects"); + } + }); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json new file mode 100644 index 000000000..096268287 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json @@ -0,0 +1,4 @@ +{"projectId":"40|nsf_________::d93e50d22374a1cf59f6a232413ea027","numOfInfluentialResults":0,"numOfPopularResults":10,"totalImpulse":181,"totalCitationCount":235} +{"projectId":"40|nih_________::1c93debc7085e440f245fbe70b2e8b21","numOfInfluentialResults":14,"numOfPopularResults":17,"totalImpulse":1558,"totalCitationCount":4226} +{"projectId":"40|nih_________::c02a8233e9b60f05bb418f0c9b714833","numOfInfluentialResults":0,"numOfPopularResults":1,"totalImpulse":25,"totalCitationCount":43} +{"projectId":"40|corda_______::d91dcf3a87dd7f72248fab0b8a4ba273","numOfInfluentialResults":2,"numOfPopularResults":3,"totalImpulse":78,"totalCitationCount":178} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json diff --git a/dhp-workflows/dhp-impact-indicators/README.md b/dhp-workflows/dhp-impact-indicators/README.md index 14f489da3..45a4701e7 100644 --- a/dhp-workflows/dhp-impact-indicators/README.md +++ b/dhp-workflows/dhp-impact-indicators/README.md @@ -1,4 +1,4 @@ -# Ranking Workflow for Openaire Publications +# Ranking Workflow for OpenAIRE Publications This project contains the files for running a paper ranking workflow on the openaire graph using apache oozie. All scripts are written in python and the project setup follows the typical oozie workflow structure: @@ -7,17 +7,15 @@ All scripts are written in python and the project setup follows the typical oozi - a job.properties file specifying parameter values for the parameters used by the workflow - a set of python scripts used by the workflow -**NOTE**: the workflow depends on the external library of ranking scripts called BiP! Ranker. +**NOTE**: the workflow depends on the external library of ranking scripts called [BiP! Ranker](https://github.com/athenarc/Bip-Ranker). You can check out a specific tag/release of BIP! Ranker using maven, as described in the following section. -## Check out a specific tag/release of BIP-Ranker +## Build and deploy -* Edit the `scmVersion` of the maven-scm-plugin in the pom.xml to point to the tag/release version you want to check out. - -* Then, use maven to perform the checkout: +Use the following command for packaging: ``` -mvn scm:checkout +mvn package -Poozie-package -Dworkflow.source.dir=eu/dnetlib/dhp/oa/graph/impact_indicators -DskipTests ``` -* The code should be visible under `src/main/bip-ranker` folder. \ No newline at end of file +Note: edit the property `bip.ranker.tag` of the `pom.xml` file to specify the tag of [BIP-Ranker](https://github.com/athenarc/Bip-Ranker) that you want to use. diff --git a/dhp-workflows/dhp-impact-indicators/pom.xml b/dhp-workflows/dhp-impact-indicators/pom.xml index b510635a6..a9eb0a4a1 100644 --- a/dhp-workflows/dhp-impact-indicators/pom.xml +++ b/dhp-workflows/dhp-impact-indicators/pom.xml @@ -5,9 +5,8 @@ 4.0.0 eu.dnetlib.dhp - dhp + dhp-workflows 1.2.5-SNAPSHOT - ../pom.xml dhp-impact-indicators @@ -16,6 +15,9 @@ 8 8 UTF-8 + + + v1.0.0 @@ -32,10 +34,29 @@ connection tag - v1.0.0 - ${project.build.directory}/../src/main/bip-ranker + ${bip.ranker.tag} + ${project.build.directory}/${oozie.package.file.name}/${oozieAppDir}/bip-ranker + + + checkout-bip-ranker + prepare-package + + checkout + + + + + + + eu.dnetlib.dhp + dhp-aggregation + ${projectVersion} + compile + + + \ No newline at end of file diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/format_ranking_results.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/format_ranking_results.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/get_ranking_files.sh b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_ranking_files.sh similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/get_ranking_files.sh rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_ranking_files.sh diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties similarity index 96% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties index a902c413f..f9f5519cc 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/job.properties +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties @@ -90,3 +90,6 @@ oozie.wf.application.path=${wfAppPath} # Path where the final output should be? actionSetOutputPath=${workflowDataDir}/bip_actionsets/ +# The directory to store project impact indicators +projectImpactIndicatorsOutput=${workflowDataDir}/project_indicators + diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/map_openaire_ids_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/map_openaire_ids_to_dois.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/map_scores_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py similarity index 100% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/map_scores_to_dois.py rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py new file mode 100644 index 000000000..d60f86e88 --- /dev/null +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py @@ -0,0 +1,108 @@ +import sys +from pyspark.sql import SparkSession +from pyspark import SparkConf, SparkContext +import pyspark.sql.functions as F +from pyspark.sql.types import StringType, IntegerType, StructType, StructField + +if len(sys.argv) < 8: + print("Usage: projects_impact.py ") + sys.exit(-1) + +appName = 'Project Impact Indicators' +conf = SparkConf().setAppName(appName) +sc = SparkContext(conf = conf) +spark = SparkSession.builder.appName(appName).getOrCreate() +sc.setLogLevel('OFF') + +# input parameters +relations_fd = sys.argv[1] +influence_fd = sys.argv[2] +popularity_fd = sys.argv[3] +cc_fd = sys.argv[4] +impulse_fd = sys.argv[5] +num_partitions = int(sys.argv[6]) +output_dir = sys.argv[7] + +# schema for impact indicator files +impact_files_schema = StructType([ + StructField('resultId', StringType(), False), + StructField('score', IntegerType(), False), + StructField('class', StringType(), False), +]) + +# list of impact indicators +impact_indicators = [ + ('influence', influence_fd, 'class'), + ('popularity', popularity_fd, 'class'), + ('impulse', impulse_fd, 'score'), + ('citation_count', cc_fd, 'score') +] + +''' + * Read impact indicator file and return a dataframe with the following schema: + * resultId: String + * indicator_name: Integer +''' +def read_df(fd, indicator_name, column_name): + return spark.read.schema(impact_files_schema)\ + .option('delimiter', '\t')\ + .option('header', False)\ + .csv(fd)\ + .select('resultId', F.col(column_name).alias(indicator_name))\ + .repartition(num_partitions, 'resultId') + +# Print dataframe schema, first 5 rows, and count +def print_df(df): + df.show(50) + df.printSchema() + print(df.count()) + +# Sets a null value to the column if the value is equal to the given value +def set_class_value_to_null(column, value): + return F.when(column != value, column).otherwise(F.lit(None)) + +# load and filter Project-to-Result relations +print("Reading relations") +relations = spark.read.json(relations_fd)\ + .select(F.col('source').alias('projectId'), F.col('target').alias('resultId'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ + .where( (F.col('relClass') == 'produces') \ + & (F.col('deletedbyinference') == "false")\ + & (F.col('invisible') == "false"))\ + .drop('deletedbyinference')\ + .drop('invisible')\ + .drop('relClass')\ + .repartition(num_partitions, 'resultId') + +for indicator_name, fd, column_name in impact_indicators: + + print("Reading {} '{}' field from file".format(indicator_name, column_name)) + df = read_df(fd, indicator_name, column_name) + + # sets a zero value to the indicator column if the value is C5 + if (column_name == 'class'): + df = df.withColumn(indicator_name, F.when(F.col(indicator_name).isin("C5"), 0).otherwise(1)) + + # print_df(df) + + print("Joining {} to relations".format(indicator_name)) + + # NOTE: we use inner join because we want to keep only the results that have an impact score + # also note that all impact scores have the same set of results + relations = relations.join(df, 'resultId', 'inner')\ + .repartition(num_partitions, 'resultId') + +# uncomment to print non-null values count for each indicator +# for indicator_name, fd, column_name in impact_indicators: +# print("Counting non null values for {}".format(indicator_name)) +# print(relations.filter(F.col(indicator_name).isNotNull()).count()) + +# sum the impact indicator values for each project +relations.groupBy('projectId')\ + .agg(\ + F.sum('influence').alias('numOfInfluentialResults'),\ + F.sum('popularity').alias('numOfPopularResults'),\ + F.sum('impulse').alias('totalImpulse'),\ + F.sum('citation_count').alias('totalCitationCount')\ + )\ + .write.mode("overwrite")\ + .json(output_dir, compression="gzip") \ No newline at end of file diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml similarity index 83% rename from dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index a957f6c10..815096665 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -15,6 +15,8 @@ ${resume eq "map-ids"} ${resume eq "map-scores"} ${resume eq "start"} + ${resume eq "projects-impact"} + @@ -32,7 +34,6 @@ - yarn-cluster @@ -88,9 +89,8 @@ ${nameNode} - - - yarn-cluster + + yarn-cluster cluster @@ -129,7 +129,6 @@ ${jobTracker} ${nameNode} - yarn-cluster @@ -179,9 +178,8 @@ ${nameNode} - - - yarn-cluster + + yarn-cluster cluster @@ -233,7 +231,7 @@ - + yarn-cluster @@ -334,12 +332,12 @@ ${nameNode} - - /usr/bin/bash - - get_ranking_files.sh - - /${workflowDataDir} + + /usr/bin/bash + + get_ranking_files.sh + + /${workflowDataDir} ${wfAppPath}/get_ranking_files.sh#get_ranking_files.sh @@ -372,8 +370,8 @@ ${nameNode} - - yarn-cluster + + yarn-cluster cluster @@ -420,8 +418,8 @@ ${nameNode} - - yarn-cluster + + yarn-cluster cluster @@ -474,7 +472,6 @@ - yarn-cluster @@ -518,7 +515,6 @@ ${nameNode} - yarn-cluster cluster @@ -558,21 +554,19 @@ - + - - - - - + + + + + + + - - + yarn cluster @@ -590,13 +584,90 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} --inputPath${bipScorePath} - --outputPath${actionSetOutputPath} - - + --outputPath${actionSetOutputPath}/results/ + --targetEntityresult + + - - + + + + + + ${jobTracker} + + ${nameNode} + + yarn-cluster + cluster + + + Project Impact Indicators + + projects_impact.py + + --executor-memory 18G --executor-cores 4 --driver-memory 10G + --master yarn + --deploy-mode cluster + --conf spark.sql.shuffle.partitions=7680 + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + + + + + ${openaireDataInput}/relations + + + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['pr_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['attrank_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['cc_file']} + ${nameNode}/${workflowDataDir}/${wf:actionData('get-file-names')['impulse_file']} + + + 7680 + + ${projectImpactIndicatorsOutput} + + + ${wfAppPath}/projects_impact.py#projects_impact.py + + + + + + + + + + + + + yarn + cluster + Produces the atomic action with the bip finder scores for projects + eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob + dhp-aggregation-${projectVersion}.jar + + --executor-memory=${sparkExecutorMemory} + --executor-cores=${sparkExecutorCores} + --driver-memory=${sparkDriverMemory} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} + + --inputPath${projectImpactIndicatorsOutput} + --outputPath${actionSetOutputPath}/projects/ + --targetEntityproject + + + + @@ -641,7 +712,14 @@ - ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - + ActionSet creation for results failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + + + + ActionSet creation for projects failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + diff --git a/dhp-workflows/pom.xml b/dhp-workflows/pom.xml index 541d59007..d054ba39b 100644 --- a/dhp-workflows/pom.xml +++ b/dhp-workflows/pom.xml @@ -38,6 +38,7 @@ dhp-usage-raw-data-update dhp-broker-events dhp-doiboost + dhp-impact-indicators