Merge pull request 'h2020classification' (#49) from miriam.baglioni/dnet-hadoop:h2020classification into master

LGTM
This commit is contained in:
Claudio Atzori 2020-10-30 17:10:05 +01:00
commit c5dda3a00c
10 changed files with 79 additions and 33 deletions

View File

@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -11,6 +12,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*; import org.apache.spark.sql.*;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -175,43 +177,54 @@ public class PrepareProgramme {
return csvProgramme; return csvProgramme;
}); });
prepareClassification(h2020Programmes); // prepareClassification(h2020Programmes);
h2020Programmes JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
.map(csvProgramme -> OBJECT_MAPPER.writeValueAsString(csvProgramme))
JavaRDD<CSVProgramme> rdd = jsc.parallelize(prepareClassification(h2020Programmes), 1);
rdd
.map(csvProgramme -> {
String tmp = OBJECT_MAPPER.writeValueAsString(csvProgramme);
return tmp;
})
.saveAsTextFile(outputPath); .saveAsTextFile(outputPath);
} }
private static void prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) { private static List<CSVProgramme> prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) {
Object[] codedescription = h2020Programmes Object[] codedescription = h2020Programmes
.map(value -> new Tuple2<>(value.getCode(), value.getTitle())) .map(
value -> new Tuple2<>(value.getCode(),
new Tuple2<String, String>(value.getTitle(), value.getShortTitle())))
.collect() .collect()
.toArray(); .toArray();
for (int i = 0; i < codedescription.length - 1; i++) { for (int i = 0; i < codedescription.length - 1; i++) {
for (int j = i + 1; j < codedescription.length; j++) { for (int j = i + 1; j < codedescription.length; j++) {
Tuple2<String, String> t2i = (Tuple2<String, String>) codedescription[i]; Tuple2<String, Tuple2<String, String>> t2i = (Tuple2<String, Tuple2<String, String>>) codedescription[i];
Tuple2<String, String> t2j = (Tuple2<String, String>) codedescription[j]; Tuple2<String, Tuple2<String, String>> t2j = (Tuple2<String, Tuple2<String, String>>) codedescription[j];
if (t2i._1().compareTo(t2j._1()) > 0) { if (t2i._1().compareTo(t2j._1()) > 0) {
Tuple2<String, String> temp = t2i; Tuple2<String, Tuple2<String, String>> temp = t2i;
codedescription[i] = t2j; codedescription[i] = t2j;
codedescription[j] = temp; codedescription[j] = temp;
} }
} }
} }
Map<String, String> map = new HashMap<>(); Map<String, Tuple2<String, String>> map = new HashMap<>();
for (int j = 0; j < codedescription.length; j++) { for (int j = 0; j < codedescription.length; j++) {
Tuple2<String, String> entry = (Tuple2<String, String>) codedescription[j]; Tuple2<String, Tuple2<String, String>> entry = (Tuple2<String, Tuple2<String, String>>) codedescription[j];
String ent = entry._1(); String ent = entry._1();
if (ent.contains("Euratom-")) { if (ent.contains("Euratom-")) {
ent = ent.replace("-Euratom-", ".Euratom."); ent = ent.replace("-Euratom-", ".Euratom.");
} }
String[] tmp = ent.split("\\."); String[] tmp = ent.split("\\.");
if (tmp.length <= 2) { if (tmp.length <= 2) {
map.put(entry._1(), entry._2()); if (StringUtils.isEmpty(entry._2()._2())) {
map.put(entry._1(), new Tuple2<String, String>(entry._2()._1(), entry._2()._1()));
} else {
map.put(entry._1(), entry._2());
}
} else { } else {
if (ent.endsWith(".")) { if (ent.endsWith(".")) {
ent = ent.substring(0, ent.length() - 1); ent = ent.substring(0, ent.length() - 1);
@ -224,14 +237,14 @@ public class PrepareProgramme {
key = key.substring(0, key.length() - 1); key = key.substring(0, key.length() - 1);
} }
} }
String current = entry._2(); String current = entry._2()._1();
if (!ent.contains("Euratom")) { if (!ent.contains("Euratom")) {
String parent; String parent;
String tmp_key = tmp[0] + "."; String tmp_key = tmp[0] + ".";
for (int i = 1; i < tmp.length - 1; i++) { for (int i = 1; i < tmp.length - 1; i++) {
tmp_key += tmp[i] + "."; tmp_key += tmp[i] + ".";
parent = map.get(tmp_key).toLowerCase().trim(); parent = map.get(tmp_key)._1().toLowerCase().trim();
if (parent.contains("|")) { if (parent.contains("|")) {
parent = parent.substring(parent.lastIndexOf("|") + 1).trim(); parent = parent.substring(parent.lastIndexOf("|") + 1).trim();
} }
@ -246,18 +259,29 @@ public class PrepareProgramme {
} }
} }
map.put(ent + ".", map.get(key) + " | " + current); String shortTitle = entry._2()._2();
if (StringUtils.isEmpty(shortTitle)) {
shortTitle = current;
}
Tuple2<String, String> newEntry = new Tuple2<>(map.get(key)._1() + " | " + current,
map.get(key)._2() + " | " + shortTitle);
map.put(ent + ".", newEntry);
} }
} }
h2020Programmes.foreach(csvProgramme -> { return h2020Programmes.map(csvProgramme -> {
if (!csvProgramme.getCode().endsWith(".") && !csvProgramme.getCode().contains("Euratom")
&& !csvProgramme.getCode().equals("H2020-EC")) String code = csvProgramme.getCode();
csvProgramme.setClassification(map.get(csvProgramme.getCode() + ".")); if (!code.endsWith(".") && !code.contains("Euratom")
else && !code.equals("H2020-EC"))
csvProgramme.setClassification(map.get(csvProgramme.getCode())); code += ".";
});
csvProgramme.setClassification(map.get(code)._1());
csvProgramme.setClassification_short(map.get(code)._2());
return csvProgramme;
}).collect();
} }
public static <R> Dataset<R> readPath( public static <R> Dataset<R> readPath(

View File

@ -9,7 +9,6 @@ import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
@ -138,7 +137,8 @@ public class SparkAtomicActionJob {
pm.setCode(csvProject.getProgramme()); pm.setCode(csvProject.getProgramme());
h2020classification.setClassification(ocsvProgramme.get().getClassification()); h2020classification.setClassification(ocsvProgramme.get().getClassification());
h2020classification.setH2020Programme(pm); h2020classification.setH2020Programme(pm);
setLevelsAndProgramme(h2020classification, ocsvProgramme.get().getClassification()); setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short());
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
pp.setH2020classification(Arrays.asList(h2020classification)); pp.setH2020classification(Arrays.asList(h2020classification));
return pp; return pp;
@ -177,8 +177,8 @@ public class SparkAtomicActionJob {
} }
private static void setLevelsAndProgramme(H2020Classification h2020Classification, String classification) { private static void setLevelsandProgramme(H2020Classification h2020Classification, String classification_short) {
String[] tmp = classification.split(" \\| "); String[] tmp = classification_short.split(" \\| ");
h2020Classification.setLevel1(tmp[0]); h2020Classification.setLevel1(tmp[0]);
if (tmp.length > 1) { if (tmp.length > 1) {
h2020Classification.setLevel2(tmp[1]); h2020Classification.setLevel2(tmp[1]);
@ -189,6 +189,12 @@ public class SparkAtomicActionJob {
h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]); h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
} }
// private static void setProgramme(H2020Classification h2020Classification, String classification) {
// String[] tmp = classification.split(" \\| ");
//
// h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
// }
public static <R> Dataset<R> readPath( public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) { SparkSession spark, String inputPath, Class<R> clazz) {
return spark return spark

View File

@ -22,6 +22,15 @@ public class CSVProgramme implements Serializable {
private String shortTitle; private String shortTitle;
private String language; private String language;
private String classification; private String classification;
private String classification_short;
public String getClassification_short() {
return classification_short;
}
public void setClassification_short(String classification_short) {
this.classification_short = classification_short;
}
public String getClassification() { public String getClassification() {
return classification; return classification;

View File

@ -9,12 +9,14 @@ import java.util.List;
import org.apache.poi.openxml4j.exceptions.InvalidFormatException; import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException; import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException;
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector; import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser; import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
@Disabled
public class EXCELParserTest { public class EXCELParserTest {
private static Path workingDir; private static Path workingDir;

View File

@ -92,6 +92,8 @@ public class PrepareH2020ProgrammeTest {
Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count()); Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count());
// tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme)));
Assertions Assertions
.assertEquals( .assertEquals(
"Societal challenges | Smart, Green And Integrated Transport | CLEANSKY2 | IADP Fast Rotorcraft", "Societal challenges | Smart, Green And Integrated Transport | CLEANSKY2 | IADP Fast Rotorcraft",

View File

@ -78,7 +78,7 @@ public class SparkUpdateProjectTest {
"-programmePath", "-programmePath",
getClass() getClass()
.getResource( .getResource(
"/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz") "/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz")
.getPath(), .getPath(),
"-projectPath", "-projectPath",
getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/prepared_projects.json").getPath(), getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/prepared_projects.json").getPath(),
@ -124,7 +124,7 @@ public class SparkUpdateProjectTest {
.getString(0)); .getString(0));
Assertions Assertions
.assertEquals( .assertEquals(
"Societal challenges", "Societal Challenges",
execverification execverification
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'") .filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
.select("classification.level1") .select("classification.level1")
@ -133,7 +133,7 @@ public class SparkUpdateProjectTest {
.getString(0)); .getString(0));
Assertions Assertions
.assertEquals( .assertEquals(
"Smart, Green And Integrated Transport", "Transport",
execverification execverification
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'") .filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
.select("classification.level2") .select("classification.level2")
@ -188,7 +188,7 @@ public class SparkUpdateProjectTest {
.getString(0)); .getString(0));
Assertions Assertions
.assertEquals( .assertEquals(
"Nurturing excellence by means of cross-border and cross-sector mobility", "MSCA Mobility",
execverification execverification
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
.select("classification.h2020Programme.description") .select("classification.h2020Programme.description")
@ -197,7 +197,7 @@ public class SparkUpdateProjectTest {
.getString(0)); .getString(0));
Assertions Assertions
.assertEquals( .assertEquals(
"Excellent science", "Excellent Science",
execverification execverification
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
.select("classification.level1") .select("classification.level1")
@ -206,7 +206,7 @@ public class SparkUpdateProjectTest {
.getString(0)); .getString(0));
Assertions Assertions
.assertEquals( .assertEquals(
"Marie Skłodowska-Curie Actions", "Marie-Sklodowska-Curie Actions",
execverification execverification
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
.select("classification.level2") .select("classification.level2")
@ -215,7 +215,7 @@ public class SparkUpdateProjectTest {
.getString(0)); .getString(0));
Assertions Assertions
.assertEquals( .assertEquals(
"Nurturing excellence by means of cross-border and cross-sector mobility", "MSCA Mobility",
execverification execverification
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'") .filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
.select("classification.level3") .select("classification.level3")

View File

@ -6,8 +6,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContextBuilder;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@Disabled
public class HttpConnectorTest { public class HttpConnectorTest {
private static final Log log = LogFactory.getLog(HttpConnectorTest.class); private static final Log log = LogFactory.getLog(HttpConnectorTest.class);