forked from D-Net/dnet-hadoop
applying changes from PR#442: Fix for missing collectedfrom after dedup
This commit is contained in:
parent
f70dc76b61
commit
ce2364743a
|
@ -0,0 +1,104 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.schema.oaf.utils;
|
||||||
|
|
||||||
|
import static eu.dnetlib.dhp.schema.common.ModelConstants.CROSSREF_ID;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.common.ModelConstants;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||||
|
|
||||||
|
public class MergeEntitiesComparator implements Comparator<Oaf> {
|
||||||
|
static final List<String> PID_AUTHORITIES = Arrays
|
||||||
|
.asList(
|
||||||
|
ModelConstants.ARXIV_ID,
|
||||||
|
ModelConstants.PUBMED_CENTRAL_ID,
|
||||||
|
ModelConstants.EUROPE_PUBMED_CENTRAL_ID,
|
||||||
|
ModelConstants.DATACITE_ID,
|
||||||
|
ModelConstants.CROSSREF_ID);
|
||||||
|
|
||||||
|
static final List<String> RESULT_TYPES = Arrays
|
||||||
|
.asList(
|
||||||
|
ModelConstants.ORP_RESULTTYPE_CLASSID,
|
||||||
|
ModelConstants.SOFTWARE_RESULTTYPE_CLASSID,
|
||||||
|
ModelConstants.DATASET_RESULTTYPE_CLASSID,
|
||||||
|
ModelConstants.PUBLICATION_RESULTTYPE_CLASSID);
|
||||||
|
|
||||||
|
public static final Comparator<Oaf> INSTANCE = new MergeEntitiesComparator();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compare(Oaf left, Oaf right) {
|
||||||
|
if (left == null && right == null)
|
||||||
|
return 0;
|
||||||
|
if (left == null)
|
||||||
|
return -1;
|
||||||
|
if (right == null)
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
int res = 0;
|
||||||
|
|
||||||
|
// pid authority
|
||||||
|
int cfp1 = left
|
||||||
|
.getCollectedfrom()
|
||||||
|
.stream()
|
||||||
|
.map(kv -> PID_AUTHORITIES.indexOf(kv.getKey()))
|
||||||
|
.max(Integer::compare)
|
||||||
|
.orElse(-1);
|
||||||
|
int cfp2 = right
|
||||||
|
.getCollectedfrom()
|
||||||
|
.stream()
|
||||||
|
.map(kv -> PID_AUTHORITIES.indexOf(kv.getKey()))
|
||||||
|
.max(Integer::compare)
|
||||||
|
.orElse(-1);
|
||||||
|
|
||||||
|
if (cfp1 >= 0 && cfp1 > cfp2) {
|
||||||
|
return 1;
|
||||||
|
} else if (cfp2 >= 0 && cfp2 > cfp1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// trust
|
||||||
|
if (left.getDataInfo() != null && right.getDataInfo() != null) {
|
||||||
|
res = left.getDataInfo().getTrust().compareTo(right.getDataInfo().getTrust());
|
||||||
|
}
|
||||||
|
|
||||||
|
// result type
|
||||||
|
if (res == 0) {
|
||||||
|
if (left instanceof Result && right instanceof Result) {
|
||||||
|
Result r1 = (Result) left;
|
||||||
|
Result r2 = (Result) right;
|
||||||
|
|
||||||
|
if (r1.getResulttype() == null || r1.getResulttype().getClassid() == null) {
|
||||||
|
if (r2.getResulttype() != null && r2.getResulttype().getClassid() != null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else if (r2.getResulttype() == null || r2.getResulttype().getClassid() == null) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int rt1 = RESULT_TYPES.indexOf(r1.getResulttype().getClassid());
|
||||||
|
int rt2 = RESULT_TYPES.indexOf(r2.getResulttype().getClassid());
|
||||||
|
|
||||||
|
if (rt1 >= 0 && rt1 > rt2) {
|
||||||
|
return 1;
|
||||||
|
} else if (rt2 >= 0 && rt2 > rt1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// id
|
||||||
|
if (res == 0) {
|
||||||
|
if (left instanceof OafEntity && right instanceof OafEntity) {
|
||||||
|
res = ((OafEntity) left).getId().compareTo(((OafEntity) right).getId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -40,27 +40,12 @@ public class MergeUtils {
|
||||||
|
|
||||||
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
|
public static <T extends Oaf> T mergeGroup(String s, Iterator<T> oafEntityIterator,
|
||||||
boolean checkDelegateAuthority) {
|
boolean checkDelegateAuthority) {
|
||||||
TreeSet<T> sortedEntities = new TreeSet<>((o1, o2) -> {
|
|
||||||
int res = 0;
|
|
||||||
|
|
||||||
if (o1.getDataInfo() != null && o2.getDataInfo() != null) {
|
ArrayList<T> sortedEntities = new ArrayList<>();
|
||||||
res = o1.getDataInfo().getTrust().compareTo(o2.getDataInfo().getTrust());
|
oafEntityIterator.forEachRemaining(sortedEntities::add);
|
||||||
}
|
sortedEntities.sort(MergeEntitiesComparator.INSTANCE.reversed());
|
||||||
|
|
||||||
if (res == 0) {
|
Iterator<T> it = sortedEntities.iterator();
|
||||||
if (o1 instanceof Result && o2 instanceof Result) {
|
|
||||||
return ResultTypeComparator.INSTANCE.compare((Result) o1, (Result) o2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
});
|
|
||||||
|
|
||||||
while (oafEntityIterator.hasNext()) {
|
|
||||||
sortedEntities.add(oafEntityIterator.next());
|
|
||||||
}
|
|
||||||
|
|
||||||
Iterator<T> it = sortedEntities.descendingIterator();
|
|
||||||
T merged = it.next();
|
T merged = it.next();
|
||||||
|
|
||||||
while (it.hasNext()) {
|
while (it.hasNext()) {
|
||||||
|
@ -143,7 +128,7 @@ public class MergeUtils {
|
||||||
* https://graph.openaire.eu/docs/data-model/pids-and-identifiers#delegated-authorities and in that case it prefers
|
* https://graph.openaire.eu/docs/data-model/pids-and-identifiers#delegated-authorities and in that case it prefers
|
||||||
* such version.
|
* such version.
|
||||||
* <p>
|
* <p>
|
||||||
* Otherwise, it considers a resulttype priority order implemented in {@link ResultTypeComparator}
|
* Otherwise, it considers a resulttype priority order implemented in {@link MergeEntitiesComparator}
|
||||||
* and proceeds with the canonical property merging.
|
* and proceeds with the canonical property merging.
|
||||||
*
|
*
|
||||||
* @param left
|
* @param left
|
||||||
|
@ -161,8 +146,9 @@ public class MergeUtils {
|
||||||
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
|
if (!leftFromDelegatedAuthority && rightFromDelegatedAuthority) {
|
||||||
return right;
|
return right;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: raise trust to have preferred fields from one or the other??
|
// TODO: raise trust to have preferred fields from one or the other??
|
||||||
if (new ResultTypeComparator().compare(left, right) < 0) {
|
if (MergeEntitiesComparator.INSTANCE.compare(left, right) > 0) {
|
||||||
return mergeResultFields(left, right);
|
return mergeResultFields(left, right);
|
||||||
} else {
|
} else {
|
||||||
return mergeResultFields(right, left);
|
return mergeResultFields(right, left);
|
||||||
|
@ -225,9 +211,9 @@ public class MergeUtils {
|
||||||
|
|
||||||
private static <T, K> List<T> mergeLists(final List<T> left, final List<T> right, int trust,
|
private static <T, K> List<T> mergeLists(final List<T> left, final List<T> right, int trust,
|
||||||
Function<T, K> keyExtractor, BinaryOperator<T> merger) {
|
Function<T, K> keyExtractor, BinaryOperator<T> merger) {
|
||||||
if (left == null) {
|
if (left == null || left.isEmpty()) {
|
||||||
return right;
|
return right != null ? right : new ArrayList<>();
|
||||||
} else if (right == null) {
|
} else if (right == null || right.isEmpty()) {
|
||||||
return left;
|
return left;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -405,7 +391,7 @@ public class MergeUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
// should be an instance attribute, get the first non-null value
|
// should be an instance attribute, get the first non-null value
|
||||||
merge.setLanguage(coalesce(merge.getLanguage(), enrich.getLanguage()));
|
merge.setLanguage(coalesceQualifier(merge.getLanguage(), enrich.getLanguage()));
|
||||||
|
|
||||||
// distinct countries, do not manage datainfo
|
// distinct countries, do not manage datainfo
|
||||||
merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust));
|
merge.setCountry(mergeQualifiers(merge.getCountry(), enrich.getCountry(), trust));
|
||||||
|
@ -575,6 +561,13 @@ public class MergeUtils {
|
||||||
return m != null ? m : e;
|
return m != null ? m : e;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Qualifier coalesceQualifier(Qualifier m, Qualifier e) {
|
||||||
|
if (m == null || m.getClassid() == null || StringUtils.isBlank(m.getClassid())) {
|
||||||
|
return e;
|
||||||
|
}
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
private static List<Author> mergeAuthors(List<Author> author, List<Author> author1, int trust) {
|
||||||
List<List<Author>> authors = new ArrayList<>();
|
List<List<Author>> authors = new ArrayList<>();
|
||||||
if (author != null) {
|
if (author != null) {
|
||||||
|
@ -587,6 +580,10 @@ public class MergeUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String instanceKeyExtractor(Instance i) {
|
private static String instanceKeyExtractor(Instance i) {
|
||||||
|
// three levels of concatenating:
|
||||||
|
// 1. ::
|
||||||
|
// 2. @@
|
||||||
|
// 3. ||
|
||||||
return String
|
return String
|
||||||
.join(
|
.join(
|
||||||
"::",
|
"::",
|
||||||
|
@ -594,10 +591,10 @@ public class MergeUtils {
|
||||||
kvKeyExtractor(i.getCollectedfrom()),
|
kvKeyExtractor(i.getCollectedfrom()),
|
||||||
qualifierKeyExtractor(i.getAccessright()),
|
qualifierKeyExtractor(i.getAccessright()),
|
||||||
qualifierKeyExtractor(i.getInstancetype()),
|
qualifierKeyExtractor(i.getInstancetype()),
|
||||||
Optional.ofNullable(i.getUrl()).map(u -> String.join("::", u)).orElse(null),
|
Optional.ofNullable(i.getUrl()).map(u -> String.join("@@", u)).orElse(null),
|
||||||
Optional
|
Optional
|
||||||
.ofNullable(i.getPid())
|
.ofNullable(i.getPid())
|
||||||
.map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("::")))
|
.map(pp -> pp.stream().map(MergeUtils::spKeyExtractor).collect(Collectors.joining("@@")))
|
||||||
.orElse(null));
|
.orElse(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -706,7 +703,7 @@ public class MergeUtils {
|
||||||
private static String spKeyExtractor(StructuredProperty sp) {
|
private static String spKeyExtractor(StructuredProperty sp) {
|
||||||
return Optional
|
return Optional
|
||||||
.ofNullable(sp)
|
.ofNullable(sp)
|
||||||
.map(s -> Joiner.on("::").join(s, qualifierKeyExtractor(s.getQualifier())))
|
.map(s -> Joiner.on("||").join(qualifierKeyExtractor(s.getQualifier()), s.getValue()))
|
||||||
.orElse(null);
|
.orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
|
||||||
|
package eu.dnetlib.dhp.oa.dedup;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.io.FileReader;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.DataInfo;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.Dataset;
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
|
||||||
|
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
class DatasetMergerTest implements Serializable {
|
||||||
|
|
||||||
|
private List<Tuple2<String, Dataset>> datasets;
|
||||||
|
|
||||||
|
private String testEntityBasePath;
|
||||||
|
private DataInfo dataInfo;
|
||||||
|
private final String dedupId = "50|doi_________::3d18564ef27ebe9ef3bd8b4dec67e148";
|
||||||
|
private Dataset dataset_top;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
testEntityBasePath = Paths
|
||||||
|
.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/json").toURI())
|
||||||
|
.toFile()
|
||||||
|
.getAbsolutePath();
|
||||||
|
|
||||||
|
datasets = readSample(testEntityBasePath + "/dataset_merge.json", Dataset.class);
|
||||||
|
|
||||||
|
dataset_top = getTopPub(datasets);
|
||||||
|
|
||||||
|
dataInfo = setDI();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void datasetMergerTest() throws InstantiationException, IllegalAccessException, InvocationTargetException {
|
||||||
|
Dataset pub_merged = MergeUtils.mergeGroup(dedupId, datasets.stream().map(Tuple2::_2).iterator());
|
||||||
|
|
||||||
|
// verify id
|
||||||
|
assertEquals(dedupId, pub_merged.getId());
|
||||||
|
assertEquals(2, pub_merged.getInstance().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public DataInfo setDI() {
|
||||||
|
DataInfo dataInfo = new DataInfo();
|
||||||
|
dataInfo.setTrust("0.9");
|
||||||
|
dataInfo.setDeletedbyinference(false);
|
||||||
|
dataInfo.setInferenceprovenance("testing");
|
||||||
|
dataInfo.setInferred(true);
|
||||||
|
return dataInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Dataset getTopPub(List<Tuple2<String, Dataset>> publications) {
|
||||||
|
|
||||||
|
Double maxTrust = 0.0;
|
||||||
|
Dataset maxPub = new Dataset();
|
||||||
|
for (Tuple2<String, Dataset> publication : publications) {
|
||||||
|
Double pubTrust = Double.parseDouble(publication._2().getDataInfo().getTrust());
|
||||||
|
if (pubTrust > maxTrust) {
|
||||||
|
maxTrust = pubTrust;
|
||||||
|
maxPub = publication._2();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return maxPub;
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> List<Tuple2<String, T>> readSample(String path, Class<T> clazz) {
|
||||||
|
List<Tuple2<String, T>> res = new ArrayList<>();
|
||||||
|
BufferedReader reader;
|
||||||
|
try {
|
||||||
|
reader = new BufferedReader(new FileReader(path));
|
||||||
|
String line = reader.readLine();
|
||||||
|
while (line != null) {
|
||||||
|
res
|
||||||
|
.add(
|
||||||
|
new Tuple2<>(
|
||||||
|
MapDocumentUtil.getJPathString("$.id", line),
|
||||||
|
new ObjectMapper().readValue(line, clazz)));
|
||||||
|
// read next line
|
||||||
|
line = reader.readLine();
|
||||||
|
}
|
||||||
|
reader.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -93,14 +93,14 @@ class EntityMergerTest implements Serializable {
|
||||||
assertEquals(pub_top.getJournal().getConferencedate(), pub_merged.getJournal().getConferencedate());
|
assertEquals(pub_top.getJournal().getConferencedate(), pub_merged.getJournal().getConferencedate());
|
||||||
assertEquals(pub_top.getJournal().getConferenceplace(), pub_merged.getJournal().getConferenceplace());
|
assertEquals(pub_top.getJournal().getConferenceplace(), pub_merged.getJournal().getConferenceplace());
|
||||||
assertEquals("OPEN", pub_merged.getBestaccessright().getClassid());
|
assertEquals("OPEN", pub_merged.getBestaccessright().getClassid());
|
||||||
assertEquals(pub_top.getResulttype(), pub_merged.getResulttype());
|
assertEquals(pub_top.getResulttype().getClassid(), pub_merged.getResulttype().getClassid());
|
||||||
assertEquals(pub_top.getLanguage(), pub_merged.getLanguage());
|
assertEquals(pub_top.getLanguage().getClassid(), pub_merged.getLanguage().getClassid());
|
||||||
assertEquals(pub_top.getPublisher(), pub_merged.getPublisher());
|
assertEquals("Elsevier BV", pub_merged.getPublisher().getValue());
|
||||||
assertEquals(pub_top.getEmbargoenddate(), pub_merged.getEmbargoenddate());
|
assertEquals(pub_top.getEmbargoenddate().getValue(), pub_merged.getEmbargoenddate().getValue());
|
||||||
assertEquals(pub_top.getResourcetype().getClassid(), "");
|
assertEquals(pub_top.getResourcetype().getClassid(), "");
|
||||||
assertEquals(pub_top.getDateoftransformation(), pub_merged.getDateoftransformation());
|
assertEquals(pub_top.getDateoftransformation(), pub_merged.getDateoftransformation());
|
||||||
assertEquals(pub_top.getOaiprovenance(), pub_merged.getOaiprovenance());
|
assertEquals(pub_top.getOaiprovenance(), pub_merged.getOaiprovenance());
|
||||||
assertEquals(pub_top.getDateofcollection(), pub_merged.getDateofcollection());
|
// assertEquals(pub_top.getDateofcollection(), pub_merged.getDateofcollection());
|
||||||
assertEquals(3, pub_merged.getInstance().size());
|
assertEquals(3, pub_merged.getInstance().size());
|
||||||
assertEquals(2, pub_merged.getCountry().size());
|
assertEquals(2, pub_merged.getCountry().size());
|
||||||
assertEquals(0, pub_merged.getSubject().size());
|
assertEquals(0, pub_merged.getSubject().size());
|
||||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue