Merge actionsets of results and projects
This commit is contained in:
parent
3a0f09774a
commit
97c1ba8918
|
@ -6,13 +6,14 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.compress.GzipCodec;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
|
@ -41,8 +42,6 @@ import scala.Tuple2;
|
|||
*/
|
||||
public class SparkAtomicActionScoreJob implements Serializable {
|
||||
|
||||
private static final String RESULT = "result";
|
||||
private static final String PROJECT = "project";
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class);
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
|
@ -61,15 +60,15 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
Boolean isSparkSessionManaged = isSparkSessionManaged(parser);
|
||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
log.info("inputPath: {}", inputPath);
|
||||
final String resultsInputPath = parser.get("resultsInputPath");
|
||||
log.info("resultsInputPath: {}", resultsInputPath);
|
||||
|
||||
final String projectsInputPath = parser.get("projectsInputPath");
|
||||
log.info("projectsInputPath: {}", projectsInputPath);
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
log.info("outputPath: {}", outputPath);
|
||||
|
||||
final String targetEntity = parser.get("targetEntity");
|
||||
log.info("targetEntity: {}", targetEntity);
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
|
@ -78,26 +77,23 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
spark -> {
|
||||
removeOutputDir(spark, outputPath);
|
||||
|
||||
// follow different procedures for different target entities
|
||||
switch (targetEntity) {
|
||||
case RESULT:
|
||||
prepareResults(spark, inputPath, outputPath);
|
||||
break;
|
||||
case PROJECT:
|
||||
prepareProjects(spark, inputPath, outputPath);
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unknown target entity: " + targetEntity);
|
||||
}
|
||||
JavaPairRDD<Text, Text> resultsRDD = prepareResults(spark, resultsInputPath, outputPath);
|
||||
JavaPairRDD<Text, Text> projectsRDD = prepareProjects(spark, projectsInputPath, outputPath);
|
||||
|
||||
resultsRDD
|
||||
.union(projectsRDD)
|
||||
.saveAsHadoopFile(
|
||||
outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
|
||||
});
|
||||
}
|
||||
|
||||
private static <I extends Project> void prepareProjects(SparkSession spark, String inputPath, String outputPath) {
|
||||
private static <I extends Project> JavaPairRDD<Text, Text> prepareProjects(SparkSession spark, String inputPath,
|
||||
String outputPath) {
|
||||
|
||||
// read input bip project scores
|
||||
Dataset<BipProjectModel> projectScores = readPath(spark, inputPath, BipProjectModel.class);
|
||||
|
||||
projectScores.map((MapFunction<BipProjectModel, Project>) bipProjectScores -> {
|
||||
return projectScores.map((MapFunction<BipProjectModel, Project>) bipProjectScores -> {
|
||||
Project project = new Project();
|
||||
project.setId(bipProjectScores.getProjectId());
|
||||
project.setMeasures(bipProjectScores.toMeasures());
|
||||
|
@ -107,12 +103,12 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
.map(p -> new AtomicAction(Project.class, p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
||||
|
||||
}
|
||||
|
||||
private static <I extends Result> void prepareResults(SparkSession spark, String bipScorePath, String outputPath) {
|
||||
private static <I extends Result> JavaPairRDD<Text, Text> prepareResults(SparkSession spark, String bipScorePath,
|
||||
String outputPath) {
|
||||
|
||||
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
||||
|
||||
|
@ -128,9 +124,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
return bs;
|
||||
}).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
|
||||
|
||||
bipScores
|
||||
|
||||
.map((MapFunction<BipScore, Result>) bs -> {
|
||||
return bipScores.map((MapFunction<BipScore, Result>) bs -> {
|
||||
Result ret = new Result();
|
||||
|
||||
ret.setId(bs.getId());
|
||||
|
@ -143,9 +137,7 @@ public class SparkAtomicActionScoreJob implements Serializable {
|
|||
.map(p -> new AtomicAction(Result.class, p))
|
||||
.mapToPair(
|
||||
aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()),
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))))
|
||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
||||
|
||||
new Text(OBJECT_MAPPER.writeValueAsString(aa))));
|
||||
}
|
||||
|
||||
private static List<Measure> getMeasure(BipScore value) {
|
||||
|
|
|
@ -6,9 +6,15 @@
|
|||
"paramRequired": false
|
||||
},
|
||||
{
|
||||
"paramName": "ip",
|
||||
"paramLongName": "inputPath",
|
||||
"paramDescription": "the URL from where to get the programme file",
|
||||
"paramName": "rip",
|
||||
"paramLongName": "resultsInputPath",
|
||||
"paramDescription": "the URL from where to get the input file for results",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "pip",
|
||||
"paramLongName": "projectsInputPath",
|
||||
"paramDescription": "the URL from where to get the input file for projects",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
|
@ -16,11 +22,5 @@
|
|||
"paramLongName": "outputPath",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "te",
|
||||
"paramLongName": "targetEntity",
|
||||
"paramDescription": "the type of target entity to be enriched; currently supported one of { 'result', 'project' }",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -7,6 +7,8 @@ import java.io.IOException;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
||||
import javax.xml.crypto.Data;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -27,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
|
||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Project;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
|
@ -38,9 +41,6 @@ public class SparkAtomicActionScoreJobTest {
|
|||
|
||||
private static Path workingDir;
|
||||
|
||||
private final static String RESULT = "result";
|
||||
private final static String PROJECT = "project";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJobTest.class);
|
||||
|
||||
@BeforeAll
|
||||
|
@ -72,50 +72,64 @@ public class SparkAtomicActionScoreJobTest {
|
|||
spark.stop();
|
||||
}
|
||||
|
||||
private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception {
|
||||
private void runJob(String resultsInputPath, String projectsInputPath, String outputPath) throws Exception {
|
||||
SparkAtomicActionScoreJob
|
||||
.main(
|
||||
new String[] {
|
||||
"-isSparkSessionManaged", Boolean.FALSE.toString(),
|
||||
"-inputPath", inputPath,
|
||||
"-resultsInputPath", resultsInputPath,
|
||||
"-projectsInputPath", projectsInputPath,
|
||||
"-outputPath", outputPath,
|
||||
"-targetEntity", targetEntity,
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testResultScores() throws Exception {
|
||||
final String targetEntity = RESULT;
|
||||
String inputResultScores = getClass()
|
||||
void testScores() throws Exception {
|
||||
|
||||
String resultsInputPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json")
|
||||
.getPath();
|
||||
String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet";
|
||||
|
||||
String projectsInputPath = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json")
|
||||
.getPath();
|
||||
|
||||
String outputPath = workingDir.toString() + "/actionSet";
|
||||
|
||||
// execute the job to generate the action sets for result scores
|
||||
runJob(inputResultScores, outputPath, targetEntity);
|
||||
runJob(resultsInputPath, projectsInputPath, outputPath);
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Result> tmp = sc
|
||||
JavaRDD<OafEntity> tmp = sc
|
||||
.sequenceFile(outputPath, Text.class, Text.class)
|
||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Result) aa.getPayload()));
|
||||
.map(aa -> ((OafEntity) aa.getPayload()));
|
||||
|
||||
assertEquals(4, tmp.count());
|
||||
assertEquals(8, tmp.count());
|
||||
|
||||
Dataset<Result> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Result.class));
|
||||
Dataset<OafEntity> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(OafEntity.class));
|
||||
verificationDataset.createOrReplaceTempView("result");
|
||||
|
||||
Dataset<Row> execVerification = spark
|
||||
Dataset<Row> testDataset = spark
|
||||
.sql(
|
||||
"Select p.id oaid, mes.id, mUnit.value from result p " +
|
||||
"lateral view explode(measures) m as mes " +
|
||||
"lateral view explode(mes.unit) u as mUnit ");
|
||||
|
||||
Assertions.assertEquals(12, execVerification.count());
|
||||
// execVerification.show();
|
||||
|
||||
Assertions.assertEquals(28, testDataset.count());
|
||||
|
||||
assertResultImpactScores(testDataset);
|
||||
assertProjectImpactScores(testDataset);
|
||||
|
||||
}
|
||||
|
||||
void assertResultImpactScores(Dataset<Row> testDataset) {
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"6.63451994567e-09", execVerification
|
||||
"6.63451994567e-09", testDataset
|
||||
.filter(
|
||||
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
|
||||
"and id = 'influence'")
|
||||
|
@ -125,7 +139,7 @@ public class SparkAtomicActionScoreJobTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"0.348694533145", execVerification
|
||||
"0.348694533145", testDataset
|
||||
.filter(
|
||||
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
|
||||
"and id = 'popularity_alt'")
|
||||
|
@ -135,7 +149,7 @@ public class SparkAtomicActionScoreJobTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"2.16094680115e-09", execVerification
|
||||
"2.16094680115e-09", testDataset
|
||||
.filter(
|
||||
"oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " +
|
||||
"and id = 'popularity'")
|
||||
|
@ -143,65 +157,49 @@ public class SparkAtomicActionScoreJobTest {
|
|||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void testProjectScores() throws Exception {
|
||||
String targetEntity = PROJECT;
|
||||
String inputResultScores = getClass()
|
||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json")
|
||||
.getPath();
|
||||
String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet";
|
||||
void assertProjectImpactScores(Dataset<Row> testDataset) throws Exception {
|
||||
|
||||
// execute the job to generate the action sets for project scores
|
||||
runJob(inputResultScores, outputPath, PROJECT);
|
||||
|
||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<Project> projects = sc
|
||||
.sequenceFile(outputPath, Text.class, Text.class)
|
||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
||||
.map(aa -> ((Project) aa.getPayload()));
|
||||
|
||||
// test the number of projects
|
||||
assertEquals(4, projects.count());
|
||||
|
||||
String testProjectId = "40|nih_________::c02a8233e9b60f05bb418f0c9b714833";
|
||||
|
||||
// count that the project with id testProjectId is present
|
||||
assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count());
|
||||
|
||||
projects
|
||||
.filter(row -> row.getId().equals(testProjectId))
|
||||
.flatMap(r -> r.getMeasures().iterator())
|
||||
.foreach(m -> {
|
||||
log.info(m.getId() + " " + m.getUnit());
|
||||
|
||||
// ensure that only one score is present for each bip impact measure
|
||||
assertEquals(1, m.getUnit().size());
|
||||
|
||||
KeyValue kv = m.getUnit().get(0);
|
||||
|
||||
// ensure that the correct key is provided, i.e. score
|
||||
assertEquals("score", kv.getKey());
|
||||
|
||||
switch (m.getId()) {
|
||||
case "numOfInfluentialResults":
|
||||
assertEquals("0", kv.getValue());
|
||||
break;
|
||||
case "numOfPopularResults":
|
||||
assertEquals("1", kv.getValue());
|
||||
break;
|
||||
case "totalImpulse":
|
||||
assertEquals("25", kv.getValue());
|
||||
break;
|
||||
case "totalCitationCount":
|
||||
assertEquals("43", kv.getValue());
|
||||
break;
|
||||
default:
|
||||
fail("Unknown measure id in the context of projects");
|
||||
}
|
||||
});
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"0", testDataset
|
||||
.filter(
|
||||
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
|
||||
"and id = 'numOfInfluentialResults'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"1", testDataset
|
||||
.filter(
|
||||
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
|
||||
"and id = 'numOfPopularResults'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"25", testDataset
|
||||
.filter(
|
||||
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
|
||||
"and id = 'totalImpulse'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"43", testDataset
|
||||
.filter(
|
||||
"oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " +
|
||||
"and id = 'totalCitationCount'")
|
||||
.select("value")
|
||||
.collectAsList()
|
||||
.get(0)
|
||||
.getString(0));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
|
||||
<!-- Aggregation of impact scores on the project level -->
|
||||
<case to="project-impact-indicators">${wf:conf('resume') eq "projects-impact"}</case>
|
||||
<case to="create-actionset-for-projects">${wf:conf('resume') eq "projects-impact-actionsets"}</case>
|
||||
<case to="create-actionset">${wf:conf('resume') eq "create-actionset"}</case>
|
||||
|
||||
<default to="create-openaire-ranking-graph" />
|
||||
</switch>
|
||||
|
@ -455,50 +455,8 @@
|
|||
<file>${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py</file>
|
||||
</spark>
|
||||
|
||||
<ok to="delete-output-path-for-actionset" />
|
||||
<error to="map-scores-fail" />
|
||||
|
||||
</action>
|
||||
|
||||
<!-- Re-create folder for result and project actionsets -->
|
||||
<action name="delete-output-path-for-actionset">
|
||||
<fs>
|
||||
<delete path="${actionSetOutputPath}/results/"/>
|
||||
<delete path="${actionSetOutputPath}/projects/"/>
|
||||
|
||||
<mkdir path="${actionSetOutputPath}/results/"/>
|
||||
<mkdir path="${actionSetOutputPath}/projects/"/>
|
||||
</fs>
|
||||
<ok to="create-actionset-for-results"/>
|
||||
<error to="actionset-delete-fail"/>
|
||||
</action>
|
||||
|
||||
<action name="create-actionset-for-results">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the atomic action with the bip finder scores for publications</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
|
||||
<spark-opts>
|
||||
--executor-memory=${sparkNormalExecutorMemory}
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--driver-memory=${sparkNormalDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${bipScorePath}</arg>
|
||||
<arg>--outputPath</arg><arg>${actionSetOutputPath}/results/</arg>
|
||||
<arg>--targetEntity</arg><arg>result</arg>
|
||||
</spark>
|
||||
|
||||
<ok to="project-impact-indicators" />
|
||||
<error to="actionset-creation-fail"/>
|
||||
<error to="map-scores-fail" />
|
||||
|
||||
</action>
|
||||
|
||||
|
@ -538,17 +496,26 @@
|
|||
<file>${wfAppPath}/projects_impact.py#projects_impact.py</file>
|
||||
</spark>
|
||||
|
||||
<ok to="create-actionset-for-projects" />
|
||||
<ok to="delete-output-path-for-actionset" />
|
||||
<error to="project-impact-indicators-fail" />
|
||||
|
||||
</action>
|
||||
|
||||
<action name="create-actionset-for-projects">
|
||||
<!-- Re-create folder for actionsets -->
|
||||
<action name="delete-output-path-for-actionset">
|
||||
<fs>
|
||||
<delete path="${actionSetOutputPath}"/>
|
||||
<mkdir path="${actionSetOutputPath}"/>
|
||||
</fs>
|
||||
<ok to="create-actionset"/>
|
||||
<error to="actionset-delete-fail"/>
|
||||
</action>
|
||||
|
||||
<action name="create-actionset">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
|
||||
<master>yarn-cluster</master>
|
||||
<mode>cluster</mode>
|
||||
<name>Produces the atomic action with the bip finder scores for projects</name>
|
||||
<name>Produces the atomic action with the bip finder scores</name>
|
||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
|
||||
|
@ -563,14 +530,13 @@
|
|||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
||||
</spark-opts>
|
||||
|
||||
<arg>--inputPath</arg><arg>${projectImpactIndicatorsOutput}</arg>
|
||||
<arg>--outputPath</arg><arg>${actionSetOutputPath}/projects/</arg>
|
||||
<arg>--targetEntity</arg><arg>project</arg>
|
||||
<arg>--resultsInputPath</arg><arg>${bipScorePath}</arg>
|
||||
<arg>--projectsInputPath</arg><arg>${projectImpactIndicatorsOutput}</arg>
|
||||
<arg>--outputPath</arg><arg>${actionSetOutputPath}</arg>
|
||||
</spark>
|
||||
|
||||
<ok to="end"/>
|
||||
<error to="actionset-project-creation-fail"/>
|
||||
|
||||
<error to="actionset-creation-fail"/>
|
||||
</action>
|
||||
|
||||
<!-- Definitions of failure messages -->
|
||||
|
@ -630,10 +596,6 @@
|
|||
<message>Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<kill name="actionset-project-creation-fail">
|
||||
<message>ActionSet creation for projects failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||
</kill>
|
||||
|
||||
<!-- Define ending node -->
|
||||
<end name="end" />
|
||||
|
||||
|
|
Loading…
Reference in New Issue