dnet-hadoop/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/authorpids/MakeReportSparkJob.java

644 lines
22 KiB
Java

package eu.dnetlib.dhp.oa.graph.clean.authorpids;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.similarity.CosineDistance;
import org.apache.commons.text.similarity.LevenshteinDistance;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.intuit.fuzzymatcher.component.MatchService;
import com.intuit.fuzzymatcher.domain.Document;
import com.intuit.fuzzymatcher.domain.Element;
import com.intuit.fuzzymatcher.domain.ElementType;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.PacePerson;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolver;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.ConstraintResolverFactory;
import eu.dnetlib.dhp.oa.graph.clean.authorpids.constraints.Selection;
import eu.dnetlib.dhp.schema.oaf.Result;
import me.xdrop.fuzzywuzzy.FuzzySearch;
import py4j.StringUtil;
import scala.Tuple2;
/**
* It checks if the orcid provided by ORCID and the one found in the result have the same author information. The
* author information is handled before the checking. Handling steps:
* words are lower-cased and trimmed, accents are replaced with their equivalent not accented. Only alfabethical
* characters and white space are retained. All the other chars are substituted with space.
*
* The check is made on different specification levels:
*
* Level1: orcid author surname and result author surname are identical. We consider the match to be right
*
* Level2: we verify if orcid author surname contains result author surname or vice versa. If it is the case we consider
* the match to be right
*
* Level3: we verify if one of the two surnames is composed by two words. In that case we concatenate the words and do
* the checking again. If the two match, we consider the match to be checked
*
* Level4: name and surname can be inverted in one of the two entities. We consider the set of words composing the name
* and the surname that are longer than 2 for orcid and result. If all the words of the shorter list are contained in
* the longer one, we consider the match to be checked
*
* Level5: name and surname are inverted but one of the two is composed by two words. Mix of Level3 and level4. We consider
* the match to be checked
*
* Level6: surnames differ for some chars. We apply the levenstein distance on surnames if their lenght is bigger than 3.
* If the distance is less than 2 we consider the match to be checked
*
* In all the other cases the match is considered wrong
*
*/
public class MakeReportSparkJob implements Serializable {
private static final Logger log = LoggerFactory.getLogger(MakeReportSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
MakeReportSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/graph/clean_orcid/input_report_author_pids.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String preparedInfoPath = parser.get("preparedInfoPath");
log.info("preparedInfoPath: {} ", preparedInfoPath);
String orcidInputPath = parser.get("orcidInputPath");
log.info("orcidInputPath: {}", orcidInputPath);
String whiteListString = parser.get("whitelist");
log.info("whitelist: {}", whiteListString);
SparkConf conf = new SparkConf();
runWithSparkHiveSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
makeReport(
spark, outputPath, preparedInfoPath,
loadAuthoritativeOrcid(spark, orcidInputPath), whiteListString);
});
}
/**
* Loads the sequence file containing the information about the orcid id, name and surname of the author.
* It returns a dataset whose model maps the one defined in the class OrcidAuthoritative
* @param spark the sparkSession
* @param orcidInputPath the path where to read the sequence file
* @return the dataset
*/
private static Dataset<OrcidAuthotitative> loadAuthoritativeOrcid(SparkSession spark, String orcidInputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return spark
.createDataset(
JavaRDD
.toRDD(
sc
.sequenceFile(orcidInputPath, Text.class, Text.class)
.map(pair -> OBJECT_MAPPER.readValue(pair._2().toString(), OrcidAuthotitative.class))),
Encoders.bean(OrcidAuthotitative.class));
}
private static void addInList(List<String> list, String to_add) {
for (String word : to_add.split(" ")) {
if (word.length() > 2) {
list.add(word);
}
}
}
public static String handleNameSurname(String input) {
input = input.toLowerCase().replace(".", "");
if (input.startsWith("dr")) {
if (input.length() > 3)
input = input.substring(3);
else
return "";
}
return StringUtils
.stripAccents(input.trim())
.replaceAll("[^a-z\\s]+", " ")
.trim();
}
private static <I extends Result> void makeReport(SparkSession spark,
String outputPath, String preparedInfoPath,
Dataset<OrcidAuthotitative> authoritative, String whiteliststring) {
WhiteList whitelist = new Gson().fromJson(whiteliststring, WhiteList.class);
log.info("whitelist_size: {}", whitelist.getWhitelist().size());
ConstraintResolver resolver = ConstraintResolverFactory.newInstance();
whitelist.getWhitelist().forEach(constraint -> {
try {
constraint.setSelection(resolver);
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
});
Dataset<ResultInfo> resultInfo = readPath(spark, preparedInfoPath, ResultInfo.class);
Dataset<Tuple2<String, ReportInfo>> checkedResult = resultInfo
.joinWith(
authoritative, authoritative
.col("oid")
.equalTo(resultInfo.col("orcid")),
"left")
.map((MapFunction<Tuple2<ResultInfo, OrcidAuthotitative>, Tuple2<String, ReportInfo>>) pair -> {
return getStringReportInfoFuzzyTuple2(pair, whitelist);
}, Encoders.tuple(Encoders.STRING(), Encoders.bean(ReportInfo.class)))
.filter(Objects::nonNull);
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("wrong")),
outputPath + "/wrong");
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("right")),
outputPath + "/right");
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("check")),
outputPath + "/check");
writeSet(
checkedResult.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("missing")),
outputPath + "/missing");
writeSet(
checkedResult
.filter((FilterFunction<Tuple2<String, ReportInfo>>) result -> result._1().equals("constraint")),
outputPath + "/constraint");
}
private static double fuzzyMatch(String orcid, String result) {
// apply one or more fuzzy functions to determine if the input string match
// match 1.0 con fuzzy => giusti
// quelli che matchano sopra 0.66 con fuzzy li metto fra i giusti
// quelli che non stanno nel match di prima, ma matchano fuzzywizzy sopra 0.5 li metto in check
// probabilmente giusti
// quelli che matchano fuzzywizzy da 0.5 a 0.3
return 0;
}
public static Tuple2<String, ReportInfo> getStringReportInfoFuzzyTuple2(
Tuple2<ResultInfo, OrcidAuthotitative> pair, WhiteList whiteList) {
Optional<OrcidAuthotitative> ooa = Optional.ofNullable(pair._2());
if (!ooa.isPresent()) {
return null;
}
OrcidAuthotitative oa = ooa.get();
ResultInfo ri = pair._1();
if (StringUtils.isBlank(ri.getSurname())) {
PacePerson pp = new PacePerson(ri.getFullname(), false);
ri.setSurname(pp.getNormalisedSurname());
ri.setName(pp.getNormalisedFirstName());
}
ReportInfo reportInfo = new ReportInfo();
reportInfo.setOid(oa.getOid());
reportInfo.setOname(oa.getName());
reportInfo.setOsurname(oa.getSurname());
reportInfo.setOcreditname(oa.getCreditName());
reportInfo.setAlternativeNames(oa.getOtherNames());
reportInfo.setAssociatedAuthors(Arrays.asList(ri));
if (!Optional.ofNullable(oa.getSurname()).isPresent()) {
return new Tuple2<>("missing", reportInfo);
}
final String handledOsurname = handleNameSurname(oa.getSurname());
if (handledOsurname.equalsIgnoreCase("")) {
return new Tuple2<>("missing", reportInfo);
}
final String handledSurname = handleNameSurname(ri.getSurname());
if (handledSurname.equals("")) {
return new Tuple2<>("missing", reportInfo);
}
final String handledOname = Optional
.ofNullable(oa.getName())
.map(name -> handleNameSurname(name))
.orElse("");
final String handledName = Optional
.ofNullable(ri.getName())
.map(name -> handleNameSurname(name))
.orElse("");
if (verifyConstraints(new HashMap<String, String>() {
{
put("oname", handledOname);
put("osurname", handledOsurname);
put("name", handledName);
put("surname", handledSurname);
}
}, whiteList)) {
return new Tuple2<>("constraint", reportInfo);
}
String[][] input = {
{
"1", handledOsurname + " " + handledOname
},
{
"2", handledSurname + " " + handledName
}
};
// exact match word by word
Double cosineDistance = new CosineDistance().apply(input[0][1], input[1][1]);
if (Math.round((1 - cosineDistance) * 100) == 100) {
reportInfo.setLevel("cosine similarity equals to 1");
return new Tuple2<>("right", reportInfo);
}
// check if there is neither a common word. If there is not they could be wrong
if (Math.round((1 - new CosineDistance().apply(input[0][1], input[1][1])) * 100) == 0) {
// verify if there is another name that can be used by the author
if (StringUtils.isNotEmpty(oa.getCreditName())) {
try {
if (Math
.round(
1 - new CosineDistance().apply(input[1][1], handleNameSurname(oa.getCreditName()))) > 0) {
reportInfo.setLevel("not zero cosine on credit names");
return new Tuple2<>("check", reportInfo);
}
} catch (Exception e) {
reportInfo.setLevel(e.getMessage() + " " + oa.getCreditName());
return new Tuple2<>("check", reportInfo);
}
}
if (oa.getOtherNames().size() > 0) {
for (String othername : oa.getOtherNames()) {
if (StringUtils.isNotEmpty(othername))
try {
if (Math
.round(1 - new CosineDistance().apply(input[1][1], handleNameSurname(othername))) > 0) {
reportInfo.setLevel("not zero cosine on othen names");
return new Tuple2<>("check", reportInfo);
}
} catch (Exception e) {
reportInfo.setLevel(e.getMessage() + " " + othername);
return new Tuple2<>("check", reportInfo);
}
}
}
MatchService matchService = new MatchService();
List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
return new Document.Builder(contact[0])
.addElement(
new Element.Builder<String>()
.setValue(contact[1])
.setType(ElementType.NAME)
.createElement())
.createDocument();
}).collect(Collectors.toList());
if (matchService.applyMatchByDocId(documentList).entrySet().size() == 0) {
double out = FuzzySearch.ratio(input[0][1], input[1][1]);
if (out < 29) {
reportInfo.setLevel("less than 29 in fuzzywuzzy");
return new Tuple2<>("wrong", reportInfo);
} else {
// TODO extend the checking to catch range of fuzzy wuzzy that could be wrong
// try using soundex techniques or merge with previous implementation or both
reportInfo.setLevel("more than 29 in fuzzywuzzy");
return new Tuple2<>("check", reportInfo);
}
}
// TODO match size is not equal to zero. Verify the match value and then decide how to preceed
} else {
// MatchService matchService = new MatchService();
//
// List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
// return new Document.Builder(contact[0])
// .addElement(
// new Element.Builder<String>()
// .setValue(contact[1])
// .setType(ElementType.NAME)
// .createElement())
// .createDocument();
// }).collect(Collectors.toList());
// if (matchService.applyMatchByDocId(documentList).entrySet().size() == 1) {
if (FuzzySearch.ratio(input[0][1], input[1][1]) > 90) {
reportInfo.setLevel("more than 90 in fuzzywuzzy");
return new Tuple2<>("right", reportInfo);
} else {
reportInfo.setLevel("less than 90 in fuzzywuzzy");
return new Tuple2<>("check", reportInfo);
}
// }else{
// reportInfo.setLevel("not found a match in name matching");
// return new Tuple2<>("check", reportInfo);
// }
}
// // they have some words in common. check if orcid provides creditName or otherNames to check for distance
// //
// List<Document> documentList = Arrays.asList(input).stream().map(contact -> {
// return new Document.Builder(contact[0])
// .addElement(
// new Element.Builder<String>()
// .setValue(contact[1])
// .setType(ElementType.NAME)
// .createElement())
// .createDocument();
// }).collect(Collectors.toList());
//
// MatchService matchService = new MatchService();
//
// Map<String, List<Match<Document>>> result = matchService.applyMatchByDocId(documentList);
//
// if (result.entrySet().size() > 0) {
// reportInfo.setLevel("fuzzyMatch");
// return new Tuple2<>("right", reportInfo);
// }
return new Tuple2<>("right", reportInfo);
}
// constraints in or
private static boolean verifyConstraints(Map<String, String> param, WhiteList whitelist) {
log.info("whitelist_size : {}", whitelist.getWhitelist().size());
for (SelectionConstraints constraint : whitelist.getWhitelist()) {
if (constraint.verifyCriteria(param)) {
return true;
}
}
return false;
}
public static Tuple2<String, ReportInfo> getStringReportInfoTuple2(Tuple2<ResultInfo, OrcidAuthotitative> pair) {
Optional<OrcidAuthotitative> ooa = Optional.ofNullable(pair._2());
if (!ooa.isPresent()) {
return null;
}
OrcidAuthotitative oa = ooa.get();
ResultInfo ri = pair._1();
if (StringUtils.isBlank(ri.getSurname())) {
PacePerson pp = new PacePerson(ri.getFullname(), false);
ri.setSurname(pp.getNormalisedSurname());
ri.setName(pp.getNormalisedFirstName());
}
ReportInfo reportInfo = new ReportInfo();
reportInfo.setOid(oa.getOid());
reportInfo.setOname(oa.getName());
reportInfo.setOsurname(oa.getSurname());
reportInfo.setOcreditname(oa.getCreditName());
reportInfo.setAssociatedAuthors(Arrays.asList(ri));
int level = 1;
if (!Optional.ofNullable(oa.getSurname()).isPresent()) {
return new Tuple2<>("missing", reportInfo);
}
final String handledOsurname = handleNameSurname(oa.getSurname());
if (handledOsurname.equalsIgnoreCase("")) {
return new Tuple2<>("missing", reportInfo);
}
final String handledSurname = handleNameSurname(ri.getSurname());
if (handledSurname.equals("")) {
return new Tuple2<>("missing", reportInfo);
}
// check if oSurname and surname are equals
if (handledOsurname.equals(handledSurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("right", reportInfo);
}
level++;
// check if one is contained in the other
if (handledOsurname.contains(handledSurname) || handledSurname.contains(handledOsurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("right", reportInfo);
}
level++;
// check if one of the two is composed of more than one word. In this case concatenate the two words
// and check again (Mohammadi Peyhani vs Mohammadipeyhani)
String[] handledorcidSplit = handledOsurname.split(" ");
String[] handledresultSplit = handledSurname.split(" ");
if (handledorcidSplit.length == 2) {
String tmpSurname = handledorcidSplit[0] + handledorcidSplit[1];
if (tmpSurname.equals(handledSurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
if (handledresultSplit.length == 2) {
String tmpSurname = handledresultSplit[0] + handledresultSplit[1];
if (tmpSurname.equals(handledSurname)) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
level++;
// check if the words composing the name and the surname are the same or one list contains the
// other.
// do for words of lenght bigger than two
String handledOname = "";
if (Optional.ofNullable(oa.getName()).isPresent()) {
handledOname = handleNameSurname(oa.getName());
}
String handledName = "";
if (Optional.ofNullable(ri.getName()).isPresent()) {
handledName = handleNameSurname(ri.getName());
}
final List<String> orcidList = new ArrayList<>();
final List<String> paperList = new ArrayList<>();
addInList(orcidList, handledOname);
addInList(orcidList, handledOsurname);
addInList(paperList, handledSurname);
addInList(paperList, handledName);
if (checkListContainment(reportInfo, level, orcidList, paperList))
return new Tuple2<>("check", reportInfo);
level++;
handledorcidSplit = handledOsurname.split(" ");
handledresultSplit = handledName.split(" ");
if (handledorcidSplit.length == 2) {
orcidList.clear();
orcidList.add(handledorcidSplit[0] + handledorcidSplit[1]);
addInList(orcidList, handledOname);
if (checkListContainment(reportInfo, level, orcidList, paperList)) {
return new Tuple2<>("check", reportInfo);
}
orcidList.clear();
orcidList.add(handledorcidSplit[1] + handledorcidSplit[0]);
addInList(orcidList, handledOname);
if (checkListContainment(reportInfo, level, orcidList, paperList)) {
return new Tuple2<>("check", reportInfo);
}
}
if (handledresultSplit.length == 2) {
orcidList.clear();
addInList(orcidList, handledOname);
addInList(orcidList, handledOsurname);
paperList.clear();
paperList.add(handledresultSplit[0] + handledresultSplit[1]);
addInList(paperList, handledSurname);
if (checkListContainment(reportInfo, level, orcidList, paperList))
return new Tuple2<>("check", reportInfo);
paperList.clear();
paperList.add(handledresultSplit[1] + handledresultSplit[0]);
addInList(paperList, handledSurname);
if (checkListContainment(reportInfo, level, orcidList, paperList))
return new Tuple2<>("check", reportInfo);
}
level++;
if (handledOsurname.length() > 3 && handledSurname.length() > 3) {
LevenshteinDistance l = new LevenshteinDistance();
if (l.apply(handledOsurname, handledSurname) <= 2) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
if (handledOsurname.length() > 3 && handledName.length() > 3) {
LevenshteinDistance l = new LevenshteinDistance();
if (l.apply(handledOsurname, handledName) <= 2) {
reportInfo.setLevel("level" + level);
return new Tuple2<>("check", reportInfo);
}
}
return new Tuple2<>("wrong", reportInfo);
}
private static boolean checkListContainment(ReportInfo reportInfo, int level, List<String> orcidList,
List<String> paperList) {
if (orcidList.size() <= paperList.size()) {
if (searchIn(paperList, orcidList)) {
reportInfo.setLevel("level" + level);
return true;
}
} else {
if (searchIn(orcidList, paperList)) {
reportInfo.setLevel("level" + level);
return true;
}
}
return false;
}
/**
* searches in list1 all the words of list 2
* @param list1 the list where to search for the words
* @param list2 the list containing the words to be searched
* @return true if all the words in list 2 are contained in list1
*/
private static boolean searchIn(List<String> list1, List<String> list2) {
for (String word : list2) {
if (!list1.contains(word)) {
return false;
}
}
return true;
}
private static void writeSet(Dataset<Tuple2<String, ReportInfo>> dataset, String outputPath) {
dataset
.groupByKey(
(MapFunction<Tuple2<String, ReportInfo>, String>) value -> value._2().getOid(), Encoders.STRING())
.mapGroups((MapGroupsFunction<String, Tuple2<String, ReportInfo>, ReportInfo>) (oid, tuple2Iterator) -> {
ReportInfo reportInfo = tuple2Iterator.next()._2();
List<ResultInfo> aList = reportInfo.getAssociatedAuthors();
tuple2Iterator.forEachRemaining(tuple -> aList.addAll(tuple._2().getAssociatedAuthors()));
reportInfo.setAssociatedAuthors(aList);
return reportInfo;
}, Encoders.bean(ReportInfo.class))
.repartition(1)
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(outputPath);
}
private static <R> Dataset<R> readPath(
SparkSession spark, String inputPath, Class<R> clazz) {
return spark
.read()
.textFile(inputPath)
.map((MapFunction<String, R>) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
}