Compare commits

...

2 Commits

10 changed files with 344 additions and 42 deletions

View File

@ -18,7 +18,7 @@ import eu.dnetlib.dhp.schema.oaf.Field;
public class DatePicker {
private static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
public static final String DATE_PATTERN = "\\d{4}-\\d{2}-\\d{2}";
private static final String DATE_DEFAULT_SUFFIX = "01-01";
private static final int YEAR_LB = 1300;
private static final int YEAR_UB = Year.now().getValue() + 5;
@ -114,7 +114,7 @@ public class DatePicker {
}
}
private static boolean inRange(final String date) {
public static boolean inRange(final String date) {
final int year = Integer.parseInt(substringBefore(date, "-"));
return year >= YEAR_LB && year <= YEAR_UB;
}

View File

@ -1,11 +1,13 @@
package eu.dnetlib.dhp.oa.dedup;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
@ -13,15 +15,12 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class DedupRecordFactory {
private static final Logger log = LoggerFactory.getLogger(DedupRecordFactory.class);
@ -81,11 +80,16 @@ public class DedupRecordFactory {
final Collection<String> dates = Lists.newArrayList();
final List<List<Author>> authors = Lists.newArrayList();
final List<Identifier> bestPids = Lists.newArrayList(); //best pids list
entities
.forEachRemaining(
t -> {
T duplicate = t._2();
//prepare the list of pids to use for the id generation
bestPids.addAll(IdGenerator.bestPidtoIdentifier(duplicate));
entity.mergeFrom(duplicate);
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result r1 = (Result) duplicate;
@ -94,6 +98,7 @@ public class DedupRecordFactory {
if (r1.getDateofacceptance() != null)
dates.add(r1.getDateofacceptance().getValue());
}
});
// set authors and date
@ -102,10 +107,13 @@ public class DedupRecordFactory {
((Result) entity).setAuthor(AuthorMerger.merge(authors));
}
entity.setId(id);
entity.setId(IdGenerator.generate(bestPids, id));
entity.setLastupdatetimestamp(ts);
entity.setDataInfo(dataInfo);
return entity;
}
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.oa.dedup;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import org.apache.commons.lang.NullArgumentException;
import org.apache.commons.lang.StringUtils;
import java.io.Serializable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class IdGenerator implements Serializable {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
public static String CROSSREF_ID = "10|openaire____::081b82f96300b6a6e3d282bad31cb6e2";
public static String DATACITE_ID = "10|openaire____::9e3be59865b2c1c335d32dae2fe7b254";
//pick the best pid from the list (consider date and pidtype)
public static String generate(List<Identifier> pids, String defaultID) {
if (pids == null || pids.size() == 0)
return defaultID;
Optional<Identifier> bp = pids.stream()
.max(Identifier::compareTo);
if (bp.get().isUseOriginal() || bp.get().getPid().getValue() == null) {
return bp.get().getOriginalID().split("\\|")[0] + "|dedup_wf_001::" + DedupUtility.md5(bp.get().getOriginalID());
} else {
return bp.get().getOriginalID().split("\\|")[0] + "|" + createPrefix(bp.get().getPid().getQualifier().getClassid()) + "::" + DedupUtility.md5(bp.get().getPid().getValue());
}
}
//pick the best pid from the entity. Returns a list (length 1) to save time in the call
public static <T extends OafEntity> List<Identifier> bestPidtoIdentifier(T entity) {
if (entity.getPid() == null || entity.getPid().size() == 0)
return Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()));
Optional<StructuredProperty> bp = entity.getPid().stream()
.filter(pid -> PidType.classidValueOf(pid.getQualifier().getClassid()) != PidType.undefined)
.max(Comparator.comparing(pid -> PidType.classidValueOf(pid.getQualifier().getClassid())));
return bp.map(structuredProperty ->
Lists.newArrayList(new Identifier(structuredProperty, extractDate(entity, sdf), PidType.classidValueOf(structuredProperty.getQualifier().getClassid()), entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId()))
).orElseGet(() -> Lists.newArrayList(new Identifier(new StructuredProperty(), new Date(), PidType.original, entity.getCollectedfrom(), EntityType.fromClass(entity.getClass()), entity.getId())));
}
//create the prefix (length = 12): dedup_+ pidType
public static String createPrefix(String pidType) {
StringBuilder prefix = new StringBuilder("dedup_" + pidType);
while (prefix.length() < 12) {
prefix.append("_");
}
return prefix.toString().substring(0, 12);
}
//extracts the date from the record. If the date is not available or is not wellformed, it returns a base date: 00-01-01
public static <T extends OafEntity> Date extractDate(T duplicate, SimpleDateFormat sdf){
String date = "2000-01-01";
if (ModelSupport.isSubClass(duplicate, Result.class)) {
Result result = (Result) duplicate;
if (isWellformed(result.getDateofacceptance())){
date = result.getDateofacceptance().getValue();
}
}
try {
return sdf.parse(date);
} catch (ParseException e) {
return new Date();
}
}
public static boolean isWellformed(Field<String> date) {
return date != null && StringUtils.isNotBlank(date.getValue()) && date.getValue().matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date.getValue());
}
}

View File

@ -0,0 +1,132 @@
package eu.dnetlib.dhp.oa.dedup;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
public class Identifier implements Serializable, Comparable<Identifier>{
StructuredProperty pid;
Date date;
PidType type;
List<KeyValue> collectedFrom;
EntityType entityType;
String originalID;
boolean useOriginal = false; //to know if the top identifier won because of the alphabetical order of the original ID
public Identifier(StructuredProperty pid, Date date, PidType type, List<KeyValue> collectedFrom, EntityType entityType, String originalID) {
this.pid = pid;
this.date = date;
this.type = type;
this.collectedFrom = collectedFrom;
this.entityType = entityType;
this.originalID = originalID;
}
public StructuredProperty getPid() {
return pid;
}
public void setPid(StructuredProperty pidValue) {
this.pid = pid;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
public PidType getType() {
return type;
}
public void setType(PidType type) {
this.type = type;
}
public List<KeyValue> getCollectedFrom() {
return collectedFrom;
}
public void setCollectedFrom(List<KeyValue> collectedFrom) {
this.collectedFrom = collectedFrom;
}
public EntityType getEntityType() {
return entityType;
}
public void setEntityType(EntityType entityType) {
this.entityType = entityType;
}
public String getOriginalID() {
return originalID;
}
public void setOriginalID(String originalID) {
this.originalID = originalID;
}
public boolean isUseOriginal() {
return useOriginal;
}
public void setUseOriginal(boolean useOriginal) {
this.useOriginal = useOriginal;
}
@Override
public int compareTo(Identifier i) {
//priority in comparisons: 1) pidtype, 2) collectedfrom (depending on the entity type) , 3) date 4) alphabetical order of the originalID
if (this.getType().compareTo(i.getType()) == 0){ //same type
if (entityType == EntityType.publication) {
if (isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID))
return 1;
if (isFromDatasourceID(i.collectedFrom, IdGenerator.CROSSREF_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.CROSSREF_ID))
return -1;
}
if (entityType == EntityType.dataset) {
if (isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID))
return 1;
if (isFromDatasourceID(i.collectedFrom, IdGenerator.DATACITE_ID) && !isFromDatasourceID(this.collectedFrom, IdGenerator.DATACITE_ID))
return -1;
}
if (this.getDate().compareTo(date) == 0) {//same date
if (this.originalID.compareTo(i.originalID) > 0)
this.useOriginal = true;
else
i.setUseOriginal(true);
//the minus because we need to take the alphabetically lower id
return -this.originalID.compareTo(i.originalID);
}
else
//the minus is because we need to take the elder date
return -this.getDate().compareTo(date);
}
else {
return this.getType().compareTo(i.getType());
}
}
public boolean isFromDatasourceID(List<KeyValue> collectedFrom, String dsId){
for(KeyValue cf: collectedFrom) {
if(cf.getKey().equals(dsId))
return true;
}
return false;
}
}

