forked from D-Net/dnet-hadoop
Merge branch 'master' into stable_ids
This commit is contained in:
commit
09e44dabff
|
@ -94,7 +94,13 @@ public class AuthorMerger {
|
||||||
if (r.getPid() == null) {
|
if (r.getPid() == null) {
|
||||||
r.setPid(new ArrayList<>());
|
r.setPid(new ArrayList<>());
|
||||||
}
|
}
|
||||||
r.getPid().add(a._1());
|
|
||||||
|
// TERRIBLE HACK but for some reason when we create and Array with Arrays.asList,
|
||||||
|
// it creates of fixed size, and the add method raise UnsupportedOperationException at
|
||||||
|
// java.util.AbstractList.add
|
||||||
|
final List<StructuredProperty> tmp = new ArrayList<>(r.getPid());
|
||||||
|
tmp.add(a._1());
|
||||||
|
r.setPid(tmp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -39,15 +39,15 @@ public class ModelConstants {
|
||||||
public static final String IS_SUPPLEMENT_TO = "isSupplementTo";
|
public static final String IS_SUPPLEMENT_TO = "isSupplementTo";
|
||||||
public static final String IS_SUPPLEMENTED_BY = "isSupplementedBy";
|
public static final String IS_SUPPLEMENTED_BY = "isSupplementedBy";
|
||||||
public static final String PART = "part";
|
public static final String PART = "part";
|
||||||
public static final String IS_PART_OF = "IsPartOf";
|
public static final String IS_PART_OF = "isPartOf";
|
||||||
public static final String HAS_PARTS = "HasParts";
|
public static final String HAS_PARTS = "hasParts";
|
||||||
public static final String RELATIONSHIP = "relationship";
|
public static final String RELATIONSHIP = "relationship";
|
||||||
public static final String CITATION = "citation";
|
public static final String CITATION = "citation";
|
||||||
public static final String CITES = "cites";
|
public static final String CITES = "cites";
|
||||||
public static final String IS_CITED_BY = "IsCitedBy";
|
public static final String IS_CITED_BY = "isCitedBy";
|
||||||
public static final String REVIEW = "review";
|
public static final String REVIEW = "review";
|
||||||
public static final String REVIEWS = "reviews";
|
public static final String REVIEWS = "reviews";
|
||||||
public static final String IS_REVIEWED_BY = "IsReviewedBy";
|
public static final String IS_REVIEWED_BY = "isReviewedBy";
|
||||||
|
|
||||||
public static final String RESULT_PROJECT = "resultProject";
|
public static final String RESULT_PROJECT = "resultProject";
|
||||||
public static final String OUTCOME = "outcome";
|
public static final String OUTCOME = "outcome";
|
||||||
|
|
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.actionmanager.project;
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
|
@ -11,6 +12,7 @@ import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.*;
|
import org.apache.spark.sql.*;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -175,43 +177,54 @@ public class PrepareProgramme {
|
||||||
return csvProgramme;
|
return csvProgramme;
|
||||||
});
|
});
|
||||||
|
|
||||||
prepareClassification(h2020Programmes);
|
// prepareClassification(h2020Programmes);
|
||||||
|
|
||||||
h2020Programmes
|
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
|
||||||
.map(csvProgramme -> OBJECT_MAPPER.writeValueAsString(csvProgramme))
|
|
||||||
|
JavaRDD<CSVProgramme> rdd = jsc.parallelize(prepareClassification(h2020Programmes), 1);
|
||||||
|
rdd
|
||||||
|
.map(csvProgramme -> {
|
||||||
|
String tmp = OBJECT_MAPPER.writeValueAsString(csvProgramme);
|
||||||
|
return tmp;
|
||||||
|
})
|
||||||
.saveAsTextFile(outputPath);
|
.saveAsTextFile(outputPath);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) {
|
private static List<CSVProgramme> prepareClassification(JavaRDD<CSVProgramme> h2020Programmes) {
|
||||||
Object[] codedescription = h2020Programmes
|
Object[] codedescription = h2020Programmes
|
||||||
.map(value -> new Tuple2<>(value.getCode(), value.getTitle()))
|
.map(
|
||||||
|
value -> new Tuple2<>(value.getCode(),
|
||||||
|
new Tuple2<String, String>(value.getTitle(), value.getShortTitle())))
|
||||||
.collect()
|
.collect()
|
||||||
.toArray();
|
.toArray();
|
||||||
|
|
||||||
for (int i = 0; i < codedescription.length - 1; i++) {
|
for (int i = 0; i < codedescription.length - 1; i++) {
|
||||||
for (int j = i + 1; j < codedescription.length; j++) {
|
for (int j = i + 1; j < codedescription.length; j++) {
|
||||||
Tuple2<String, String> t2i = (Tuple2<String, String>) codedescription[i];
|
Tuple2<String, Tuple2<String, String>> t2i = (Tuple2<String, Tuple2<String, String>>) codedescription[i];
|
||||||
Tuple2<String, String> t2j = (Tuple2<String, String>) codedescription[j];
|
Tuple2<String, Tuple2<String, String>> t2j = (Tuple2<String, Tuple2<String, String>>) codedescription[j];
|
||||||
if (t2i._1().compareTo(t2j._1()) > 0) {
|
if (t2i._1().compareTo(t2j._1()) > 0) {
|
||||||
Tuple2<String, String> temp = t2i;
|
Tuple2<String, Tuple2<String, String>> temp = t2i;
|
||||||
codedescription[i] = t2j;
|
codedescription[i] = t2j;
|
||||||
codedescription[j] = temp;
|
codedescription[j] = temp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, String> map = new HashMap<>();
|
Map<String, Tuple2<String, String>> map = new HashMap<>();
|
||||||
for (int j = 0; j < codedescription.length; j++) {
|
for (int j = 0; j < codedescription.length; j++) {
|
||||||
Tuple2<String, String> entry = (Tuple2<String, String>) codedescription[j];
|
Tuple2<String, Tuple2<String, String>> entry = (Tuple2<String, Tuple2<String, String>>) codedescription[j];
|
||||||
String ent = entry._1();
|
String ent = entry._1();
|
||||||
if (ent.contains("Euratom-")) {
|
if (ent.contains("Euratom-")) {
|
||||||
ent = ent.replace("-Euratom-", ".Euratom.");
|
ent = ent.replace("-Euratom-", ".Euratom.");
|
||||||
}
|
}
|
||||||
String[] tmp = ent.split("\\.");
|
String[] tmp = ent.split("\\.");
|
||||||
if (tmp.length <= 2) {
|
if (tmp.length <= 2) {
|
||||||
|
if (StringUtils.isEmpty(entry._2()._2())) {
|
||||||
|
map.put(entry._1(), new Tuple2<String, String>(entry._2()._1(), entry._2()._1()));
|
||||||
|
} else {
|
||||||
map.put(entry._1(), entry._2());
|
map.put(entry._1(), entry._2());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ent.endsWith(".")) {
|
if (ent.endsWith(".")) {
|
||||||
ent = ent.substring(0, ent.length() - 1);
|
ent = ent.substring(0, ent.length() - 1);
|
||||||
|
@ -224,14 +237,14 @@ public class PrepareProgramme {
|
||||||
key = key.substring(0, key.length() - 1);
|
key = key.substring(0, key.length() - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
String current = entry._2();
|
String current = entry._2()._1();
|
||||||
if (!ent.contains("Euratom")) {
|
if (!ent.contains("Euratom")) {
|
||||||
|
|
||||||
String parent;
|
String parent;
|
||||||
String tmp_key = tmp[0] + ".";
|
String tmp_key = tmp[0] + ".";
|
||||||
for (int i = 1; i < tmp.length - 1; i++) {
|
for (int i = 1; i < tmp.length - 1; i++) {
|
||||||
tmp_key += tmp[i] + ".";
|
tmp_key += tmp[i] + ".";
|
||||||
parent = map.get(tmp_key).toLowerCase().trim();
|
parent = map.get(tmp_key)._1().toLowerCase().trim();
|
||||||
if (parent.contains("|")) {
|
if (parent.contains("|")) {
|
||||||
parent = parent.substring(parent.lastIndexOf("|") + 1).trim();
|
parent = parent.substring(parent.lastIndexOf("|") + 1).trim();
|
||||||
}
|
}
|
||||||
|
@ -246,18 +259,29 @@ public class PrepareProgramme {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
map.put(ent + ".", map.get(key) + " | " + current);
|
String shortTitle = entry._2()._2();
|
||||||
|
if (StringUtils.isEmpty(shortTitle)) {
|
||||||
|
shortTitle = current;
|
||||||
|
}
|
||||||
|
Tuple2<String, String> newEntry = new Tuple2<>(map.get(key)._1() + " | " + current,
|
||||||
|
map.get(key)._2() + " | " + shortTitle);
|
||||||
|
map.put(ent + ".", newEntry);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
h2020Programmes.foreach(csvProgramme -> {
|
return h2020Programmes.map(csvProgramme -> {
|
||||||
if (!csvProgramme.getCode().endsWith(".") && !csvProgramme.getCode().contains("Euratom")
|
|
||||||
&& !csvProgramme.getCode().equals("H2020-EC"))
|
String code = csvProgramme.getCode();
|
||||||
csvProgramme.setClassification(map.get(csvProgramme.getCode() + "."));
|
if (!code.endsWith(".") && !code.contains("Euratom")
|
||||||
else
|
&& !code.equals("H2020-EC"))
|
||||||
csvProgramme.setClassification(map.get(csvProgramme.getCode()));
|
code += ".";
|
||||||
});
|
|
||||||
|
csvProgramme.setClassification(map.get(code)._1());
|
||||||
|
csvProgramme.setClassification_short(map.get(code)._2());
|
||||||
|
|
||||||
|
return csvProgramme;
|
||||||
|
}).collect();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <R> Dataset<R> readPath(
|
public static <R> Dataset<R> readPath(
|
||||||
|
|
|
@ -9,7 +9,6 @@ import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
@ -138,7 +137,8 @@ public class SparkAtomicActionJob {
|
||||||
pm.setCode(csvProject.getProgramme());
|
pm.setCode(csvProject.getProgramme());
|
||||||
h2020classification.setClassification(ocsvProgramme.get().getClassification());
|
h2020classification.setClassification(ocsvProgramme.get().getClassification());
|
||||||
h2020classification.setH2020Programme(pm);
|
h2020classification.setH2020Programme(pm);
|
||||||
setLevelsAndProgramme(h2020classification, ocsvProgramme.get().getClassification());
|
setLevelsandProgramme(h2020classification, ocsvProgramme.get().getClassification_short());
|
||||||
|
// setProgramme(h2020classification, ocsvProgramme.get().getClassification());
|
||||||
pp.setH2020classification(Arrays.asList(h2020classification));
|
pp.setH2020classification(Arrays.asList(h2020classification));
|
||||||
|
|
||||||
return pp;
|
return pp;
|
||||||
|
@ -177,8 +177,8 @@ public class SparkAtomicActionJob {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void setLevelsAndProgramme(H2020Classification h2020Classification, String classification) {
|
private static void setLevelsandProgramme(H2020Classification h2020Classification, String classification_short) {
|
||||||
String[] tmp = classification.split(" \\| ");
|
String[] tmp = classification_short.split(" \\| ");
|
||||||
h2020Classification.setLevel1(tmp[0]);
|
h2020Classification.setLevel1(tmp[0]);
|
||||||
if (tmp.length > 1) {
|
if (tmp.length > 1) {
|
||||||
h2020Classification.setLevel2(tmp[1]);
|
h2020Classification.setLevel2(tmp[1]);
|
||||||
|
@ -189,6 +189,12 @@ public class SparkAtomicActionJob {
|
||||||
h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
|
h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// private static void setProgramme(H2020Classification h2020Classification, String classification) {
|
||||||
|
// String[] tmp = classification.split(" \\| ");
|
||||||
|
//
|
||||||
|
// h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
|
||||||
|
// }
|
||||||
|
|
||||||
public static <R> Dataset<R> readPath(
|
public static <R> Dataset<R> readPath(
|
||||||
SparkSession spark, String inputPath, Class<R> clazz) {
|
SparkSession spark, String inputPath, Class<R> clazz) {
|
||||||
return spark
|
return spark
|
||||||
|
|
|
@ -22,6 +22,15 @@ public class CSVProgramme implements Serializable {
|
||||||
private String shortTitle;
|
private String shortTitle;
|
||||||
private String language;
|
private String language;
|
||||||
private String classification;
|
private String classification;
|
||||||
|
private String classification_short;
|
||||||
|
|
||||||
|
public String getClassification_short() {
|
||||||
|
return classification_short;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setClassification_short(String classification_short) {
|
||||||
|
this.classification_short = classification_short;
|
||||||
|
}
|
||||||
|
|
||||||
public String getClassification() {
|
public String getClassification() {
|
||||||
return classification;
|
return classification;
|
||||||
|
|
|
@ -9,12 +9,14 @@ import java.util.List;
|
||||||
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
|
import org.apache.poi.openxml4j.exceptions.InvalidFormatException;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException;
|
import eu.dnetlib.dhp.actionmanager.project.httpconnector.CollectorServiceException;
|
||||||
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
|
import eu.dnetlib.dhp.actionmanager.project.httpconnector.HttpConnector;
|
||||||
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
|
import eu.dnetlib.dhp.actionmanager.project.utils.EXCELParser;
|
||||||
|
|
||||||
|
@Disabled
|
||||||
public class EXCELParserTest {
|
public class EXCELParserTest {
|
||||||
|
|
||||||
private static Path workingDir;
|
private static Path workingDir;
|
||||||
|
|
|
@ -92,6 +92,8 @@ public class PrepareH2020ProgrammeTest {
|
||||||
|
|
||||||
Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count());
|
Assertions.assertEquals(0, verificationDataset.filter("classification = ''").count());
|
||||||
|
|
||||||
|
// tmp.foreach(csvProgramme -> System.out.println(OBJECT_MAPPER.writeValueAsString(csvProgramme)));
|
||||||
|
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"Societal challenges | Smart, Green And Integrated Transport | CLEANSKY2 | IADP Fast Rotorcraft",
|
"Societal challenges | Smart, Green And Integrated Transport | CLEANSKY2 | IADP Fast Rotorcraft",
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class SparkUpdateProjectTest {
|
||||||
"-programmePath",
|
"-programmePath",
|
||||||
getClass()
|
getClass()
|
||||||
.getResource(
|
.getResource(
|
||||||
"/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_classification_whole.json.gz")
|
"/eu/dnetlib/dhp/actionmanager/project/preparedProgramme_whole.json.gz")
|
||||||
.getPath(),
|
.getPath(),
|
||||||
"-projectPath",
|
"-projectPath",
|
||||||
getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/prepared_projects.json").getPath(),
|
getClass().getResource("/eu/dnetlib/dhp/actionmanager/project/prepared_projects.json").getPath(),
|
||||||
|
@ -124,7 +124,7 @@ public class SparkUpdateProjectTest {
|
||||||
.getString(0));
|
.getString(0));
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"Societal challenges",
|
"Societal Challenges",
|
||||||
execverification
|
execverification
|
||||||
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
|
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
|
||||||
.select("classification.level1")
|
.select("classification.level1")
|
||||||
|
@ -133,7 +133,7 @@ public class SparkUpdateProjectTest {
|
||||||
.getString(0));
|
.getString(0));
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"Smart, Green And Integrated Transport",
|
"Transport",
|
||||||
execverification
|
execverification
|
||||||
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
|
.filter("id = '40|corda__h2020::2c7298913008865ba784e5c1350a0aa5'")
|
||||||
.select("classification.level2")
|
.select("classification.level2")
|
||||||
|
@ -188,7 +188,7 @@ public class SparkUpdateProjectTest {
|
||||||
.getString(0));
|
.getString(0));
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"Nurturing excellence by means of cross-border and cross-sector mobility",
|
"MSCA Mobility",
|
||||||
execverification
|
execverification
|
||||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||||
.select("classification.h2020Programme.description")
|
.select("classification.h2020Programme.description")
|
||||||
|
@ -197,7 +197,7 @@ public class SparkUpdateProjectTest {
|
||||||
.getString(0));
|
.getString(0));
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"Excellent science",
|
"Excellent Science",
|
||||||
execverification
|
execverification
|
||||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||||
.select("classification.level1")
|
.select("classification.level1")
|
||||||
|
@ -206,7 +206,7 @@ public class SparkUpdateProjectTest {
|
||||||
.getString(0));
|
.getString(0));
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"Marie Skłodowska-Curie Actions",
|
"Marie-Sklodowska-Curie Actions",
|
||||||
execverification
|
execverification
|
||||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||||
.select("classification.level2")
|
.select("classification.level2")
|
||||||
|
@ -215,7 +215,7 @@ public class SparkUpdateProjectTest {
|
||||||
.getString(0));
|
.getString(0));
|
||||||
Assertions
|
Assertions
|
||||||
.assertEquals(
|
.assertEquals(
|
||||||
"Nurturing excellence by means of cross-border and cross-sector mobility",
|
"MSCA Mobility",
|
||||||
execverification
|
execverification
|
||||||
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
.filter("id = '40|corda__h2020::1a1f235fdd06ef14790baec159aa1202'")
|
||||||
.select("classification.level3")
|
.select("classification.level3")
|
||||||
|
|
|
@ -6,8 +6,11 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||||
import org.apache.http.ssl.SSLContextBuilder;
|
import org.apache.http.ssl.SSLContextBuilder;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
|
||||||
|
@Disabled
|
||||||
public class HttpConnectorTest {
|
public class HttpConnectorTest {
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(HttpConnectorTest.class);
|
private static final Log log = LogFactory.getLog(HttpConnectorTest.class);
|
||||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -1,13 +1,15 @@
|
||||||
package eu.dnetlib.doiboost
|
package eu.dnetlib.doiboost
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset, Organization}
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset}
|
||||||
import eu.dnetlib.doiboost.mag.ConversionUtil
|
import eu.dnetlib.doiboost.mag.ConversionUtil
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.sql.functions.col
|
import org.apache.spark.sql.functions.col
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
object SparkGenerateDoiBoost {
|
object SparkGenerateDoiBoost {
|
||||||
|
@ -49,6 +51,7 @@ object SparkGenerateDoiBoost {
|
||||||
val otherPub = item._2._2
|
val otherPub = item._2._2
|
||||||
if (otherPub != null) {
|
if (otherPub != null) {
|
||||||
crossrefPub.mergeFrom(otherPub)
|
crossrefPub.mergeFrom(otherPub)
|
||||||
|
crossrefPub.setAuthor(AuthorMerger.mergeAuthor(crossrefPub.getAuthor, otherPub.getAuthor))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
crossrefPub
|
crossrefPub
|
||||||
|
|
|
@ -38,6 +38,8 @@ class QueryTest {
|
||||||
def myQuery(spark:SparkSession, sc:SparkContext): Unit = {
|
def myQuery(spark:SparkSession, sc:SparkContext): Unit = {
|
||||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
|
mapper.getSerializationConfig.enable(SerializationConfig.Feature.INDENT_OUTPUT)
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,6 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
|
import eu.dnetlib.dhp.schema.oaf.{Oaf, Relation}
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication, DLIUnknown}
|
||||||
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
|
import eu.dnetlib.dhp.sx.ebi.EBIAggregator
|
||||||
import eu.dnetlib.dhp.sx.ebi.model.{PMArticle, PMAuthor, PMJournal}
|
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
@ -18,38 +17,38 @@ object SparkSplitOafTODLIEntities {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
|
|
||||||
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
|
|
||||||
parser.parseArgument(args)
|
|
||||||
|
|
||||||
val workingPath: String = parser.get("workingPath")
|
def extract_dataset(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
logger.info(s"Working dir path = $workingPath")
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf].repartition(4000)
|
||||||
|
|
||||||
|
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset].repartition(1000)
|
||||||
|
|
||||||
|
|
||||||
|
OAFDataset
|
||||||
|
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
||||||
|
.map(s =>s.asInstanceOf[DLIDataset])
|
||||||
|
.union(ebi_dataset)
|
||||||
|
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
||||||
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
|
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
||||||
|
.map(p => p._2)
|
||||||
|
.repartition(2000)
|
||||||
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
def extract_publication(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
|
|
||||||
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
||||||
implicit val datEncoder: Encoder[DLIDataset] = Encoders.kryo[DLIDataset]
|
|
||||||
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
|
|
||||||
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val spark:SparkSession = SparkSession
|
|
||||||
.builder()
|
|
||||||
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
|
|
||||||
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
|
||||||
.master(parser.get("master"))
|
|
||||||
.getOrCreate()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
|
||||||
val ebi_dataset:Dataset[DLIDataset] = spark.read.load(s"$workingPath/ebi/baseline_dataset_ebi").as[DLIDataset]
|
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication].repartition(1000)
|
||||||
val ebi_publication:Dataset[DLIPublication] = spark.read.load(s"$workingPath/ebi/baseline_publication_ebi").as[DLIPublication]
|
|
||||||
val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation]
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
|
@ -60,20 +59,17 @@ object SparkSplitOafTODLIEntities {
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
.agg(EBIAggregator.getDLIPublicationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(2000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/publication")
|
||||||
|
|
||||||
OAFDataset
|
}
|
||||||
.filter(s => s != null && s.isInstanceOf[DLIDataset])
|
|
||||||
.map(s =>s.asInstanceOf[DLIDataset])
|
|
||||||
.union(ebi_dataset)
|
|
||||||
.map(d => (d.getId, d))(Encoders.tuple(Encoders.STRING, datEncoder))
|
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
|
||||||
.agg(EBIAggregator.getDLIDatasetAggregator().toColumn)
|
|
||||||
.map(p => p._2)
|
|
||||||
.repartition(1000)
|
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/dataset")
|
|
||||||
|
|
||||||
|
def extract_unknown(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val unkEncoder: Encoder[DLIUnknown] = Encoders.kryo[DLIUnknown]
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
|
.filter(s => s != null && s.isInstanceOf[DLIUnknown])
|
||||||
|
@ -82,9 +78,18 @@ object SparkSplitOafTODLIEntities {
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
.agg(EBIAggregator.getDLIUnknownAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/unknown")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extract_relations(spark:SparkSession, workingPath:String) :Unit = {
|
||||||
|
|
||||||
|
implicit val oafEncoder: Encoder[Oaf] = Encoders.kryo[Oaf]
|
||||||
|
implicit val relEncoder: Encoder[Relation] = Encoders.kryo[Relation]
|
||||||
|
|
||||||
|
val OAFDataset:Dataset[Oaf] = spark.read.load(s"$workingPath/input/OAFDataset").as[Oaf]
|
||||||
|
val ebi_relation:Dataset[Relation] = spark.read.load(s"$workingPath/ebi/baseline_relation_ebi").as[Relation].repartition(2000)
|
||||||
|
|
||||||
OAFDataset
|
OAFDataset
|
||||||
.filter(s => s != null && s.isInstanceOf[Relation])
|
.filter(s => s != null && s.isInstanceOf[Relation])
|
||||||
|
@ -94,9 +99,35 @@ object SparkSplitOafTODLIEntities {
|
||||||
.groupByKey(_._1)(Encoders.STRING)
|
.groupByKey(_._1)(Encoders.STRING)
|
||||||
.agg(EBIAggregator.getRelationAggregator().toColumn)
|
.agg(EBIAggregator.getRelationAggregator().toColumn)
|
||||||
.map(p => p._2)
|
.map(p => p._2)
|
||||||
.repartition(1000)
|
.repartition(4000)
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
|
.write.mode(SaveMode.Overwrite).save(s"$workingPath/graph/relation")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main(args: Array[String]): Unit = {
|
||||||
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkSplitOafTODLIEntities.getClass.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/input_extract_entities_parameters.json")))
|
||||||
|
val logger = LoggerFactory.getLogger(SparkSplitOafTODLIEntities.getClass)
|
||||||
|
parser.parseArgument(args)
|
||||||
|
|
||||||
|
val workingPath: String = parser.get("workingPath")
|
||||||
|
val entity:String = parser.get("entity")
|
||||||
|
logger.info(s"Working dir path = $workingPath")
|
||||||
|
|
||||||
|
val spark:SparkSession = SparkSession
|
||||||
|
.builder()
|
||||||
|
.appName(SparkSplitOafTODLIEntities.getClass.getSimpleName)
|
||||||
|
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
|
.master(parser.get("master"))
|
||||||
|
.getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
|
entity match {
|
||||||
|
case "publication" => extract_publication(spark, workingPath)
|
||||||
|
case "dataset" => extract_dataset(spark,workingPath)
|
||||||
|
case "relation" => extract_relations(spark, workingPath)
|
||||||
|
case "unknown" => extract_unknown(spark, workingPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
[
|
[
|
||||||
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
{"paramName":"mt", "paramLongName":"master", "paramDescription": "should be local or yarn", "paramRequired": true},
|
||||||
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true}
|
{"paramName":"w", "paramLongName":"workingPath", "paramDescription": "the work dir path", "paramRequired": true},
|
||||||
|
{"paramName":"e", "paramLongName":"entity", "paramDescription": "the work dir path", "paramRequired": true}
|
||||||
]
|
]
|
|
@ -14,30 +14,103 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="ExtractDLIEntities"/>
|
<start to="ExtractDLIPublication"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
<action name="ExtractDLIEntities">
|
<action name="ExtractDLIPublication">
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
<job-tracker>${jobTracker}</job-tracker>
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<master>yarn-cluster</master>
|
<master>yarn-cluster</master>
|
||||||
<mode>cluster</mode>
|
<mode>cluster</mode>
|
||||||
<name>Extract DLI Entities</name>
|
<name>Extract DLI Entities (Publication)</name>
|
||||||
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-memory ${sparkExecutorMemory}
|
--executor-memory ${sparkExecutorMemory}
|
||||||
--executor-cores=${sparkExecutorCores}
|
--executor-cores=${sparkExecutorCores}
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.sql.shuffle.partitions=3840
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
${sparkExtraOPT}
|
${sparkExtraOPT}
|
||||||
</spark-opts>
|
</spark-opts>
|
||||||
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>publication</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ExtractDLIDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ExtractDLIDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Extract DLI Entities (Dataset)</name>
|
||||||
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>dataset</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ExtractDLIUnknown"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ExtractDLIUnknown">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Extract DLI Entities (Unknown)</name>
|
||||||
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>unknown</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ExtractDLIRelation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ExtractDLIRelation">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Extract DLI Entities (Relation)</name>
|
||||||
|
<class>eu.dnetlib.dhp.sx.graph.SparkSplitOafTODLIEntities</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory ${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=5000
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>-mt</arg> <arg>yarn-cluster</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>-e</arg><arg>relation</arg>
|
||||||
</spark>
|
</spark>
|
||||||
<ok to="End"/>
|
<ok to="End"/>
|
||||||
<error to="Kill"/>
|
<error to="Kill"/>
|
||||||
|
|
|
@ -30,7 +30,7 @@ class SparkScholexplorerAggregationTest {
|
||||||
|
|
||||||
|
|
||||||
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
implicit val pubEncoder: Encoder[DLIPublication] = Encoders.kryo[DLIPublication]
|
||||||
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").getOrCreate()
|
val spark: SparkSession = SparkSession.builder().appName("Test").master("local[*]").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()
|
||||||
|
|
||||||
|
|
||||||
val ds: Dataset[DLIPublication] = spark.createDataset(spark.sparkContext.parallelize(s)).as[DLIPublication]
|
val ds: Dataset[DLIPublication] = spark.createDataset(spark.sparkContext.parallelize(s)).as[DLIPublication]
|
||||||
|
|
|
@ -62,6 +62,10 @@
|
||||||
<artifactId>dhp-schemas</artifactId>
|
<artifactId>dhp-schemas</artifactId>
|
||||||
<version>${project.version}</version>
|
<version>${project.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.httpcomponents</groupId>
|
||||||
|
<artifactId>httpmime</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.elasticsearch</groupId>
|
<groupId>org.elasticsearch</groupId>
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.export.zenodo;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
|
||||||
|
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
public class MakeTar implements Serializable {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(MakeTar.class);
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String jsonConfiguration = IOUtils
|
||||||
|
.toString(
|
||||||
|
MakeTar.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/export/input_maketar_parameters.json"));
|
||||||
|
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String outputPath = parser.get("targetPath");
|
||||||
|
log.info("hdfsPath: {}", outputPath);
|
||||||
|
|
||||||
|
final String hdfsNameNode = parser.get("nameNode");
|
||||||
|
log.info("nameNode: {}", hdfsNameNode);
|
||||||
|
|
||||||
|
final String inputPath = parser.get("sourcePath");
|
||||||
|
log.info("input path : {}", inputPath);
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
|
||||||
|
makeTArArchive(fileSystem, inputPath, outputPath);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void makeTArArchive(FileSystem fileSystem, String inputPath, String outputPath) throws IOException {
|
||||||
|
|
||||||
|
RemoteIterator<LocatedFileStatus> dir_iterator = fileSystem.listLocatedStatus(new Path(inputPath));
|
||||||
|
|
||||||
|
while (dir_iterator.hasNext()) {
|
||||||
|
LocatedFileStatus fileStatus = dir_iterator.next();
|
||||||
|
|
||||||
|
Path p = fileStatus.getPath();
|
||||||
|
String p_string = p.toString();
|
||||||
|
String entity = p_string.substring(p_string.lastIndexOf("/") + 1);
|
||||||
|
|
||||||
|
write(fileSystem, p_string, outputPath + "/" + entity + ".tar", entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void write(FileSystem fileSystem, String inputPath, String outputPath, String dir_name)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
Path hdfsWritePath = new Path(outputPath);
|
||||||
|
FSDataOutputStream fsDataOutputStream = null;
|
||||||
|
if (fileSystem.exists(hdfsWritePath)) {
|
||||||
|
fileSystem.delete(hdfsWritePath, true);
|
||||||
|
|
||||||
|
}
|
||||||
|
fsDataOutputStream = fileSystem.create(hdfsWritePath);
|
||||||
|
|
||||||
|
TarArchiveOutputStream ar = new TarArchiveOutputStream(fsDataOutputStream.getWrappedStream());
|
||||||
|
|
||||||
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
|
.listFiles(
|
||||||
|
new Path(inputPath), true);
|
||||||
|
|
||||||
|
while (fileStatusListIterator.hasNext()) {
|
||||||
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
|
||||||
|
Path p = fileStatus.getPath();
|
||||||
|
String p_string = p.toString();
|
||||||
|
if (!p_string.endsWith("_SUCCESS")) {
|
||||||
|
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
|
||||||
|
TarArchiveEntry entry = new TarArchiveEntry(dir_name + "/" + name + ".json.gz");
|
||||||
|
entry.setSize(fileStatus.getLen());
|
||||||
|
ar.putArchiveEntry(entry);
|
||||||
|
|
||||||
|
InputStream is = fileSystem.open(fileStatus.getPath());
|
||||||
|
|
||||||
|
BufferedInputStream bis = new BufferedInputStream(is);
|
||||||
|
|
||||||
|
int count;
|
||||||
|
byte data[] = new byte[1024];
|
||||||
|
while ((count = bis.read(data, 0, data.length)) != -1) {
|
||||||
|
ar.write(data, 0, count);
|
||||||
|
}
|
||||||
|
bis.close();
|
||||||
|
ar.closeArchiveEntry();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
ar.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,80 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.export.zenodo;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.*;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
import eu.dnetlib.dhp.common.api.MissingConceptDoiException;
|
||||||
|
import eu.dnetlib.dhp.common.api.ZenodoAPIClient;
|
||||||
|
|
||||||
|
public class SendToZenodoHDFS implements Serializable {
|
||||||
|
|
||||||
|
private static final Log log = LogFactory.getLog(SendToZenodoHDFS.class);
|
||||||
|
|
||||||
|
public static void main(final String[] args) throws Exception, MissingConceptDoiException {
|
||||||
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
|
IOUtils
|
||||||
|
.toString(
|
||||||
|
SendToZenodoHDFS.class
|
||||||
|
.getResourceAsStream(
|
||||||
|
"/eu/dnetlib/dhp/export/upload_zenodo.json")));
|
||||||
|
|
||||||
|
parser.parseArgument(args);
|
||||||
|
|
||||||
|
final String hdfsPath = parser.get("hdfsPath");
|
||||||
|
final String hdfsNameNode = parser.get("nameNode");
|
||||||
|
final String access_token = parser.get("accessToken");
|
||||||
|
final String connection_url = parser.get("connectionUrl");
|
||||||
|
final String metadata = parser.get("metadata");
|
||||||
|
final Boolean newDeposition = Boolean.valueOf(parser.get("newDeposition"));
|
||||||
|
final String concept_rec_id = Optional
|
||||||
|
.ofNullable(parser.get("conceptRecordId"))
|
||||||
|
.orElse(null);
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("fs.defaultFS", hdfsNameNode);
|
||||||
|
|
||||||
|
FileSystem fileSystem = FileSystem.get(conf);
|
||||||
|
|
||||||
|
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem
|
||||||
|
.listFiles(
|
||||||
|
new Path(hdfsPath), true);
|
||||||
|
ZenodoAPIClient zenodoApiClient = new ZenodoAPIClient(connection_url, access_token);
|
||||||
|
if (newDeposition) {
|
||||||
|
zenodoApiClient.newDeposition();
|
||||||
|
} else {
|
||||||
|
if (concept_rec_id == null) {
|
||||||
|
throw new MissingConceptDoiException("No concept record id has been provided");
|
||||||
|
}
|
||||||
|
zenodoApiClient.newVersion(concept_rec_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (fileStatusListIterator.hasNext()) {
|
||||||
|
LocatedFileStatus fileStatus = fileStatusListIterator.next();
|
||||||
|
|
||||||
|
Path p = fileStatus.getPath();
|
||||||
|
String p_string = p.toString();
|
||||||
|
if (!p_string.endsWith("_SUCCESS")) {
|
||||||
|
// String tmp = p_string.substring(0, p_string.lastIndexOf("/"));
|
||||||
|
String name = p_string.substring(p_string.lastIndexOf("/") + 1);
|
||||||
|
log.info("Sending information for community: " + name);
|
||||||
|
FSDataInputStream inputStream = fileSystem.open(p);
|
||||||
|
zenodoApiClient.uploadIS(inputStream, name, fileStatus.getLen());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
zenodoApiClient.sendMretadata(metadata);
|
||||||
|
zenodoApiClient.publish();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,20 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName": "n",
|
||||||
|
"paramLongName": "nameNode",
|
||||||
|
"paramDescription": "the Name Node",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "s",
|
||||||
|
"paramLongName": "sourcePath",
|
||||||
|
"paramDescription": "the source path",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "t",
|
||||||
|
"paramLongName": "targetPath",
|
||||||
|
"paramDescription": "the target path",
|
||||||
|
"paramRequired": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,45 @@
|
||||||
|
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"paramName":"nd",
|
||||||
|
"paramLongName":"newDeposition",
|
||||||
|
"paramDescription": "if it is a new deposition (true) or a new version (false)",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"cri",
|
||||||
|
"paramLongName":"conceptRecordId",
|
||||||
|
"paramDescription": "The id of the concept record for a new version",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"hdfsp",
|
||||||
|
"paramLongName":"hdfsPath",
|
||||||
|
"paramDescription": "the path of the folder tofind files to send to Zenodo",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "nn",
|
||||||
|
"paramLongName": "nameNode",
|
||||||
|
"paramDescription": "the name node",
|
||||||
|
"paramRequired": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName": "at",
|
||||||
|
"paramLongName": "accessToken",
|
||||||
|
"paramDescription": "the access token for the deposition",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"cu",
|
||||||
|
"paramLongName":"connectionUrl",
|
||||||
|
"paramDescription": "the url to connect to deposit",
|
||||||
|
"paramRequired": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"paramName":"m",
|
||||||
|
"paramLongName":"metadata",
|
||||||
|
"paramDescription": "metadata associated to the deposition",
|
||||||
|
"paramRequired": false
|
||||||
|
}
|
||||||
|
]
|
|
@ -0,0 +1,48 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.wf.rerun.failnodes</name>
|
||||||
|
<value>false</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</configuration>
|
|
@ -0,0 +1,53 @@
|
||||||
|
<workflow-app name="Send Dump to Zenodo" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>sourcePath</name>
|
||||||
|
<description>the source path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>targetPath</name>
|
||||||
|
<description>the target path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>metadata</name>
|
||||||
|
<description>the metadata</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<start to="send_zenodo"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="MakeTar">
|
||||||
|
<java>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<main-class>eu.dnetlib.dhp.export.zenodo.MakeTar</main-class>
|
||||||
|
<arg>-t</arg><arg>${targetPath}</arg>
|
||||||
|
<arg>-n</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>-s</arg><arg>${sourcePath}</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="send_zenodo">
|
||||||
|
<java>
|
||||||
|
<main-class>eu.dnetlib.dhp.export.zenodo.SendToZenodoHDFS</main-class>
|
||||||
|
<arg>--hdfsPath</arg><arg>/user/dnet.scholexplorer/scholix/provision/scholix.tar/scholix-2020-10-16.tar</arg>
|
||||||
|
<arg>--nameNode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--accessToken</arg><arg>b6ddrY6b77WxcDEevn9gqVE5sL5sDNjdUijt75W3o7cQo5vpFFI48dMiu8Gv</arg>
|
||||||
|
<arg>--connectionUrl</arg><arg>https://zenodo.org/api/deposit/depositions</arg>
|
||||||
|
<arg>--metadata</arg><arg>${metadata}</arg>
|
||||||
|
<arg>--conceptRecordId</arg><arg>1200252</arg>
|
||||||
|
<arg>--newDeposition</arg><arg>false</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -3,14 +3,15 @@ package eu.dnetlib.dhp.export
|
||||||
import java.time.LocalDateTime
|
import java.time.LocalDateTime
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.provision.scholix.Scholix
|
||||||
|
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
|
||||||
import eu.dnetlib.dhp.schema.oaf.Relation
|
import eu.dnetlib.dhp.schema.oaf.Relation
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
||||||
|
|
||||||
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
|
import org.codehaus.jackson.map.{ObjectMapper, SerializationConfig}
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
|
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
class ExportDLITOOAFTest {
|
class ExportDLITOOAFTest {
|
||||||
|
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
@ -22,12 +23,27 @@ class ExportDLITOOAFTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extractDatasources(s:Scholix):List[String]= {
|
||||||
|
s.getTarget.getCollectedFrom.asScala.map(c => c.getProvider.getName)(collection.breakOut)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def extractDatasources(s:ScholixSummary):List[String] = {
|
||||||
|
|
||||||
|
s.getDatasources.asScala.map(c => c.getDatasourceName)(collection.breakOut)
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testMappingRele():Unit = {
|
def testMappingRele():Unit = {
|
||||||
|
|
||||||
val r:Relation = new Relation
|
val r:Relation = new Relation
|
||||||
r.setSource("60|fbff1d424e045eecf24151a5fe3aa738")
|
r.setSource("60|fbff1d424e045eecf24151a5fe3aa738")
|
||||||
r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877")
|
r.setTarget("50|dedup_wf_001::ec409f09e63347d4e834087fe1483877")
|
||||||
|
r.setRelType("IsReferencedBy")
|
||||||
|
|
||||||
|
|
||||||
val r1 =DLIToOAF.convertDLIRelation(r)
|
val r1 =DLIToOAF.convertDLIRelation(r)
|
||||||
println(r1.getSource, r1.getTarget)
|
println(r1.getSource, r1.getTarget)
|
||||||
|
|
Loading…
Reference in New Issue