merge upstream

This commit is contained in:
Miriam Baglioni 2020-05-21 16:31:09 +02:00
commit 6750075fbd
8 changed files with 318 additions and 181 deletions

View File

@ -0,0 +1,159 @@
package eu.dnetlib.dhp.oa.dedup;
import com.wcohen.ss.JaroWinkler;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.pace.model.Person;
import org.apache.commons.lang3.StringUtils;
import scala.Tuple2;
import java.text.Normalizer;
import java.util.*;
import java.util.stream.Collectors;
public class AuthorMerger {
private static final Double THRESHOLD = 0.95;
public static List<Author> merge(List<List<Author>> authors){
authors.sort(new Comparator<List<Author>>() {
@Override
public int compare(List<Author> o1, List<Author> o2) {
return -Integer.compare(countAuthorsPids(o1), countAuthorsPids(o2));
}
});
List<Author> author = new ArrayList<>();
for(List<Author> a : authors){
author = mergeAuthor(author, a);
}
return author;
}
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
int pa = countAuthorsPids(a);
int pb = countAuthorsPids(b);
List<Author> base, enrich;
int sa = authorsSize(a);
int sb = authorsSize(b);
if (pa == pb) {
base = sa > sb ? a : b;
enrich = sa > sb ? b : a;
} else {
base = pa > pb ? a : b;
enrich = pa > pb ? b : a;
}
enrichPidFromList(base, enrich);
return base;
}
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
if (base == null || enrich == null)
return;
final Map<String, Author> basePidAuthorMap = base
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.map(p -> new Tuple2<>(pidToComparableString(p), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.filter(p -> !basePidAuthorMap.containsKey(pidToComparableString(p)))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
pidToEnrich
.forEach(
a -> {
Optional<Tuple2<Double, Author>> simAuthor = base
.stream()
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
.max(Comparator.comparing(Tuple2::_1));
if (simAuthor.isPresent() && simAuthor.get()._1() > THRESHOLD) {
Author r = simAuthor.get()._2();
if (r.getPid() == null) {
r.setPid(new ArrayList<>());
}
r.getPid().add(a._1());
}
});
}
public static String pidToComparableString(StructuredProperty pid){
return (pid.getQualifier()!=null? pid.getQualifier().getClassid()!=null?pid.getQualifier().getClassid().toLowerCase():"":"") + (pid.getValue()!=null? pid.getValue().toLowerCase():"");
}
public static int countAuthorsPids(List<Author> authors) {
if (authors == null)
return 0;
return (int) authors.stream().filter(AuthorMerger::hasPid).count();
}
private static int authorsSize(List<Author> authors) {
if (authors == null)
return 0;
return authors.size();
}
private static Double sim(Author a, Author b) {
final Person pa = parse(a);
final Person pb = parse(b);
if (pa.isAccurate() & pb.isAccurate()) {
return new JaroWinkler()
.score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString()));
} else {
return new JaroWinkler()
.score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
}
}
private static boolean hasPid(Author a) {
if (a == null || a.getPid() == null || a.getPid().size() == 0)
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
private static Person parse(Author author) {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
return new Person(author.getFullname(), false);
}
}
private static String normalize(final String s) {
return nfd(s)
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError
// in case
// of large input strings
.replaceAll("(\\W)+", " ")
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
.trim();
}
private static String nfd(final String s) {
return Normalizer.normalize(s, Normalizer.Form.NFD);
}
}

View File

@ -1,8 +1,9 @@
package eu.dnetlib.dhp.oa.dedup; package eu.dnetlib.dhp.oa.dedup;
import java.io.Serializable;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction; import org.apache.spark.api.java.function.MapGroupsFunction;
@ -67,16 +68,18 @@ public class DedupRecordFactory {
(MapFunction<Tuple2<String, T>, String>) entity -> entity._1(), Encoders.STRING()) (MapFunction<Tuple2<String, T>, String>) entity -> entity._1(), Encoders.STRING())
.mapGroups( .mapGroups(
(MapGroupsFunction<String, Tuple2<String, T>, T>) (key, (MapGroupsFunction<String, Tuple2<String, T>, T>) (key,
values) -> entityMerger(key, values, ts, dataInfo), values) -> entityMerger(key, values, ts, dataInfo, clazz),
Encoders.bean(clazz)); Encoders.bean(clazz));
} }
private static <T extends OafEntity> T entityMerger( public static <T extends OafEntity> T entityMerger(
String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo) { String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) throws IllegalAccessException, InstantiationException {
T entity = entities.next()._2(); T entity = clazz.newInstance();
final Collection<String> dates = Lists.newArrayList(); final Collection<String> dates = Lists.newArrayList();
final List<List<Author>> authors = Lists.newArrayList();
entities entities
.forEachRemaining( .forEachRemaining(
t -> { t -> {
@ -84,17 +87,17 @@ public class DedupRecordFactory {
entity.mergeFrom(duplicate); entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) { if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate; Result r1 = (Result) duplicate;
Result er = (Result) entity; if (r1.getAuthor() != null && r1.getAuthor().size()>0)
er.setAuthor(DedupUtility.mergeAuthor(er.getAuthor(), r1.getAuthor())); authors.add(r1.getAuthor());
if (r1.getDateofacceptance() != null)
if (r1.getDateofacceptance() != null) {
dates.add(r1.getDateofacceptance().getValue()); dates.add(r1.getDateofacceptance().getValue());
}
} }
}); });
//set authors and date
if (ModelSupport.isSubClass(entity, Result.class)) { if (ModelSupport.isSubClass(entity, Result.class)) {
((Result) entity).setDateofacceptance(DatePicker.pick(dates)); ((Result) entity).setDateofacceptance(DatePicker.pick(dates));
((Result) entity).setAuthor(AuthorMerger.merge(authors));
} }
entity.setId(id); entity.setId(id);

View File

@ -32,7 +32,6 @@ import eu.dnetlib.pace.model.Person;
import scala.Tuple2; import scala.Tuple2;
public class DedupUtility { public class DedupUtility {
private static final Double THRESHOLD = 0.95;
public static Map<String, LongAccumulator> constructAccumulator( public static Map<String, LongAccumulator> constructAccumulator(
final DedupConfig dedupConf, final SparkContext context) { final DedupConfig dedupConf, final SparkContext context) {
@ -82,61 +81,6 @@ public class DedupUtility {
} }
} }
public static List<Author> mergeAuthor(final List<Author> a, final List<Author> b) {
int pa = countAuthorsPids(a);
int pb = countAuthorsPids(b);
List<Author> base, enrich;
int sa = authorsSize(a);
int sb = authorsSize(b);
if (pa == pb) {
base = sa > sb ? a : b;
enrich = sa > sb ? b : a;
} else {
base = pa > pb ? a : b;
enrich = pa > pb ? b : a;
}
enrichPidFromList(base, enrich);
return base;
}
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
if (base == null || enrich == null)
return;
final Map<String, Author> basePidAuthorMap = base
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(a -> a.getPid().stream().map(p -> new Tuple2<>(p.toComparableString(), a)))
.collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
.stream()
.filter(a -> a.getPid() != null && a.getPid().size() > 0)
.flatMap(
a -> a
.getPid()
.stream()
.filter(p -> !basePidAuthorMap.containsKey(p.toComparableString()))
.map(p -> new Tuple2<>(p, a)))
.collect(Collectors.toList());
pidToEnrich
.forEach(
a -> {
Optional<Tuple2<Double, Author>> simAuhtor = base
.stream()
.map(ba -> new Tuple2<>(sim(ba, a._2()), ba))
.max(Comparator.comparing(Tuple2::_1));
if (simAuhtor.isPresent() && simAuhtor.get()._1() > THRESHOLD) {
Author r = simAuhtor.get()._2();
if (r.getPid() == null) {
r.setPid(new ArrayList<>());
}
r.getPid().add(a._1());
}
});
}
public static String createDedupRecordPath( public static String createDedupRecordPath(
final String basePath, final String actionSetId, final String entityType) { final String basePath, final String actionSetId, final String entityType) {
return String.format("%s/%s/%s_deduprecord", basePath, actionSetId, entityType); return String.format("%s/%s/%s_deduprecord", basePath, actionSetId, entityType);
@ -156,65 +100,6 @@ public class DedupUtility {
return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType); return String.format("%s/%s/%s_mergerel", basePath, actionSetId, entityType);
} }
private static Double sim(Author a, Author b) {
final Person pa = parse(a);
final Person pb = parse(b);
if (pa.isAccurate() & pb.isAccurate()) {
return new JaroWinkler()
.score(normalize(pa.getSurnameString()), normalize(pb.getSurnameString()));
} else {
return new JaroWinkler()
.score(normalize(pa.getNormalisedFullname()), normalize(pb.getNormalisedFullname()));
}
}
private static String normalize(final String s) {
return nfd(s)
.toLowerCase()
// do not compact the regexes in a single expression, would cause StackOverflowError
// in case
// of large input strings
.replaceAll("(\\W)+", " ")
.replaceAll("(\\p{InCombiningDiacriticalMarks})+", " ")
.replaceAll("(\\p{Punct})+", " ")
.replaceAll("(\\d)+", " ")
.replaceAll("(\\n)+", " ")
.trim();
}
private static String nfd(final String s) {
return Normalizer.normalize(s, Normalizer.Form.NFD);
}
private static Person parse(Author author) {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
} else {
return new Person(author.getFullname(), false);
}
}
private static int countAuthorsPids(List<Author> authors) {
if (authors == null)
return 0;
return (int) authors.stream().filter(DedupUtility::hasPid).count();
}
private static int authorsSize(List<Author> authors) {
if (authors == null)
return 0;
return authors.size();
}
private static boolean hasPid(Author a) {
if (a == null || a.getPid() == null || a.getPid().size() == 0)
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator) public static List<DedupConfig> getConfigurations(String isLookUpUrl, String orchestrator)
throws ISLookUpException, DocumentException { throws ISLookUpException, DocumentException {
final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl); final ISLookUpService isLookUpService = ISLookupClientFactory.getLookUpService(isLookUpUrl);

View File

@ -0,0 +1,138 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.*;
import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Tuple2;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class EntityMergerTest implements Serializable {
List<Tuple2<String, Publication>> publications;
String testEntityBasePath;
DataInfo dataInfo;
String dedupId = "dedup_id";
Publication pub_top;
@BeforeEach
public void setUp() throws Exception {
testEntityBasePath = Paths
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
.toFile()
.getAbsolutePath();
publications = readSample(testEntityBasePath + "/publication_merge.json", Publication.class);
pub_top = getTopPub(publications);
dataInfo = setDI();
}
@Test
public void publicationMergerTest() throws InstantiationException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory.entityMerger(dedupId, publications.iterator(), 0, dataInfo, Publication.class);
assertEquals(dedupId, pub_merged.getId());
assertEquals(pub_merged.getJournal(), pub_top.getJournal());
assertEquals(pub_merged.getBestaccessright(), pub_top.getBestaccessright());
assertEquals(pub_merged.getResulttype(), pub_top.getResulttype());
assertEquals(pub_merged.getLanguage(), pub_merged.getLanguage());
assertEquals(pub_merged.getPublisher(), pub_top.getPublisher());
assertEquals(pub_merged.getEmbargoenddate(), pub_top.getEmbargoenddate());
assertEquals(pub_merged.getResourcetype().getClassid(), "0004");
assertEquals(pub_merged.getDateoftransformation(), pub_top.getDateoftransformation());
assertEquals(pub_merged.getOaiprovenance(), pub_top.getOaiprovenance());
assertEquals(pub_merged.getDateofcollection(), pub_top.getDateofcollection());
assertEquals(pub_merged.getInstance().size(),3);
assertEquals(pub_merged.getCountry().size(), 2);
assertEquals(pub_merged.getSubject().size(), 0);
assertEquals(pub_merged.getTitle().size(), 2);
assertEquals(pub_merged.getRelevantdate().size(),0);
assertEquals(pub_merged.getDescription().size(),0);
assertEquals(pub_merged.getSource().size(),0);
assertEquals(pub_merged.getFulltext().size(),0);
assertEquals(pub_merged.getFormat().size(),0);
assertEquals(pub_merged.getContributor().size(),0);
assertEquals(pub_merged.getCoverage().size(),0);
assertEquals(pub_merged.getContext().size(),0);
assertEquals(pub_merged.getExternalReference().size(),0);
assertEquals(pub_merged.getOriginalId().size(),3);
assertEquals(pub_merged.getCollectedfrom().size(),3);
assertEquals(pub_merged.getPid().size(),1);
assertEquals(pub_merged.getExtraInfo().size(),0);
//verify datainfo
assertEquals(pub_merged.getDataInfo(), dataInfo);
//verify datepicker
assertEquals(pub_merged.getDateofacceptance().getValue(), "2018-09-30");
//verify authors
assertEquals(pub_merged.getAuthor().size(), 9);
assertEquals(AuthorMerger.countAuthorsPids(pub_merged.getAuthor()), 4);
}
public DataInfo setDI(){
DataInfo dataInfo = new DataInfo();
dataInfo.setTrust("0.9");
dataInfo.setDeletedbyinference(false);
dataInfo.setInferenceprovenance("testing");
dataInfo.setInferred(true);
return dataInfo;
}
public Publication getTopPub(List<Tuple2<String, Publication>> publications){
Double maxTrust = 0.0;
Publication maxPub = new Publication();
for (Tuple2<String, Publication> publication : publications) {
Double pubTrust = Double.parseDouble(publication._2().getDataInfo().getTrust());
if(pubTrust > maxTrust){
maxTrust = pubTrust;
maxPub = publication._2();
}
}
return maxPub;
}
public <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
List<Tuple2<String, T>> res = new ArrayList<>();
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(path));
String line = reader.readLine();
while (line != null) {
res.add(
new Tuple2<>(
MapDocumentUtil.getJPathString("$.id", line),
new ObjectMapper().readValue(line, clazz))
);
// read next line
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
return res;
}
}

