From ee04cf92bf4030f9be3b4a34703198c3dd5ce424 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Wed, 26 Apr 2023 20:23:46 +0300 Subject: [PATCH] Add actionsets for project impact indicators --- .../bipfinder/SparkAtomicActionScoreJob.java | 63 +++++++---- .../score/deserializers/BipProjectModel.java | 69 ++++++++++++ .../deserializers/BipResultModel.java} | 8 +- .../PrepareBipFinder.java | 6 +- .../bipfinder/input_actionset_parameter.json | 6 ++ .../SparkAtomicActionScoreJobTest.java | 102 ++++++++++++++---- .../bipfinder/project_bip_scores.json | 4 + ...scores_oid.json => result_bip_scores.json} | 0 .../oozie_app/projects_impact.py | 13 ++- 9 files changed, 218 insertions(+), 53 deletions(-) create mode 100644 dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java rename dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/{BipDeserialize.java => score/deserializers/BipResultModel.java} (65%) create mode 100644 dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json rename dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/{bip_scores_oid.json => result_bip_scores.json} (100%) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index ddf5f4adf..13ce1440a 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; @@ -24,7 +25,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -56,18 +57,17 @@ public class SparkAtomicActionScoreJob implements Serializable { parser.parseArgument(args); - Boolean isSparkSessionManaged = Optional - .ofNullable(parser.get("isSparkSessionManaged")) - .map(Boolean::valueOf) - .orElse(Boolean.TRUE); - + Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); final String inputPath = parser.get("inputPath"); - log.info("inputPath {}: ", inputPath); + log.info("inputPath: {}", inputPath); final String outputPath = parser.get("outputPath"); - log.info("outputPath {}: ", outputPath); + log.info("outputPath: {}", outputPath); + + final String targetEntity = parser.get("targetEntity"); + log.info("targetEntity: {}", targetEntity); SparkConf conf = new SparkConf(); @@ -76,17 +76,48 @@ public class SparkAtomicActionScoreJob implements Serializable { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareResults(spark, inputPath, outputPath); - }); + + // follow different procedures for different target entities + switch (targetEntity) { + case "result": + prepareResults(spark, inputPath, outputPath); + break; + case "project": + prepareProjects(spark, inputPath, outputPath); + break; + default: + throw new RuntimeException("Unknown target entity: " + targetEntity); + } + } + ); + } + + private static void prepareProjects(SparkSession spark, String inputPath, String outputPath) { + + // read input bip project scores + Dataset projectScores = readPath(spark, inputPath, BipProjectModel.class); + + projectScores.map( (MapFunction) bipProjectScores -> { + Project project = new Project(); + project.setId(bipProjectScores.getProjectId()); + project.setMeasures(bipProjectScores.toMeasures()); + return project; + }, Encoders.bean(Project.class)) + .toJavaRDD() + .map(p -> new AtomicAction(Project.class, p)) + .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), + new Text(OBJECT_MAPPER.writeValueAsString(aa)))) + .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + } private static void prepareResults(SparkSession spark, String bipScorePath, String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD bipDeserializeJavaRDD = sc .textFile(bipScorePath) - .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class)); Dataset bipScores = spark .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { @@ -159,12 +190,4 @@ public class SparkAtomicActionScoreJob implements Serializable { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - public static Dataset readPath( - SparkSession spark, String inputPath, Class clazz) { - return spark - .read() - .textFile(inputPath) - .map((MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)); - } - } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java new file mode 100644 index 000000000..77c1567a8 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipProjectModel.java @@ -0,0 +1,69 @@ +package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; + +import com.opencsv.bean.CsvBindByPosition; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import eu.dnetlib.dhp.schema.oaf.Measure; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static eu.dnetlib.dhp.actionmanager.Constants.*; + +@NoArgsConstructor +@AllArgsConstructor +@Getter +@Setter +public class BipProjectModel { + String projectId; + + String numOfInfluentialResults; + + String numOfPopularResults; + + String totalImpulse; + + String totalCitationCount; + + // each project bip measure has exactly one value, hence one key-value pair + private Measure createMeasure(String measureId, String measureValue) { + + KeyValue kv = new KeyValue(); + kv.setKey("score"); + kv.setValue(measureValue); + kv.setDataInfo( + OafMapperUtils.dataInfo( + false, + UPDATE_DATA_INFO_TYPE, + true, + false, + OafMapperUtils.qualifier( + UPDATE_MEASURE_BIP_CLASS_ID, + UPDATE_CLASS_NAME, + ModelConstants.DNET_PROVENANCE_ACTIONS, + ModelConstants.DNET_PROVENANCE_ACTIONS), + "") + ); + + Measure measure = new Measure(); + measure.setId(measureId); + measure.setUnit(Collections.singletonList(kv)); + return measure; + } + public List toMeasures() { + return Arrays.asList( + createMeasure("numOfInfluentialResults", numOfInfluentialResults), + createMeasure("numOfPopularResults", numOfPopularResults), + createMeasure("totalImpulse", totalImpulse), + createMeasure("totalCitationCount", totalCitationCount) + ); + } + +} diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java similarity index 65% rename from dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java rename to dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java index a70bca618..06a173413 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/BipDeserialize.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipmodel/score/deserializers/BipResultModel.java @@ -1,5 +1,7 @@ -package eu.dnetlib.dhp.actionmanager.bipmodel; +package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers; + +import eu.dnetlib.dhp.actionmanager.bipmodel.Score; import java.io.Serializable; import java.util.ArrayList; @@ -11,9 +13,9 @@ import java.util.List; * Only needed for deserialization purposes */ -public class BipDeserialize extends HashMap> implements Serializable { +public class BipResultModel extends HashMap> implements Serializable { - public BipDeserialize() { + public BipResultModel() { super(); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java index 80573c71a..efcb96a85 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/createunresolvedentities/PrepareBipFinder.java @@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; +import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; @@ -82,9 +82,9 @@ public class PrepareBipFinder implements Serializable { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD bipDeserializeJavaRDD = sc + JavaRDD bipDeserializeJavaRDD = sc .textFile(inputPath) - .map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); + .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class)); spark .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json index 7663a454b..d6b93c5af 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json @@ -16,5 +16,11 @@ "paramLongName": "outputPath", "paramDescription": "the path of the new ActionSet", "paramRequired": true + }, + { + "paramName": "te", + "paramLongName": "targetEntity", + "paramDescription": "the type of target entity to be enriched; currently supported one of { 'result', 'project' }", + "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index be82b9fc3..aa5a19f11 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -6,8 +6,9 @@ import static org.junit.jupiter.api.Assertions.*; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; +import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.Project; import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -27,7 +28,6 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; -import eu.dnetlib.dhp.schema.oaf.Publication; import eu.dnetlib.dhp.schema.oaf.Result; public class SparkAtomicActionScoreJobTest { @@ -37,8 +37,11 @@ public class SparkAtomicActionScoreJobTest { private static SparkSession spark; private static Path workingDir; - private static final Logger log = LoggerFactory - .getLogger(SparkAtomicActionScoreJobTest.class); + + private final static String RESULT = "result"; + private final static String PROJECT = "project"; + + private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJobTest.class); @BeforeAll public static void beforeAll() throws IOException { @@ -69,29 +72,31 @@ public class SparkAtomicActionScoreJobTest { spark.stop(); } + private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception { + SparkAtomicActionScoreJob.main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputPath", inputPath, + "-outputPath", outputPath, + "-targetEntity", targetEntity, + } + ); + } @Test - void testMatch() throws Exception { - String bipScoresPath = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json") + void testResultScores() throws Exception { + final String targetEntity = RESULT; + String inputResultScores = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json") .getPath(); + String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; - SparkAtomicActionScoreJob - .main( - new String[] { - "-isSparkSessionManaged", - Boolean.FALSE.toString(), - "-inputPath", - - bipScoresPath, - - "-outputPath", - workingDir.toString() + "/actionSet" - }); + // execute the job to generate the action sets for result scores + runJob(inputResultScores, outputPath, targetEntity); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); JavaRDD tmp = sc - .sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) + .sequenceFile(outputPath, Text.class, Text.class) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(aa -> ((Result) aa.getPayload())); @@ -140,4 +145,61 @@ public class SparkAtomicActionScoreJobTest { } + @Test + void testProjectScores() throws Exception { + String targetEntity = PROJECT; + String inputResultScores = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") + .getPath(); + String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; + + // execute the job to generate the action sets for project scores + runJob(inputResultScores, outputPath, PROJECT); + + final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); + + JavaRDD projects = sc + .sequenceFile(outputPath, Text.class, Text.class) + .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) + .map(aa -> ((Project) aa.getPayload())); + + // test the number of projects + assertEquals(4, projects.count()); + + String testProjectId = "40|nih_________::c02a8233e9b60f05bb418f0c9b714833"; + + // count that the project with id testProjectId is present + assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count()); + + projects.filter(row -> row.getId().equals(testProjectId)) + .flatMap(r -> r.getMeasures().iterator()) + .foreach(m -> { + log.info(m.getId() + " " + m.getUnit()); + + // ensure that only one score is present for each bip impact measure + assertEquals(1, m.getUnit().size()); + + KeyValue kv = m.getUnit().get(0); + + // ensure that the correct key is provided, i.e. score + assertEquals("score", kv.getKey()); + + switch(m.getId()) { + case "numOfInfluentialResults": + assertEquals("0", kv.getValue()); + break; + case "numOfPopularResults": + assertEquals("1", kv.getValue()); + break; + case "totalImpulse": + assertEquals("25", kv.getValue()); + break; + case "totalCitationCount": + assertEquals("43", kv.getValue()); + break; + default: + fail("Unknown measure id in the context of projects"); + } + }); + } } diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json new file mode 100644 index 000000000..096268287 --- /dev/null +++ b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json @@ -0,0 +1,4 @@ +{"projectId":"40|nsf_________::d93e50d22374a1cf59f6a232413ea027","numOfInfluentialResults":0,"numOfPopularResults":10,"totalImpulse":181,"totalCitationCount":235} +{"projectId":"40|nih_________::1c93debc7085e440f245fbe70b2e8b21","numOfInfluentialResults":14,"numOfPopularResults":17,"totalImpulse":1558,"totalCitationCount":4226} +{"projectId":"40|nih_________::c02a8233e9b60f05bb418f0c9b714833","numOfInfluentialResults":0,"numOfPopularResults":1,"totalImpulse":25,"totalCitationCount":43} +{"projectId":"40|corda_______::d91dcf3a87dd7f72248fab0b8a4ba273","numOfInfluentialResults":2,"numOfPopularResults":3,"totalImpulse":78,"totalCitationCount":178} \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json b/dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json similarity index 100% rename from dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json rename to dhp-workflows/dhp-aggregation/src/test/resources/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py index f01c92a0d..d60f86e88 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/projects_impact.py @@ -96,14 +96,13 @@ for indicator_name, fd, column_name in impact_indicators: # print("Counting non null values for {}".format(indicator_name)) # print(relations.filter(F.col(indicator_name).isNotNull()).count()) -sum the impact indicator values for each project +# sum the impact indicator values for each project relations.groupBy('projectId')\ .agg(\ - F.sum('influence').alias('influence'),\ - F.sum('popularity').alias('popularity'),\ - F.sum('impulse').alias('impulse'),\ - F.sum('citation_count').alias('citation_count')\ + F.sum('influence').alias('numOfInfluentialResults'),\ + F.sum('popularity').alias('numOfPopularResults'),\ + F.sum('impulse').alias('totalImpulse'),\ + F.sum('citation_count').alias('totalCitationCount')\ )\ .write.mode("overwrite")\ - .option("delimiter", "\t")\ - .csv(output_dir, compression="gzip") \ No newline at end of file + .json(output_dir, compression="gzip") \ No newline at end of file