forked from D-Net/dnet-hadoop
This commit is contained in:
parent
87018ac895
commit
6b79d1cf2a
|
@ -15,6 +15,7 @@ import org.apache.spark.sql.SaveMode;
|
|||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Author;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
|
||||
public class PrepareNormalizedResultSpark {
|
||||
|
@ -53,33 +54,33 @@ public class PrepareNormalizedResultSpark {
|
|||
}
|
||||
|
||||
private static void execNormalize(SparkSession spark, String outputPath, String inputPath) {
|
||||
Dataset<Result> normalized_result = Utils
|
||||
.readPath(spark, inputPath + "publicationsWithOrcid", Result.class)
|
||||
.union(Utils.readPath(spark, inputPath + "datasetWithOrcid", Result.class))
|
||||
.union(Utils.readPath(spark, inputPath + "softwareWithOrcid", Result.class))
|
||||
.union(Utils.readPath(spark, inputPath + "otherWithOrcid", Result.class))
|
||||
.map((MapFunction<Result, Result>) r -> {
|
||||
Dataset<Author> normalized_result = Utils
|
||||
.readPath(spark, inputPath + "publicationsWithOrcid", Author.class)
|
||||
.union(Utils.readPath(spark, inputPath + "datasetWithOrcid", Author.class))
|
||||
.union(Utils.readPath(spark, inputPath + "softwareWithOrcid", Author.class))
|
||||
.union(Utils.readPath(spark, inputPath + "otherWithOrcid", Author.class))
|
||||
.map((MapFunction<Author, Author>) r -> {
|
||||
r.setName(Utils.normalizeString(r.getName()));
|
||||
r.setSurname(Utils.normalizeString(r.getSurname()));
|
||||
r.setFullname(Utils.normalizeString(r.getFullname()));
|
||||
return r;
|
||||
}, Encoders.bean(Result.class));
|
||||
}, Encoders.bean(Author.class));
|
||||
|
||||
normalized_result
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "ResultWithOrcid");
|
||||
.json(outputPath + "ResultAuthorNormalized");
|
||||
|
||||
normalized_result
|
||||
.filter((FilterFunction<Result>) r -> !r.getId().startsWith("50|dedup"))
|
||||
.filter((FilterFunction<Author>) r -> !r.getId().startsWith("50|dedup"))
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath + "collectedResultWithOrcid");
|
||||
|
||||
normalized_result
|
||||
.filter((FilterFunction<Result>) r -> !r.getDeletedbyinference())
|
||||
.filter((FilterFunction<Author>) r -> !r.getDeletedbyinference())
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.apache.spark.sql.SaveMode;
|
|||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Author;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.KeyValue;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
|
@ -65,22 +66,21 @@ public class PrepareResultSpark {
|
|||
String input_path,
|
||||
Class<R> resultClazz, String output_path) {
|
||||
Dataset<R> publicationDataset = Utils.readPath(spark, input_path, resultClazz);
|
||||
Dataset<Result> result = publicationDataset.filter((FilterFunction<R>) p -> {
|
||||
Dataset<R> result = publicationDataset.filter((FilterFunction<R>) p -> {
|
||||
if (p.getAuthor() == null)
|
||||
return false;
|
||||
if (p.getAuthor().size() == 0)
|
||||
return false;
|
||||
return true;
|
||||
})
|
||||
.flatMap((FlatMapFunction<R, Result>) p -> {
|
||||
List<Result> reslist = new ArrayList<>();
|
||||
});
|
||||
|
||||
result.flatMap((FlatMapFunction<R, Author>) p -> {
|
||||
List<Author> reslist = new ArrayList<>();
|
||||
p.getAuthor().forEach(a -> {
|
||||
a.getPid().forEach(apid -> {
|
||||
if (apid.getQualifier().getClassid().equals(ModelConstants.ORCID)
|
||||
|| apid.getQualifier().getClassid().equals(ModelConstants.ORCID_PENDING)) {
|
||||
Result r = new Result();
|
||||
Author r = new Author();
|
||||
r.setDeletedbyinference(p.getDataInfo().getDeletedbyinference());
|
||||
r.setId(p.getId());
|
||||
|
||||
r
|
||||
.setCf(
|
||||
p
|
||||
|
@ -88,6 +88,10 @@ public class PrepareResultSpark {
|
|||
.stream()
|
||||
.map(cf -> KeyValue.newInstance(cf.getKey(), cf.getValue()))
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
r.setName(a.getName());
|
||||
r.setSurname(a.getSurname());
|
||||
r.setFullname(a.getFullname());
|
||||
r
|
||||
.setPid(
|
||||
p
|
||||
|
@ -97,20 +101,23 @@ public class PrepareResultSpark {
|
|||
pid -> KeyValue
|
||||
.newInstance(pid.getQualifier().getClassid(), pid.getValue()))
|
||||
.collect(Collectors.toList()));
|
||||
r.setName(a.getName());
|
||||
r.setSurname(a.getSurname());
|
||||
r.setFullname(a.getFullname());
|
||||
r.setOid(apid.getValue());
|
||||
r
|
||||
.setApid(
|
||||
Optional
|
||||
.ofNullable(a.getPid())
|
||||
.map(
|
||||
pids -> pids
|
||||
.stream()
|
||||
.map(pd -> KeyValue.newInstance(pd.getQualifier().getClassid(), pd.getValue()))
|
||||
.collect(Collectors.toList()))
|
||||
.orElse(new ArrayList<>()));
|
||||
reslist.add(r);
|
||||
}
|
||||
});
|
||||
|
||||
});
|
||||
return reslist.iterator();
|
||||
}, Encoders.bean(Result.class));
|
||||
result
|
||||
}, Encoders.bean(Author.class))
|
||||
.write()
|
||||
.option("compressio", "gzip")
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(output_path);
|
||||
|
||||
|
|
|
@ -1,4 +1,78 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention;
|
||||
|
||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.api.java.function.FilterFunction;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Author;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.KeyValue;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
|
||||
public class SelectAuthorWithOrcidOnlySpark {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
String jsonConfiguration = IOUtils
|
||||
.toString(
|
||||
PrepareResultSpark.class
|
||||
.getResourceAsStream(
|
||||
"/eu/dnetlib/dhp/ircdl_extention/prepare_result_parameters.json"));
|
||||
|
||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
|
||||
|
||||
parser.parseArgument(args);
|
||||
|
||||
Boolean isSparkSessionManaged = Optional
|
||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(Boolean.TRUE);
|
||||
|
||||
final String inputPath = parser.get("inputPath");
|
||||
|
||||
final String outputPath = parser.get("outputPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
|
||||
runWithSparkSession(
|
||||
conf,
|
||||
isSparkSessionManaged,
|
||||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
selectAuthors(spark, inputPath, outputPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void selectAuthors(SparkSession spark, String input_path, String output_path) {
|
||||
Dataset<Author> resultDataset = Utils.readPath(spark, input_path, Author.class);
|
||||
resultDataset.flatMap((FlatMapFunction<Author, Result>) p -> {
|
||||
List<Result> reslist = new ArrayList<>();
|
||||
p.getApid().forEach(a -> {
|
||||
if (a.getKey().equals(ModelConstants.ORCID_PENDING) || a.getKey().equals(ModelConstants.ORCID)) {
|
||||
Result r = Result.fromAuthor(p);
|
||||
r.setOid(a.getValue());
|
||||
reslist.add(r);
|
||||
}
|
||||
});
|
||||
return reslist.iterator();
|
||||
}, Encoders.bean(Result.class))
|
||||
.write()
|
||||
.option("compressio", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(output_path);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.similarity.CosineDistance;
|
||||
|
@ -15,7 +16,10 @@ import org.apache.spark.api.java.function.MapFunction;
|
|||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.wcohen.ss.JaroWinkler;
|
||||
|
||||
|
@ -28,6 +32,9 @@ public class Utils implements Serializable {
|
|||
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private static final Logger log = LoggerFactory
|
||||
.getLogger(eu.dnetlib.dhp.ircdl_extention.Utils.class);
|
||||
|
||||
public static String normalizeString(String input) {
|
||||
if (input == null || input.equals("void"))
|
||||
return new String();
|
||||
|
@ -55,16 +62,65 @@ public class Utils implements Serializable {
|
|||
}
|
||||
|
||||
private static List<String> getList(List<String> input) {
|
||||
return input.stream().map(st -> st.trim()).filter(st -> st.length() > 0).collect(Collectors.toList());
|
||||
return input
|
||||
.stream()
|
||||
.map(st -> st.trim())
|
||||
.filter(st -> st.length() > 0)
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
public static boolean filterFunction(Tuple2<Result, Orcid> input) {
|
||||
|
||||
List<String> res = getList(Arrays.asList(input._1().getFullname().split(" ")))
|
||||
private static List<String> getListInitials(List<String> input) {
|
||||
List<String> ret = new ArrayList<>();
|
||||
List<Character> tmp = input
|
||||
.stream()
|
||||
.map(st -> st.trim())
|
||||
.filter(st -> st.length() > 0)
|
||||
.map(st -> st.charAt(0))
|
||||
.sorted()
|
||||
.collect(Collectors.toList());
|
||||
if (tmp.size() == 1)
|
||||
ret.add(String.valueOf(tmp.get(0)));
|
||||
for (int i = 0; i < tmp.size(); i++) {
|
||||
for (int j = i + 1; j < tmp.size(); j++) {
|
||||
ret.add(String.valueOf(tmp.get(i)) + String.valueOf(tmp.get(j)));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
}
|
||||
|
||||
// selezione delle coppie di primi caratteri per ogni parola che compone il nome
|
||||
// se ci sono match il nome e' giusto
|
||||
// aggiungere verifica che la lunghezza delle liste non sia troppo sbilanciata: se una lista e' lunga
|
||||
public static boolean conservativeFilterFunction(Tuple2<Result, Orcid> input) {
|
||||
|
||||
List<String> res = getListInitials(Arrays.asList(input._1().getFullname().split(" ")));
|
||||
Orcid or = input._2();
|
||||
List<String> tmp = new ArrayList<>();
|
||||
Collections.addAll(tmp, or.getName().split(" "));
|
||||
Collections.addAll(tmp, or.getSurname().split(" "));
|
||||
return checkContains(
|
||||
res, getListInitials(tmp), false)
|
||||
||
|
||||
checkContains(
|
||||
res, getListInitials(Arrays.asList(or.getCreditname().split(" "))), false)
|
||||
||
|
||||
or
|
||||
.getOtherNames()
|
||||
.stream()
|
||||
.anyMatch(
|
||||
on -> checkContains(
|
||||
res, getListInitials(Arrays.asList(on.split(" "))), false));
|
||||
|
||||
}
|
||||
|
||||
public static boolean filterFunction(Tuple2<Result, Orcid> input) throws JsonProcessingException {
|
||||
|
||||
try {
|
||||
List<String> res = getList(Arrays.asList(input._1().getFullname().split(" ")));
|
||||
Orcid or = input._2();
|
||||
List<String> tmp = new ArrayList<>();
|
||||
Collections.addAll(tmp, or.getName().split(" "));
|
||||
|
@ -90,9 +146,19 @@ public class Utils implements Serializable {
|
|||
.stream()
|
||||
.sorted()
|
||||
.collect(Collectors.toList())));
|
||||
} catch (Exception e) {
|
||||
|
||||
log.info("EXCEPTIONNNN: " + new ObjectMapper().writeValueAsString(input));
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static boolean checkContains(List<String> result, List<String> orcid) {
|
||||
return checkContains(result, orcid, true);
|
||||
}
|
||||
|
||||
private static boolean checkContains(List<String> result, List<String> orcid, boolean jaro) {
|
||||
if (result.size() == 0 || orcid.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
|
@ -118,11 +184,14 @@ public class Utils implements Serializable {
|
|||
if (isIn(result, orcid))
|
||||
return true;
|
||||
}
|
||||
if (jaro) {
|
||||
// apply JaroWinkler distance
|
||||
double score = new JaroWinkler()
|
||||
.score(StringUtils.joinWith(" ", result), StringUtils.joinWith(" ", orcid));
|
||||
return score > 0.95;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean isIn(List<String> lst1, List<String> lst2) {
|
||||
|
||||
|
|
|
@ -18,7 +18,10 @@ import org.apache.spark.sql.Encoders;
|
|||
import org.apache.spark.sql.SaveMode;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Author;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.ShuffleInfo;
|
||||
|
@ -53,6 +56,8 @@ public class WrongSpark {
|
|||
|
||||
final String resultPath = parser.get("inputPath");
|
||||
|
||||
final String authorPath = parser.get("authorPath");
|
||||
|
||||
SparkConf conf = new SparkConf();
|
||||
conf.set("hive.metastore.uris", "thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083");
|
||||
|
||||
|
@ -62,16 +67,17 @@ public class WrongSpark {
|
|||
spark -> {
|
||||
Utils.removeOutputDir(spark, outputPath);
|
||||
findWrong(spark, orcidPath, outputPath + "/wrong", resultPath);
|
||||
findShuffle(spark, orcidPath, outputPath + "/shuffle", resultPath);
|
||||
findShuffle(spark, orcidPath, outputPath + "/shuffle", resultPath, authorPath);
|
||||
});
|
||||
}
|
||||
|
||||
private static void findShuffle(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
|
||||
private static void findShuffle(SparkSession spark, String orcidPath, String outputPath, String resultPath,
|
||||
String authorPath) {
|
||||
|
||||
Utils
|
||||
.readPath(spark, resultPath, Result.class)
|
||||
.readPath(spark, authorPath, Author.class)
|
||||
.map(
|
||||
(MapFunction<Result, ShuffleInfo>) r -> ShuffleInfo
|
||||
(MapFunction<Author, ShuffleInfo>) r -> ShuffleInfo
|
||||
.newInstance(r.getName(), r.getSurname(), r.getFullname(), r.getId()),
|
||||
Encoders.bean(ShuffleInfo.class))
|
||||
.union(
|
||||
|
@ -82,7 +88,7 @@ public class WrongSpark {
|
|||
.newInstance(
|
||||
t2._1().getName(), t2._1().getSurname(), t2._1().getFullname(),
|
||||
t2._1().getId(), t2._2().getName(), t2._2().getSurname(),
|
||||
t2._2().getCreditname(), t2._2().getOtherNames()),
|
||||
t2._2().getCreditname(), t2._2().getOtherNames(), t2._2().getOrcid()),
|
||||
Encoders.bean(ShuffleInfo.class)))
|
||||
.groupByKey((MapFunction<ShuffleInfo, String>) si -> si.getId(), Encoders.STRING())
|
||||
.flatMapGroups((FlatMapGroupsFunction<String, ShuffleInfo, ShuffleInfo>) (s, it) -> {
|
||||
|
@ -99,7 +105,12 @@ public class WrongSpark {
|
|||
});
|
||||
return ret.iterator();
|
||||
}, Encoders.bean(ShuffleInfo.class))
|
||||
.filter(Objects::nonNull);
|
||||
.filter(Objects::nonNull)
|
||||
.write()
|
||||
.option("compression", "gzip")
|
||||
.mode(SaveMode.Overwrite)
|
||||
.json(outputPath);
|
||||
|
||||
/*
|
||||
* def checkShuffle(x): alis = [a for a in x[1]] dic = {} count = 0 for entry in alis: if entry['orcid'] != '':
|
||||
* dic[entry['orcid']] = entry for orcid in dic: name = dic[orcid]['oname'] surname = dic[orcid]['osurname'] for
|
||||
|
@ -116,15 +127,24 @@ public class WrongSpark {
|
|||
|
||||
private static boolean checkShuffle(ShuffleInfo e, List<ShuffleInfo> shuffleInfoList) {
|
||||
|
||||
return shuffleInfoList
|
||||
boolean b = shuffleInfoList
|
||||
.stream()
|
||||
.anyMatch(
|
||||
si -> Utils
|
||||
si -> {
|
||||
try {
|
||||
return Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance(si.getAfullname()),
|
||||
Orcid
|
||||
.newInstance(
|
||||
si.getOname(), si.getOsurname(), si.getOcreditName(), si.getoOtherNames()))));
|
||||
e.getOname(), e.getOsurname(), e.getOcreditName(),
|
||||
e.getoOtherNames())));
|
||||
} catch (JsonProcessingException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
});
|
||||
|
||||
return b;
|
||||
}
|
||||
|
||||
private static Dataset<Tuple2<Result, Orcid>> getWrong(SparkSession spark, String orcidPath, String resultPath) {
|
||||
|
@ -139,7 +159,7 @@ public class WrongSpark {
|
|||
.col("oid")
|
||||
.equalTo(orcidDataset.col("orcid")),
|
||||
"inner")
|
||||
.filter((FilterFunction<Tuple2<Result, Orcid>>) t2 -> !Utils.filterFunction(t2));
|
||||
.filter((FilterFunction<Tuple2<Result, Orcid>>) t2 -> !Utils.conservativeFilterFunction(t2));
|
||||
}
|
||||
|
||||
private static void findWrong(SparkSession spark, String orcidPath, String outputPath, String resultPath) {
|
||||
|
|
|
@ -1,4 +1,18 @@
|
|||
|
||||
package eu.dnetlib.dhp.ircdl_extention.model;
|
||||
|
||||
public class Author {
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
public class Author extends Result implements Serializable {
|
||||
|
||||
private List<KeyValue> apid;
|
||||
|
||||
public List<KeyValue> getApid() {
|
||||
return apid;
|
||||
}
|
||||
|
||||
public void setApid(List<KeyValue> apid) {
|
||||
this.apid = apid;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,13 +14,12 @@ public class Orcid implements Serializable {
|
|||
private Boolean works;
|
||||
private String name;
|
||||
|
||||
public static Orcid newInstance(String oname, String osurname, String ocreditName, List<String> getoOtherNames) {
|
||||
public static Orcid newInstance(String oname, String osurname, String ocreditName, List<String> oOtherNames) {
|
||||
Orcid o = new Orcid();
|
||||
o.name = oname;
|
||||
o.surname = osurname;
|
||||
o.creditname = ocreditName;
|
||||
o.otherNames = getoOtherNames;
|
||||
|
||||
o.otherNames = oOtherNames;
|
||||
return o;
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,19 @@ public class Result implements Serializable {
|
|||
return r;
|
||||
}
|
||||
|
||||
public static Result fromAuthor(Author p) {
|
||||
Result r = new Result();
|
||||
r.deletedbyinference = p.getDeletedbyinference();
|
||||
r.id = p.getId();
|
||||
r.cf = p.getCf();
|
||||
r.pid = p.getPid();
|
||||
r.name = p.getName();
|
||||
r.surname = p.getSurname();
|
||||
r.fullname = p.getFullname();
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
public Boolean getDeletedbyinference() {
|
||||
return deletedbyinference;
|
||||
}
|
||||
|
|
|
@ -15,6 +15,15 @@ public class ShuffleInfo implements Serializable {
|
|||
private List<String> oOtherNames;
|
||||
private String orcid;
|
||||
private String id;
|
||||
private String pid;
|
||||
|
||||
public String getPid() {
|
||||
return pid;
|
||||
}
|
||||
|
||||
public void setPid(String pid) {
|
||||
this.pid = pid;
|
||||
}
|
||||
|
||||
public String getAname() {
|
||||
return aname;
|
||||
|
@ -98,7 +107,7 @@ public class ShuffleInfo implements Serializable {
|
|||
}
|
||||
|
||||
public static ShuffleInfo newInstance(String aname, String asurname, String afullname, String id, String oname,
|
||||
String osurname, String ocredtname, List<String> oOthername) {
|
||||
String osurname, String ocredtname, List<String> oOthername, String orcid, String pid) {
|
||||
ShuffleInfo si = new ShuffleInfo();
|
||||
si.afullname = afullname;
|
||||
si.aname = aname;
|
||||
|
@ -108,6 +117,8 @@ public class ShuffleInfo implements Serializable {
|
|||
si.osurname = osurname;
|
||||
si.ocreditName = ocredtname;
|
||||
si.oOtherNames = oOthername;
|
||||
si.orcid = orcid;
|
||||
si.pid = pid;
|
||||
return si;
|
||||
}
|
||||
|
||||
|
|
|
@ -73,6 +73,7 @@
|
|||
<arg>--inputPath</arg><arg>${inputPath}/publication</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/publicationsWithOrcid</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -98,6 +99,7 @@
|
|||
<arg>--inputPath</arg><arg>${inputPath}/dataset</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/datasetWithOrcid</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -123,6 +125,7 @@
|
|||
<arg>--inputPath</arg><arg>${inputPath}/software</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/softwareWithOrcid</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -148,6 +151,7 @@
|
|||
<arg>--inputPath</arg><arg>${inputPath}/otherresearchproduct</arg>
|
||||
<arg>--resultClass</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/otherWithOrcid</arg>
|
||||
|
||||
</spark>
|
||||
<ok to="wait_prepare_result"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -156,6 +160,7 @@
|
|||
|
||||
<join name="wait_prepare_result" to="normalize_result"/>
|
||||
|
||||
|
||||
<action name="normalize_result">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
|
@ -176,6 +181,31 @@
|
|||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Normalized/</arg>
|
||||
</spark>
|
||||
<ok to="select_only_author_with_orcid"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
||||
|
||||
<action name="select_only_author_with_orcid">
|
||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||
<master>yarn</master>
|
||||
<mode>cluster</mode>
|
||||
<name>PrepareResult</name>
|
||||
<class>eu.dnetlib.dhp.ircdl_extention.SelectAuthorWithOrcidOnlySpark</class>
|
||||
<jar>dhp-aggregation-${projectVersion}.jar</jar>
|
||||
<spark-opts>
|
||||
--executor-cores=${sparkExecutorCores}
|
||||
--executor-memory=${sparkExecutorMemory}
|
||||
--driver-memory=${sparkDriverMemory}
|
||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
<arg>--outputPath</arg><arg>${workingDir}/GRAPH/Normalized/ResultWithOrcid/</arg>
|
||||
</spark>
|
||||
<ok to="fork_get_result_info"/>
|
||||
<error to="Kill"/>
|
||||
</action>
|
||||
|
@ -322,6 +352,7 @@
|
|||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/InstRepo/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/InstRepo/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
<arg>--ap</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -344,8 +375,9 @@
|
|||
--conf spark.sql.shuffle.partitions=3840
|
||||
</spark-opts>
|
||||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/allDatacite/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Datacite/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
<arg>--ap</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -371,6 +403,7 @@
|
|||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Crossref/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Crossref/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
<arg>--ap</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -396,6 +429,7 @@
|
|||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/AllTheRest/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/AllTheRest/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
<arg>--ap</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -421,6 +455,7 @@
|
|||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Zenodo/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Zenodo/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
<arg>--ap</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -445,6 +480,7 @@
|
|||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Figshare/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Figshare/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
<arg>--ap</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
|
@ -469,6 +505,7 @@
|
|||
<arg>--inputPath</arg><arg>${workingDir}/GRAPH/Datacite/Dryad/</arg>
|
||||
<arg>--outputPath</arg><arg>${outputPath}/Dryad/</arg>
|
||||
<arg>--orcidPath</arg><arg>${workingDir}/ORCID/entrySetMayNormalized/</arg>
|
||||
<arg>--ap</arg><arg>${workingDir}/GRAPH/Normalized/ResultAuthorNormalized/</arg>
|
||||
</spark>
|
||||
<ok to="jojn_wrong"/>
|
||||
<error to="Kill"/>
|
||||
|
|
|
@ -21,6 +21,6 @@
|
|||
"paramName": "rc",
|
||||
"paramLongName": "resultClass",
|
||||
"paramDescription": "the path of the new ActionSet",
|
||||
"paramRequired": true
|
||||
"paramRequired": false
|
||||
}
|
||||
]
|
|
@ -22,5 +22,11 @@
|
|||
"paramLongName": "inputPath",
|
||||
"paramDescription": "thepath of the new ActionSet",
|
||||
"paramRequired": true
|
||||
},
|
||||
{
|
||||
"paramName": "ap",
|
||||
"paramLongName": "authorPath",
|
||||
"paramDescription": "thepath of the new ActionSet",
|
||||
"paramRequired": true
|
||||
}
|
||||
]
|
|
@ -3,13 +3,21 @@ package eu.dnetlib.dhp.ircdl_extention;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.neethi.Assertion;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.jayway.jsonpath.WriteContext;
|
||||
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.Result;
|
||||
import eu.dnetlib.dhp.ircdl_extention.model.ShuffleInfo;
|
||||
import scala.Tuple2;
|
||||
|
||||
public class WrongOrcidTest {
|
||||
|
@ -53,7 +61,7 @@ public class WrongOrcidTest {
|
|||
.newInstance(
|
||||
"satrajit", "ghosh",
|
||||
"satra",
|
||||
Arrays.asList("satra","satrajit s ghosh")))));
|
||||
Arrays.asList("satra", "satrajit s ghosh")))));
|
||||
|
||||
}
|
||||
|
||||
|
@ -68,7 +76,7 @@ public class WrongOrcidTest {
|
|||
.newInstance(
|
||||
"satrajit", "ghosh",
|
||||
"satra",
|
||||
Arrays.asList("satra","satrajit s ghosh")))));
|
||||
Arrays.asList("satra", "satrajit s ghosh")))));
|
||||
|
||||
}
|
||||
|
||||
|
@ -87,9 +95,186 @@ public class WrongOrcidTest {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalse5() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance("domingues af"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"allysson", "domingues",
|
||||
"allysson f domingues",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalseConservative() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("veigas pires cristina"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"cristina", "veiga pires", "c veiga pires",
|
||||
Arrays.asList("c c veiga pires")))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalseConservative2() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("yushkevich p"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"paul", "yushkevich", "paul a yushkevich",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalseConservative3() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("ghosh ss"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"satrajit", "ghosh",
|
||||
"satra",
|
||||
Arrays.asList("satra", "satrajit s ghosh")))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidTrueConservative() throws Exception {
|
||||
Assertions
|
||||
.assertFalse(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("kluft lukas"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"satrajit", "ghosh",
|
||||
"satra",
|
||||
Arrays.asList("satra", "satrajit s ghosh")))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalseConservative4() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("schulz s a"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"sebastian", "schulz",
|
||||
"sebastian a schulz",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalseConservative5() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("domingues af"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"allysson", "domingues",
|
||||
"allysson f domingues",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidTrueConservative2() throws Exception {
|
||||
Assertions
|
||||
.assertFalse(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("figueiredo pontes lorena lobo de"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"moyses", "soares",
|
||||
"moyses antonio porto soares",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void wrongOrcidFalseConservative6() throws Exception {
|
||||
Assertions
|
||||
.assertTrue(
|
||||
Utils
|
||||
.conservativeFilterFunction(
|
||||
new Tuple2<>(Result.newInstance("da luz geraldo eduardo"),
|
||||
Orcid
|
||||
.newInstance(
|
||||
"geraldo", "luz jr",
|
||||
"luz jr g e",
|
||||
new ArrayList<>()))));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShuffle() throws Exception {
|
||||
|
||||
List<ShuffleInfo> shuffleInfoList = new ArrayList<>();
|
||||
|
||||
shuffleInfoList
|
||||
.add(
|
||||
ShuffleInfo
|
||||
.newInstance(
|
||||
"Miriam", "Baglioni", "Miriam Baglioni", "50|fake_1",
|
||||
"Alessia", "Bardi", "", new ArrayList<String>(), "orcid_alessia"));
|
||||
shuffleInfoList.add(ShuffleInfo.newInstance("Alessia", "Bardi", "Alessia Bardi", "50|fake_1"));
|
||||
shuffleInfoList.add(ShuffleInfo.newInstance("Miriam", "Baglioni", "Miriam Baglioni", "50|fake_1"));
|
||||
shuffleInfoList
|
||||
.add(
|
||||
ShuffleInfo
|
||||
.newInstance(
|
||||
"Alessia", "Bardi", "Alessia Bardi", "50|fake_1",
|
||||
"Miriam", "Baglioni", "", new ArrayList<String>(), "orcid_miriam"));
|
||||
shuffleInfoList.add(ShuffleInfo.newInstance("Claudio", "Atzori", "Claudio Atzori", "50|fake_1"));
|
||||
|
||||
List<ShuffleInfo> tmp = shuffleInfoList
|
||||
.stream()
|
||||
.filter(e -> Optional.ofNullable(e.getOrcid()).isPresent())
|
||||
.collect(Collectors.toList());
|
||||
int count = 0;
|
||||
for (ShuffleInfo e : tmp) {
|
||||
if (verifyShuffle(e, shuffleInfoList))
|
||||
count++;
|
||||
|
||||
}
|
||||
|
||||
System.out.println(count);
|
||||
}
|
||||
|
||||
private boolean verifyShuffle(ShuffleInfo e, List<ShuffleInfo> shuffleInfoList) {
|
||||
return shuffleInfoList.stream().anyMatch(si -> {
|
||||
try {
|
||||
final Orcid orcid = Orcid
|
||||
.newInstance(e.getOname(), e.getOsurname(), e.getOcreditName(), e.getoOtherNames());
|
||||
return Utils
|
||||
.filterFunction(
|
||||
new Tuple2<>(Result.newInstance(si.getAfullname()), orcid));
|
||||
} catch (JsonProcessingException ex) {
|
||||
ex.printStackTrace();
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
{"aname":"Miriam", "asurname":"Baglioni", "afullname":"Miriam Baglioni","oname": "Alessia","osurname": "Bardi","ocreditName": "", "oOtherNames": [],"orcid": "orcid_alessia","id": "50|fake1"}
|
||||
{"aname":"Alessia", "asurname":"Bardi", "afullname":"Alessia Bardi","oname": null,"osurname": null,"ocreditName": null, "oOtherNames": null,"orcid": null,"id": "50|fake1"}
|
||||
{"aname":"Claudio", "asurname":"Atzori", "afullname":"Claudio Atzori","oname": null,"osurname": null,"ocreditName": null, "oOtherNames": null,"orcid": null,"id": "50|fake1"}
|
||||
{"aname":"Alessia", "asurname":"Bardi", "afullname":"Alessia Bardi","oname": "Miriam","osurname": "Baglioni","ocreditName": "", "oOtherNames": [],"orcid": "orcid_miriam","id": "50|fake1"}
|
Loading…
Reference in New Issue