forked from D-Net/dnet-hadoop
fixed error on empty intersection with publication and relation on export to OAF
This commit is contained in:
parent
eec418cd26
commit
734934e2eb
|
@ -1,4 +1,6 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.merge;
|
package eu.dnetlib.dhp.oa.merge;
|
||||||
|
|
||||||
import java.text.Normalizer;
|
import java.text.Normalizer;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -14,155 +16,155 @@ import scala.Tuple2;
|
||||||
|
|
||||||
public class AuthorMerger {
|
public class AuthorMerger {
|
||||||
|
|
||||||
private static final Double THRESHOLD = 0.95;
|
private static final Double THRESHOLD = 0.95;
|
||||||
|
|
||||||
public static List<Author> merge(List<List<Author>> authors) {
|
public static List<Author> merge(List<List<Author>> authors) {
|
||||||
|
|
||||||
authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2)));
|
authors.sort((o1, o2) -> -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2)));
|
||||||
|
|
||||||
List<Author> author = new ArrayList<>();
|
List<Author> author = new ArrayList<>();
|
||||||
|
|
||||||
for (List<Author> a : authors) {
|
for (List<Author> a : authors) {
|
||||||
author = mergeAuthor(author, a);
|
author = mergeAuthor(author, a);
|
||||||
}
|
}
|
||||||
|
|
||||||
return author;
|
return author;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
|
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
|
||||||
int pa = countAuthorsPids(a);
|
int pa = countAuthorsPids(a);
|
||||||
int pb = countAuthorsPids(b);
|
int pb = countAuthorsPids(b);
|
||||||
List<Author> base, enrich;
|
List<Author> base, enrich;
|
||||||
int sa = authorsSize(a);
|
int sa = authorsSize(a);
|
||||||
int sb = authorsSize(b);
|
int sb = authorsSize(b);
|
||||||
|
|
||||||
if (pa == pb) {
|
if (pa == pb) {
|
||||||
base = sa > sb ? a : b;
|
base = sa > sb ? a : b;
|
||||||
enrich = sa > sb ? b : a;
|
enrich = sa > sb ? b : a;
|
||||||
} else {
|
} else {
|
||||||
base = pa > pb ? a : b;
|
base = pa > pb ? a : b;
|
||||||
enrich = pa > pb ? b : a;
|
enrich = pa > pb ? b : a;
|
||||||
}
|
}
|
||||||
enrichPidFromList(base, enrich);
|
enrichPidFromList(base, enrich);
|
||||||
return base;
|
return base;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
|
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
|
||||||
if (base == null || enrich == null)
|
if (base == null || enrich == null)
|
||||||
return;
|
return;
|
||||||
final Map<String, Author> basePidAuthorMap = base
|
final Map<String, Author> basePidAuthorMap = base
|
||||||
.stream()
|
.stream()
|
||||||
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
||||||
.flatMap(
|
.flatMap(
|
||||||
a -> a
|
a -> a
|
||||||
.getPid()
|
.getPid()
|
||||||
.stream()
|
.stream()
|
||||||
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
|
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
|
||||||
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
|
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
|
||||||
|
|
||||||
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
|
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
|
||||||
.stream()
|
.stream()
|
||||||
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
|
||||||
.flatMap(
|
.flatMap(
|
||||||
a -> a
|
a -> a
|
||||||
.getPid()
|
.getPid()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
|
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
|
||||||
.map(p -> new Tuple2<>(p, a)))
|
.map(p -> new Tuple2<>(p, a)))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
pidToEnrich
|
pidToEnrich
|
||||||
.forEach(
|
.forEach(
|
||||||
a -> {
|
a -> {
|
||||||
Optional<Tuple2<Double, Author>> simAuthor = base
|
Optional<Tuple2<Double, Author>> simAuthor = base
|
||||||
.stream()
|
.stream()
|
||||||
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
|
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
|
||||||
.max(Comparator.comparing(Tuple2::_1));
|
.max(Comparator.comparing(Tuple2::_1));
|
||||||
|
|
||||||
if (simAuthor.isPresent()) {
|
if (simAuthor.isPresent()) {
|
||||||
double th = THRESHOLD;
|
double th = THRESHOLD;
|
||||||
// increase the threshold if the surname is too short
|
// increase the threshold if the surname is too short
|
||||||
if (simAuthor.get()._2().getSurname() != null
|
if (simAuthor.get()._2().getSurname() != null
|
||||||
&& simAuthor.get()._2().getSurname().length() <= 3)
|
&& simAuthor.get()._2().getSurname().length() <= 3)
|
||||||
th = 0.99;
|
th = 0.99;
|
||||||
|
|
||||||
if (simAuthor.get()._1() > th) {
|
if (simAuthor.get()._1() > th) {
|
||||||
Author r = simAuthor.get()._2();
|
Author r = simAuthor.get()._2();
|
||||||
if (r.getPid() == null) {
|
if (r.getPid() == null) {
|
||||||
r.setPid(new ArrayList<>());
|
r.setPid(new ArrayList<>());
|
||||||
}
|
}
|
||||||
r.getPid().add(a._1());
|
r.getPid().add(a._1());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String pidToComparableString(StructuredProperty pid) {
|
public static String pidToComparableString(StructuredProperty pid) {
|
||||||
return (pid.getQualifier() != null
|
return (pid.getQualifier() != null
|
||||||
? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
|
? pid.getQualifier().getClassid() != null ? pid.getQualifier().getClassid().toLowerCase() : ""
|
||||||
: "")
|
: "")
|
||||||
+ (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
|
+ (pid.getValue() != null ? pid.getValue().toLowerCase() : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int countAuthorsPids(List<Author> authors) {
|
public static int countAuthorsPids(List<Author> authors) {
|
||||||
if (authors == null)
|
if (authors == null)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
return (int) authors.stream().filter(AuthorMerger::hasPid).count();
|
return (int) authors.stream().filter(AuthorMerger::hasPid).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int authorsSize(List<Author> authors) {
|
private static int authorsSize(List<Author> authors) {
|
||||||
if (authors == null)
|
if (authors == null)
|
||||||
return 0;
|
return 0;
|
||||||
return authors.size();
|
return authors.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Double sim(Author a, Author b) {
|
private static Double sim(Author a, Author b) {
|
||||||
|
|
||||||
final Person pa = parse(a);
|
final Person pa = parse(a);
|
||||||
final Person pb = parse(b);
|
final Person pb = parse(b);
|
||||||
|
|
||||||
// if both are accurate (e.g. they have name and surname)
|
// if both are accurate (e.g. they have name and surname)
|
||||||
if (pa.isAccurate() & pb.isAccurate()) {
|
if (pa.isAccurate() & pb.isAccurate()) {
|
||||||
return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5
|
return new JaroWinkler().score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString())) * 0.5
|
||||||
+ new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5;
|
+ new JaroWinkler().score(normalize(pa.getNameString()), normalize(pb.getNameString())) * 0.5;
|
||||||
} else {
|
} else {
|
||||||
return new JaroWinkler()
|
return new JaroWinkler()
|
||||||
.score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
|
.score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean hasPid(Author a) {
|
private static boolean hasPid(Author a) {
|
||||||
if (a == null || a.getPid() == null || a.getPid().size() == 0)
|
if (a == null || a.getPid() == null || a.getPid().size() == 0)
|
||||||
return false;
|
return false;
|
||||||
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
|
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Person parse(Author author) {
|
private static Person parse(Author author) {
|
||||||
if (StringUtils.isNotBlank(author.getSurname())) {
|
if (StringUtils.isNotBlank(author.getSurname())) {
|
||||||
return new Person(author.getSurname() + ", " + author.getName(), false);
|
return new Person(author.getSurname() + ", " + author.getName(), false);
|
||||||
} else {
|
} else {
|
||||||
return new Person(author.getFullname(), false);
|
return new Person(author.getFullname(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String normalize(final String s) {
|
private static String normalize(final String s) {
|
||||||
return nfd(s)
|
return nfd(s)
|
||||||
.toLowerCase()
|
.toLowerCase()
|
||||||
// do not compact the regexes in a single expression, would cause StackOverflowError
|
// do not compact the regexes in a single expression, would cause StackOverflowError
|
||||||
// in case
|
// in case
|
||||||
// of large input strings
|
// of large input strings
|
||||||
.replaceAll("(\\W)+", " ")
|
.replaceAll("(\\W)+", " ")
|
||||||
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
|
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
|
||||||
.replaceAll("(\\p{Punct})+", " ")
|
.replaceAll("(\\p{Punct})+", " ")
|
||||||
.replaceAll("(\\d)+", " ")
|
.replaceAll("(\\d)+", " ")
|
||||||
.replaceAll("(\\n)+", " ")
|
.replaceAll("(\\n)+", " ")
|
||||||
.trim();
|
.trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String nfd(final String s) {
|
private static String nfd(final String s) {
|
||||||
return Normalizer.normalize(s, Normalizer.Form.NFD);
|
return Normalizer.normalize(s, Normalizer.Form.NFD);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,10 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.dedup;
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
|
@ -19,6 +17,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
|
@ -10,11 +10,11 @@ import java.io.Serializable;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
|
||||||
import eu.dnetlib.dhp.schema.oaf.*;
|
import eu.dnetlib.dhp.schema.oaf.*;
|
||||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
|
@ -272,30 +272,11 @@ object DLIToOAF {
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def convertDLIRelation(r: Relation): Relation = {
|
||||||
// def convertDLIRelation(r: DLIRelation): Relation = {
|
r.setSource(r.getSource.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::"))
|
||||||
//
|
r.setTarget(r.getTarget.replaceFirst("50|","50|scholix_____::" ).replaceFirst("60|", "60|scholix_____::"))
|
||||||
// val result = new Relation
|
r
|
||||||
// if (!relationTypeMapping.contains(r.getRelType))
|
}
|
||||||
// return null
|
|
||||||
//
|
|
||||||
// if (r.getProperties == null || r.getProperties.size() == 0 || (r.getProperties.size() == 1 && r.getProperties.get(0) == null))
|
|
||||||
// return null
|
|
||||||
// val t = relationTypeMapping.get(r.getRelType)
|
|
||||||
//
|
|
||||||
// result.setRelType("resultResult")
|
|
||||||
// result.setRelClass(t.get._1)
|
|
||||||
// result.setSubRelType(t.get._2)
|
|
||||||
// result.setCollectedfrom(r.getProperties.asScala.map(c => collectedFromMap.getOrElse(c.getKey, null)).filter(p => p != null).asJava)
|
|
||||||
// result.setSource(generateId(r.getSource))
|
|
||||||
// result.setTarget(generateId(r.getTarget))
|
|
||||||
//
|
|
||||||
// if (result.getSource.equals(result.getTarget))
|
|
||||||
// return null
|
|
||||||
// result.setDataInfo(generateDataInfo())
|
|
||||||
//
|
|
||||||
// result
|
|
||||||
// }
|
|
||||||
|
|
||||||
|
|
||||||
def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = {
|
def convertDLIDatasetTOOAF(d: DLIDataset): Dataset = {
|
||||||
|
|
|
@ -44,7 +44,7 @@ object SparkExportContentForOpenAire {
|
||||||
|
|
||||||
|
|
||||||
val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation]
|
val dsRel = spark.read.load(s"$workingPath/relation_b").as[Relation]
|
||||||
dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
|
dsRel.filter(r => r.getDataInfo==null || r.getDataInfo.getDeletedbyinference ==false).map(DLIToOAF.convertDLIRelation).write.mode(SaveMode.Overwrite).save(s"$workingPath/export/relationDS")
|
||||||
|
|
||||||
|
|
||||||
val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication]
|
val dsPubs = spark.read.load(s"$workingPath/publication").as[DLIPublication]
|
||||||
|
|
Loading…
Reference in New Issue