forked from D-Net/dnet-hadoop
Fix MergeUtils.mergeGroup: it could get rid of some records and did not consider all PID authorities whilke sorting records.
ResultTypeComparator is now renamed in MergeEntitiesComparator and can be used as a general comparator for merging groups of records
This commit is contained in:
parent
6be783caec
commit
3feab5d92d
|
@ -0,0 +1,104 @@
|
|||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.CROSSREF_ID;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class MergeEntitiesComparator implements Comparator<Oaf> {
|
||||
static final List<String> PID_AUTHORITIES = Arrays
|
||||
.asList(
|
||||
ModelConstants.ARXIV_ID,
|
||||
ModelConstants.PUBMED_CENTRAL_ID,
|
||||
ModelConstants.EUROPE_PUBMED_CENTRAL_ID,
|
||||
ModelConstants.DATACITE_ID,
|
||||
ModelConstants.CROSSREF_ID);
|
||||
|
||||
static final List<String> RESULT_TYPES = Arrays
|
||||
.asList(
|
||||
ModelConstants.ORP_RESULTTYPE_CLASSID,
|
||||
ModelConstants.SOFTWARE_RESULTTYPE_CLASSID,
|
||||
ModelConstants.DATASET_RESULTTYPE_CLASSID,
|
||||
ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||
|
||||
public static final Comparator<Oaf> INSTANCE = new MergeEntitiesComparator();
|
||||
|
||||
@Override
|
||||
public int compare(Oaf left, Oaf right) {
|
||||
if (left == null && right == null)
|
||||
return 0;
|
||||
if (left == null)
|
||||
return -1;
|
||||
if (right == null)
|
||||
return 1;
|
||||
|
||||
int res = 0;
|
||||
|
||||
// pid authority
|
||||
int cfp1 = left
|
||||
.getCollectedfrom()
|
||||
.stream()
|
||||
.map(kv -> PID_AUTHORITIES.indexOf(kv.getKey()))
|
||||
.max(Integer::compare)
|
||||
.orElse(-1);
|
||||
int cfp2 = right
|
||||
.getCollectedfrom()
|
||||
.stream()
|
||||
.map(kv -> PID_AUTHORITIES.indexOf(kv.getKey()))
|
||||
.max(Integer::compare)
|
||||
.orElse(-1);
|
||||
|
||||
if (cfp1 >= 0 && cfp1 > cfp2) {
|
||||
return 1;
|
||||
} else if (cfp2 >= 0 && cfp2 > cfp1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// trust
|
||||
if (left.getDataInfo() != null && right.getDataInfo() != null) {
|
||||
res = left.getDataInfo().getTrust().compareTo(right.getDataInfo().getTrust());
|
||||
}
|
||||
|
||||
// result type
|
||||
if (res == 0) {
|
||||
if (left instanceof Result && right instanceof Result) {
|
||||
Result r1 = (Result) left;
|
||||
Result r2 = (Result) right;
|
||||
|
||||
if (r1.getResulttype() == null || r1.getResulttype().getClassid() == null) {
|
||||
if (r2.getResulttype() != null && r2.getResulttype().getClassid() != null) {
|
||||
return -1;
|
||||
}
|
||||
} else if (r2.getResulttype() == null || r2.getResulttype().getClassid() == null) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
int rt1 = RESULT_TYPES.indexOf(r1.getResulttype().getClassid());
|
||||
int rt2 = RESULT_TYPES.indexOf(r2.getResulttype().getClassid());
|
||||
|
||||
if (rt1 >= 0 && rt1 > rt2) {
|
||||
return 1;
|
||||
} else if (rt2 >= 0 && rt2 > rt1) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// id
|
||||
if (res == 0) {
|
||||
if (left instanceof OafEntity && right instanceof OafEntity) {
|
||||
res = ((OafEntity) left).getId().compareTo(((OafEntity) right).getId());
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
|
@ -40,27 +40,12 @@ public class MergeUtils {
|
|||
|
||||
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
|
||||
boolean checkDelegateAuthority) {
|
||||
TreeSet<T> sortedEntities = new TreeSet<>((o1, o2) -> {
|
||||
int res = 0;
|
||||
|
||||
if (o1.getDataInfo() != null && o2.getDataInfo() != null) {
|
||||
res = o1.getDataInfo().getTrust().compareTo(o2.getDataInfo().getTrust());
|
||||
}
|
||||
ArrayList<T> sortedEntities = new ArrayList<>();
|
||||
oafEntityIterator.forEachRemaining(sortedEntities::add);
|
||||
sortedEntities.sort(MergeEntitiesComparator.INSTANCE.reversed());
|
||||
|
||||
if (res == 0) {
|
||||
if (o1 instanceof Result && o2 instanceof Result) {
|
||||
return ResultTypeComparator.INSTANCE.compare((Result) o1, (Result) o2);
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
});
|
||||
|
||||
while (oafEntityIterator.hasNext()) {
|
||||
sortedEntities.add(oafEntityIterator.next());
|
||||
}
|
||||
|
||||
Iterator<T> it = sortedEntities.descendingIterator();
|
||||
Iterator<T> it = sortedEntities.iterator();
|
||||
T merged = it.next();
|
||||
|
||||
while (it.hasNext()) {
|
||||
|
@ -143,7 +128,7 @@ public class MergeUtils {
|
|||
* https://graph.openaire.eu/docs/data-model/pids-and-identifiers#delegated-authorities and in that case it prefers
|
||||
* such version.
|
||||
* <p>
|
||||
* Otherwise, it considers a resulttype priority order implemented in {@link ResultTypeComparator}
|
||||
* Otherwise, it considers a resulttype priority order implemented in {@link MergeEntitiesComparator}
|
||||
* and proceeds with the canonical property merging.
|
||||
*
|
||||
* @param left
|
||||
|
@ -161,8 +146,9 @@ public class MergeUtils {
|
|||
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
|
||||
return right;
|
||||
}
|
||||
|
||||
// TODO: raise trust to have preferred fields from one or the other??
|
||||
if (new ResultTypeComparator().compare(left, right) < 0) {
|
||||
if (MergeEntitiesComparator.INSTANCE.compare(left, right) > 0) {
|
||||
return mergeResultFields(left, right);
|
||||
} else {
|
||||
return mergeResultFields(right, left);
|
||||
|
@ -225,9 +211,9 @@ public class MergeUtils {
|
|||
|
||||
private static <T, K> List<T> mergeLists(final List<T> left, final List<T> right, int trust,
|
||||
Function<T, K> keyExtractor, BinaryOperator<T> merger) {
|
||||
if (left == null) {
|
||||
return right;
|
||||
} else if (right == null) {
|
||||
if (left == null || left.isEmpty()) {
|
||||
return right != null ? right : new ArrayList<>();
|
||||
} else if (right == null || right.isEmpty()) {
|
||||
return left;
|
||||
}
|
||||
|
||||
|
@ -405,7 +391,7 @@ public class MergeUtils {
|
|||
}
|
||||
|
||||
// should be an instance attribute, get the first non-null value
|
||||
merge.setLanguage(coalesce(merge.getLanguage(), enrich.getLanguage()));
|
||||
merge.setLanguage(coalesceQualifier(merge.getLanguage(), enrich.getLanguage()));
|
||||
|
||||
// distinct countries, do not manage datainfo
|
||||
merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust));
|
||||
|
@ -575,6 +561,13 @@ public class MergeUtils {
|
|||
return m != null ? m : e;
|
||||
}
|
||||
|
||||
private static Qualifier coalesceQualifier(Qualifier m, Qualifier e) {
|
||||
if (m == null || m.getClassid() == null || StringUtils.isBlank(m.getClassid())) {
|
||||
return e;
|
||||
}
|
||||
return m;
|
||||
}
|
||||
|
||||
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
||||
List<List<Author>> authors = new ArrayList<>();
|
||||
if (author != null) {
|
||||
|
@ -587,6 +580,10 @@ public class MergeUtils {
|
|||
}
|
||||
|
||||
private static String instanceKeyExtractor(Instance i) {
|
||||
// three levels of concatenating:
|
||||
// 1. ::
|
||||
// 2. @@
|
||||
// 3. ||
|
||||
return String
|
||||
.join(
|
||||
"::",
|
||||
|
@ -594,10 +591,10 @@ public class MergeUtils {
|
|||
kvKeyExtractor(i.getCollectedfrom()),
|
||||
qualifierKeyExtractor(i.getAccessright()),
|
||||
qualifierKeyExtractor(i.getInstancetype()),
|
||||
Optional.ofNullable(i.getUrl()).map(u -> String.join("::", u)).orElse(null),
|
||||
Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null),
|
||||
Optional
|
||||
.ofNullable(i.getPid())
|
||||
.map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("::")))
|
||||
.map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@")))
|
||||
.orElse(null));
|
||||
}
|
||||
|
||||
|
@ -706,7 +703,7 @@ public class MergeUtils {
|
|||
private static String spKeyExtractor(StructuredProperty sp) {
|
||||
return Optional
|
||||
.ofNullable(sp)
|
||||
.map(s -> Joiner.on("::").join(s, qualifierKeyExtractor(s.getQualifier())))
|
||||
.map(s -> Joiner.on("||").join(qualifierKeyExtractor(s.getQualifier()), s.getValue()))
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,87 +0,0 @@
|
|||
|
||||
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||
|
||||
import static eu.dnetlib.dhp.schema.common.ModelConstants.CROSSREF_ID;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
|
||||
public class ResultTypeComparator implements Comparator<Result> {
|
||||
|
||||
public static final ResultTypeComparator INSTANCE = new ResultTypeComparator();
|
||||
|
||||
@Override
|
||||
public int compare(Result left, Result right) {
|
||||
|
||||
if (left == null && right == null)
|
||||
return 0;
|
||||
if (left == null)
|
||||
return 1;
|
||||
if (right == null)
|
||||
return -1;
|
||||
|
||||
HashSet<String> lCf = getCollectedFromIds(left);
|
||||
HashSet<String> rCf = getCollectedFromIds(right);
|
||||
|
||||
if (lCf.contains(CROSSREF_ID) && !rCf.contains(CROSSREF_ID)) {
|
||||
return -1;
|
||||
}
|
||||
if (!lCf.contains(CROSSREF_ID) && rCf.contains(CROSSREF_ID)) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (left.getResulttype() == null || left.getResulttype().getClassid() == null) {
|
||||
if (right.getResulttype() == null || right.getResulttype().getClassid() == null) {
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
} else if (right.getResulttype() == null || right.getResulttype().getClassid() == null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
String lClass = left.getResulttype().getClassid();
|
||||
String rClass = right.getResulttype().getClassid();
|
||||
|
||||
if (!lClass.equals(rClass)) {
|
||||
if (lClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.DATASET_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
|
||||
if (lClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
|
||||
return -1;
|
||||
if (rClass.equals(ModelConstants.ORP_RESULTTYPE_CLASSID))
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Else (but unlikely), lexicographical ordering will do.
|
||||
return lClass.compareTo(rClass);
|
||||
}
|
||||
|
||||
protected HashSet<String> getCollectedFromIds(Result left) {
|
||||
return Optional
|
||||
.ofNullable(left.getCollectedfrom())
|
||||
.map(
|
||||
cf -> cf
|
||||
.stream()
|
||||
.map(KeyValue::getKey)
|
||||
.collect(Collectors.toCollection(HashSet::new)))
|
||||
.orElse(new HashSet<>());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import scala.Tuple2;
|
||||
|
||||
class DatasetMergerTest implements Serializable {
|
||||
|
||||
private List<Tuple2<String, Dataset>> datasets;
|
||||
|
||||
private String testEntityBasePath;
|
||||
private DataInfo dataInfo;
|
||||
private final String dedupId = "50|doi_________::3d18564ef27ebe9ef3bd8b4dec67e148";
|
||||
private Dataset dataset_top;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
testEntityBasePath = Paths
|
||||
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
|
||||
.toFile()
|
||||
.getAbsolutePath();
|
||||
|
||||
datasets = readSample(testEntityBasePath + "/dataset_merge.json", Dataset.class);
|
||||
|
||||
dataset_top = getTopPub(datasets);
|
||||
|
||||
dataInfo = setDI();
|
||||
}
|
||||
|
||||
@Test
|
||||
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException {
|
||||
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator());
|
||||
|
||||
// verify id
|
||||
assertEquals(dedupId, pub_merged.getId());
|
||||
assertEquals(2, pub_merged.getInstance().size());
|
||||
}
|
||||
|
||||
public DataInfo setDI() {
|
||||
DataInfo dataInfo = new DataInfo();
|
||||
dataInfo.setTrust("0.9");
|
||||
dataInfo.setDeletedbyinference(false);
|
||||
dataInfo.setInferenceprovenance("testing");
|
||||
dataInfo.setInferred(true);
|
||||
return dataInfo;
|
||||
}
|
||||
|
||||
public Dataset getTopPub(List<Tuple2<String, Dataset>> publications) {
|
||||
|
||||
Double maxTrust = 0.0;
|
||||
Dataset maxPub = new Dataset();
|
||||
for (Tuple2<String, Dataset> publication : publications) {
|
||||
Double pubTrust = Double.parseDouble(publication._2().getDataInfo().getTrust());
|
||||
if (pubTrust > maxTrust) {
|
||||
maxTrust = pubTrust;
|
||||
maxPub = publication._2();
|
||||
}
|
||||
}
|
||||
return maxPub;
|
||||
}
|
||||
|
||||
public <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
|
||||
List<Tuple2<String, T>> res = new ArrayList<>();
|
||||
BufferedReader reader;
|
||||
try {
|
||||
reader = new BufferedReader(new FileReader(path));
|
||||
String line = reader.readLine();
|
||||
while (line != null) {
|
||||
res
|
||||
.add(
|
||||
new Tuple2<>(
|
||||
MapDocumentUtil.getJPathString("$.id", line),
|
||||
new ObjectMapper().readValue(line, clazz)));
|
||||
// read next line
|
||||
line = reader.readLine();
|
||||
}
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
}
|
|
@ -93,14 +93,14 @@ class EntityMergerTest implements Serializable {
|
|||
assertEquals(pub_top.getJournal().getConferencedate(), pub_merged.getJournal().getConferencedate());
|
||||
assertEquals(pub_top.getJournal().getConferenceplace(), pub_merged.getJournal().getConferenceplace());
|
||||
assertEquals("OPEN", pub_merged.getBestaccessright().getClassid());
|
||||
assertEquals(pub_top.getResulttype(), pub_merged.getResulttype());
|
||||
assertEquals(pub_top.getLanguage(), pub_merged.getLanguage());
|
||||
assertEquals(pub_top.getPublisher(), pub_merged.getPublisher());
|
||||
assertEquals(pub_top.getEmbargoenddate(), pub_merged.getEmbargoenddate());
|
||||
assertEquals(pub_top.getResulttype().getClassid(), pub_merged.getResulttype().getClassid());
|
||||
assertEquals(pub_top.getLanguage().getClassid(), pub_merged.getLanguage().getClassid());
|
||||
assertEquals("Elsevier BV", pub_merged.getPublisher().getValue());
|
||||
assertEquals(pub_top.getEmbargoenddate().getValue(), pub_merged.getEmbargoenddate().getValue());
|
||||
assertEquals(pub_top.getResourcetype().getClassid(), "");
|
||||
assertEquals(pub_top.getDateoftransformation(), pub_merged.getDateoftransformation());
|
||||
assertEquals(pub_top.getOaiprovenance(), pub_merged.getOaiprovenance());
|
||||
assertEquals(pub_top.getDateofcollection(), pub_merged.getDateofcollection());
|
||||
// assertEquals(pub_top.getDateofcollection(), pub_merged.getDateofcollection());
|
||||
assertEquals(3, pub_merged.getInstance().size());
|
||||
assertEquals(2, pub_merged.getCountry().size());
|
||||
assertEquals(0, pub_merged.getSubject().size());
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue