1
0
Fork 0

changed the implementation of Atomic Actions creation by exploiting the topic information get from the cordis excel file

This commit is contained in:
Miriam Baglioni 2020-09-28 12:16:34 +02:00
parent d930b8d3fc
commit c2abde4d9f
1 changed files with 49 additions and 30 deletions

View File

@ -23,8 +23,9 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProgramme; import eu.dnetlib.dhp.actionmanager.project.utils.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.csvutils.CSVProject; import eu.dnetlib.dhp.actionmanager.project.utils.CSVProject;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELTopic;
import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.action.AtomicAction;
@ -68,6 +69,9 @@ public class SparkAtomicActionJob {
final String programmePath = parser.get("programmePath"); final String programmePath = parser.get("programmePath");
log.info("programmePath {}: ", programmePath); log.info("programmePath {}: ", programmePath);
final String topicPath = parser.get("topicPath");
log.info("topic path {}: ", topicPath);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
@ -79,6 +83,7 @@ public class SparkAtomicActionJob {
spark, spark,
projectPath, projectPath,
programmePath, programmePath,
topicPath,
outputPath); outputPath);
}); });
} }
@ -89,29 +94,39 @@ public class SparkAtomicActionJob {
private static void getAtomicActions(SparkSession spark, String projectPatH, private static void getAtomicActions(SparkSession spark, String projectPatH,
String programmePath, String programmePath,
String topicPath,
String outputPath) { String outputPath) {
Dataset<CSVProject> project = readPath(spark, projectPatH, CSVProject.class); Dataset<CSVProject> project = readPath(spark, projectPatH, CSVProject.class);
Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class); Dataset<CSVProgramme> programme = readPath(spark, programmePath, CSVProgramme.class);
Dataset<EXCELTopic> topic = readPath(spark, topicPath, EXCELTopic.class);
project project
.joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left")
.map((MapFunction<Tuple2<CSVProject, CSVProgramme>, Project>) c -> { .joinWith(topic, project.col("topics").equalTo(topic.col("code")), "left")
CSVProject csvProject = c._1(); .map((MapFunction<Tuple2<Tuple2<CSVProject, CSVProgramme>, EXCELTopic>, Project>) c -> {
Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(c._2()); Tuple2<CSVProject, CSVProgramme> projectprogramme = c._1();
if (ocsvProgramme.isPresent()) { CSVProject csvProject = projectprogramme._1();
Project p = new Project(); Optional<CSVProgramme> ocsvProgramme = Optional.ofNullable(projectprogramme._2());
p
String topicdescription = Optional
.ofNullable(c._2())
.map(t -> t.getTitle())
.orElse(null);
Project p = Optional
.ofNullable(projectprogramme._2())
.map(csvProgramme -> {
Project pp = new Project();
pp
.setId( .setId(
createOpenaireId( createOpenaireId(
ModelSupport.entityIdPrefix.get("project"), ModelSupport.entityIdPrefix.get("project"),
"corda__h2020", csvProject.getId())); "corda__h2020", csvProject.getId()));
p.setH2020topiccode(csvProject.getTopics()); pp.setH2020topiccode(csvProject.getTopics());
p.setH2020topicdescription(csvProject.getTopicdescription());
H2020Programme pm = new H2020Programme(); H2020Programme pm = new H2020Programme();
H2020Classification h2020classification = new H2020Classification(); H2020Classification h2020classification = new H2020Classification();
pm.setCode(csvProject.getProgramme()); pm.setCode(csvProject.getProgramme());
CSVProgramme csvProgramme = ocsvProgramme.get();
if (StringUtils.isNotEmpty(csvProgramme.getShortTitle())) { if (StringUtils.isNotEmpty(csvProgramme.getShortTitle())) {
pm.setDescription(csvProgramme.getShortTitle()); pm.setDescription(csvProgramme.getShortTitle());
} else { } else {
@ -120,11 +135,15 @@ public class SparkAtomicActionJob {
h2020classification.setClassification(ocsvProgramme.get().getClassification()); h2020classification.setClassification(ocsvProgramme.get().getClassification());
setLevels(h2020classification, ocsvProgramme.get().getClassification()); setLevels(h2020classification, ocsvProgramme.get().getClassification());
h2020classification.setH2020Programme(pm); h2020classification.setH2020Programme(pm);
p.setH2020classification(Arrays.asList(h2020classification)); pp.setH2020classification(Arrays.asList(h2020classification));
return p; if (topicdescription != null) {
pp.setH2020topicdescription(topicdescription);
} }
return pp;
})
.orElse(null);
return null; return p;
}, Encoders.bean(Project.class)) }, Encoders.bean(Project.class))
.filter(Objects::nonNull) .filter(Objects::nonNull)
.groupByKey( .groupByKey(