View File

@ -1,54 +0,0 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import eu.dnetlib.dhp.schema.oaf.Publication;
public class MergeAuthorTest {
private List<Publication> publicationsToMerge;
private final ObjectMapper mapper = new ObjectMapper();
@BeforeEach
public void setUp() throws Exception {
final String json = IOUtils
.toString(
this.getClass().getResourceAsStream("/eu/dnetlib/dhp/dedup/json/authors_merge.json"));
publicationsToMerge = Arrays
.asList(json.split("\n"))
.stream()
.map(
s -> {
try {
return mapper.readValue(s, Publication.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
}
// FIX ME Michele DB this tests doesn't work
// @Test
public void test() throws Exception {
Publication dedup = new Publication();
publicationsToMerge
.forEach(
p -> {
dedup.mergeFrom(p);
dedup.setAuthor(DedupUtility.mergeAuthor(dedup.getAuthor(), p.getAuthor()));
});
System.out.println(mapper.writeValueAsString(dedup));
}
}

File diff suppressed because one or more lines are too long

View File

@ -22,12 +22,13 @@ SELECT
'' AS inferenceprovenance, '' AS inferenceprovenance,
d.id AS collectedfromid, d.id AS collectedfromid,
d.officialname AS collectedfromname, d.officialname AS collectedfromname,
o.country || '@@@' || o.country || '@@@dnet:countries@@@dnet:countries' AS country, o.country || '@@@' || COALESCE(cntr.name,o.country) || '@@@dnet:countries@@@dnet:countries' AS country,
'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction, 'sysimport:crosswalk:entityregistry@@@sysimport:crosswalk:entityregistry@@@dnet:provenance_actions@@@dnet:provenance_actions' AS provenanceaction,
ARRAY[]::text[] AS pid ARRAY[]::text[] AS pid
FROM dsm_organizations o FROM dsm_organizations o
LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom) LEFT OUTER JOIN dsm_datasources d ON (d.id = o.collectedfrom)
LEFT OUTER JOIN class cntr ON (cntr.code = o.country)

View File

@ -521,6 +521,8 @@
<PARAM managedBy="system" name="params" required="true" type="string"> <PARAM managedBy="system" name="params" required="true" type="string">
{ {
'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/country/oozie_app', 'oozie.wf.application.path' : '/lib/dnet/oa/enrichment/country/oozie_app',
'sparkExecutorCores' : '3',
'sparkExecutorMemory' : '10G',
'workingDir' : '/tmp/beta_provision/working_dir/country', 'workingDir' : '/tmp/beta_provision/working_dir/country',
'allowedtypes' : 'pubsrepository::institutional', 'allowedtypes' : 'pubsrepository::institutional',
'whitelist' : '10|opendoar____::300891a62162b960cf02ce3827bb363c', 'whitelist' : '10|opendoar____::300891a62162b960cf02ce3827bb363c',