From c2abde4d9ff493c8015f3c02d29131785849ce1f Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 28 Sep 2020 12:16:34 +0200 Subject: [PATCH] changed the implementation of Atomic Actions creation by exploiting the topic information get from the cordis excel file --- .../project/SparkAtomicActionJob.java | 79 ++++++++++++------- 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index 222e623fd..68b17c31f 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -23,8 +23,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme; -import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject; +import eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme; +import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject; +import eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; @@ -68,6 +69,9 @@ public class SparkAtomicActionJob { final String programmePath = parser.get("programmePath"); log.info("programmePath {}: ", programmePath); + final String topicPath = parser.get("topicPath"); + log.info("topic path {}: ", topicPath); + SparkConf conf = new SparkConf(); runWithSparkSession( @@ -79,6 +83,7 @@ public class SparkAtomicActionJob { spark, projectPath, programmePath, + topicPath, outputPath); }); } @@ -89,42 +94,56 @@ public class SparkAtomicActionJob { private static void getAtomicActions(SparkSession spark, String projectPatH, String programmePath, + String topicPath, String outputPath) { Dataset project = readPath(spark, projectPatH, CSVProject.class); Dataset programme = readPath(spark, programmePath, CSVProgramme.class); + Dataset topic = readPath(spark, topicPath, EXCELTopic.class); project .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") - .map((MapFunction, Project>) c -> { - CSVProject csvProject = c._1(); - Optional ocsvProgramme = Optional.ofNullable(c._2()); - if (ocsvProgramme.isPresent()) { - Project p = new Project(); - p - .setId( - createOpenaireId( - ModelSupport.entityIdPrefix.get("project"), - "corda__h2020", csvProject.getId())); - p.setH2020topiccode(csvProject.getTopics()); - p.setH2020topicdescription(csvProject.getTopicdescription()); - H2020Programme pm = new H2020Programme(); - H2020Classification h2020classification = new H2020Classification(); - pm.setCode(csvProject.getProgramme()); - CSVProgramme csvProgramme = ocsvProgramme.get(); - if (StringUtils.isNotEmpty(csvProgramme.getShortTitle())) { - pm.setDescription(csvProgramme.getShortTitle()); - } else { - pm.setDescription(csvProgramme.getTitle()); - } - h2020classification.setClassification(ocsvProgramme.get().getClassification()); - setLevels(h2020classification, ocsvProgramme.get().getClassification()); - h2020classification.setH2020Programme(pm); - p.setH2020classification(Arrays.asList(h2020classification)); - return p; - } + .joinWith(topic, project.col("topics").equalTo(topic.col("code")), "left") + .map((MapFunction, EXCELTopic>, Project>) c -> { + Tuple2 projectprogramme = c._1(); + CSVProject csvProject = projectprogramme._1(); + Optional ocsvProgramme = Optional.ofNullable(projectprogramme._2()); - return null; + String topicdescription = Optional + .ofNullable(c._2()) + .map(t -> t.getTitle()) + .orElse(null); + + Project p = Optional + .ofNullable(projectprogramme._2()) + .map(csvProgramme -> { + Project pp = new Project(); + pp + .setId( + createOpenaireId( + ModelSupport.entityIdPrefix.get("project"), + "corda__h2020", csvProject.getId())); + pp.setH2020topiccode(csvProject.getTopics()); + H2020Programme pm = new H2020Programme(); + H2020Classification h2020classification = new H2020Classification(); + pm.setCode(csvProject.getProgramme()); + if (StringUtils.isNotEmpty(csvProgramme.getShortTitle())) { + pm.setDescription(csvProgramme.getShortTitle()); + } else { + pm.setDescription(csvProgramme.getTitle()); + } + h2020classification.setClassification(ocsvProgramme.get().getClassification()); + setLevels(h2020classification, ocsvProgramme.get().getClassification()); + h2020classification.setH2020Programme(pm); + pp.setH2020classification(Arrays.asList(h2020classification)); + if (topicdescription != null) { + pp.setH2020topicdescription(topicdescription); + } + return pp; + }) + .orElse(null); + + return p; }, Encoders.bean(Project.class)) .filter(Objects::nonNull) .groupByKey(