forked from D-Net/dnet-hadoop
merge upstream
This commit is contained in:
commit
1124ac29fc
|
@ -39,15 +39,15 @@ public class ModelConstants {
|
|||
public static final String IS_SUPPLEMENT_TO = "isSupplementTo";
|
||||
public static final String IS_SUPPLEMENTED_BY = "isSupplementedBy";
|
||||
public static final String PART = "part";
|
||||
public static final String IS_PART_OF = "IsPartOf";
|
||||
public static final String HAS_PARTS = "HasParts";
|
||||
public static final String IS_PART_OF = "isPartOf";
|
||||
public static final String HAS_PARTS = "hasParts";
|
||||
public static final String RELATIONSHIP = "relationship";
|
||||
public static final String CITATION = "citation";
|
||||
public static final String CITES = "cites";
|
||||
public static final String IS_CITED_BY = "IsCitedBy";
|
||||
public static final String IS_CITED_BY = "isCitedBy";
|
||||
public static final String REVIEW = "review";
|
||||
public static final String REVIEWS = "reviews";
|
||||
public static final String IS_REVIEWED_BY = "IsReviewedBy";
|
||||
public static final String IS_REVIEWED_BY = "isReviewedBy";
|
||||
|
||||
public static final String RESULT_PROJECT = "resultProject";
|
||||
public static final String OUTCOME = "outcome";
|
||||
|
|
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project;
|
|||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
|
@ -11,6 +12,7 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -175,43 +177,54 @@ public class PrepareProgramme {
|
|||
return csvProgramme;
|
||||
});
|
||||
|
||||
prepareClassification(h2020Programmes);
|
||||
// prepareClassification(h2020Programmes);
|
||||
|
||||
h2020Programmes
|
||||
.map(csvProgramme -> OBJECT_MAPPER.writeValueAsString(csvProgramme))
|
||||
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
|
||||
|
||||
JavaRDD<CSVProgramme> rdd = jsc.parallelize(prepareClassification(h2020Programmes), 1);
|
||||
rdd
|
||||
.map(csvProgramme -> {
|
||||
String tmp = OBJECT_MAPPER.writeValueAsString(csvProgramme);
|
||||
return tmp;
|
||||
})
|
||||
.saveAsTextFile(outputPath);
|
||||
|
||||
}
|
||||
|
||||
private static void prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) {
|
||||
private static List<CSVProgramme> prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) {
|
||||
Object[] codedescription = h2020Programmes
|
||||
.map(value -> new Tuple2<>(value.getCode(), value.getTitle()))
|
||||
.map(
|
||||
value -> new Tuple2<>(value.getCode(),
|
||||
new Tuple2<String, String>(value.getTitle(), value.getShortTitle())))
|
||||
.collect()
|
||||
.toArray();
|
||||
|
||||
for (int i = 0; i < codedescription.length - 1; i++) {
|
||||
for (int j = i + 1; j < codedescription.length; j++) {
|
||||
Tuple2<String, String> t2i = (Tuple2<String, String>) codedescription[i];
|
||||
Tuple2<String, String> t2j = (Tuple2<String, String>) codedescription[j];
|
||||
Tuple2<String, Tuple2<String, String>> t2i = (Tuple2<String, Tuple2<String, String>>) codedescription[i];
|
||||
Tuple2<String, Tuple2<String, String>> t2j = (Tuple2<String, Tuple2<String, String>>) codedescription[j];
|
||||
if (t2i._1().compareTo(t2j._1()) > 0) {
|
||||
Tuple2<String, String> temp = t2i;
|
||||
Tuple2<String, Tuple2<String, String>> temp = t2i;
|
||||
codedescription[i] = t2j;
|
||||
codedescription[j] = temp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, String> map = new HashMap<>();
|
||||
Map<String, Tuple2<String, String>> map = new HashMap<>();
|
||||
for (int j = 0; j < codedescription.length; j++) {
|
||||
Tuple2<String, String> entry = (Tuple2<String, String>) codedescription[j];
|
||||
Tuple2<String, Tuple2<String, String>> entry = (Tuple2<String, Tuple2<String, String>>) codedescription[j];
|
||||
String ent = entry._1();
|
||||
if (ent.contains("Euratom-")) {
|
||||
ent = ent.replace("-Euratom-", ".Euratom.");
|
||||
}
|
||||
String[] tmp = ent.split("\\.");
|
||||
if (tmp.length <= 2) {
|
||||
map.put(entry._1(), entry._2());
|
||||
|
||||
if (StringUtils.isEmpty(entry._2()._2())) {
|
||||
map.put(entry._1(), new Tuple2<String, String>(entry._2()._1(), entry._2()._1()));
|
||||
} else {
|
||||
map.put(entry._1(), entry._2());
|
||||
}
|
||||
} else {
|
||||
if (ent.endsWith(".")) {
|
||||
ent = ent.substring(0, ent.length() - 1);
|
||||
|
@ -224,14 +237,14 @@ public class PrepareProgramme {
|
|||
key = key.substring(0, key.length() - 1);
|
||||
}
|
||||
}
|
||||
String current = entry._2();
|
||||
String current = entry._2()._1();
|
||||
if (!ent.contains("Euratom")) {
|
||||
|
||||
String parent;
|
||||
String tmp_key = tmp[0] + ".";
|
||||
for (int i = 1; i < tmp.length - 1; i++) {
|
||||
tmp_key += tmp[i] + ".";
|
||||
parent = map.get(tmp_key).toLowerCase().trim();
|
||||
parent = map.get(tmp_key)._1().toLowerCase().trim();
|
||||
if (parent.contains("|")) {
|
||||
parent = parent.substring(parent.lastIndexOf("|") + 1).trim();
|
||||
}
|
||||
|
@ -246,18 +259,29 @@ public class PrepareProgramme {
|
|||
}
|
||||
|
||||
}
|
||||
map.put(ent + ".", map.get(key) + " | " + current);
|
||||
String shortTitle = entry._2()._2();
|
||||
if (StringUtils.isEmpty(shortTitle)) {
|
||||
shortTitle = current;
|
||||
}
|
||||
Tuple2<String, String> newEntry = new Tuple2<>(map.get(key)._1() + " | " + current,
|
||||
map.get(key)._2() + " | " + shortTitle);
|
||||
map.put(ent + ".", newEntry);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
h2020Programmes.foreach(csvProgramme -> {
|
||||
if (!csvProgramme.getCode().endsWith(".") && !csvProgramme.getCode().contains("Euratom")
|
||||
&& !csvProgramme.getCode().equals("H2020-EC"))
|
||||
csvProgramme.setClassification(map.get(csvProgramme.getCode() + "."));
|
||||
else
|
||||
csvProgramme.setClassification(map.get(csvProgramme.getCode()));
|
||||
});
|
||||
return h2020Programmes.map(csvProgramme -> {
|
||||
|
||||
String code = csvProgramme.getCode();
|
||||
if (!code.endsWith(".") && !code.contains("Euratom")
|
||||
&& !code.equals("H2020-EC"))
|
||||
code += ".";
|
||||
|
||||
csvProgramme.setClassification(map.get(code)._1());
|
||||
csvProgramme.setClassification_short(map.get(code)._2());
|
||||
|
||||
return csvProgramme;
|
||||
}).collect();
|
||||
}
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
|
|
|
@ -9,7 +9,6 @@ import java.util.Objects;
|
|||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -138,7 +137,8 @@ public class SparkAtomicActionJob {
|
|||
pm.setCode(csvProject.getProgramme());
|
||||
h2020classification.setClassification(ocsvProgramme.get().getClassification());
|
||||
h2020classification.setH2020Programme(pm);
|
||||
setLevelsAndProgramme(h2020classification, ocsvProgramme.get().getClassification());
|
||||
setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short());
|
||||
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
|
||||
pp.setH2020classification(Arrays.asList(h2020classification));
|
||||
|
||||
return pp;
|
||||
|
@ -177,8 +177,8 @@ public class SparkAtomicActionJob {
|
|||
|
||||
}
|
||||
|
||||
private static void setLevelsAndProgramme(H2020Classification h2020Classification, String classification) {
|
||||
String[] tmp = classification.split(" \\| ");
|
||||
private static void setLevelsandProgramme(H2020Classification h2020Classification, String classification_short) {
|
||||
String[] tmp = classification_short.split(" \\| ");
|
||||
h2020Classification.setLevel1(tmp[0]);
|
||||
if (tmp.length > 1) {
|
||||
h2020Classification.setLevel2(tmp[1]);
|
||||
|
@ -189,6 +189,12 @@ public class SparkAtomicActionJob {
|
|||
h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
|
||||
}
|
||||
|
||||
// private static void setProgramme(H2020Classification h2020Classification, String classification) {
|
||||
// String[] tmp = classification.split(" \\| ");
|
||||
//
|
||||
// h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
|
||||
// }
|
||||
|
||||
public static <R> Dataset<R> readPath(
|
||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||
return spark
|
||||
|
|
|
@ -22,6 +22,15 @@ public class CSVProgramme implements Serializable {
|
|||
private String shortTitle;
|
||||
private String language;
|
||||
private String classification;
|
||||
private String classification_short;
|
||||
|
||||
public String getClassification_short() {
|
||||
return classification_short;
|
||||
}
|
||||
|
||||
public void setClassification_short(String classification_short) {
|
||||
this.classification_short = classification_short;
|
||||
}
|
||||
|
||||
public String getClassification() {
|
||||
return classification;
|
||||
|
|
|
@ -9,12 +9,14 @@ import java.util.List;
|
|||
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException;
|
||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
|
||||
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
|
||||
|
||||
@Disabled
|
||||
public class EXCELParserTest {
|
||||
|
||||
private static Path workingDir;
|
||||
|
|
|
@ -92,6 +92,8 @@ public class PrepareH2020ProgrammeTest {
|
|||
|
||||
Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count());
|
||||
|
||||
// tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme)));
|
||||
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Societal challenges | Smart, Green And Integrated Transport | CLEANSKY2 | IADP Fast Rotorcraft",
|
||||
|
|
|
@ -78,7 +78,7 @@ public class SparkUpdateProjectTest {
|
|||
"-programmePath",
|
||||
getClass()
|
||||
.getResource(
|
||||
"/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz")
|
||||
"/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz")
|
||||
.getPath(),
|
||||
"-projectPath",
|
||||
getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/prepared_projects.json").getPath(),
|
||||
|
@ -124,7 +124,7 @@ public class SparkUpdateProjectTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Societal challenges",
|
||||
"Societal Challenges",
|
||||
execverification
|
||||
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
|
||||
.select("classification.level1")
|
||||
|
@ -133,7 +133,7 @@ public class SparkUpdateProjectTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Smart, Green And Integrated Transport",
|
||||
"Transport",
|
||||
execverification
|
||||
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
|
||||
.select("classification.level2")
|
||||
|
@ -188,7 +188,7 @@ public class SparkUpdateProjectTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Nurturing excellence by means of cross-border and cross-sector mobility",
|
||||
"MSCA Mobility",
|
||||
execverification
|
||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||
.select("classification.h2020Programme.description")
|
||||
|
@ -197,7 +197,7 @@ public class SparkUpdateProjectTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Excellent science",
|
||||
"Excellent Science",
|
||||
execverification
|
||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||
.select("classification.level1")
|
||||
|
@ -206,7 +206,7 @@ public class SparkUpdateProjectTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Marie Skłodowska-Curie Actions",
|
||||
"Marie-Sklodowska-Curie Actions",
|
||||
execverification
|
||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||
.select("classification.level2")
|
||||
|
@ -215,7 +215,7 @@ public class SparkUpdateProjectTest {
|
|||
.getString(0));
|
||||
Assertions
|
||||
.assertEquals(
|
||||
"Nurturing excellence by means of cross-border and cross-sector mobility",
|
||||
"MSCA Mobility",
|
||||
execverification
|
||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||
.select("classification.level3")
|
||||
|
|
|
@ -6,8 +6,11 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||
import org.apache.http.ssl.SSLContextBuilder;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
|
||||
@Disabled
|
||||
public class HttpConnectorTest {
|
||||
|
||||
private static final Log log = LogFactory.getLog(HttpConnectorTest.class);
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -38,6 +38,8 @@ class QueryTest {
|
|||
def myQuery(spark:SparkSession, sc:SparkContext): Unit = {
|
||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||
|
||||
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ class SparkScholexplorerAggregationTest {
|
|||
|
||||
|
||||
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
||||
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").getOrCreate()
|
||||
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
|
||||
|
||||
|
||||
val ds: Dataset[DLIPublication] = spark.createDataset(spark.sparkContext.parallelize(s)).as[DLIPublication]
|
||||
|
|
|
@ -3,14 +3,15 @@ package eu.dnetlib.dhp.export
|
|||
import java.time.LocalDateTime
|
||||
import java.time.format.DateTimeFormatter
|
||||
|
||||
import eu.dnetlib.dhp.provision.scholix.Scholix
|
||||
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
||||
|
||||
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
|
||||
import org.junit.jupiter.api.Test
|
||||
|
||||
import scala.io.Source
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
class ExportDLITOOAFTest {
|
||||
|
||||
val mapper = new ObjectMapper()
|
||||
|
@ -22,12 +23,27 @@ class ExportDLITOOAFTest {
|
|||
}
|
||||
|
||||
|
||||
def extractDatasources(s:Scholix):List[String]= {
|
||||
s.getTarget.getCollectedFrom.asScala.map(c => c.getProvider.getName)(collection.breakOut)
|
||||
}
|
||||
|
||||
|
||||
def extractDatasources(s:ScholixSummary):List[String] = {
|
||||
|
||||
s.getDatasources.asScala.map(c => c.getDatasourceName)(collection.breakOut)
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
def testMappingRele():Unit = {
|
||||
|
||||
val r:Relation = new Relation
|
||||
r.setSource("60|fbff1d424e045eecf24151a5fe3aa738")
|
||||
r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877")
|
||||
r.setRelType("IsReferencedBy")
|
||||
|
||||
|
||||
val r1 =DLIToOAF.convertDLIRelation(r)
|
||||
println(r1.getSource, r1.getTarget)
|
||||
|
|
Loading…
Reference in New Issue