forked from D-Net/dnet-hadoop
Compare commits
2 Commits
master
...
deduptesti
Author | SHA1 | Date |
---|---|---|
miconis | d47352cbc7 | |
miconis | b260fee787 |
|
@ -18,7 +18,7 @@ import eu.dnetlib.dhp.schema.oaf.Field;
|
|||
|
||||
public class DatePicker {
|
||||
|
||||
private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
|
||||
public static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
|
||||
private static final String DATE_DEFAULT_SUFFIX = "01-01";
|
||||
private static final int YEAR_LB = 1300;
|
||||
private static final int YEAR_UB = Year.now().getValue() + 5;
|
||||
|
@ -114,7 +114,7 @@ public class DatePicker {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean inRange(final String date) {
|
||||
public static boolean inRange(final String date) {
|
||||
final int year = Integer.parseInt(substringBefore(date, "-"));
|
||||
return year >= YEAR_LB && year <= YEAR_UB;
|
||||
}
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.MapGroupsFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
|
@ -13,15 +15,12 @@ import org.apache.spark.sql.Encoders;
|
|||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.*;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
|
||||
public class DedupRecordFactory {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DedupRecordFactory.class);
|
||||
|
@ -81,11 +80,16 @@ public class DedupRecordFactory {
|
|||
|
||||
final Collection<String> dates = Lists.newArrayList();
|
||||
final List<List<Author>> authors = Lists.newArrayList();
|
||||
final List<Identifier> bestPids = Lists.newArrayList(); //best pids list
|
||||
|
||||
entities
|
||||
.forEachRemaining(
|
||||
t -> {
|
||||
T duplicate = t._2();
|
||||
|
||||
//prepare the list of pids to use for the id generation
|
||||
bestPids.addAll(IdGenerator.bestPidtoIdentifier(duplicate));
|
||||
|
||||
entity.mergeFrom(duplicate);
|
||||
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
||||
Result r1 = (Result) duplicate;
|
||||
|
@ -94,6 +98,7 @@ public class DedupRecordFactory {
|
|||
if (r1.getDateofacceptance() != null)
|
||||
dates.add(r1.getDateofacceptance().getValue());
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
// set authors and date
|
||||
|
@ -102,10 +107,13 @@ public class DedupRecordFactory {
|
|||
((Result) entity).setAuthor(AuthorMerger.merge(authors));
|
||||
}
|
||||
|
||||
entity.setId(id);
|
||||
entity.setId(IdGenerator.generate(bestPids, id));
|
||||
|
||||
entity.setLastupdatetimestamp(ts);
|
||||
entity.setDataInfo(dataInfo);
|
||||
|
||||
return entity;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
||||
import eu.dnetlib.dhp.schema.oaf.Field;
|
||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
||||
import eu.dnetlib.dhp.schema.oaf.Result;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
import org.apache.commons.lang.NullArgumentException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.*;
|
||||
|
||||
public class IdGenerator implements Serializable {
|
||||
|
||||
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
|
||||
public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
|
||||
public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
|
||||
|
||||
//pick the best pid from the list (consider date and pidtype)
|
||||
public static String generate(List<Identifier> pids, String defaultID) {
|
||||
if (pids == null || pids.size() == 0)
|
||||
return defaultID;
|
||||
|
||||
Optional<Identifier> bp = pids.stream()
|
||||
.max(Identifier::compareTo);
|
||||
|
||||
if (bp.get().isUseOriginal() || bp.get().getPid().getValue() == null) {
|
||||
return bp.get().getOriginalID().split("\\|")[0] + "|dedup_wf_001::" + DedupUtility.md5(bp.get().getOriginalID());
|
||||
} else {
|
||||
return bp.get().getOriginalID().split("\\|")[0] + "|" + createPrefix(bp.get().getPid().getQualifier().getClassid()) + "::" + DedupUtility.md5(bp.get().getPid().getValue());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//pick the best pid from the entity. Returns a list (length 1) to save time in the call
|
||||
public static <T extends OafEntity> List<Identifier> bestPidtoIdentifier(T entity) {
|
||||
|
||||
if (entity.getPid() == null || entity.getPid().size() == 0)
|
||||
return Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()));
|
||||
|
||||
Optional<StructuredProperty> bp = entity.getPid().stream()
|
||||
.filter(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()) != PidType.undefined)
|
||||
.max(Comparator.comparing(pid -> PidType.classidValueOf(pid.getQualifier().getClassid())));
|
||||
|
||||
return bp.map(structuredProperty ->
|
||||
Lists.newArrayList(new Identifier(structuredProperty, extractDate(entity, sdf), PidType.classidValueOf(structuredProperty.getQualifier().getClassid()), entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()))
|
||||
).orElseGet(() -> Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId())));
|
||||
|
||||
}
|
||||
|
||||
//create the prefix (length = 12): dedup_+ pidType
|
||||
public static String createPrefix(String pidType) {
|
||||
|
||||
StringBuilder prefix = new StringBuilder("dedup_" + pidType);
|
||||
|
||||
while (prefix.length() < 12) {
|
||||
prefix.append("_");
|
||||
}
|
||||
return prefix.toString().substring(0, 12);
|
||||
|
||||
}
|
||||
|
||||
//extracts the date from the record. If the date is not available or is not wellformed, it returns a base date: 00-01-01
|
||||
public static <T extends OafEntity> Date extractDate(T duplicate, SimpleDateFormat sdf){
|
||||
|
||||
String date = "2000-01-01";
|
||||
if (ModelSupport.isSubClass(duplicate, Result.class)) {
|
||||
Result result = (Result) duplicate;
|
||||
if (isWellformed(result.getDateofacceptance())){
|
||||
date = result.getDateofacceptance().getValue();
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return sdf.parse(date);
|
||||
} catch (ParseException e) {
|
||||
return new Date();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static boolean isWellformed(Field<String> date) {
|
||||
return date != null && StringUtils.isNotBlank(date.getValue()) && date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,132 @@
|
|||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import eu.dnetlib.dhp.schema.common.EntityType;
|
||||
import eu.dnetlib.dhp.schema.oaf.KeyValue;
|
||||
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
public class Identifier implements Serializable, Comparable<Identifier>{
|
||||
|
||||
StructuredProperty pid;
|
||||
Date date;
|
||||
PidType type;
|
||||
List<KeyValue> collectedFrom;
|
||||
EntityType entityType;
|
||||
String originalID;
|
||||
|
||||
boolean useOriginal = false; //to know if the top identifier won because of the alphabetical order of the original ID
|
||||
|
||||
public Identifier(StructuredProperty pid, Date date, PidType type, List<KeyValue> collectedFrom, EntityType entityType, String originalID) {
|
||||
this.pid = pid;
|
||||
this.date = date;
|
||||
this.type = type;
|
||||
this.collectedFrom = collectedFrom;
|
||||
this.entityType = entityType;
|
||||
this.originalID = originalID;
|
||||
}
|
||||
|
||||
public StructuredProperty getPid() {
|
||||
return pid;
|
||||
}
|
||||
|
||||
public void setPid(StructuredProperty pidValue) {
|
||||
this.pid = pid;
|
||||
}
|
||||
|
||||
public Date getDate() {
|
||||
return date;
|
||||
}
|
||||
|
||||
public void setDate(Date date) {
|
||||
this.date = date;
|
||||
}
|
||||
|
||||
public PidType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(PidType type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public List<KeyValue> getCollectedFrom() {
|
||||
return collectedFrom;
|
||||
}
|
||||
|
||||
public void setCollectedFrom(List<KeyValue> collectedFrom) {
|
||||
this.collectedFrom = collectedFrom;
|
||||
}
|
||||
|
||||
public EntityType getEntityType() {
|
||||
return entityType;
|
||||
}
|
||||
|
||||
public void setEntityType(EntityType entityType) {
|
||||
this.entityType = entityType;
|
||||
}
|
||||
|
||||
public String getOriginalID() {
|
||||
return originalID;
|
||||
}
|
||||
|
||||
public void setOriginalID(String originalID) {
|
||||
this.originalID = originalID;
|
||||
}
|
||||
|
||||
public boolean isUseOriginal() {
|
||||
return useOriginal;
|
||||
}
|
||||
|
||||
public void setUseOriginal(boolean useOriginal) {
|
||||
this.useOriginal = useOriginal;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Identifier i) {
|
||||
//priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) alphabetical order of the originalID
|
||||
if (this.getType().compareTo(i.getType()) == 0){ //same type
|
||||
if (entityType == EntityType.publication) {
|
||||
if (isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID))
|
||||
return 1;
|
||||
if (isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID))
|
||||
return -1;
|
||||
}
|
||||
if (entityType == EntityType.dataset) {
|
||||
if (isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID))
|
||||
return 1;
|
||||
if (isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID))
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (this.getDate().compareTo(date) == 0) {//same date
|
||||
|
||||
if (this.originalID.compareTo(i.originalID) > 0)
|
||||
this.useOriginal = true;
|
||||
else
|
||||
i.setUseOriginal(true);
|
||||
|
||||
//the minus because we need to take the alphabetically lower id
|
||||
return -this.originalID.compareTo(i.originalID);
|
||||
}
|
||||
else
|
||||
//the minus is because we need to take the elder date
|
||||
return -this.getDate().compareTo(date);
|
||||
}
|
||||
else {
|
||||
return this.getType().compareTo(i.getType());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean isFromDatasourceID(List<KeyValue> collectedFrom, String dsId){
|
||||
|
||||
for(KeyValue cf: collectedFrom) {
|
||||
if(cf.getKey().equals(dsId))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
public enum PidType {
|
||||
|
||||
//from the less to the more important
|
||||
undefined,
|
||||
original,
|
||||
orcid,
|
||||
ror,
|
||||
grid,
|
||||
pdb,
|
||||
arXiv,
|
||||
pmid,
|
||||
doi;
|
||||
|
||||
public static PidType classidValueOf(String s){
|
||||
try {
|
||||
return PidType.valueOf(s);
|
||||
}
|
||||
catch (Exception e) {
|
||||
return PidType.undefined;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -22,10 +22,13 @@ public class EntityMergerTest implements Serializable {
|
|||
|
||||
List<Tuple2<String, Publication>> publications;
|
||||
List<Tuple2<String, Publication>> publications2;
|
||||
List<Tuple2<String, Publication>> publications3;
|
||||
List<Tuple2<String, Publication>> publications4;
|
||||
List<Tuple2<String, Publication>> publications5;
|
||||
|
||||
String testEntityBasePath;
|
||||
DataInfo dataInfo;
|
||||
String dedupId = "dedup_id";
|
||||
String dedupId = "00|dedup_id::1";
|
||||
Publication pub_top;
|
||||
|
||||
@BeforeEach
|
||||
|
@ -38,6 +41,9 @@ public class EntityMergerTest implements Serializable {
|
|||
|
||||
publications = readSample(testEntityBasePath + "/publication_merge.json", Publication.class);
|
||||
publications2 = readSample(testEntityBasePath + "/publication_merge2.json", Publication.class);
|
||||
publications3 = readSample(testEntityBasePath + "/publication_merge3.json", Publication.class);
|
||||
publications4 = readSample(testEntityBasePath + "/publication_merge4.json", Publication.class);
|
||||
publications5 = readSample(testEntityBasePath + "/publication_merge5.json", Publication.class);
|
||||
|
||||
pub_top = getTopPub(publications);
|
||||
|
||||
|
@ -54,6 +60,9 @@ public class EntityMergerTest implements Serializable {
|
|||
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
|
||||
|
||||
assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
|
||||
|
||||
assertEquals(merged.getId(), "50|dedup_doi___::0968af610a356656706657e4f234b340");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -62,7 +71,8 @@ public class EntityMergerTest implements Serializable {
|
|||
Publication pub_merged = DedupRecordFactory
|
||||
.entityMerger(dedupId, publications.iterator(), 0, dataInfo, Publication.class);
|
||||
|
||||
assertEquals(dedupId, pub_merged.getId());
|
||||
// verify id
|
||||
assertEquals(pub_merged.getId(), "50|dedup_doi___::0968af610a356656706657e4f234b340");
|
||||
|
||||
assertEquals(pub_merged.getJournal(), pub_top.getJournal());
|
||||
assertEquals(pub_merged.getBestaccessright(), pub_top.getBestaccessright());
|
||||
|
@ -117,8 +127,43 @@ public class EntityMergerTest implements Serializable {
|
|||
Publication pub_merged = DedupRecordFactory
|
||||
.entityMerger(dedupId, publications2.iterator(), 0, dataInfo, Publication.class);
|
||||
|
||||
// verify id
|
||||
assertEquals("50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
|
||||
|
||||
assertEquals(pub_merged.getAuthor().size(), 27);
|
||||
// insert assertions here
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publicationMergerTest3() throws InstantiationException, IllegalAccessException {
|
||||
|
||||
Publication pub_merged = DedupRecordFactory
|
||||
.entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class);
|
||||
|
||||
// verify id
|
||||
assertEquals( "50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publicationMergerTest4() throws InstantiationException, IllegalStateException, IllegalAccessException {
|
||||
|
||||
Publication pub_merged = DedupRecordFactory
|
||||
.entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class);
|
||||
|
||||
// verify id
|
||||
assertEquals("50|dedup_wf_001::2d2bbbbcfb285e3fb3590237b79e2fa8", pub_merged.getId());
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void publicationMergerTest5() throws InstantiationException, IllegalStateException, IllegalAccessException {
|
||||
|
||||
Publication pub_merged = DedupRecordFactory
|
||||
.entityMerger(dedupId, publications5.iterator(), 0, dataInfo, Publication.class);
|
||||
|
||||
// verify id
|
||||
assertEquals("50|dedup_wf_001::584b89679c3ccd1015b647ec63cc2699", pub_merged.getId());
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -1,22 +1,12 @@
|
|||
|
||||
package eu.dnetlib.dhp.oa.dedup;
|
||||
|
||||
import static java.nio.file.Files.createTempDirectory;
|
||||
|
||||
import static org.apache.spark.sql.functions.col;
|
||||
import static org.apache.spark.sql.functions.count;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.spark.SparkConf;
|
||||
|
@ -35,16 +25,19 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
|||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
||||
import eu.dnetlib.dhp.schema.oaf.Relation;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
||||
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
||||
import eu.dnetlib.pace.util.MapDocumentUtil;
|
||||
import scala.Tuple2;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import static java.nio.file.Files.createTempDirectory;
|
||||
import static org.apache.spark.sql.functions.count;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.lenient;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
public class SparkDedupTest implements Serializable {
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue