forked from D-Net/dnet-hadoop
merged from master
This commit is contained in:
commit
d6686dd7cf
|
@ -4,6 +4,7 @@ package eu.dnetlib.dhp.oa.merge;
|
||||||
import java.text.Normalizer;
|
import java.text.Normalizer;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
@ -32,27 +33,33 @@ public class AuthorMerger {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
|
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b, Double threshold) {
|
||||||
int pa = countAuthorsPids(a);
|
int pa = countAuthorsPids(a);
|
||||||
int pb = countAuthorsPids(b);
|
int pb = countAuthorsPids(b);
|
||||||
List<Author> base, enrich;
|
List<Author> base, enrich;
|
||||||
int sa = authorsSize(a);
|
int sa = authorsSize(a);
|
||||||
int sb = authorsSize(b);
|
int sb = authorsSize(b);
|
||||||
|
|
||||||
if (pa == pb) {
|
if (sa == sb) {
|
||||||
base = sa > sb ? a : b;
|
|
||||||
enrich = sa > sb ? b : a;
|
|
||||||
} else {
|
|
||||||
base = pa > pb ? a : b;
|
base = pa > pb ? a : b;
|
||||||
enrich = pa > pb ? b : a;
|
enrich = pa > pb ? b : a;
|
||||||
|
} else {
|
||||||
|
base = sa > sb ? a : b;
|
||||||
|
enrich = sa > sb ? b : a;
|
||||||
}
|
}
|
||||||
enrichPidFromList(base, enrich);
|
enrichPidFromList(base, enrich, threshold);
|
||||||
return base;
|
return base;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
|
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
|
||||||
|
return mergeAuthor(a, b, THRESHOLD);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void enrichPidFromList(List<Author> base, List<Author> enrich, Double threshold) {
|
||||||
if (base == null || enrich == null)
|
if (base == null || enrich == null)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// <pidComparableString, Author> (if an Author has more than 1 pid, it appears 2 times in the list)
|
||||||
final Map<String, Author> basePidAuthorMap = base
|
final Map<String, Author> basePidAuthorMap = base
|
||||||
.stream()
|
.stream()
|
||||||
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
||||||
|
@ -63,6 +70,7 @@ public class AuthorMerger {
|
||||||
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
|
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
|
||||||
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
|
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
|
||||||
|
|
||||||
|
// <pid, Author> (list of pid that are missing in the other list)
|
||||||
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
|
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
|
||||||
.stream()
|
.stream()
|
||||||
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
||||||
|
@ -83,10 +91,10 @@ public class AuthorMerger {
|
||||||
.max(Comparator.comparing(Tuple2::_1));
|
.max(Comparator.comparing(Tuple2::_1));
|
||||||
|
|
||||||
if (simAuthor.isPresent()) {
|
if (simAuthor.isPresent()) {
|
||||||
double th = THRESHOLD;
|
double th = threshold;
|
||||||
// increase the threshold if the surname is too short
|
// increase the threshold if the surname is too short
|
||||||
if (simAuthor.get()._2().getSurname() != null
|
if (simAuthor.get()._2().getSurname() != null
|
||||||
&& simAuthor.get()._2().getSurname().length() <= 3)
|
&& simAuthor.get()._2().getSurname().length() <= 3 && threshold > 0.0)
|
||||||
th = 0.99;
|
th = 0.99;
|
||||||
|
|
||||||
if (simAuthor.get()._1() > th) {
|
if (simAuthor.get()._1() > th) {
|
||||||
|
@ -156,7 +164,7 @@ public class AuthorMerger {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String normalize(final String s) {
|
private static String normalize(final String s) {
|
||||||
return nfd(s)
|
String[] normalized = nfd(s)
|
||||||
.toLowerCase()
|
.toLowerCase()
|
||||||
// do not compact the regexes in a single expression, would cause StackOverflowError
|
// do not compact the regexes in a single expression, would cause StackOverflowError
|
||||||
// in case
|
// in case
|
||||||
|
@ -166,7 +174,12 @@ public class AuthorMerger {
|
||||||
.replaceAll("(\\p{Punct})+", " ")
|
.replaceAll("(\\p{Punct})+", " ")
|
||||||
.replaceAll("(\\d)+", " ")
|
.replaceAll("(\\d)+", " ")
|
||||||
.replaceAll("(\\n)+", " ")
|
.replaceAll("(\\n)+", " ")
|
||||||
.trim();
|
.trim()
|
||||||
|
.split(" ");
|
||||||
|
|
||||||
|
Arrays.sort(normalized);
|
||||||
|
|
||||||
|
return String.join(" ", normalized);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String nfd(final String s) {
|
private static String nfd(final String s) {
|
||||||
|
|
|
@ -0,0 +1,100 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.merge;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Author;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Publication;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||||
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
public class AuthorMergerTest {
|
||||||
|
|
||||||
|
private String publicationsBasePath;
|
||||||
|
|
||||||
|
private List<List<Author>> authors;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
|
||||||
|
publicationsBasePath = Paths
|
||||||
|
.get(AuthorMergerTest.class.getResource("/eu/dnetlib/dhp/oa/merge").toURI())
|
||||||
|
.toFile()
|
||||||
|
.getAbsolutePath();
|
||||||
|
|
||||||
|
authors = readSample(publicationsBasePath + "/publications_with_authors.json", Publication.class)
|
||||||
|
.stream()
|
||||||
|
.map(p -> p._2().getAuthor())
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mergeTest() { // used in the dedup: threshold set to 0.95
|
||||||
|
|
||||||
|
for (List<Author> authors1 : authors) {
|
||||||
|
System.out.println("List " + (authors.indexOf(authors1) + 1));
|
||||||
|
for (Author author : authors1) {
|
||||||
|
System.out.println(authorToString(author));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Author> merge = AuthorMerger.merge(authors);
|
||||||
|
|
||||||
|
System.out.println("Merge ");
|
||||||
|
for (Author author : merge) {
|
||||||
|
System.out.println(authorToString(author));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assertions.assertEquals(7, merge.size());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
|
||||||
|
List<Tuple2<String, T>> res = new ArrayList<>();
|
||||||
|
BufferedReader reader;
|
||||||
|
try {
|
||||||
|
reader = new BufferedReader(new FileReader(path));
|
||||||
|
String line = reader.readLine();
|
||||||
|
while (line != null) {
|
||||||
|
res
|
||||||
|
.add(
|
||||||
|
new Tuple2<>(
|
||||||
|
MapDocumentUtil.getJPathString("$.id", line),
|
||||||
|
new ObjectMapper().readValue(line, clazz)));
|
||||||
|
// read next line
|
||||||
|
line = reader.readLine();
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String authorToString(Author a) {
|
||||||
|
|
||||||
|
String print = "Fullname = ";
|
||||||
|
print += a.getFullname() + " pid = [";
|
||||||
|
if (a.getPid() != null)
|
||||||
|
for (StructuredProperty sp : a.getPid()) {
|
||||||
|
print += sp.toComparableString() + " ";
|
||||||
|
}
|
||||||
|
print += "]";
|
||||||
|
return print;
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because one or more lines are too long
|
@ -105,6 +105,8 @@ public class ModelConstants {
|
||||||
public static final KeyValue UNKNOWN_REPOSITORY = keyValue(
|
public static final KeyValue UNKNOWN_REPOSITORY = keyValue(
|
||||||
"10|openaire____::55045bd2a65019fd8e6741a755395c8c", "Unknown Repository");
|
"10|openaire____::55045bd2a65019fd8e6741a755395c8c", "Unknown Repository");
|
||||||
|
|
||||||
|
public static final Qualifier UNKNOWN_COUNTRY = qualifier(UNKNOWN, "Unknown", DNET_COUNTRY_TYPE, DNET_COUNTRY_TYPE);
|
||||||
|
|
||||||
private static Qualifier qualifier(
|
private static Qualifier qualifier(
|
||||||
final String classid,
|
final String classid,
|
||||||
final String classname,
|
final String classname,
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.schema.oaf;
|
package eu.dnetlib.dhp.schema.oaf;
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import com.google.common.base.Objects;
|
import com.google.common.base.Objects;
|
||||||
|
@ -9,7 +8,7 @@ import com.google.common.base.Objects;
|
||||||
/**
|
/**
|
||||||
* Represent a measure, must be further described by a system available resource providing name and descriptions.
|
* Represent a measure, must be further described by a system available resource providing name and descriptions.
|
||||||
*/
|
*/
|
||||||
public class Measure implements Serializable {
|
public class Measure {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unique measure identifier.
|
* Unique measure identifier.
|
||||||
|
@ -17,7 +16,7 @@ public class Measure implements Serializable {
|
||||||
private String id;
|
private String id;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* List of units associated with this measure. KeyValue provides a pair to store the label (key) and the value, plus
|
* List of units associated with this measure. KeyValue provides a pair to store the laber (key) and the value, plus
|
||||||
* common provenance information.
|
* common provenance information.
|
||||||
*/
|
*/
|
||||||
private List<KeyValue> unit;
|
private List<KeyValue> unit;
|
||||||
|
|
|
@ -1,28 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Class that maps the model of the bipFinder! input data.
|
|
||||||
* Only needed for deserialization purposes
|
|
||||||
*/
|
|
||||||
|
|
||||||
public class BipDeserialize extends HashMap<String, List<Score>> implements Serializable {
|
|
||||||
|
|
||||||
public BipDeserialize() {
|
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Score> get(String key) {
|
|
||||||
|
|
||||||
if (super.get(key) == null) {
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
return super.get(key);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,85 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Just collects all the atomic actions produced for the different results and saves them in
|
|
||||||
* outputpath for the ActionSet
|
|
||||||
*/
|
|
||||||
public class CollectAndSave implements Serializable {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(CollectAndSave.class);
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static <I extends Result> void main(String[] args) throws Exception {
|
|
||||||
|
|
||||||
String jsonConfiguration = IOUtils
|
|
||||||
.toString(
|
|
||||||
CollectAndSave.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"));
|
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
|
||||||
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String inputPath = parser.get("inputPath");
|
|
||||||
log.info("inputPath {}: ", inputPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath {}: ", outputPath);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
removeOutputDir(spark, outputPath);
|
|
||||||
collectAndSave(spark, inputPath, outputPath);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void collectAndSave(SparkSession spark, String inputPath, String outputPath) {
|
|
||||||
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
sc
|
|
||||||
.sequenceFile(inputPath + "/publication", Text.class, Text.class)
|
|
||||||
.union(sc.sequenceFile(inputPath + "/dataset", Text.class, Text.class))
|
|
||||||
.union(sc.sequenceFile(inputPath + "/otherresearchproduct", Text.class, Text.class))
|
|
||||||
.union(sc.sequenceFile(inputPath + "/software", Text.class, Text.class))
|
|
||||||
.saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
|
|
||||||
;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void removeOutputDir(SparkSession spark, String path) {
|
|
||||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,26 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
public class KeyValue implements Serializable {
|
|
||||||
|
|
||||||
private String key;
|
|
||||||
private String value;
|
|
||||||
|
|
||||||
public String getKey() {
|
|
||||||
return key;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setKey(String key) {
|
|
||||||
this.key = key;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getValue() {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setValue(String value) {
|
|
||||||
this.value = value;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,28 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Subset of the information of the generic results that are needed to create the atomic action
|
|
||||||
*/
|
|
||||||
public class PreparedResult implements Serializable {
|
|
||||||
private String id; // openaire id
|
|
||||||
private String value; // doi
|
|
||||||
|
|
||||||
public String getId() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setId(String id) {
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getValue() {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setValue(String value) {
|
|
||||||
this.value = value;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,30 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* represents the score in the input file
|
|
||||||
*/
|
|
||||||
public class Score implements Serializable {
|
|
||||||
|
|
||||||
private String id;
|
|
||||||
private List<KeyValue> unit;
|
|
||||||
|
|
||||||
public String getId() {
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setId(String id) {
|
|
||||||
this.id = id;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<KeyValue> getUnit() {
|
|
||||||
return unit;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setUnit(List<KeyValue> unit) {
|
|
||||||
this.unit = unit;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,20 +0,0 @@
|
||||||
[
|
|
||||||
{
|
|
||||||
"paramName": "issm",
|
|
||||||
"paramLongName": "isSparkSessionManaged",
|
|
||||||
"paramDescription": "when true will stop SparkSession after job execution",
|
|
||||||
"paramRequired": false
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "ip",
|
|
||||||
"paramLongName": "inputPath",
|
|
||||||
"paramDescription": "the URL from where to get the programme file",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "o",
|
|
||||||
"paramLongName": "outputPath",
|
|
||||||
"paramDescription": "the path of the new ActionSet",
|
|
||||||
"paramRequired": true
|
|
||||||
}
|
|
||||||
]
|
|
|
@ -1,32 +0,0 @@
|
||||||
[
|
|
||||||
{
|
|
||||||
"paramName": "issm",
|
|
||||||
"paramLongName": "isSparkSessionManaged",
|
|
||||||
"paramDescription": "when true will stop SparkSession after job execution",
|
|
||||||
"paramRequired": false
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "ip",
|
|
||||||
"paramLongName": "inputPath",
|
|
||||||
"paramDescription": "the URL from where to get the programme file",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "o",
|
|
||||||
"paramLongName": "outputPath",
|
|
||||||
"paramDescription": "the path of the new ActionSet",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "rtn",
|
|
||||||
"paramLongName": "resultTableName",
|
|
||||||
"paramDescription": "the path of the new ActionSet",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"paramName": "bsp",
|
|
||||||
"paramLongName": "bipScorePath",
|
|
||||||
"paramDescription": "the path of the new ActionSet",
|
|
||||||
"paramRequired": true
|
|
||||||
}
|
|
||||||
]
|
|
|
@ -1,171 +0,0 @@
|
||||||
<workflow-app name="BipFinderScore" xmlns="uri:oozie:workflow:0.5">
|
|
||||||
<parameters>
|
|
||||||
<property>
|
|
||||||
<name>inputPath</name>
|
|
||||||
<description>the input path of the resources to be extended</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>bipScorePath</name>
|
|
||||||
<description>the path where to find the bipFinder scores</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>outputPath</name>
|
|
||||||
<description>the path where to store the actionset</description>
|
|
||||||
</property>
|
|
||||||
</parameters>
|
|
||||||
|
|
||||||
<start to="deleteoutputpath"/>
|
|
||||||
<kill name="Kill">
|
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
|
||||||
</kill>
|
|
||||||
<action name="deleteoutputpath">
|
|
||||||
<fs>
|
|
||||||
<delete path='${outputPath}'/>
|
|
||||||
<mkdir path='${outputPath}'/>
|
|
||||||
<delete path='${workingDir}'/>
|
|
||||||
<mkdir path='${workingDir}'/>
|
|
||||||
</fs>
|
|
||||||
<ok to="atomicactions"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<fork name="atomicactions">
|
|
||||||
<path start="atomicactions_publication"/>
|
|
||||||
<path start="atomicactions_dataset"/>
|
|
||||||
<path start="atomicactions_orp"/>
|
|
||||||
<path start="atomicactions_software"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
<action name="atomicactions_publication">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Produces the atomic action with the bip finder scores for publications</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/publication</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="atomicactions_dataset">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Produces the atomic action with the bip finder scores for datasets</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/dataset</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="atomicactions_orp">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Produces the atomic action with the bip finder scores for orp</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/otherresearchproduct</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="atomicactions_software">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Produces the atomic action with the bip finder scores for software</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
|
|
||||||
<arg>--resultTableName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/software</arg>
|
|
||||||
<arg>--bipScorePath</arg><arg>${bipScorePath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="join_aa"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<join name="join_aa" to="collectandsave"/>
|
|
||||||
|
|
||||||
<action name="collectandsave">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>saves all the aa produced for the several types of results in the as output path</name>
|
|
||||||
<class>eu.dnetlib.dhp.actionmanager.bipfinder.CollectAndSave</class>
|
|
||||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--inputPath</arg><arg>${workingDir}</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="End"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<end name="End"/>
|
|
||||||
</workflow-app>
|
|
|
@ -1,331 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.actionmanager.bipfinder;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.hadoop.io.Text;
|
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.ForeachFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
|
||||||
import org.apache.spark.sql.Dataset;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.Row;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.junit.jupiter.api.AfterAll;
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
|
||||||
import org.junit.jupiter.api.Test;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
public class SparkAtomicActionScoreJobTest {
|
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
private static SparkSession spark;
|
|
||||||
|
|
||||||
private static Path workingDir;
|
|
||||||
private static final Logger log = LoggerFactory
|
|
||||||
.getLogger(SparkAtomicActionScoreJobTest.class);
|
|
||||||
|
|
||||||
@BeforeAll
|
|
||||||
public static void beforeAll() throws IOException {
|
|
||||||
workingDir = Files
|
|
||||||
.createTempDirectory(SparkAtomicActionScoreJobTest.class.getSimpleName());
|
|
||||||
log.info("using work dir {}", workingDir);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
conf.setAppName(SparkAtomicActionScoreJobTest.class.getSimpleName());
|
|
||||||
|
|
||||||
conf.setMaster("local[*]");
|
|
||||||
conf.set("spark.driver.host", "localhost");
|
|
||||||
conf.set("hive.metastore.local", "true");
|
|
||||||
conf.set("spark.ui.enabled", "false");
|
|
||||||
conf.set("spark.sql.warehouse.dir", workingDir.toString());
|
|
||||||
conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
|
|
||||||
|
|
||||||
spark = SparkSession
|
|
||||||
.builder()
|
|
||||||
.appName(SparkAtomicActionScoreJobTest.class.getSimpleName())
|
|
||||||
.config(conf)
|
|
||||||
.getOrCreate();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterAll
|
|
||||||
public static void afterAll() throws IOException {
|
|
||||||
FileUtils.deleteDirectory(workingDir.toFile());
|
|
||||||
spark.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void matchOne() throws Exception {
|
|
||||||
String bipScoresPath = getClass()
|
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
|
||||||
.getPath();
|
|
||||||
String inputPath = getClass()
|
|
||||||
.getResource(
|
|
||||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication.json")
|
|
||||||
.getPath();
|
|
||||||
|
|
||||||
SparkAtomicActionScoreJob
|
|
||||||
.main(
|
|
||||||
new String[] {
|
|
||||||
"-isSparkSessionManaged",
|
|
||||||
Boolean.FALSE.toString(),
|
|
||||||
"-inputPath",
|
|
||||||
inputPath,
|
|
||||||
"-bipScorePath",
|
|
||||||
bipScoresPath,
|
|
||||||
"-resultTableName",
|
|
||||||
"eu.dnetlib.dhp.schema.oaf.Publication",
|
|
||||||
"-outputPath",
|
|
||||||
workingDir.toString() + "/actionSet"
|
|
||||||
});
|
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
JavaRDD<Publication> tmp = sc
|
|
||||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
|
||||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
|
||||||
.map(aa -> ((Publication) aa.getPayload()));
|
|
||||||
|
|
||||||
Assertions.assertTrue(tmp.count() == 1);
|
|
||||||
|
|
||||||
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
|
||||||
verificationDataset.createOrReplaceTempView("publication");
|
|
||||||
|
|
||||||
Dataset<Row> execVerification = spark
|
|
||||||
.sql(
|
|
||||||
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
|
||||||
"lateral view explode(measures) m as mes " +
|
|
||||||
"lateral view explode(mes.unit) u as mUnit ");
|
|
||||||
|
|
||||||
Assertions.assertEquals(2, execVerification.count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
|
|
||||||
execVerification.select("oaid").collectAsList().get(0).getString(0));
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"1.47565045883e-08",
|
|
||||||
execVerification.filter("id = 'influence'").select("value").collectAsList().get(0).getString(0));
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"0.227515392",
|
|
||||||
execVerification.filter("id = 'popularity'").select("value").collectAsList().get(0).getString(0));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void matchOneWithTwo() throws Exception {
|
|
||||||
String bipScoresPath = getClass()
|
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
|
||||||
.getPath();
|
|
||||||
String inputPath = getClass()
|
|
||||||
.getResource(
|
|
||||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_2.json")
|
|
||||||
.getPath();
|
|
||||||
|
|
||||||
SparkAtomicActionScoreJob
|
|
||||||
.main(
|
|
||||||
new String[] {
|
|
||||||
"-isSparkSessionManaged",
|
|
||||||
Boolean.FALSE.toString(),
|
|
||||||
"-inputPath",
|
|
||||||
inputPath,
|
|
||||||
"-bipScorePath",
|
|
||||||
bipScoresPath,
|
|
||||||
"-resultTableName",
|
|
||||||
"eu.dnetlib.dhp.schema.oaf.Publication",
|
|
||||||
"-outputPath",
|
|
||||||
workingDir.toString() + "/actionSet"
|
|
||||||
});
|
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
JavaRDD<Publication> tmp = sc
|
|
||||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
|
||||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
|
||||||
.map(aa -> ((Publication) aa.getPayload()));
|
|
||||||
|
|
||||||
Assertions.assertTrue(tmp.count() == 1);
|
|
||||||
|
|
||||||
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
|
||||||
verificationDataset.createOrReplaceTempView("publication");
|
|
||||||
|
|
||||||
Dataset<Row> execVerification = spark
|
|
||||||
.sql(
|
|
||||||
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
|
||||||
"lateral view explode(measures) m as mes " +
|
|
||||||
"lateral view explode(mes.unit) u as mUnit ");
|
|
||||||
|
|
||||||
Assertions.assertEquals(4, execVerification.count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb",
|
|
||||||
execVerification.select("oaid").collectAsList().get(0).getString(0));
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
2,
|
|
||||||
execVerification.filter("id = 'influence'").count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
2,
|
|
||||||
execVerification.filter("id = 'popularity'").count());
|
|
||||||
|
|
||||||
List<Row> tmp_ds = execVerification.filter("id = 'influence'").select("value").collectAsList();
|
|
||||||
String tmp_influence = tmp_ds.get(0).getString(0);
|
|
||||||
Assertions
|
|
||||||
.assertTrue(
|
|
||||||
"1.47565045883e-08".equals(tmp_influence) ||
|
|
||||||
"1.98956540239e-08".equals(tmp_influence));
|
|
||||||
|
|
||||||
tmp_influence = tmp_ds.get(1).getString(0);
|
|
||||||
Assertions
|
|
||||||
.assertTrue(
|
|
||||||
"1.47565045883e-08".equals(tmp_influence) ||
|
|
||||||
"1.98956540239e-08".equals(tmp_influence));
|
|
||||||
|
|
||||||
Assertions.assertTrue(!tmp_ds.get(0).getString(0).equals(tmp_ds.get(1).getString(0)));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void matchTwo() throws Exception {
|
|
||||||
String bipScoresPath = getClass()
|
|
||||||
.getResource("/eu/dnetlib/dhp/actionmanager/bipfinder/bip_scores.json")
|
|
||||||
.getPath();
|
|
||||||
String inputPath = getClass()
|
|
||||||
.getResource(
|
|
||||||
"/eu/dnetlib/dhp/actionmanager/bipfinder/publication_3.json")
|
|
||||||
.getPath();
|
|
||||||
|
|
||||||
SparkAtomicActionScoreJob
|
|
||||||
.main(
|
|
||||||
new String[] {
|
|
||||||
"-isSparkSessionManaged",
|
|
||||||
Boolean.FALSE.toString(),
|
|
||||||
"-inputPath",
|
|
||||||
inputPath,
|
|
||||||
"-bipScorePath",
|
|
||||||
bipScoresPath,
|
|
||||||
"-resultTableName",
|
|
||||||
"eu.dnetlib.dhp.schema.oaf.Publication",
|
|
||||||
"-outputPath",
|
|
||||||
workingDir.toString() + "/actionSet"
|
|
||||||
});
|
|
||||||
|
|
||||||
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
|
|
||||||
|
|
||||||
JavaRDD<Publication> tmp = sc
|
|
||||||
.sequenceFile(workingDir.toString() + "/actionSet", Text.class, Text.class)
|
|
||||||
.map(value -> OBJECT_MAPPER.readValue(value._2().toString(), AtomicAction.class))
|
|
||||||
.map(aa -> ((Publication) aa.getPayload()));
|
|
||||||
|
|
||||||
Assertions.assertTrue(tmp.count() == 2);
|
|
||||||
|
|
||||||
Dataset<Publication> verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
|
|
||||||
verificationDataset.createOrReplaceTempView("publication");
|
|
||||||
|
|
||||||
Dataset<Row> execVerification = spark
|
|
||||||
.sql(
|
|
||||||
"Select p.id oaid, mes.id, mUnit.value from publication p " +
|
|
||||||
"lateral view explode(measures) m as mes " +
|
|
||||||
"lateral view explode(mes.unit) u as mUnit ");
|
|
||||||
|
|
||||||
Assertions.assertEquals(4, execVerification.count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
2,
|
|
||||||
execVerification.filter("oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb'").count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
2,
|
|
||||||
execVerification.filter("oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09'").count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
2,
|
|
||||||
execVerification.filter("id = 'influence'").count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
2,
|
|
||||||
execVerification.filter("id = 'popularity'").count());
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"1.47565045883e-08",
|
|
||||||
execVerification
|
|
||||||
.filter(
|
|
||||||
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
|
|
||||||
"and id = 'influence'")
|
|
||||||
.select("value")
|
|
||||||
.collectAsList()
|
|
||||||
.get(0)
|
|
||||||
.getString(0));
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"1.98956540239e-08",
|
|
||||||
execVerification
|
|
||||||
.filter(
|
|
||||||
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
|
|
||||||
"and id = 'influence'")
|
|
||||||
.select("value")
|
|
||||||
.collectAsList()
|
|
||||||
.get(0)
|
|
||||||
.getString(0));
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"0.282046161584",
|
|
||||||
execVerification
|
|
||||||
.filter(
|
|
||||||
"oaid = '50|acm_________::faed5b7a1bd8f51118d13ed29cfaee09' " +
|
|
||||||
"and id = 'popularity'")
|
|
||||||
.select("value")
|
|
||||||
.collectAsList()
|
|
||||||
.get(0)
|
|
||||||
.getString(0));
|
|
||||||
|
|
||||||
Assertions
|
|
||||||
.assertEquals(
|
|
||||||
"0.227515392",
|
|
||||||
execVerification
|
|
||||||
.filter(
|
|
||||||
"oaid = '50|355e65625b88::ffa5bad14f4adc0c9a15c00efbbccddb' " +
|
|
||||||
"and id = 'popularity'")
|
|
||||||
.select("value")
|
|
||||||
.collectAsList()
|
|
||||||
.get(0)
|
|
||||||
.getString(0));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -10,10 +10,11 @@ import java.io.Serializable;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
|
@ -100,8 +101,8 @@ public class EntityMergerTest implements Serializable {
|
||||||
assertEquals(pub_merged.getDateofacceptance().getValue(), "2018-09-30");
|
assertEquals(pub_merged.getDateofacceptance().getValue(), "2018-09-30");
|
||||||
|
|
||||||
// verify authors
|
// verify authors
|
||||||
assertEquals(pub_merged.getAuthor().size(), 9);
|
assertEquals(13, pub_merged.getAuthor().size());
|
||||||
assertEquals(AuthorMerger.countAuthorsPids(pub_merged.getAuthor()), 4);
|
assertEquals(4, AuthorMerger.countAuthorsPids(pub_merged.getAuthor()));
|
||||||
|
|
||||||
// verify title
|
// verify title
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
|
@ -7,7 +7,6 @@ import java.util.List;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
|
||||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.PairFunction;
|
import org.apache.spark.api.java.function.PairFunction;
|
||||||
|
@ -16,8 +15,8 @@ import org.apache.spark.rdd.RDD;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.hash.Hashing;
|
import com.google.common.hash.Hashing;
|
||||||
|
|
||||||
import eu.dnetlib.dedup.graph.ConnectedComponent;
|
import eu.dnetlib.dedup.graph.ConnectedComponent;
|
||||||
|
|
|
@ -10,7 +10,8 @@ import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
|
|
@ -4,14 +4,13 @@ import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||||
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Organization, Publication, Qualifier, Relation, Result, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{DataInfo, Dataset, Field, Instance, KeyValue, Oaf, Organization, Publication, Qualifier, Relation, Result, StructuredProperty}
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
import org.codehaus.jackson.map.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import org.json4s
|
import org.json4s
|
||||||
import org.json4s.DefaultFormats
|
import org.json4s.DefaultFormats
|
||||||
import org.json4s.jackson.JsonMethods.parse
|
import org.json4s.jackson.JsonMethods.parse
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.io.Source
|
|
||||||
|
|
||||||
|
|
||||||
case class HostedByItemType(id: String, officialname: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
|
case class HostedByItemType(id: String, officialname: String, issn: String, eissn: String, lissn: String, openAccess: Boolean) {}
|
||||||
|
@ -19,17 +18,11 @@ case class HostedByItemType(id: String, officialname: String, issn: String, eiss
|
||||||
case class DoiBoostAffiliation(PaperId:Long, AffiliationId:Long, GridId:Option[String], OfficialPage:Option[String], DisplayName:Option[String]){}
|
case class DoiBoostAffiliation(PaperId:Long, AffiliationId:Long, GridId:Option[String], OfficialPage:Option[String], DisplayName:Option[String]){}
|
||||||
|
|
||||||
object DoiBoostMappingUtil {
|
object DoiBoostMappingUtil {
|
||||||
def getUnknownCountry(): Qualifier = {
|
|
||||||
createQualifier("UNKNOWN","UNKNOWN","dnet:countries","dnet:countries")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def generateMAGAffiliationId(affId: String): String = {
|
def generateMAGAffiliationId(affId: String): String = {
|
||||||
s"20|microsoft___$SEPARATOR${DHPUtils.md5(affId)}"
|
s"20|microsoft___$SEPARATOR${DHPUtils.md5(affId)}"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(getClass)
|
val logger: Logger = LoggerFactory.getLogger(getClass)
|
||||||
|
|
||||||
//STATIC STRING
|
//STATIC STRING
|
||||||
|
|
|
@ -39,33 +39,38 @@ object SparkGenerateDOIBoostActionSet {
|
||||||
val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath")
|
val dbaffiliationRelationPath = parser.get("dbaffiliationRelationPath")
|
||||||
val dbOrganizationPath = parser.get("dbOrganizationPath")
|
val dbOrganizationPath = parser.get("dbOrganizationPath")
|
||||||
val workingDirPath = parser.get("targetPath")
|
val workingDirPath = parser.get("targetPath")
|
||||||
|
val sequenceFilePath = parser.get("sFilePath")
|
||||||
|
|
||||||
spark.read.load(dbDatasetPath).as[OafDataset]
|
val asDataset = spark.read.load(dbDatasetPath).as[OafDataset]
|
||||||
.map(d =>DoiBoostMappingUtil.fixResult(d))
|
.map(d =>DoiBoostMappingUtil.fixResult(d))
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Overwrite).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
spark.read.load(dbPublicationPath).as[Publication]
|
val asPublication =spark.read.load(dbPublicationPath).as[Publication]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
spark.read.load(dbOrganizationPath).as[Organization]
|
val asOrganization = spark.read.load(dbOrganizationPath).as[Organization]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
|
|
||||||
spark.read.load(crossRefRelation).as[Relation]
|
val asCRelation = spark.read.load(crossRefRelation).as[Relation]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
spark.read.load(dbaffiliationRelationPath).as[Relation]
|
val asRelAffiliation = spark.read.load(dbaffiliationRelationPath).as[Relation]
|
||||||
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
.map(d=>DoiBoostMappingUtil.toActionSet(d))(Encoders.tuple(Encoders.STRING, Encoders.STRING))
|
||||||
.write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
// .write.mode(SaveMode.Append).save(s"$workingDirPath/actionSet")
|
||||||
|
|
||||||
|
|
||||||
val d: Dataset[(String, String)] =spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
|
|
||||||
|
|
||||||
d.rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingDirPath/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
|
||||||
|
val d: Dataset[(String, String)] = asDataset.union(asPublication).union(asOrganization).union(asCRelation).union(asRelAffiliation)
|
||||||
|
|
||||||
|
// spark.read.load(s"$workingDirPath/actionSet").as[(String,String)]
|
||||||
|
|
||||||
|
d.rdd.repartition(6000).map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$sequenceFilePath", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package eu.dnetlib.doiboost
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset}
|
import eu.dnetlib.dhp.schema.oaf.{Organization, Publication, Relation, Dataset => OafDataset}
|
||||||
import eu.dnetlib.doiboost.mag.ConversionUtil
|
import eu.dnetlib.doiboost.mag.ConversionUtil
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
@ -30,7 +31,7 @@ object SparkGenerateDoiBoost {
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
|
|
||||||
val hostedByMapPath = parser.get("hostedByMapPath")
|
val hostedByMapPath = parser.get("hostedByMapPath")
|
||||||
val workingDirPath = parser.get("workingDirPath")
|
val workingDirPath = parser.get("workingPath")
|
||||||
|
|
||||||
|
|
||||||
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
implicit val mapEncoderPub: Encoder[Publication] = Encoders.kryo[Publication]
|
||||||
|
@ -132,7 +133,7 @@ object SparkGenerateDoiBoost {
|
||||||
o.setLegalname(DoiBoostMappingUtil.asField(affiliation.DisplayName.get))
|
o.setLegalname(DoiBoostMappingUtil.asField(affiliation.DisplayName.get))
|
||||||
if (affiliation.OfficialPage.isDefined)
|
if (affiliation.OfficialPage.isDefined)
|
||||||
o.setWebsiteurl(DoiBoostMappingUtil.asField(affiliation.OfficialPage.get))
|
o.setWebsiteurl(DoiBoostMappingUtil.asField(affiliation.OfficialPage.get))
|
||||||
o.setCountry(DoiBoostMappingUtil.getUnknownCountry())
|
o.setCountry(ModelConstants.UNKNOWN_COUNTRY)
|
||||||
o
|
o
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
@ -2,18 +2,16 @@
|
||||||
package eu.dnetlib.doiboost.crossref;
|
package eu.dnetlib.doiboost.crossref;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.zip.Inflater;
|
import java.util.zip.Inflater;
|
||||||
|
|
||||||
import org.apache.commons.codec.binary.Base64;
|
import org.apache.commons.codec.binary.Base64;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.IntWritable;
|
import org.apache.hadoop.io.IntWritable;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
@ -30,34 +28,45 @@ public class CrossrefImporter {
|
||||||
|
|
||||||
parser.parseArgument(args);
|
parser.parseArgument(args);
|
||||||
|
|
||||||
final String hdfsuri = parser.get("namenode");
|
final String namenode = parser.get("namenode");
|
||||||
System.out.println("HDFS URI" + hdfsuri);
|
System.out.println("namenode: " + namenode);
|
||||||
Path hdfswritepath = new Path(parser.get("targetPath"));
|
|
||||||
System.out.println("TargetPath: " + hdfsuri);
|
|
||||||
|
|
||||||
final Long timestamp = StringUtils.isNotBlank(parser.get("timestamp"))
|
Path targetPath = new Path(parser.get("targetPath"));
|
||||||
? Long.parseLong(parser.get("timestamp"))
|
System.out.println("targetPath: " + targetPath);
|
||||||
: -1;
|
|
||||||
|
|
||||||
if (timestamp > 0)
|
final Long timestamp = Optional
|
||||||
System.out.println("Timestamp added " + timestamp);
|
.ofNullable(parser.get("timestamp"))
|
||||||
|
.map(s -> {
|
||||||
|
try {
|
||||||
|
return Long.parseLong(s);
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
return -1L;
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.orElse(-1L);
|
||||||
|
System.out.println("timestamp: " + timestamp);
|
||||||
|
|
||||||
|
final String esServer = parser.get("esServer");
|
||||||
|
System.out.println("esServer: " + esServer);
|
||||||
|
|
||||||
|
final String esIndex = parser.get("esIndex");
|
||||||
|
System.out.println("esIndex: " + esIndex);
|
||||||
|
|
||||||
// ====== Init HDFS File System Object
|
// ====== Init HDFS File System Object
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// Set FileSystem URI
|
// Set FileSystem URI
|
||||||
conf.set("fs.defaultFS", hdfsuri);
|
conf.set("fs.defaultFS", namenode);
|
||||||
// Because of Maven
|
// Because of Maven
|
||||||
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
|
||||||
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
|
||||||
|
|
||||||
ESClient client = timestamp > 0
|
// "ip-90-147-167-25.ct1.garrservices.it", "crossref"
|
||||||
? new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref", timestamp)
|
final ESClient client = new ESClient(esServer, esIndex, timestamp);
|
||||||
: new ESClient("ip-90-147-167-25.ct1.garrservices.it", "crossref");
|
|
||||||
|
|
||||||
try (SequenceFile.Writer writer = SequenceFile
|
try (SequenceFile.Writer writer = SequenceFile
|
||||||
.createWriter(
|
.createWriter(
|
||||||
conf,
|
conf,
|
||||||
SequenceFile.Writer.file(hdfswritepath),
|
SequenceFile.Writer.file(targetPath),
|
||||||
SequenceFile.Writer.keyClass(IntWritable.class),
|
SequenceFile.Writer.keyClass(IntWritable.class),
|
||||||
SequenceFile.Writer.valueClass(Text.class))) {
|
SequenceFile.Writer.valueClass(Text.class))) {
|
||||||
|
|
||||||
|
@ -74,8 +83,7 @@ public class CrossrefImporter {
|
||||||
end = System.currentTimeMillis();
|
end = System.currentTimeMillis();
|
||||||
final float time = (end - start) / 1000.0F;
|
final float time = (end - start) / 1000.0F;
|
||||||
System.out
|
System.out
|
||||||
.println(
|
.println(String.format("Imported %s records last 100000 imported in %s seconds", i, time));
|
||||||
String.format("Imported %d records last 100000 imported in %f seconds", i, time));
|
|
||||||
start = System.currentTimeMillis();
|
start = System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
|
|
||||||
package eu.dnetlib.doiboost.crossref;
|
package eu.dnetlib.doiboost.crossref;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.http.HttpHeaders;
|
||||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||||
import org.apache.http.client.methods.HttpPost;
|
import org.apache.http.client.methods.HttpPost;
|
||||||
import org.apache.http.entity.StringEntity;
|
import org.apache.http.entity.StringEntity;
|
||||||
|
@ -17,13 +17,17 @@ import org.slf4j.LoggerFactory;
|
||||||
import com.jayway.jsonpath.JsonPath;
|
import com.jayway.jsonpath.JsonPath;
|
||||||
|
|
||||||
public class ESClient implements Iterator<String> {
|
public class ESClient implements Iterator<String> {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ESClient.class);
|
|
||||||
|
|
||||||
static final String blobPath = "$.hits[*].hits[*]._source.blob";
|
private static final String BLOB_PATH = "$.hits.hits[*]._source.blob";
|
||||||
static final String scrollIdPath = "$._scroll_id";
|
private static final String SCROLL_ID_PATH = "$._scroll_id";
|
||||||
static final String JSON_NO_TS = "{\"size\":1000}";
|
private static final String JSON_NO_TS = "{\"size\":1000}";
|
||||||
static final String JSON_WITH_TS = "{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}";
|
private static final String JSON_WITH_TS = "{\"size\":1000, \"query\":{\"range\":{\"timestamp\":{\"gte\":%d}}}}";
|
||||||
static final String JSON_SCROLL = "{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}";
|
private static final String JSON_SCROLL = "{\"scroll_id\":\"%s\",\"scroll\" : \"1m\"}";
|
||||||
|
|
||||||
|
public static final String APPLICATION_JSON = "application/json";
|
||||||
|
|
||||||
|
public static final String ES_SEARCH_URL = "http://%s:9200/%s/_search?scroll=1m";
|
||||||
|
public static final String ES_SCROLL_URL = "http://%s:9200/_search/scroll";
|
||||||
|
|
||||||
private final String scrollId;
|
private final String scrollId;
|
||||||
|
|
||||||
|
@ -31,47 +35,30 @@ public class ESClient implements Iterator<String> {
|
||||||
|
|
||||||
private final String esHost;
|
private final String esHost;
|
||||||
|
|
||||||
public ESClient(final String esHost, final String esIndex) throws IOException {
|
public ESClient(final String esHost, final String esIndex, final long timestamp) {
|
||||||
|
|
||||||
this.esHost = esHost;
|
this.esHost = esHost;
|
||||||
final String body = getResponse(
|
|
||||||
String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex), JSON_NO_TS);
|
|
||||||
scrollId = getJPathString(scrollIdPath, body);
|
|
||||||
buffer = getBlobs(body);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ESClient(final String esHost, final String esIndex, final long timestamp)
|
final String body = timestamp > 0
|
||||||
throws IOException {
|
? getResponse(String.format(ES_SEARCH_URL, esHost, esIndex), String.format(JSON_WITH_TS, timestamp))
|
||||||
this.esHost = esHost;
|
: getResponse(String.format(ES_SEARCH_URL, esHost, esIndex), JSON_NO_TS);
|
||||||
final String body = getResponse(
|
scrollId = getJPathString(SCROLL_ID_PATH, body);
|
||||||
String.format("http://%s:9200/%s/_search?scroll=1m", esHost, esIndex),
|
|
||||||
String.format(JSON_WITH_TS, timestamp));
|
|
||||||
scrollId = getJPathString(scrollIdPath, body);
|
|
||||||
buffer = getBlobs(body);
|
buffer = getBlobs(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getResponse(final String url, final String json) {
|
private String getResponse(final String url, final String json) {
|
||||||
CloseableHttpClient client = HttpClients.createDefault();
|
try (CloseableHttpClient client = HttpClients.createDefault()) {
|
||||||
try {
|
|
||||||
|
|
||||||
HttpPost httpPost = new HttpPost(url);
|
HttpPost httpPost = new HttpPost(url);
|
||||||
if (json != null) {
|
if (json != null) {
|
||||||
StringEntity entity = new StringEntity(json);
|
StringEntity entity = new StringEntity(json);
|
||||||
httpPost.setEntity(entity);
|
httpPost.setEntity(entity);
|
||||||
httpPost.setHeader("Accept", "application/json");
|
httpPost.setHeader(HttpHeaders.ACCEPT, APPLICATION_JSON);
|
||||||
httpPost.setHeader("Content-type", "application/json");
|
httpPost.setHeader(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON);
|
||||||
|
}
|
||||||
|
try (CloseableHttpResponse response = client.execute(httpPost)) {
|
||||||
|
return IOUtils.toString(response.getEntity().getContent());
|
||||||
}
|
}
|
||||||
CloseableHttpResponse response = client.execute(httpPost);
|
|
||||||
|
|
||||||
return IOUtils.toString(response.getEntity().getContent());
|
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
throw new RuntimeException("Error on executing request ", e);
|
throw new RuntimeException("Error on executing request ", e);
|
||||||
} finally {
|
|
||||||
try {
|
|
||||||
client.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException("Unable to close client ", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +74,7 @@ public class ESClient implements Iterator<String> {
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> getBlobs(final String body) {
|
private List<String> getBlobs(final String body) {
|
||||||
final List<String> res = JsonPath.read(body, "$.hits.hits[*]._source.blob");
|
final List<String> res = JsonPath.read(body, BLOB_PATH);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -102,11 +89,11 @@ public class ESClient implements Iterator<String> {
|
||||||
if (buffer.isEmpty()) {
|
if (buffer.isEmpty()) {
|
||||||
|
|
||||||
final String json_param = String.format(JSON_SCROLL, scrollId);
|
final String json_param = String.format(JSON_SCROLL, scrollId);
|
||||||
final String body = getResponse(String.format("http://%s:9200/_search/scroll", esHost), json_param);
|
final String body = getResponse(String.format(ES_SCROLL_URL, esHost), json_param);
|
||||||
try {
|
try {
|
||||||
buffer = getBlobs(body);
|
buffer = getBlobs(body);
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
logger.error("Error on get next page: body:" + body);
|
System.out.println("Error on get next page: body:" + body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nextItem;
|
return nextItem;
|
||||||
|
|
|
@ -11,7 +11,7 @@ import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
||||||
object SparkPreProcessMAG {
|
object SparkProcessMAG {
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
|
|
||||||
val logger: Logger = LoggerFactory.getLogger(getClass)
|
val logger: Logger = LoggerFactory.getLogger(getClass)
|
|
@ -1,11 +1,11 @@
|
||||||
package eu.dnetlib.doiboost.orcid
|
package eu.dnetlib.doiboost.orcid
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
|
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Publication}
|
||||||
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
import eu.dnetlib.dhp.schema.orcid.OrcidDOI
|
||||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
import eu.dnetlib.doiboost.DoiBoostMappingUtil
|
||||||
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier}
|
import eu.dnetlib.doiboost.DoiBoostMappingUtil.{ORCID, PID_TYPES, createSP, generateDataInfo, generateIdentifier}
|
||||||
import org.apache.commons.lang.StringUtils
|
import org.apache.commons.lang.StringUtils
|
||||||
import org.codehaus.jackson.map.ObjectMapper
|
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
@ -18,7 +18,7 @@ case class ORCIDItem(oid:String,name:String,surname:String,creditName:String,err
|
||||||
case class ORCIDElement(doi:String, authors:List[ORCIDItem]) {}
|
case class ORCIDElement(doi:String, authors:List[ORCIDItem]) {}
|
||||||
object ORCIDToOAF {
|
object ORCIDToOAF {
|
||||||
val logger: Logger = LoggerFactory.getLogger(ORCIDToOAF.getClass)
|
val logger: Logger = LoggerFactory.getLogger(ORCIDToOAF.getClass)
|
||||||
val mapper = new ObjectMapper
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
def isJsonValid(inputStr: String): Boolean = {
|
def isJsonValid(inputStr: String): Boolean = {
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
|
|
|
@ -3,10 +3,8 @@ package eu.dnetlib.doiboost.orcid;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
@ -18,11 +16,9 @@ import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
import org.apache.http.impl.client.HttpClients;
|
import org.apache.http.impl.client.HttpClients;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
import org.apache.spark.api.java.JavaPairRDD;
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.function.Function;
|
import org.apache.spark.api.java.function.Function;
|
||||||
import org.apache.spark.util.LongAccumulator;
|
import org.apache.spark.util.LongAccumulator;
|
||||||
import org.mortbay.log.Log;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -36,7 +32,7 @@ public class SparkDownloadOrcidAuthors {
|
||||||
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||||
static final String lastUpdate = "2020-09-29 00:00:00";
|
static final String lastUpdate = "2020-09-29 00:00:00";
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException, Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
||||||
IOUtils
|
IOUtils
|
||||||
|
@ -51,12 +47,12 @@ public class SparkDownloadOrcidAuthors {
|
||||||
.orElse(Boolean.TRUE);
|
.orElse(Boolean.TRUE);
|
||||||
logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
logger.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
||||||
final String workingPath = parser.get("workingPath");
|
final String workingPath = parser.get("workingPath");
|
||||||
logger.info("workingPath: ", workingPath);
|
logger.info("workingPath: {}", workingPath);
|
||||||
final String outputPath = parser.get("outputPath");
|
final String outputPath = parser.get("outputPath");
|
||||||
logger.info("outputPath: ", outputPath);
|
logger.info("outputPath: {}", outputPath);
|
||||||
final String token = parser.get("token");
|
final String token = parser.get("token");
|
||||||
final String lambdaFileName = parser.get("lambdaFileName");
|
final String lambdaFileName = parser.get("lambdaFileName");
|
||||||
logger.info("lambdaFileName: ", lambdaFileName);
|
logger.info("lambdaFileName: {}", lambdaFileName);
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
|
@ -171,8 +167,8 @@ public class SparkDownloadOrcidAuthors {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean isModified(String orcidId, String modifiedDate) {
|
private static boolean isModified(String orcidId, String modifiedDate) {
|
||||||
Date modifiedDateDt = null;
|
Date modifiedDateDt;
|
||||||
Date lastUpdateDt = null;
|
Date lastUpdateDt;
|
||||||
try {
|
try {
|
||||||
if (modifiedDate.length() != 19) {
|
if (modifiedDate.length() != 19) {
|
||||||
modifiedDate = modifiedDate.substring(0, 19);
|
modifiedDate = modifiedDate.substring(0, 19);
|
||||||
|
|
|
@ -5,5 +5,6 @@
|
||||||
{"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true},
|
{"paramName": "cr", "paramLongName":"crossRefRelation", "paramDescription": "the UnpayWall Publication Path", "paramRequired": true},
|
||||||
{"paramName": "da", "paramLongName":"dbaffiliationRelationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
|
{"paramName": "da", "paramLongName":"dbaffiliationRelationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
|
||||||
{"paramName": "do", "paramLongName":"dbOrganizationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
|
{"paramName": "do", "paramLongName":"dbOrganizationPath", "paramDescription": "the MAG Publication Path", "paramRequired": true},
|
||||||
{"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true}
|
{"paramName": "w", "paramLongName":"targetPath", "paramDescription": "the Working Path", "paramRequired": true},
|
||||||
|
{"paramName": "sp", "paramLongName":"sFilePath", "paramDescription": "the Sequence file Path", "paramRequired": true}
|
||||||
]
|
]
|
||||||
|
|
|
@ -3,5 +3,5 @@
|
||||||
{"paramName": "hb", "paramLongName":"hostedByMapPath", "paramDescription": "the hosted By Map Path", "paramRequired": true},
|
{"paramName": "hb", "paramLongName":"hostedByMapPath", "paramDescription": "the hosted By Map Path", "paramRequired": true},
|
||||||
{"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true},
|
{"paramName": "ap", "paramLongName":"affiliationPath", "paramDescription": "the Affliation Path", "paramRequired": true},
|
||||||
{"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true},
|
{"paramName": "pa", "paramLongName":"paperAffiliationPath", "paramDescription": "the paperAffiliation Path", "paramRequired": true},
|
||||||
{"paramName": "w", "paramLongName":"workingDirPath", "paramDescription": "the Working Path", "paramRequired": true}
|
{"paramName": "w", "paramLongName":"workingPath", "paramDescription": "the Working Path", "paramRequired": true}
|
||||||
]
|
]
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
[
|
[
|
||||||
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true},
|
{"paramName":"t", "paramLongName":"targetPath", "paramDescription": "the path of the sequencial file to write", "paramRequired": true},
|
||||||
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true},
|
{"paramName":"n", "paramLongName":"namenode", "paramDescription": "the hive metastore uris", "paramRequired": true},
|
||||||
{"paramName":"ts", "paramLongName":"timestamp", "paramDescription": "timestamp", "paramRequired": false}
|
{"paramName":"ts", "paramLongName":"timestamp", "paramDescription": "timestamp", "paramRequired": false},
|
||||||
|
{"paramName":"ess", "paramLongName":"esServer", "paramDescription": "elasticsearch server url", "paramRequired": true},
|
||||||
|
{"paramName":"esi", "paramLongName":"esIndex", "paramDescription": "elasticsearch index name", "paramRequired": true}
|
||||||
]
|
]
|
|
@ -15,6 +15,10 @@
|
||||||
<name>oozie.action.sharelib.for.spark</name>
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
<value>spark2</value>
|
<value>spark2</value>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hive_metastore_uris</name>
|
<name>hive_metastore_uris</name>
|
||||||
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
@ -23,36 +27,16 @@
|
||||||
<name>spark2YarnHistoryServerAddress</name>
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
|
||||||
<name>spark2ExtraListeners</name>
|
|
||||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>spark2SqlQueryExecutionListeners</name>
|
|
||||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>oozie.launcher.mapreduce.user.classpath.first</name>
|
|
||||||
<value>true</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>sparkExecutorNumber</name>
|
|
||||||
<value>4</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
<property>
|
||||||
<name>spark2EventLogDir</name>
|
<name>spark2EventLogDir</name>
|
||||||
<value>/user/spark/spark2ApplicationHistory</value>
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkDriverMemory</name>
|
<name>spark2ExtraListeners</name>
|
||||||
<value>15G</value>
|
<value>"com.cloudera.spark.lineage.NavigatorAppListener"</value>
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>sparkExecutorMemory</name>
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
<value>6G</value>
|
<value>"com.cloudera.spark.lineage.NavigatorQueryListener"</value>
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>sparkExecutorCores</name>
|
|
||||||
<value>1</value>
|
|
||||||
</property>
|
</property>
|
||||||
</configuration>
|
</configuration>
|
|
@ -0,0 +1,335 @@
|
||||||
|
<workflow-app name="Generate DOIBoost ActionSet" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<description>memory for driver process</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorIntersectionMemory</name>
|
||||||
|
<description>memory for individual executor</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<description>number of cores used by single executor</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
|
||||||
|
<!-- Itersection Parameters -->
|
||||||
|
<property>
|
||||||
|
<name>workingPath</name>
|
||||||
|
<description>the working Path</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>hostedByMapPath</name>
|
||||||
|
<description>the hostedByMap Path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the Path of the sequence file action set</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
|
||||||
|
<!-- Crossref Parameters -->
|
||||||
|
<property>
|
||||||
|
<name>inputPathCrossref</name>
|
||||||
|
<description>the Crossref input path</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>crossrefTimestamp</name>
|
||||||
|
<description>Timestamp for the Crossref incremental Harvesting</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>esServer</name>
|
||||||
|
<description>elasticsearch server url for the Crossref Harvesting</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>esIndex</name>
|
||||||
|
<description>elasticsearch index name for the Crossref Harvesting</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<!-- MAG Parameters -->
|
||||||
|
<property>
|
||||||
|
<name>inputPathMAG</name>
|
||||||
|
<description>the MAG working path</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
|
||||||
|
<!-- UnpayWall Parameters -->
|
||||||
|
<property>
|
||||||
|
<name>inputPathUnpayWall</name>
|
||||||
|
<description>the UnpayWall working path</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<!-- ORCID Parameters -->
|
||||||
|
<property>
|
||||||
|
<name>inputPathOrcid</name>
|
||||||
|
<description>the ORCID working path</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="resume_from"/>
|
||||||
|
|
||||||
|
<decision name="resume_from">
|
||||||
|
<switch>
|
||||||
|
<case to="ConvertCrossrefToOAF">${wf:conf('resumeFrom') eq 'ConvertCrossrefToOAF'}</case>
|
||||||
|
<case to="ResetMagWorkingPath">${wf:conf('resumeFrom') eq 'ResetMagWorkingPath'}</case>
|
||||||
|
<case to="ProcessMAG">${wf:conf('resumeFrom') eq 'PreprocessMag'}</case>
|
||||||
|
<case to="ProcessUW">${wf:conf('resumeFrom') eq 'PreprocessUW'}</case>
|
||||||
|
<case to="ProcessORCID">${wf:conf('resumeFrom') eq 'PreprocessORCID'}</case>
|
||||||
|
<case to="CreateDOIBoost">${wf:conf('resumeFrom') eq 'CreateDOIBoost'}</case>
|
||||||
|
<case to="GenerateActionSet">${wf:conf('resumeFrom') eq 'GenerateActionSet'}</case>
|
||||||
|
<default to="ImportCrossRef"/>
|
||||||
|
</switch>
|
||||||
|
</decision>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="ImportCrossRef">
|
||||||
|
<java>
|
||||||
|
<main-class>eu.dnetlib.doiboost.crossref.CrossrefImporter</main-class>
|
||||||
|
<arg>--targetPath</arg><arg>${inputPathCrossref}/index_update</arg>
|
||||||
|
<arg>--namenode</arg><arg>${nameNode}</arg>
|
||||||
|
<arg>--esServer</arg><arg>${esServer}</arg>
|
||||||
|
<arg>--esIndex</arg><arg>${esIndex}</arg>
|
||||||
|
<arg>--timestamp</arg><arg>${crossrefTimestamp}</arg>
|
||||||
|
</java>
|
||||||
|
<ok to="GenerateCrossrefDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<!-- CROSSREF SECTION -->
|
||||||
|
|
||||||
|
<action name="GenerateCrossrefDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>GenerateCrossrefDataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.crossref.CrossrefDataset</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--workingPath</arg><arg>${inputPathCrossref}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="RenameDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="RenameDataset">
|
||||||
|
<fs>
|
||||||
|
<delete path="${inputPathCrossref}/crossref_ds"/>
|
||||||
|
<move source="${inputPathCrossref}/crossref_ds_updated"
|
||||||
|
target="${inputPathCrossref}/crossref_ds"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="ConvertCrossrefToOAF"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="ConvertCrossrefToOAF">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>ConvertCrossrefToOAF</name>
|
||||||
|
<class>eu.dnetlib.doiboost.crossref.SparkMapDumpIntoOAF</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${inputPathCrossref}/crossref_ds</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ResetMagWorkingPath"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
<!-- MAG SECTION -->
|
||||||
|
<action name="ResetMagWorkingPath">
|
||||||
|
<fs>
|
||||||
|
<delete path="${inputPathMAG}/dataset"/>
|
||||||
|
<delete path="${inputPathMAG}/process"/>
|
||||||
|
<delete path="${inputPathMAG}/dataset"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="ConvertMagToDataset"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ConvertMagToDataset">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Convert Mag to Dataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.mag.SparkImportMagIntoDataset</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${inputPathMAG}/input</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${inputPathMAG}/dataset</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ProcessMAG"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="ProcessMAG">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Convert Mag to OAF Dataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.mag.SparkProcessMAG</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${inputPathMAG}/dataset</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${inputPathMAG}/process</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ProcessUW"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<!-- UnpayWall SECTION -->
|
||||||
|
|
||||||
|
<action name="ProcessUW">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Convert UnpayWall to Dataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.uw.SparkMapUnpayWallToOAF</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${inputPathUnpayWall}/uw_extracted</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="ProcessORCID"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<!-- ORCID SECTION -->
|
||||||
|
<action name="ProcessORCID">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Convert ORCID to Dataset</name>
|
||||||
|
<class>eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${inputPathOrcid}</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="CreateDOIBoost"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<!-- INTERSECTION SECTION-->
|
||||||
|
<action name="CreateDOIBoost">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Create DOIBoost Infospace</name>
|
||||||
|
<class>eu.dnetlib.doiboost.SparkGenerateDoiBoost</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorIntersectionMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--hostedByMapPath</arg><arg>${hostedByMapPath}</arg>
|
||||||
|
<arg>--affiliationPath</arg><arg>${inputPathMAG}/process/Affiliations</arg>
|
||||||
|
<arg>--paperAffiliationPath</arg><arg>${inputPathMAG}/process/PaperAuthorAffiliations</arg>
|
||||||
|
<arg>--workingPath</arg><arg>${workingPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="GenerateActionSet"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
<action name="GenerateActionSet">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn-cluster</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Generate DOIBoost ActionSet</name>
|
||||||
|
<class>eu.dnetlib.doiboost.SparkGenerateDOIBoostActionSet</class>
|
||||||
|
<jar>dhp-doiboost-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.sql.shuffle.partitions=3840
|
||||||
|
${sparkExtraOPT}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--dbPublicationPath</arg><arg>${workingPath}/doiBoostPublicationFiltered</arg>
|
||||||
|
<arg>--dbDatasetPath</arg><arg>${workingPath}/crossrefDataset</arg>
|
||||||
|
<arg>--crossRefRelation</arg><arg>${workingPath}/crossrefRelation</arg>
|
||||||
|
<arg>--dbaffiliationRelationPath</arg><arg>${workingPath}/doiBoostPublicationAffiliation</arg>
|
||||||
|
<arg>--dbOrganizationPath</arg><arg>${workingPath}/doiBoostOrganization</arg>
|
||||||
|
<arg>--targetPath</arg><arg>${workingPath}/actionDataSet</arg>
|
||||||
|
<arg>--sFilePath</arg><arg>${outputPath}</arg>
|
||||||
|
<arg>--master</arg><arg>yarn-cluster</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
</workflow-app>
|
|
@ -1,9 +1,9 @@
|
||||||
package eu.dnetlib.doiboost.orcid
|
package eu.dnetlib.doiboost.orcid
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.schema.oaf.Publication
|
import eu.dnetlib.dhp.schema.oaf.Publication
|
||||||
import eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF.getClass
|
import eu.dnetlib.doiboost.orcid.SparkConvertORCIDToOAF.getClass
|
||||||
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||||
import org.codehaus.jackson.map.ObjectMapper
|
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
|
@ -16,7 +16,10 @@ import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
public class CleaningFunctions {
|
public class CleaningFunctions {
|
||||||
|
|
||||||
public static final String DOI_URL_PREFIX_REGEX = "(^http(s?):\\/\\/)(((dx\\.)?doi\\.org)|(handle\\.test\\.datacite\\.org))\\/";
|
public static final String DOI_URL_PREFIX_REGEX = "(^http(s?):\\/\\/)(((dx\\.)?doi\\.org)|(handle\\.test\\.datacite\\.org))\\/";
|
||||||
public static final String ORCID_PREFIX_REGEX = "^http(s?):\\/\\/orcid\\.org\\/";
|
|
||||||
|
public static final String ORCID_CLEANING_REGEX = ".*([0-9]{4}).*[-–—−=].*([0-9]{4}).*[-–—−=].*([0-9]{4}).*[-–—−=].*([0-9x]{4})";
|
||||||
|
public static final int ORCID_LEN = 19;
|
||||||
|
|
||||||
public static final String CLEANING_REGEX = "(?:\\n|\\r|\\t)";
|
public static final String CLEANING_REGEX = "(?:\\n|\\r|\\t)";
|
||||||
|
|
||||||
public static final Set<String> PID_BLACKLIST = new HashSet<>();
|
public static final Set<String> PID_BLACKLIST = new HashSet<>();
|
||||||
|
@ -86,7 +89,7 @@ public class CleaningFunctions {
|
||||||
} else if (value instanceof Organization) {
|
} else if (value instanceof Organization) {
|
||||||
Organization o = (Organization) value;
|
Organization o = (Organization) value;
|
||||||
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
|
if (Objects.isNull(o.getCountry()) || StringUtils.isBlank(o.getCountry().getClassid())) {
|
||||||
o.setCountry(qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_COUNTRY_TYPE));
|
o.setCountry(ModelConstants.UNKNOWN_COUNTRY);
|
||||||
}
|
}
|
||||||
} else if (value instanceof Relation) {
|
} else if (value instanceof Relation) {
|
||||||
// nothing to clean here
|
// nothing to clean here
|
||||||
|
@ -153,12 +156,14 @@ public class CleaningFunctions {
|
||||||
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
|
if (Objects.isNull(r.getResourcetype()) || StringUtils.isBlank(r.getResourcetype().getClassid())) {
|
||||||
r
|
r
|
||||||
.setResourcetype(
|
.setResourcetype(
|
||||||
qualifier("UNKNOWN", "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
|
qualifier(ModelConstants.UNKNOWN, "Unknown", ModelConstants.DNET_DATA_CITE_RESOURCE));
|
||||||
}
|
}
|
||||||
if (Objects.nonNull(r.getInstance())) {
|
if (Objects.nonNull(r.getInstance())) {
|
||||||
for (Instance i : r.getInstance()) {
|
for (Instance i : r.getInstance()) {
|
||||||
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
|
if (Objects.isNull(i.getAccessright()) || StringUtils.isBlank(i.getAccessright().getClassid())) {
|
||||||
i.setAccessright(qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
i
|
||||||
|
.setAccessright(
|
||||||
|
qualifier(ModelConstants.UNKNOWN, "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||||
}
|
}
|
||||||
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
|
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
|
||||||
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
|
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
|
||||||
|
@ -173,7 +178,7 @@ public class CleaningFunctions {
|
||||||
if (Objects.isNull(bestaccessrights)) {
|
if (Objects.isNull(bestaccessrights)) {
|
||||||
r
|
r
|
||||||
.setBestaccessright(
|
.setBestaccessright(
|
||||||
qualifier("UNKNOWN", "not available", ModelConstants.DNET_ACCESS_MODES));
|
qualifier(ModelConstants.UNKNOWN, "not available", ModelConstants.DNET_ACCESS_MODES));
|
||||||
} else {
|
} else {
|
||||||
r.setBestaccessright(bestaccessrights);
|
r.setBestaccessright(bestaccessrights);
|
||||||
}
|
}
|
||||||
|
@ -211,14 +216,31 @@ public class CleaningFunctions {
|
||||||
.map(Qualifier::getClassid)
|
.map(Qualifier::getClassid)
|
||||||
.orElse(""))
|
.orElse(""))
|
||||||
.orElse("");
|
.orElse("");
|
||||||
if (pidProvenance.equals(ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY)) {
|
if (p
|
||||||
p.getQualifier().setClassid(ModelConstants.ORCID);
|
.getQualifier()
|
||||||
} else {
|
.getClassid()
|
||||||
p.getQualifier().setClassid(ModelConstants.ORCID_PENDING);
|
.toLowerCase()
|
||||||
|
.contains(ModelConstants.ORCID)) {
|
||||||
|
if (pidProvenance
|
||||||
|
.equals(ModelConstants.SYSIMPORT_CROSSWALK_ENTITYREGISTRY)) {
|
||||||
|
p.getQualifier().setClassid(ModelConstants.ORCID);
|
||||||
|
} else {
|
||||||
|
p.getQualifier().setClassid(ModelConstants.ORCID_PENDING);
|
||||||
|
}
|
||||||
|
final String orcid = p
|
||||||
|
.getValue()
|
||||||
|
.trim()
|
||||||
|
.toLowerCase()
|
||||||
|
.replaceAll(ORCID_CLEANING_REGEX, "$1-$2-$3-$4");
|
||||||
|
if (orcid.length() == ORCID_LEN) {
|
||||||
|
p.setValue(orcid);
|
||||||
|
} else {
|
||||||
|
p.setValue("");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
p.setValue(p.getValue().trim().replaceAll(ORCID_PREFIX_REGEX, ""));
|
|
||||||
return p;
|
return p;
|
||||||
})
|
})
|
||||||
|
.filter(p -> StringUtils.isNotBlank(p.getValue()))
|
||||||
.collect(
|
.collect(
|
||||||
Collectors
|
Collectors
|
||||||
.toMap(
|
.toMap(
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.spark.sql.expressions.Aggregator;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.jayway.jsonpath.Configuration;
|
import com.jayway.jsonpath.Configuration;
|
||||||
import com.jayway.jsonpath.DocumentContext;
|
import com.jayway.jsonpath.DocumentContext;
|
||||||
|
@ -44,7 +45,8 @@ public class GroupEntitiesAndRelationsSparkJob {
|
||||||
|
|
||||||
private final static String SOURCE_JPATH = "$.source";
|
private final static String SOURCE_JPATH = "$.source";
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.SequenceFile;
|
import org.apache.hadoop.io.SequenceFile;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ public class MigrateDbEntitiesApplicationTest {
|
||||||
|
|
||||||
private MigrateDbEntitiesApplication app;
|
private MigrateDbEntitiesApplication app;
|
||||||
|
|
||||||
@Mock
|
@Mock(lenient = true)
|
||||||
private ResultSet rs;
|
private ResultSet rs;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
|
|
|
@ -6,29 +6,29 @@
|
||||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||||
<dri:objIdentifier>r3f52792889d::000051aa1f61d77d2c0b340091f8024e</dri:objIdentifier>
|
<dri:objIdentifier>r3f52792889d::00002412cb25f2f3047712d00ab2c8eb</dri:objIdentifier>
|
||||||
<dri:recordIdentifier>textgrid:q9cv.0</dri:recordIdentifier>
|
<dri:recordIdentifier>hdl:11858/00-1734-0000-0003-EE73-2</dri:recordIdentifier>
|
||||||
<dri:dateOfCollection>2020-11-17T09:34:11.128+01:00</dri:dateOfCollection>
|
<dri:dateOfCollection>2020-12-16T10:04:03.148Z</dri:dateOfCollection>
|
||||||
<oaf:datasourceprefix>r3f52792889d</oaf:datasourceprefix>
|
<oaf:datasourceprefix>r3f52792889d</oaf:datasourceprefix>
|
||||||
<identifier xmlns="http://www.openarchives.org/OAI/2.0/">textgrid:q9cv.0</identifier>
|
<identifier xmlns="http://www.openarchives.org/OAI/2.0/">textgrid:rn8z.0</identifier>
|
||||||
<datestamp xmlns="http://www.openarchives.org/OAI/2.0/">2012-01-21T13:35:20Z</datestamp>
|
<datestamp xmlns="http://www.openarchives.org/OAI/2.0/">2012-01-29T20:54:12Z</datestamp>
|
||||||
<dr:dateOfTransformation>2020-11-17T19:08:56.703+01:00</dr:dateOfTransformation>
|
<dr:dateOfTransformation>2020-12-16T16:02:37.562Z</dr:dateOfTransformation>
|
||||||
</oai:header>
|
</oai:header>
|
||||||
<metadata>
|
<metadata>
|
||||||
<datacite:resource xmlns="http://www.openarchives.org/OAI/2.0/"
|
<datacite:resource xmlns="http://www.openarchives.org/OAI/2.0/"
|
||||||
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||||
<datacite:identifier identifierType="Handle">hdl:11858/00-1734-0000-0003-7664-F</datacite:identifier>
|
<datacite:identifier identifierType="Handle">hdl:11858/00-1734-0000-0003-EE73-2</datacite:identifier>
|
||||||
<datacite:creators>
|
<datacite:creators>
|
||||||
<datacite:creator>
|
<datacite:creator>
|
||||||
<datacite:creatorName>Hoffmann von Fallersleben, August Heinrich</datacite:creatorName>
|
<datacite:creatorName>Liliencron, Detlev von</datacite:creatorName>
|
||||||
<datacite:nameIdentifier nameIdentifierScheme="pnd" schemeURI="https://de.dariah.eu/pnd-service">118552589</datacite:nameIdentifier>
|
<datacite:nameIdentifier nameIdentifierScheme="pnd" schemeURI="https://ref.de.dariah.eu/pndsearch/pndquery.xql?id=">118572954</datacite:nameIdentifier>
|
||||||
</datacite:creator>
|
</datacite:creator>
|
||||||
</datacite:creators>
|
</datacite:creators>
|
||||||
<datacite:titles>
|
<datacite:titles>
|
||||||
<datacite:title titleType="Other">Mailied</datacite:title>
|
<datacite:title titleType="Other">Auf dem Trocknen</datacite:title>
|
||||||
<datacite:title titleType="Other">August Heinrich Hoffmann von Fallersleben: Unpolitische Lieder von Hoffmann von Fallersleben, 1. + 2. Theil, 1. Theil, Hamburg: Hoffmann und Campe, 1841.</datacite:title>
|
<datacite:title titleType="Other">Detlev von Liliencron: Gute Nacht. Hinterlassene Gedichte, Berlin: Schuster & Loeffler, 1909.</datacite:title>
|
||||||
</datacite:titles>
|
</datacite:titles>
|
||||||
<datacite:publisher>TextGrid</datacite:publisher>
|
<datacite:publisher>TextGrid</datacite:publisher>
|
||||||
<datacite:publicationYear>2012</datacite:publicationYear>
|
<datacite:publicationYear>2012</datacite:publicationYear>
|
||||||
|
@ -38,21 +38,21 @@
|
||||||
</datacite:contributor>
|
</datacite:contributor>
|
||||||
<datacite:contributor contributorType="Other">
|
<datacite:contributor contributorType="Other">
|
||||||
<datacite:contributorName>Digitale Bibliothek</datacite:contributorName>
|
<datacite:contributorName>Digitale Bibliothek</datacite:contributorName>
|
||||||
<datacite:nameIdentifier nameIdentifierScheme="textgrid">TGPR-372fe6dc-57f2-6cd4-01b5-2c4bbefcfd3c</datacite:nameIdentifier>
|
<datacite:nameIdentifier nameIdentifierScheme="textgrid" schemeURI="http://www.textgridlab.org/schema/textgrid-metadata_2010.xsd">TGPR-372fe6dc-57f2-6cd4-01b5-2c4bbefcfd3c</datacite:nameIdentifier>
|
||||||
</datacite:contributor>
|
</datacite:contributor>
|
||||||
</datacite:contributors>
|
</datacite:contributors>
|
||||||
<datacite:dates>
|
<datacite:dates>
|
||||||
<datacite:date dateType="Created">2012-01-21T13:35:20Z</datacite:date>
|
<datacite:date dateType="Created">2012-01-29T20:54:12Z</datacite:date>
|
||||||
<datacite:date dateType="Issued">2012-01-21T13:35:20Z</datacite:date>
|
<datacite:date dateType="Issued">2012-01-29T20:54:12Z</datacite:date>
|
||||||
<datacite:date dateType="Updated">2012-01-21T13:35:20Z</datacite:date>
|
<datacite:date dateType="Updated">2012-01-29T20:54:12Z</datacite:date>
|
||||||
</datacite:dates>
|
</datacite:dates>
|
||||||
<datacite:resourceType resourceTypeGeneral="Dataset"/>
|
<datacite:resourceType resourceTypeGeneral="Dataset"/>
|
||||||
<alternateIdentifiers xmlns="http://datacite.org/schema/kernel-3">
|
<alternateIdentifiers xmlns="http://datacite.org/schema/kernel-3">
|
||||||
<datacite:alternateIdentifier alternateIdentifierType="URI" xmlns="http://www.openarchives.org/OAI/2.0/">textgrid:q9cv.0</datacite:alternateIdentifier>
|
<datacite:alternateIdentifier alternateIdentifierType="URI" xmlns="http://www.openarchives.org/OAI/2.0/">textgrid:rn8z.0</datacite:alternateIdentifier>
|
||||||
<alternateIdentifier alternateIdentifierType="URL">http://hdl.handle.net/hdl:11858/00-1734-0000-0003-7664-F</alternateIdentifier>
|
<alternateIdentifier alternateIdentifierType="URL">http://hdl.handle.net/hdl:11858/00-1734-0000-0003-EE73-2</alternateIdentifier>
|
||||||
</alternateIdentifiers>
|
</alternateIdentifiers>
|
||||||
<datacite:relatedIdentifiers>
|
<datacite:relatedIdentifiers>
|
||||||
<datacite:relatedIdentifier relatedIdentifierType="Handle" relationType="IsPartOf">hdl:11858/00-1734-0000-0003-7666-B</datacite:relatedIdentifier>
|
<datacite:relatedIdentifier relatedIdentifierType="Handle" relationType="IsPartOf">hdl:11858/00-1734-0000-0003-EE72-4</datacite:relatedIdentifier>
|
||||||
</datacite:relatedIdentifiers>
|
</datacite:relatedIdentifiers>
|
||||||
<datacite:sizes>
|
<datacite:sizes>
|
||||||
<datacite:size>527 Bytes</datacite:size>
|
<datacite:size>527 Bytes</datacite:size>
|
||||||
|
@ -77,17 +77,18 @@
|
||||||
<datacite:geoLocations>
|
<datacite:geoLocations>
|
||||||
<datacite:geoLocation>
|
<datacite:geoLocation>
|
||||||
<datacite:geoLocationPlace
|
<datacite:geoLocationPlace
|
||||||
xmlns:xs="http://www.w3.org/2001/XMLSchema" xsi:type="xs:string">Hamburg</datacite:geoLocationPlace>
|
xmlns:xs="http://www.w3.org/2001/XMLSchema" xsi:type="xs:string">Berlin</datacite:geoLocationPlace>
|
||||||
</datacite:geoLocation>
|
</datacite:geoLocation>
|
||||||
</datacite:geoLocations>
|
</datacite:geoLocations>
|
||||||
</datacite:resource>
|
</datacite:resource>
|
||||||
<oaf:identifier identifierType="handle">hdl:11858/00-1734-0000-0003-7664-F</oaf:identifier>
|
<oaf:identifier identifierType="handle">hdl:11858/00-1734-0000-0003-EE73-2</oaf:identifier>
|
||||||
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
|
<dr:CobjCategory type="dataset">0021</dr:CobjCategory>
|
||||||
<oaf:refereed>0002</oaf:refereed>
|
<oaf:refereed>0002</oaf:refereed>
|
||||||
<oaf:dateAccepted>2012-01-21</oaf:dateAccepted>
|
<oaf:dateAccepted>2012-01-29</oaf:dateAccepted>
|
||||||
<oaf:accessrights>OPEN</oaf:accessrights>
|
<oaf:accessrights>OPEN</oaf:accessrights>
|
||||||
<oaf:license>http://creativecommons.org/licenses/by/3.0/de/legalcode</oaf:license>
|
<oaf:license>http://creativecommons.org/licenses/by/3.0/de/legalcode</oaf:license>
|
||||||
<oaf:language>und</oaf:language>
|
<oaf:language>und</oaf:language>
|
||||||
|
<oaf:country>DE</oaf:country>
|
||||||
<oaf:hostedBy id="re3data_____::r3d100011365" name="TextGrid Repository"/>
|
<oaf:hostedBy id="re3data_____::r3d100011365" name="TextGrid Repository"/>
|
||||||
<oaf:collectedFrom id="re3data_____::r3d100011365" name="TextGrid Repository"/>
|
<oaf:collectedFrom id="re3data_____::r3d100011365" name="TextGrid Repository"/>
|
||||||
</metadata>
|
</metadata>
|
||||||
|
@ -95,11 +96,11 @@
|
||||||
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
xmlns:dri="http://www.driver-repository.eu/namespace/dri"
|
||||||
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
xmlns:prov="http://www.openarchives.org/OAI/2.0/provenance" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||||
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
<provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance" xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
|
||||||
<originDescription altered="true" harvestDate="2020-11-17T09:34:11.128+01:00">
|
<originDescription altered="true" harvestDate="2020-12-16T10:04:03.148Z">
|
||||||
<baseURL>https%3A%2F%2Fdev.textgridlab.org%2F1.0%2Ftgoaipmh%2Foai</baseURL>
|
<baseURL>https%3A%2F%2Fdev.textgridlab.org%2F1.0%2Ftgoaipmh%2Foai</baseURL>
|
||||||
<identifier>textgrid:q9cv.0</identifier>
|
<identifier>textgrid:rn8z.0</identifier>
|
||||||
<datestamp>2012-01-21T13:35:20Z</datestamp>
|
<datestamp>2012-01-29T20:54:12Z</datestamp>
|
||||||
<metadataNamespace>http://schema.datacite.org/oai/oai-1.0/</metadataNamespace>
|
<metadataNamespace/>
|
||||||
</originDescription>
|
</originDescription>
|
||||||
</provenance>
|
</provenance>
|
||||||
<oaf:datainfo>
|
<oaf:datainfo>
|
||||||
|
@ -107,9 +108,10 @@
|
||||||
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
<oaf:deletedbyinference>false</oaf:deletedbyinference>
|
||||||
<oaf:trust>0.9</oaf:trust>
|
<oaf:trust>0.9</oaf:trust>
|
||||||
<oaf:inferenceprovenance/>
|
<oaf:inferenceprovenance/>
|
||||||
<oaf:provenanceaction classid="sysimport:crosswalk"
|
<oaf:provenanceaction classid="sysimport:crosswalk:datasetarchive"
|
||||||
classname="sysimport:crosswalk"
|
classname="sysimport:crosswalk:datasetarchive"
|
||||||
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
|
||||||
</oaf:datainfo>
|
</oaf:datainfo>
|
||||||
</about>
|
</about>
|
||||||
</record>
|
</record>
|
||||||
|
|
||||||
|
|
|
@ -1,15 +1,15 @@
|
||||||
package eu.dnetlib.dhp.export
|
package eu.dnetlib.dhp.export
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
|
|
||||||
import java.time.LocalDateTime
|
import java.time.LocalDateTime
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
|
|
||||||
import eu.dnetlib.dhp.common.PacePerson
|
import eu.dnetlib.dhp.common.PacePerson
|
||||||
import eu.dnetlib.dhp.schema.action.AtomicAction
|
import eu.dnetlib.dhp.schema.action.AtomicAction
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
|
import eu.dnetlib.dhp.schema.oaf.{Author, Dataset, ExternalReference, Field, Instance, KeyValue, Oaf, Publication, Qualifier, Relation, Result, StructuredProperty}
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
import org.apache.commons.lang3.StringUtils
|
import org.apache.commons.lang3.StringUtils
|
||||||
import org.codehaus.jackson.map.ObjectMapper
|
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils._
|
import eu.dnetlib.dhp.schema.scholexplorer.OafUtils._
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
|
|
|
@ -1,27 +1,21 @@
|
||||||
package eu.dnetlib.dhp.`export`
|
package eu.dnetlib.dhp.`export`
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Instance, Publication, Relation, Dataset => OafDataset}
|
import eu.dnetlib.dhp.schema.oaf.{Publication, Relation, Dataset => OafDataset}
|
||||||
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
import eu.dnetlib.dhp.schema.scholexplorer.{DLIDataset, DLIPublication}
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
import org.apache.hadoop.io.Text
|
import org.apache.hadoop.io.Text
|
||||||
import org.apache.hadoop.io.compress.GzipCodec
|
import org.apache.hadoop.io.compress.GzipCodec
|
||||||
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
import org.apache.hadoop.mapred.SequenceFileOutputFormat
|
||||||
import org.apache.spark.rdd.RDD
|
|
||||||
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.apache.spark.sql.functions._
|
import org.apache.spark.sql.functions._
|
||||||
import org.apache.spark.sql.expressions.Window
|
import org.apache.spark.sql.expressions.Window
|
||||||
import org.apache.spark.{SparkConf, SparkContext}
|
import org.apache.spark.SparkConf
|
||||||
import org.codehaus.jackson.map.ObjectMapper
|
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
import scala.collection.JavaConverters._
|
|
||||||
|
|
||||||
object SparkExportContentForOpenAire {
|
object SparkExportContentForOpenAire {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
val conf: SparkConf = new SparkConf()
|
val conf: SparkConf = new SparkConf()
|
||||||
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExportContentForOpenAire.getClass.getResourceAsStream("input_export_content_parameters.json")))
|
val parser = new ArgumentApplicationParser(IOUtils.toString(SparkExportContentForOpenAire.getClass.getResourceAsStream("input_export_content_parameters.json")))
|
||||||
|
@ -178,11 +172,4 @@ object SparkExportContentForOpenAire {
|
||||||
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/export/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
fRels.union(fpubs).union(fdats).rdd.map(s => (new Text(s._1), new Text(s._2))).saveAsHadoopFile(s"$workingPath/export/rawset", classOf[Text], classOf[Text], classOf[SequenceFileOutputFormat[Text,Text]], classOf[GzipCodec])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,8 @@ import org.apache.http.client.methods.HttpPut;
|
||||||
import org.apache.http.entity.StringEntity;
|
import org.apache.http.entity.StringEntity;
|
||||||
import org.apache.http.impl.client.CloseableHttpClient;
|
import org.apache.http.impl.client.CloseableHttpClient;
|
||||||
import org.apache.http.impl.client.HttpClients;
|
import org.apache.http.impl.client.HttpClients;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package eu.dnetlib.dhp.provision
|
package eu.dnetlib.dhp.provision
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
||||||
import eu.dnetlib.dhp.provision.scholix.Scholix
|
import eu.dnetlib.dhp.provision.scholix.Scholix
|
||||||
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
|
import eu.dnetlib.dhp.provision.scholix.summary.ScholixSummary
|
||||||
|
@ -7,7 +8,6 @@ import org.apache.commons.io.IOUtils
|
||||||
import org.apache.hadoop.io.compress.GzipCodec
|
import org.apache.hadoop.io.compress.GzipCodec
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
|
||||||
import org.codehaus.jackson.map.ObjectMapper
|
|
||||||
|
|
||||||
object SparkConvertDatasetToJson {
|
object SparkConvertDatasetToJson {
|
||||||
|
|
||||||
|
|
|
@ -259,7 +259,7 @@
|
||||||
<action name="Step17">
|
<action name="Step17">
|
||||||
<hive2 xmlns="uri:oozie:hive2-action:0.1">
|
<hive2 xmlns="uri:oozie:hive2-action:0.1">
|
||||||
<jdbc-url>${hive_jdbc_url}</jdbc-url>
|
<jdbc-url>${hive_jdbc_url}</jdbc-url>
|
||||||
<script>scripts/updateProductionViews.sql</script>
|
<script>scripts/step17.sql</script>
|
||||||
<param>stats_db_name=${stats_db_name}</param>
|
<param>stats_db_name=${stats_db_name}</param>
|
||||||
<param>stats_db_shadow_name=${stats_db_shadow_name}</param>
|
<param>stats_db_shadow_name=${stats_db_shadow_name}</param>
|
||||||
</hive2>
|
</hive2>
|
||||||
|
@ -287,8 +287,8 @@
|
||||||
<name-node>${nameNode}</name-node>
|
<name-node>${nameNode}</name-node>
|
||||||
<exec>impala-shell.sh</exec>
|
<exec>impala-shell.sh</exec>
|
||||||
<argument>${stats_db_shadow_name}</argument>
|
<argument>${stats_db_shadow_name}</argument>
|
||||||
<argument>computeProductionStats.sql</argument>
|
<argument>step19.sql</argument>
|
||||||
<argument>${wf:appPath()}/scripts/computeProductionStats.sql</argument>
|
<argument>${wf:appPath()}/scripts/step19.sql</argument>
|
||||||
<file>impala-shell.sh</file>
|
<file>impala-shell.sh</file>
|
||||||
</shell>
|
</shell>
|
||||||
<ok to="Step20"/>
|
<ok to="Step20"/>
|
||||||
|
|
Loading…
Reference in New Issue