Add actionsets for project impact indicators

This commit is contained in:
Serafeim Chatzopoulos 2023-04-26 20:23:46 +03:00
parent 23f58a86f1
commit ee04cf92bf
9 changed files with 218 additions and 53 deletions

View File

@ -9,6 +9,7 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@ -24,7 +25,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel;
import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
@ -56,18 +57,17 @@ public class SparkAtomicActionScoreJob implements Serializable {
parser.parseArgument(args); parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged); log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
final String inputPath = parser.get("inputPath"); final String inputPath = parser.get("inputPath");
log.info("inputPath {}: ", inputPath); log.info("inputPath: {}", inputPath);
final String outputPath = parser.get("outputPath"); final String outputPath = parser.get("outputPath");
log.info("outputPath {}: ", outputPath); log.info("outputPath: {}", outputPath);
final String targetEntity = parser.get("targetEntity");
log.info("targetEntity: {}", targetEntity);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
@ -76,17 +76,48 @@ public class SparkAtomicActionScoreJob implements Serializable {
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath); removeOutputDir(spark, outputPath);
// follow different procedures for different target entities
switch (targetEntity) {
case "result":
prepareResults(spark, inputPath, outputPath); prepareResults(spark, inputPath, outputPath);
}); break;
case "project":
prepareProjects(spark, inputPath, outputPath);
break;
default:
throw new RuntimeException("Unknown target entity: " + targetEntity);
}
}
);
}
private static <I extends Project> void prepareProjects(SparkSession spark, String inputPath, String outputPath) {
// read input bip project scores
Dataset<BipProjectModel> projectScores = readPath(spark, inputPath, BipProjectModel.class);
projectScores.map( (MapFunction<BipProjectModel, Project>) bipProjectScores -> {
Project project = new Project();
project.setId(bipProjectScores.getProjectId());
project.setMeasures(bipProjectScores.toMeasures());
return project;
}, Encoders.bean(Project.class))
.toJavaRDD()
.map(p -> new AtomicAction(Project.class, p))
.mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
} }
private static <I extends Result> void prepareResults(SparkSession spark, String bipScorePath, String outputPath) { private static <I extends Result> void prepareResults(SparkSession spark, String bipScorePath, String outputPath) {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<BipDeserialize> bipDeserializeJavaRDD = sc JavaRDD<BipResultModel> bipDeserializeJavaRDD = sc
.textFile(bipScorePath) .textFile(bipScorePath)
.map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class));
Dataset<BipScore> bipScores = spark Dataset<BipScore> bipScores = spark
.createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> {
@ -159,12 +190,4 @@ public class SparkAtomicActionScoreJob implements Serializable {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
} }
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
} }

View File

@ -0,0 +1,69 @@
package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers;
import com.opencsv.bean.CsvBindByPosition;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import eu.dnetlib.dhp.schema.oaf.Measure;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static eu.dnetlib.dhp.actionmanager.Constants.*;
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
public class BipProjectModel {
String projectId;
String numOfInfluentialResults;
String numOfPopularResults;
String totalImpulse;
String totalCitationCount;
// each project bip measure has exactly one value, hence one key-value pair
private Measure createMeasure(String measureId, String measureValue) {
KeyValue kv = new KeyValue();
kv.setKey("score");
kv.setValue(measureValue);
kv.setDataInfo(
OafMapperUtils.dataInfo(
false,
UPDATE_DATA_INFO_TYPE,
true,
false,
OafMapperUtils.qualifier(
UPDATE_MEASURE_BIP_CLASS_ID,
UPDATE_CLASS_NAME,
ModelConstants.DNET_PROVENANCE_ACTIONS,
ModelConstants.DNET_PROVENANCE_ACTIONS),
"")
);
Measure measure = new Measure();
measure.setId(measureId);
measure.setUnit(Collections.singletonList(kv));
return measure;
}
public List<Measure> toMeasures() {
return Arrays.asList(
createMeasure("numOfInfluentialResults", numOfInfluentialResults),
createMeasure("numOfPopularResults", numOfPopularResults),
createMeasure("totalImpulse", totalImpulse),
createMeasure("totalCitationCount", totalCitationCount)
);
}
}

View File

