From 97c1ba89187b5c57b6cac3263dd8c9d855c586d8 Mon Sep 17 00:00:00 2001 From: Serafeim Chatzopoulos Date: Fri, 11 Aug 2023 15:56:53 +0300 Subject: [PATCH] Merge actionsets of results and projects --- .../bipfinder/SparkAtomicActionScoreJob.java | 62 ++++--- .../bipfinder/input_actionset_parameter.json | 18 +-- .../SparkAtomicActionScoreJobTest.java | 152 +++++++++--------- .../impact_indicators/oozie_app/workflow.xml | 76 +++------ 4 files changed, 130 insertions(+), 178 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java index fb11e829f..040c89782 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJob.java @@ -6,13 +6,14 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.io.Serializable; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.MapFunction; @@ -41,8 +42,6 @@ import scala.Tuple2; */ public class SparkAtomicActionScoreJob implements Serializable { - private static final String RESULT = "result"; - private static final String PROJECT = "project"; private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -61,15 +60,15 @@ public class SparkAtomicActionScoreJob implements Serializable { Boolean isSparkSessionManaged = isSparkSessionManaged(parser); log.info("isSparkSessionManaged: {}", isSparkSessionManaged); - final String inputPath = parser.get("inputPath"); - log.info("inputPath: {}", inputPath); + final String resultsInputPath = parser.get("resultsInputPath"); + log.info("resultsInputPath: {}", resultsInputPath); + + final String projectsInputPath = parser.get("projectsInputPath"); + log.info("projectsInputPath: {}", projectsInputPath); final String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); - final String targetEntity = parser.get("targetEntity"); - log.info("targetEntity: {}", targetEntity); - SparkConf conf = new SparkConf(); runWithSparkSession( @@ -78,26 +77,23 @@ public class SparkAtomicActionScoreJob implements Serializable { spark -> { removeOutputDir(spark, outputPath); - // follow different procedures for different target entities - switch (targetEntity) { - case RESULT: - prepareResults(spark, inputPath, outputPath); - break; - case PROJECT: - prepareProjects(spark, inputPath, outputPath); - break; - default: - throw new RuntimeException("Unknown target entity: " + targetEntity); - } + JavaPairRDD resultsRDD = prepareResults(spark, resultsInputPath, outputPath); + JavaPairRDD projectsRDD = prepareProjects(spark, projectsInputPath, outputPath); + + resultsRDD + .union(projectsRDD) + .saveAsHadoopFile( + outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); }); } - private static void prepareProjects(SparkSession spark, String inputPath, String outputPath) { + private static JavaPairRDD prepareProjects(SparkSession spark, String inputPath, + String outputPath) { // read input bip project scores Dataset projectScores = readPath(spark, inputPath, BipProjectModel.class); - projectScores.map((MapFunction) bipProjectScores -> { + return projectScores.map((MapFunction) bipProjectScores -> { Project project = new Project(); project.setId(bipProjectScores.getProjectId()); project.setMeasures(bipProjectScores.toMeasures()); @@ -107,12 +103,12 @@ public class SparkAtomicActionScoreJob implements Serializable { .map(p -> new AtomicAction(Project.class, p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } - private static void prepareResults(SparkSession spark, String bipScorePath, String outputPath) { + private static JavaPairRDD prepareResults(SparkSession spark, String bipScorePath, + String outputPath) { final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -128,24 +124,20 @@ public class SparkAtomicActionScoreJob implements Serializable { return bs; }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class)); - bipScores + return bipScores.map((MapFunction) bs -> { + Result ret = new Result(); - .map((MapFunction) bs -> { - Result ret = new Result(); + ret.setId(bs.getId()); - ret.setId(bs.getId()); + ret.setMeasures(getMeasure(bs)); - ret.setMeasures(getMeasure(bs)); - - return ret; - }, Encoders.bean(Result.class)) + return ret; + }, Encoders.bean(Result.class)) .toJavaRDD() .map(p -> new AtomicAction(Result.class, p)) .mapToPair( aa -> new Tuple2<>(new Text(aa.getClazz().getCanonicalName()), - new Text(OBJECT_MAPPER.writeValueAsString(aa)))) - .saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class); - + new Text(OBJECT_MAPPER.writeValueAsString(aa)))); } private static List getMeasure(BipScore value) { diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json index d6b93c5af..c472eb5e6 100644 --- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json +++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json @@ -6,9 +6,15 @@ "paramRequired": false }, { - "paramName": "ip", - "paramLongName": "inputPath", - "paramDescription": "the URL from where to get the programme file", + "paramName": "rip", + "paramLongName": "resultsInputPath", + "paramDescription": "the URL from where to get the input file for results", + "paramRequired": true + }, + { + "paramName": "pip", + "paramLongName": "projectsInputPath", + "paramDescription": "the URL from where to get the input file for projects", "paramRequired": true }, { @@ -16,11 +22,5 @@ "paramLongName": "outputPath", "paramDescription": "the path of the new ActionSet", "paramRequired": true - }, - { - "paramName": "te", - "paramLongName": "targetEntity", - "paramDescription": "the type of target entity to be enriched; currently supported one of { 'result', 'project' }", - "paramRequired": true } ] \ No newline at end of file diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java index 7752fbc27..542354836 100644 --- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java +++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/actionmanager/bipfinder/SparkAtomicActionScoreJobTest.java @@ -7,6 +7,8 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import javax.xml.crypto.Data; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; @@ -27,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.oaf.KeyValue; +import eu.dnetlib.dhp.schema.oaf.OafEntity; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.schema.oaf.Result; @@ -38,9 +41,6 @@ public class SparkAtomicActionScoreJobTest { private static Path workingDir; - private final static String RESULT = "result"; - private final static String PROJECT = "project"; - private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJobTest.class); @BeforeAll @@ -72,50 +72,64 @@ public class SparkAtomicActionScoreJobTest { spark.stop(); } - private void runJob(String inputPath, String outputPath, String targetEntity) throws Exception { + private void runJob(String resultsInputPath, String projectsInputPath, String outputPath) throws Exception { SparkAtomicActionScoreJob .main( new String[] { "-isSparkSessionManaged", Boolean.FALSE.toString(), - "-inputPath", inputPath, + "-resultsInputPath", resultsInputPath, + "-projectsInputPath", projectsInputPath, "-outputPath", outputPath, - "-targetEntity", targetEntity, }); } @Test - void testResultScores() throws Exception { - final String targetEntity = RESULT; - String inputResultScores = getClass() + void testScores() throws Exception { + + String resultsInputPath = getClass() .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/result_bip_scores.json") .getPath(); - String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; + + String projectsInputPath = getClass() + .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") + .getPath(); + + String outputPath = workingDir.toString() + "/actionSet"; // execute the job to generate the action sets for result scores - runJob(inputResultScores, outputPath, targetEntity); + runJob(resultsInputPath, projectsInputPath, outputPath); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - JavaRDD tmp = sc + JavaRDD tmp = sc .sequenceFile(outputPath, Text.class, Text.class) .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Result) aa.getPayload())); + .map(aa -> ((OafEntity) aa.getPayload())); - assertEquals(4, tmp.count()); + assertEquals(8, tmp.count()); - Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Result.class)); + Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(OafEntity.class)); verificationDataset.createOrReplaceTempView("result"); - Dataset execVerification = spark + Dataset testDataset = spark .sql( "Select p.id oaid, mes.id, mUnit.value from result p " + "lateral view explode(measures) m as mes " + "lateral view explode(mes.unit) u as mUnit "); - Assertions.assertEquals(12, execVerification.count()); +// execVerification.show(); + + Assertions.assertEquals(28, testDataset.count()); + + assertResultImpactScores(testDataset); + assertProjectImpactScores(testDataset); + + } + + void assertResultImpactScores(Dataset testDataset) { Assertions .assertEquals( - "6.63451994567e-09", execVerification + "6.63451994567e-09", testDataset .filter( "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + "and id = 'influence'") @@ -125,7 +139,7 @@ public class SparkAtomicActionScoreJobTest { .getString(0)); Assertions .assertEquals( - "0.348694533145", execVerification + "0.348694533145", testDataset .filter( "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + "and id = 'popularity_alt'") @@ -135,7 +149,7 @@ public class SparkAtomicActionScoreJobTest { .getString(0)); Assertions .assertEquals( - "2.16094680115e-09", execVerification + "2.16094680115e-09", testDataset .filter( "oaid='50|arXiv_dedup_::4a2d5fd8d71daec016c176ec71d957b1' " + "and id = 'popularity'") @@ -143,65 +157,49 @@ public class SparkAtomicActionScoreJobTest { .collectAsList() .get(0) .getString(0)); - } - @Test - void testProjectScores() throws Exception { - String targetEntity = PROJECT; - String inputResultScores = getClass() - .getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/project_bip_scores.json") - .getPath(); - String outputPath = workingDir.toString() + "/" + targetEntity + "/actionSet"; + void assertProjectImpactScores(Dataset testDataset) throws Exception { - // execute the job to generate the action sets for project scores - runJob(inputResultScores, outputPath, PROJECT); - - final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); - - JavaRDD projects = sc - .sequenceFile(outputPath, Text.class, Text.class) - .map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class)) - .map(aa -> ((Project) aa.getPayload())); - - // test the number of projects - assertEquals(4, projects.count()); - - String testProjectId = "40|nih_________::c02a8233e9b60f05bb418f0c9b714833"; - - // count that the project with id testProjectId is present - assertEquals(1, projects.filter(row -> row.getId().equals(testProjectId)).count()); - - projects - .filter(row -> row.getId().equals(testProjectId)) - .flatMap(r -> r.getMeasures().iterator()) - .foreach(m -> { - log.info(m.getId() + " " + m.getUnit()); - - // ensure that only one score is present for each bip impact measure - assertEquals(1, m.getUnit().size()); - - KeyValue kv = m.getUnit().get(0); - - // ensure that the correct key is provided, i.e. score - assertEquals("score", kv.getKey()); - - switch (m.getId()) { - case "numOfInfluentialResults": - assertEquals("0", kv.getValue()); - break; - case "numOfPopularResults": - assertEquals("1", kv.getValue()); - break; - case "totalImpulse": - assertEquals("25", kv.getValue()); - break; - case "totalCitationCount": - assertEquals("43", kv.getValue()); - break; - default: - fail("Unknown measure id in the context of projects"); - } - }); + Assertions + .assertEquals( + "0", testDataset + .filter( + "oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " + + "and id = 'numOfInfluentialResults'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); + Assertions + .assertEquals( + "1", testDataset + .filter( + "oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " + + "and id = 'numOfPopularResults'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); + Assertions + .assertEquals( + "25", testDataset + .filter( + "oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " + + "and id = 'totalImpulse'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); + Assertions + .assertEquals( + "43", testDataset + .filter( + "oaid='40|nih_________::c02a8233e9b60f05bb418f0c9b714833' " + + "and id = 'totalCitationCount'") + .select("value") + .collectAsList() + .get(0) + .getString(0)); } } diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml index 349e054d8..c225fa3e1 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml @@ -31,7 +31,7 @@ ${wf:conf('resume') eq "projects-impact"} - ${wf:conf('resume') eq "projects-impact-actionsets"} + ${wf:conf('resume') eq "create-actionset"} @@ -455,53 +455,11 @@ ${wfAppPath}/map_scores_to_dois.py#map_scores_to_dois.py - + - - - - - - - - - - - - - - - - - yarn-cluster - cluster - Produces the atomic action with the bip finder scores for publications - eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob - dhp-aggregation-${projectVersion}.jar - - - --executor-memory=${sparkNormalExecutorMemory} - --executor-cores=${sparkExecutorCores} - --driver-memory=${sparkNormalDriverMemory} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - - --inputPath${bipScorePath} - --outputPath${actionSetOutputPath}/results/ - --targetEntityresult - - - - - - - @@ -538,17 +496,26 @@ ${wfAppPath}/projects_impact.py#projects_impact.py - + - - + + + + + + + + + + + yarn-cluster cluster - Produces the atomic action with the bip finder scores for projects + Produces the atomic action with the bip finder scores eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob dhp-aggregation-${projectVersion}.jar @@ -563,14 +530,13 @@ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir} - --inputPath${projectImpactIndicatorsOutput} - --outputPath${actionSetOutputPath}/projects/ - --targetEntityproject + --resultsInputPath${bipScorePath} + --projectsInputPath${projectImpactIndicatorsOutput} + --outputPath${actionSetOutputPath} - - + @@ -630,10 +596,6 @@ Calculating project impact indicators failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - - ActionSet creation for projects failed, error message[${wf:errorMessage(wf:lastErrorNode())}] - -