dnet-hadoop/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/ircdl_extention/Utils.java

200 lines
4.9 KiB
Java

package eu.dnetlib.dhp.ircdl_extention;
import java.io.Serializable;
import java.text.Normalizer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.ircdl_extention.model.Orcid;
import eu.dnetlib.dhp.ircdl_extention.model.Result;
import scala.Tuple2;
public class Utils implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static String normalizeString(String input) {
if (input == null || input.equals("void"))
return new String();
String tmp = Normalizer
.normalize(input, Normalizer.Form.NFKD)
.replaceAll("[^\\p{ASCII}]", "");
tmp = tmp
.replaceAll("[^\\p{Alpha}]+", " ")
.replaceAll("\\s+", " ")
.trim();
return tmp;
}
public static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
public static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
private static List<String> getList(List<String> input) {
return input.stream().map(st -> st.trim()).filter(st -> st.length() > 0).collect(Collectors.toList());
}
public static boolean filterFunction(Tuple2<Result, Orcid> input) {
List<String> res = getList(Arrays.asList(input._1().getFullname().split(" ")))
.stream()
.sorted()
.collect(Collectors.toList());
Orcid or = input._2();
List<String> tmp = new ArrayList<>();
Collections.addAll(tmp, or.getName().split(" "));
Collections.addAll(tmp, or.getSurname().split(" "));
return checkContains(
res, getList(tmp)
.stream()
.sorted()
.collect(Collectors.toList()))
||
checkContains(
res, getList(Arrays.asList(or.getCreditname().split(" ")))
.stream()
.sorted()
.collect(Collectors.toList()))
||
or
.getOtherNames()
.stream()
.anyMatch(
on -> checkContains(
res, getList(Arrays.asList(on.split(" ")))
.stream()
.sorted()
.collect(Collectors.toList())));
}
private static boolean checkContains(List<String> result, List<String> orcid) {
if (result.size() == 0 || orcid.size() == 0) {
return true;
}
String[][] input = {
{
"1", StringUtils.joinWith(" ", result)
},
{
"2", StringUtils.joinWith(" ", orcid)
}
};
// exact match word by word
Double cosineDistance = new CosineDistance().apply(input[0][1], input[1][1]);
if (Math.round((1 - cosineDistance) * 100) == 100) {
return true;
}
// check containment one list can be greater than the other, and also composition of words to create the name
// e.g. pengli yan = li peng yan
if (orcid.size() < result.size()) {
if (isIn(orcid, result))
return true;
} else {
if (isIn(result, orcid))
return true;
}
// apply JaroWinkler distance
double score = new JaroWinkler()
.score(StringUtils.joinWith(" ", result), StringUtils.joinWith(" ", orcid));
return score > 0.95;
}
private static boolean isIn(List<String> lst1, List<String> lst2) {
int index = 0;
for (String word : lst1) {
int i = index;
boolean found = false;
while (i < lst2.size()) {
String wordlist = lst2.get(i);
if (word.equals(wordlist)) {
index = i + 1;
i = lst2.size();
found = true;
} else {
if (word.charAt(0) < wordlist.charAt(0)) {
if (!checkComposition(word, lst2)) {
return false;
} else {
index = 0;
i = lst2.size();
found = true;
}
} else {
if (word.length() == 1 || wordlist.length() == 1) {
if (word.charAt(0) == wordlist.charAt(0)) {
index = i + 1;
i = lst2.size();
found = true;
} else {
i++;
}
} else {
i++;
}
}
}
}
if (!found) {
if (!checkComposition(word, lst2)) {
return false;
} else {
index = 0;
}
}
}
return true;
}
private static boolean checkComposition(String word, List<String> lst2) {
for (int i = 0; i < lst2.size(); i++) {
for (int j = 0; j < lst2.size(); j++) {
if (i != j) {
String w = lst2.get(i) + lst2.get(j);
if (word.equals(w)) {
if (i > j) {
lst2.remove(i);
lst2.remove(j);
} else {
lst2.remove(j);
lst2.remove(i);
}
return true;
}
}
}
}
return false;
}
}