From 2cba3cb484cfd823b1bc99b52e227b336c5acb49 Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Wed, 23 Sep 2020 17:31:15 +0200 Subject: [PATCH] modification to the classes building the actionset to consider the h2020classification --- .../project/PrepareProgramme.java | 192 +++++++++++++++--- .../project/SparkAtomicActionJob.java | 48 +++-- .../project/csvutils/CSVProgramme.java | 9 + 3 files changed, 203 insertions(+), 46 deletions(-) diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java index c6dab13a06..324b34f4a9 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/PrepareProgramme.java @@ -4,11 +4,14 @@ package eu.dnetlib.dhp.actionmanager.project; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import org.slf4j.Logger; @@ -66,49 +69,182 @@ public class PrepareProgramme { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } +// private static void exec(SparkSession spark, String programmePath, String outputPath) { +// Dataset programme = readPath(spark, programmePath, CSVProgramme.class); +// +// programme +// .toJavaRDD() +// .filter(p -> !p.getCode().contains("FP7")) +// .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme)) +// .reduceByKey((a, b) -> { +// if (StringUtils.isEmpty(a.getShortTitle())) { +// if (StringUtils.isEmpty(b.getShortTitle())) { +// if (StringUtils.isEmpty(a.getTitle())) { +// if (StringUtils.isNotEmpty(b.getTitle())) { +// a.setShortTitle(b.getTitle()); +// a.setLanguage(b.getLanguage()); +// } +// } else {// notIsEmpty a.getTitle +// if (StringUtils.isEmpty(b.getTitle())) { +// a.setShortTitle(a.getTitle()); +// } else { +// if (b.getLanguage().equalsIgnoreCase("en")) { +// a.setShortTitle(b.getTitle()); +// a.setLanguage(b.getLanguage()); +// } else { +// a.setShortTitle(a.getTitle()); +// } +// } +// } +// } else {// not isEmpty b.getShortTitle +// a.setShortTitle(b.getShortTitle()); +// // a.setLanguage(b.getLanguage()); +// } +// } +// return a; +// +// }) +// .map(p -> { +// CSVProgramme csvProgramme = p._2(); +// if (StringUtils.isEmpty(csvProgramme.getShortTitle())) { +// csvProgramme.setShortTitle(csvProgramme.getTitle()); +// } +// return OBJECT_MAPPER.writeValueAsString(csvProgramme); +// }) +// .saveAsTextFile(outputPath); +// +// } + private static void exec(SparkSession spark, String programmePath, String outputPath) { Dataset programme = readPath(spark, programmePath, CSVProgramme.class); - programme + JavaRDD h2020Programmes = programme .toJavaRDD() .filter(p -> !p.getCode().contains("FP7")) .mapToPair(csvProgramme -> new Tuple2<>(csvProgramme.getCode(), csvProgramme)) .reduceByKey((a, b) -> { - if (StringUtils.isEmpty(a.getShortTitle())) { - if (StringUtils.isEmpty(b.getShortTitle())) { - if (StringUtils.isEmpty(a.getTitle())) { - if (StringUtils.isNotEmpty(b.getTitle())) { - a.setShortTitle(b.getTitle()); - a.setLanguage(b.getLanguage()); - } - } else {// notIsEmpty a.getTitle - if (StringUtils.isEmpty(b.getTitle())) { - a.setShortTitle(a.getTitle()); - } else { - if (b.getLanguage().equalsIgnoreCase("en")) { - a.setShortTitle(b.getTitle()); - a.setLanguage(b.getLanguage()); - } else { - a.setShortTitle(a.getTitle()); - } - } - } - } else {// not isEmpty b.getShortTitle - a.setShortTitle(b.getShortTitle()); - // a.setLanguage(b.getLanguage()); + if (!a.getLanguage().equals("en")) { + if (b.getLanguage().equalsIgnoreCase("en")) { + a.setTitle(b.getTitle()); + a.setLanguage(b.getLanguage()); } } + if (StringUtils.isEmpty(a.getShortTitle())) { + if (!StringUtils.isEmpty(b.getShortTitle())) { + a.setShortTitle(b.getShortTitle()); + } + } + return a; }) .map(p -> { CSVProgramme csvProgramme = p._2(); - if (StringUtils.isEmpty(csvProgramme.getShortTitle())) { - csvProgramme.setShortTitle(csvProgramme.getTitle()); + String programmeTitle = csvProgramme.getTitle().trim(); + if (programmeTitle.length() > 8 && programmeTitle.substring(0, 8).equalsIgnoreCase("PRIORITY")) { + programmeTitle = programmeTitle.substring(9); + if (programmeTitle.charAt(0) == '\'') { + programmeTitle = programmeTitle.substring(1); + } + if (programmeTitle.charAt(programmeTitle.length() - 1) == '\'') { + programmeTitle = programmeTitle.substring(0, programmeTitle.length() - 1); + } + csvProgramme.setTitle(programmeTitle); } - return OBJECT_MAPPER.writeValueAsString(csvProgramme); - }) - .saveAsTextFile(outputPath); + return csvProgramme; + }); + + Object[] codedescription = h2020Programmes + .map(value -> new Tuple2<>(value.getCode(), value.getTitle())) + .collect() + .toArray(); + + for (int i = 0; i < codedescription.length - 1; i++) { + for (int j = i + 1; j < codedescription.length; j++) { + Tuple2 t2i = (Tuple2) codedescription[i]; + Tuple2 t2j = (Tuple2) codedescription[j]; + if (t2i._1().compareTo(t2j._1()) > 0) { + Tuple2 temp = t2i; + codedescription[i] = t2j; + codedescription[j] = temp; + } + } + } + + Map map = new HashMap<>(); + for (int j = 0; j < codedescription.length; j++) { + Tuple2 entry = (Tuple2) codedescription[j]; + String ent = entry._1(); + if (ent.contains("Euratom-")) { + ent = ent.replace("-Euratom-", ".Euratom."); + } + String[] tmp = ent.split("\\."); + if (tmp.length <= 2) { + map.put(entry._1(), entry._2()); + + } else { + if (ent.endsWith(".")) { + ent = ent.substring(0, ent.length() - 1); + } + String key = ent.substring(0, ent.lastIndexOf(".") + 1); + if (key.contains("Euratom")) { + key = key.replace(".Euratom.", "-Euratom-"); + ent = ent.replace(".Euratom.", "-Euratom-"); + if (key.endsWith("-")) { + key = key.substring(0, key.length() - 1); + } + } + String current = entry._2(); + if (!ent.contains("Euratom")) { + + String parent; + String tmp_key = tmp[0] + "."; + for (int i = 1; i < tmp.length - 1; i++) { + tmp_key += tmp[i] + "."; + parent = map.get(tmp_key).toLowerCase().trim(); + if (parent.contains("|")) { + parent = parent.substring(parent.lastIndexOf("|") + 1).trim(); + } + if (current.trim().length() > parent.length() + && current.toLowerCase().trim().substring(0, parent.length()).equals(parent)) { + current = current.substring(parent.length() + 1); + if (current.trim().charAt(0) == '-') { + current = current.trim().substring(1).trim(); + } + + } + } + + } + map.put(ent + ".", map.get(key) + " | " + current); +// String current = entry._2(); +// String parent; +// String tmp_key = tmp[0] + "."; +// for (int i = 1; i< tmp.length -1; i++){ +// tmp_key += tmp[i] + "."; +// parent = map.get(tmp_key).toLowerCase().trim(); +// if (current.trim().length() > parent.length() && current.toLowerCase().trim().substring(0, parent.length()).equals(parent)){ +// current = current.substring(parent.length()+1); +// if(current.trim().charAt(0) == '-'){ +// current = current.trim().substring(1).trim(); +// } +// +// } +// } +// +// map.put(ent + ".", map.get(key) + " $ " + current); + } + + } + + h2020Programmes.map(csvProgramme -> { + if (!csvProgramme.getCode().endsWith(".") && !csvProgramme.getCode().contains("Euratom") + && !csvProgramme.getCode().equals("H2020-EC")) + csvProgramme.setClassification(map.get(csvProgramme.getCode() + ".")); + else + csvProgramme.setClassification(map.get(csvProgramme.getCode())); + return OBJECT_MAPPER.writeValueAsString(csvProgramme); + }).saveAsTextFile(outputPath); } diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java index 1023e2d19d..550621cde4 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/SparkAtomicActionJob.java @@ -3,28 +3,20 @@ package eu.dnetlib.dhp.actionmanager.project; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Objects; import java.util.Optional; -import java.util.function.Consumer; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.SequenceFile; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapGroupsFunction; -import org.apache.spark.rdd.SequenceFileRDDFunctions; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,12 +29,11 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.schema.action.AtomicAction; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.Programme; +import eu.dnetlib.dhp.schema.oaf.H2020Classification; +import eu.dnetlib.dhp.schema.oaf.H2020Programme; import eu.dnetlib.dhp.schema.oaf.Project; import eu.dnetlib.dhp.utils.DHPUtils; -import scala.Function1; import scala.Tuple2; -import scala.runtime.BoxedUnit; public class SparkAtomicActionJob { private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class); @@ -105,20 +96,30 @@ public class SparkAtomicActionJob { project .joinWith(programme, project.col("programme").equalTo(programme.col("code")), "left") - .map(c -> { + .map((MapFunction, Project>) c -> { CSVProject csvProject = c._1(); - Optional csvProgramme = Optional.ofNullable(c._2()); - if (csvProgramme.isPresent()) { + Optional ocsvProgramme = Optional.ofNullable(c._2()); + if (ocsvProgramme.isPresent()) { Project p = new Project(); p .setId( createOpenaireId( ModelSupport.entityIdPrefix.get("project"), "corda__h2020", csvProject.getId())); - Programme pm = new Programme(); + p.setH2020topiccode(csvProject.getTopics()); + H2020Programme pm = new H2020Programme(); + H2020Classification h2020classification = new H2020Classification(); pm.setCode(csvProject.getProgramme()); - pm.setDescription(csvProgramme.get().getShortTitle()); - p.setProgramme(Arrays.asList(pm)); + CSVProgramme csvProgramme = ocsvProgramme.get(); + if (StringUtils.isNotEmpty(csvProgramme.getShortTitle())) { + pm.setDescription(csvProgramme.getShortTitle()); + } else { + pm.setDescription(csvProgramme.getTitle()); + } + h2020classification.setClassification(ocsvProgramme.get().getClassification()); + setLevels(h2020classification, ocsvProgramme.get().getClassification()); + h2020classification.setH2020Programme(pm); + p.setH2020classification(Arrays.asList(h2020classification)); return p; } @@ -144,6 +145,17 @@ public class SparkAtomicActionJob { } + private static void setLevels(H2020Classification h2020Classification, String classification) { + String[] tmp = classification.split(" | "); + h2020Classification.setLevel1(tmp[0]); + if (tmp.length > 1) { + h2020Classification.setLevel2(tmp[1]); + } + if (tmp.length > 2) { + h2020Classification.setLevel3(tmp[2]); + } + } + public static Dataset readPath( SparkSession spark, String inputPath, Class clazz) { return spark diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java index a9069e5104..a9c3156516 100644 --- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java +++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/actionmanager/project/csvutils/CSVProgramme.java @@ -9,6 +9,15 @@ public class CSVProgramme implements Serializable { private String title; private String shortTitle; private String language; + private String classification; + + public String getClassification() { + return classification; + } + + public void setClassification(String classification) { + this.classification = classification; + } public String getRcn() { return rcn;