@ -1,5 +1,7 @@
package eu.dnetlib.dhp.actionmanager.bipmodel; package eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers;
import eu.dnetlib.dhp.actionmanager.bipmodel.Score;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
@ -11,9 +13,9 @@ import java.util.List;
* Only needed for deserialization purposes * Only needed for deserialization purposes
*/ */
public class BipDeserialize extends HashMap<String, List<Score>> implements Serializable { public class BipResultModel extends HashMap<String, List<Score>> implements Serializable {
public BipDeserialize() { public BipResultModel() {
super(); super();
} }

View File

@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.bipmodel.BipDeserialize; import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel;
import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore; import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
@ -82,9 +82,9 @@ public class PrepareBipFinder implements Serializable {
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaRDD<BipDeserialize> bipDeserializeJavaRDD = sc JavaRDD<BipResultModel> bipDeserializeJavaRDD = sc
.textFile(inputPath) .textFile(inputPath)
.map(item -> OBJECT_MAPPER.readValue(item, BipDeserialize.class)); .map(item -> OBJECT_MAPPER.readValue(item, BipResultModel.class));
spark spark
.createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> { .createDataset(bipDeserializeJavaRDD.flatMap(entry -> entry.keySet().stream().map(key -> {

View File

@ -16,5 +16,11 @@
"paramLongName": "outputPath", "paramLongName": "outputPath",
"paramDescription": "the path of the new ActionSet", "paramDescription": "the path of the new ActionSet",
"paramRequired": true "paramRequired": true
},
{
"paramName": "te",
"paramLongName": "targetEntity",
"paramDescription": "the type of target entity to be enriched; currently supported one of { 'result', 'project' }",
"paramRequired": true
} }
] ]

View File

@ -6,8 +6,9 @@ import static org.junit.jupiter.api.Assertions.*;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Project;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -27,7 +28,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Result; import eu.dnetlib.dhp.schema.oaf.Result;
public class SparkAtomicActionScoreJobTest { public class SparkAtomicActionScoreJobTest {
@ -37,8 +37,11 @@ public class SparkAtomicActionScoreJobTest {
private static SparkSession spark; private static SparkSession spark;
private static Path workingDir; private static Path workingDir;
private static final Logger log = LoggerFactory
.getLogger(SparkAtomicActionScoreJobTest.class); private final static String RESULT = "result";
private final static String PROJECT = "project";
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJobTest.class);
@BeforeAll @BeforeAll
public static void beforeAll() throws IOException { public static void beforeAll() throws IOException {
@ -69,29 +72,31 @@ public class SparkAtomicActionScoreJobTest {
spark.stop(); spark.stop();
} }
@Test private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception {
void testMatch() throws Exception { SparkAtomicActionScoreJob.main(
String bipScoresPath = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores_oid.json")
.getPath();
SparkAtomicActionScoreJob
.main(
new String[] { new String[] {
"-isSparkSessionManaged", "-isSparkSessionManaged", Boolean.FALSE.toString(),
Boolean.FALSE.toString(), "-inputPath", inputPath,
"-inputPath", "-outputPath", outputPath,
"-targetEntity", targetEntity,
}
);
}
@Test
void testResultScores() throws Exception {
final String targetEntity = RESULT;
String inputResultScores = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json")
.getPath();
String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet";
bipScoresPath, // execute the job to generate the action sets for result scores
runJob(inputResultScores, outputPath, targetEntity);
"-outputPath",
workingDir.toString() + "/actionSet"
});
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Result> tmp = sc JavaRDD<Result> tmp = sc
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class) .sequenceFile(outputPath, Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Result) aa.getPayload())); .map(aa -> ((Result) aa.getPayload()));
@ -140,4 +145,61 @@ public class SparkAtomicActionScoreJobTest {
} }
@Test
void testProjectScores() throws Exception {
String targetEntity = PROJECT;
String inputResultScores = getClass()
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json")
.getPath();
String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet";
// execute the job to generate the action sets for project scores
runJob(inputResultScores, outputPath, PROJECT);
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Project> projects = sc
.sequenceFile(outputPath, Text.class, Text.class)
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
.map(aa -> ((Project) aa.getPayload()));
// test the number of projects
assertEquals(4, projects.count());
String testProjectId = "40|nih_________::c02a8233e9b60f05bb418f0c9b714833";
// count that the project with id testProjectId is present
assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count());
projects.filter(row -> row.getId().equals(testProjectId))
.flatMap(r -> r.getMeasures().iterator())
.foreach(m -> {
log.info(m.getId() + " " + m.getUnit());
// ensure that only one score is present for each bip impact measure
assertEquals(1, m.getUnit().size());
KeyValue kv = m.getUnit().get(0);
// ensure that the correct key is provided, i.e. score
assertEquals("score", kv.getKey());
switch(m.getId()) {
case "numOfInfluentialResults":
assertEquals("0", kv.getValue());
break;
case "numOfPopularResults":
assertEquals("1", kv.getValue());
break;
case "totalImpulse":
assertEquals("25", kv.getValue());
break;
case "totalCitationCount":
assertEquals("43", kv.getValue());
break;
default:
fail("Unknown measure id in the context of projects");
}
});
}
} }

View File

@ -0,0 +1,4 @@
{"projectId":"40|nsf_________::d93e50d22374a1cf59f6a232413ea027","numOfInfluentialResults":0,"numOfPopularResults":10,"totalImpulse":181,"totalCitationCount":235}
{"projectId":"40|nih_________::1c93debc7085e440f245fbe70b2e8b21","numOfInfluentialResults":14,"numOfPopularResults":17,"totalImpulse":1558,"totalCitationCount":4226}
{"projectId":"40|nih_________::c02a8233e9b60f05bb418f0c9b714833","numOfInfluentialResults":0,"numOfPopularResults":1,"totalImpulse":25,"totalCitationCount":43}
{"projectId":"40|corda_______::d91dcf3a87dd7f72248fab0b8a4ba273","numOfInfluentialResults":2,"numOfPopularResults":3,"totalImpulse":78,"totalCitationCount":178}

View File

@ -96,14 +96,13 @@ for indicator_name, fd, column_name in impact_indicators:
# print("Counting non null values for {}".format(indicator_name)) # print("Counting non null values for {}".format(indicator_name))
# print(relations.filter(F.col(indicator_name).isNotNull()).count()) # print(relations.filter(F.col(indicator_name).isNotNull()).count())
sum the impact indicator values for each project # sum the impact indicator values for each project
relations.groupBy('projectId')\ relations.groupBy('projectId')\
.agg(\ .agg(\
F.sum('influence').alias('influence'),\ F.sum('influence').alias('numOfInfluentialResults'),\
F.sum('popularity').alias('popularity'),\ F.sum('popularity').alias('numOfPopularResults'),\
F.sum('impulse').alias('impulse'),\ F.sum('impulse').alias('totalImpulse'),\
F.sum('citation_count').alias('citation_count')\ F.sum('citation_count').alias('totalCitationCount')\
)\ )\
.write.mode("overwrite")\ .write.mode("overwrite")\
.option("delimiter", "\t")\ .json(output_dir, compression="gzip")
.csv(output_dir, compression="gzip")