View File

@ -0,0 +1,25 @@
package eu.dnetlib.dhp.oa.dedup;
public enum PidType {
//from the less to the more important
undefined,
original,
orcid,
ror,
grid,
pdb,
arXiv,
pmid,
doi;
public static PidType classidValueOf(String s){
try {
return PidType.valueOf(s);
}
catch (Exception e) {
return PidType.undefined;
}
}
}

View File

@ -22,10 +22,13 @@ public class EntityMergerTest implements Serializable {
List<Tuple2<String, Publication>> publications;
List<Tuple2<String, Publication>> publications2;
List<Tuple2<String, Publication>> publications3;
List<Tuple2<String, Publication>> publications4;
List<Tuple2<String, Publication>> publications5;
String testEntityBasePath;
DataInfo dataInfo;
String dedupId = "dedup_id";
String dedupId = "00|dedup_id::1";
Publication pub_top;
@BeforeEach
@ -38,6 +41,9 @@ public class EntityMergerTest implements Serializable {
publications = readSample(testEntityBasePath + "/publication_merge.json", Publication.class);
publications2 = readSample(testEntityBasePath + "/publication_merge2.json", Publication.class);
publications3 = readSample(testEntityBasePath + "/publication_merge3.json", Publication.class);
publications4 = readSample(testEntityBasePath + "/publication_merge4.json", Publication.class);
publications5 = readSample(testEntityBasePath + "/publication_merge5.json", Publication.class);
pub_top = getTopPub(publications);
@ -54,6 +60,9 @@ public class EntityMergerTest implements Serializable {
.entityMerger(dedupId, softwares.iterator(), 0, dataInfo, Software.class);
assertEquals(merged.getBestaccessright().getClassid(), "OPEN SOURCE");
assertEquals(merged.getId(), "50|dedup_doi___::0968af610a356656706657e4f234b340");
}
@Test
@ -62,7 +71,8 @@ public class EntityMergerTest implements Serializable {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications.iterator(), 0, dataInfo, Publication.class);
assertEquals(dedupId, pub_merged.getId());
// verify id
assertEquals(pub_merged.getId(), "50|dedup_doi___::0968af610a356656706657e4f234b340");
assertEquals(pub_merged.getJournal(), pub_top.getJournal());
assertEquals(pub_merged.getBestaccessright(), pub_top.getBestaccessright());
@ -117,8 +127,43 @@ public class EntityMergerTest implements Serializable {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications2.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
assertEquals(pub_merged.getAuthor().size(), 27);
// insert assertions here
}
@Test
public void publicationMergerTest3() throws InstantiationException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications3.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals( "50|dedup_doi___::0ca46ff10b2b4c756191719d85302b14", pub_merged.getId());
}
@Test
public void publicationMergerTest4() throws InstantiationException, IllegalStateException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications4.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|dedup_wf_001::2d2bbbbcfb285e3fb3590237b79e2fa8", pub_merged.getId());
}
@Test
public void publicationMergerTest5() throws InstantiationException, IllegalStateException, IllegalAccessException {
Publication pub_merged = DedupRecordFactory
.entityMerger(dedupId, publications5.iterator(), 0, dataInfo, Publication.class);
// verify id
assertEquals("50|dedup_wf_001::584b89679c3ccd1015b647ec63cc2699", pub_merged.getId());
}

View File

@ -1,22 +1,12 @@
package eu.dnetlib.dhp.oa.dedup;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -35,16 +25,19 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import scala.Tuple2;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import static java.nio.file.Files.createTempDirectory;
import static org.apache.spark.sql.functions.count;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.lenient;
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SparkDedupTest implements Serializable {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long