1
0
Fork 0

implemented DedupRecord factory with the merge of project

This commit is contained in:
Sandro La Bruzzo 2019-12-12 15:18:48 +01:00
parent 6b45e37e22
commit 39367676d7
14 changed files with 224 additions and 258 deletions

View File

@ -74,4 +74,26 @@ public class Dataset extends Result implements Serializable {
public void setGeolocation(List<GeoLocation> geolocation) {
this.geolocation = geolocation;
}
@Override
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);
final Dataset d = (Dataset) e;
storagedate = d.getStoragedate() != null && compareTrust(this, e)<0? d.getStoragedate() : storagedate;
device= d.getDevice() != null && compareTrust(this, e)<0? d.getDevice() : device;
size= d.getSize() != null && compareTrust(this, e)<0? d.getSize() : size;
version= d.getVersion() != null && compareTrust(this, e)<0? d.getVersion() : version;
lastmetadataupdate= d.getLastmetadataupdate() != null && compareTrust(this, e)<0? d.getLastmetadataupdate() :lastmetadataupdate;
metadataversionnumber= d.getMetadataversionnumber() != null && compareTrust(this, e)<0? d.getMetadataversionnumber() : metadataversionnumber;
geolocation = mergeLists(geolocation, d.getGeolocation());
mergeOAFDataInfo(d);
}
}

View File

@ -33,4 +33,29 @@ public class GeoLocation implements Serializable {
public void setPlace(String place) {
this.place = place;
}
public String toComparableString() {
return String.format("%s::%s%s", point != null ? point.toLowerCase() : "", box != null ? box.toLowerCase() : "",place != null ? place.toLowerCase() : "");
}
@Override
public int hashCode() {
return toComparableString().hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
GeoLocation other = (GeoLocation) obj;
return toComparableString()
.equals(other.toComparableString());
}
}

View File

@ -23,4 +23,24 @@ public abstract class Oaf implements Serializable {
public void setLastupdatetimestamp(Long lastupdatetimestamp) {
this.lastupdatetimestamp = lastupdatetimestamp;
}
public void mergeOAFDataInfo(Oaf e) {
if (e.getDataInfo()!= null && compareTrust(this,e)<0)
dataInfo = e.getDataInfo();
}
protected String extractTrust(Oaf e) {
if (e == null || e.getDataInfo()== null || e.getDataInfo().getTrust()== null)
return "0.0";
return e.getDataInfo().getTrust();
}
protected int compareTrust(Oaf a, Oaf b) {
return extractTrust(a).compareTo(extractTrust(b));
}
}

View File

@ -86,6 +86,7 @@ public abstract class OafEntity extends Oaf implements Serializable {
this.oaiprovenance = oaiprovenance;
}
public void mergeFrom(OafEntity e) {
if (e == null)
@ -97,12 +98,15 @@ public abstract class OafEntity extends Oaf implements Serializable {
pid = mergeLists(pid, e.getPid());
if (e.getDateofcollection() != null && compareTrust(this, e) < 0)
dateofcollection = e.getDateofcollection();
if (e.getDateoftransformation() != null && compareTrust(this, e) < 0)
dateoftransformation = e.getDateoftransformation();
extraInfo = mergeLists(extraInfo, e.getExtraInfo());
if (e.getOaiprovenance() != null && compareTrust(this, e) < 0)
oaiprovenance = e.getOaiprovenance();
}

View File

@ -170,21 +170,22 @@ public class Organization extends OafEntity implements Serializable {
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);
final Organization o = (Organization) e;
legalshortname = o.getLegalshortname() != null ? o.getLegalshortname() : legalshortname;
legalname = o.getLegalname() != null ? o.getLegalname() : legalname;
legalshortname = o.getLegalshortname() != null && compareTrust(this, e)<0? o.getLegalshortname() : legalshortname;
legalname = o.getLegalname() != null && compareTrust(this, e)<0 ? o.getLegalname() : legalname;
alternativeNames = mergeLists(o.getAlternativeNames(), alternativeNames);
websiteurl = o.getWebsiteurl() != null ? o.getWebsiteurl() : websiteurl;
logourl = o.getLogourl() != null ? o.getLogourl() : logourl;
eclegalbody = o.getEclegalbody() != null ? o.getEclegalbody() : eclegalbody;
eclegalperson = o.getEclegalperson() != null ? o.getEclegalperson() : eclegalperson;
ecnonprofit = o.getEcnonprofit() != null ? o.getEcnonprofit() : ecnonprofit;
ecresearchorganization = o.getEcresearchorganization() != null ? o.getEcresearchorganization() : ecresearchorganization;
echighereducation = o.getEchighereducation() != null ? o.getEchighereducation() : echighereducation;
ecinternationalorganizationeurinterests = o.getEcinternationalorganizationeurinterests() != null ? o.getEcinternationalorganizationeurinterests() : ecinternationalorganizationeurinterests;
ecinternationalorganization = o.getEcinternationalorganization() != null ? o.getEcinternationalorganization() : ecinternationalorganization;
ecenterprise = o.getEcenterprise() != null ? o.getEcenterprise() :ecenterprise;
ecsmevalidated = o.getEcsmevalidated() != null ? o.getEcsmevalidated() :ecsmevalidated;
ecnutscode = o.getEcnutscode() != null ? o.getEcnutscode() :ecnutscode;
country = o.getCountry() != null ? o.getCountry() :country;
websiteurl = o.getWebsiteurl() != null && compareTrust(this, e)<0? o.getWebsiteurl() : websiteurl;
logourl = o.getLogourl() != null && compareTrust(this, e)<0? o.getLogourl() : logourl;
eclegalbody = o.getEclegalbody() != null && compareTrust(this, e)<0? o.getEclegalbody() : eclegalbody;
eclegalperson = o.getEclegalperson() != null && compareTrust(this, e)<0? o.getEclegalperson() : eclegalperson;
ecnonprofit = o.getEcnonprofit() != null && compareTrust(this, e)<0? o.getEcnonprofit() : ecnonprofit;
ecresearchorganization = o.getEcresearchorganization() != null && compareTrust(this, e)<0? o.getEcresearchorganization() : ecresearchorganization;
echighereducation = o.getEchighereducation() != null && compareTrust(this, e)<0? o.getEchighereducation() : echighereducation;
ecinternationalorganizationeurinterests = o.getEcinternationalorganizationeurinterests() != null && compareTrust(this, e)<0? o.getEcinternationalorganizationeurinterests() : ecinternationalorganizationeurinterests;
ecinternationalorganization = o.getEcinternationalorganization() != null && compareTrust(this, e)<0? o.getEcinternationalorganization() : ecinternationalorganization;
ecenterprise = o.getEcenterprise() != null && compareTrust(this, e)<0? o.getEcenterprise() :ecenterprise;
ecsmevalidated = o.getEcsmevalidated() != null && compareTrust(this, e)<0? o.getEcsmevalidated() :ecsmevalidated;
ecnutscode = o.getEcnutscode() != null && compareTrust(this, e)<0? o.getEcnutscode() :ecnutscode;
country = o.getCountry() != null && compareTrust(this, e)<0 ? o.getCountry() :country;
mergeOAFDataInfo(o);
}
}

View File

@ -264,4 +264,39 @@ public class Project extends OafEntity implements Serializable {
public void setFundedamount(Float fundedamount) {
this.fundedamount = fundedamount;
}
@Override
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);
Project p = (Project)e;
websiteurl= p.getWebsiteurl()!= null && compareTrust(this,e)<0?p.getWebsiteurl():websiteurl;
code= p.getCode()!=null && compareTrust(this,e)<0?p.getCode():code;
acronym= p.getAcronym()!= null && compareTrust(this,e)<0?p.getAcronym():acronym;
title= p.getTitle()!= null && compareTrust(this,e)<0?p.getTitle():title;
startdate= p.getStartdate()!=null && compareTrust(this,e)<0?p.getStartdate():startdate;
enddate= p.getEnddate()!=null && compareTrust(this,e)<0?p.getEnddate():enddate;
callidentifier= p.getCallidentifier()!=null && compareTrust(this,e)<0?p.getCallidentifier():callidentifier;
keywords= p.getKeywords()!=null && compareTrust(this,e)<0?p.getKeywords():keywords;
duration= p.getDuration()!=null && compareTrust(this,e)<0?p.getDuration():duration;
ecsc39= p.getEcsc39()!=null && compareTrust(this,e)<0?p.getEcsc39():ecsc39;
oamandatepublications= p.getOamandatepublications()!=null && compareTrust(this,e)<0?p.getOamandatepublications():oamandatepublications;
ecarticle29_3= p.getEcarticle29_3()!=null && compareTrust(this,e)<0?p.getEcarticle29_3():ecarticle29_3;
subjects= mergeLists(subjects, p.getSubjects());
fundingtree= mergeLists(fundingtree, p.getFundingtree());
contracttype= p.getContracttype()!=null && compareTrust(this,e)<0?p.getContracttype():contracttype;
optional1= p.getOptional1()!=null && compareTrust(this,e)<0?p.getOptional1():optional1;
optional2= p.getOptional2()!=null && compareTrust(this,e)<0?p.getOptional2():optional2;
jsonextrainfo= p.getJsonextrainfo()!=null && compareTrust(this,e)<0?p.getJsonextrainfo():jsonextrainfo;
contactfullname= p.getContactfullname()!=null && compareTrust(this,e)<0?p.getContactfullname():contactfullname;
contactfax= p.getContactfax()!=null && compareTrust(this,e)<0?p.getContactfax():contactfax;
contactphone= p.getContactphone()!=null && compareTrust(this,e)<0?p.getContactphone():contactphone;
contactemail= p.getContactemail()!=null && compareTrust(this,e)<0?p.getContactemail():contactemail;
summary= p.getSummary()!=null && compareTrust(this,e)<0?p.getSummary():summary;
currency= p.getCurrency()!=null && compareTrust(this,e)<0?p.getCurrency():currency;
totalcost= p.getTotalcost()!=null && compareTrust(this,e)<0?p.getTotalcost():totalcost;
fundedamount= p.getFundedamount()!= null && compareTrust(this,e)<0?p.getFundedamount():fundedamount;
mergeOAFDataInfo(e);
}
}

View File

@ -21,8 +21,9 @@ public class Publication extends Result implements Serializable {
Publication p = (Publication) e;
if (p.getJournal() != null)
if (p.getJournal() != null && compareTrust(this, e)<0)
journal = p.getJournal();
mergeOAFDataInfo(e);
}

View File

@ -252,10 +252,10 @@ public abstract class Result extends OafEntity implements Serializable {
instance = mergeLists(instance, r.getInstance());
if (r.getResulttype() != null)
if (r.getResulttype() != null && compareTrust(this, r)<0)
resulttype = r.getResulttype();
if (r.getLanguage() != null)
if (r.getLanguage() != null && compareTrust(this, r)<0)
language = r.getLanguage();
country = mergeLists(country, r.getCountry());
@ -268,10 +268,10 @@ public abstract class Result extends OafEntity implements Serializable {
description = longestLists(description, r.getDescription());
if (r.getPublisher() != null)
if (r.getPublisher() != null && compareTrust(this, r)<0)
publisher = r.getPublisher();
if (r.getEmbargoenddate() != null)
if (r.getEmbargoenddate() != null && compareTrust(this, r)<0)
embargoenddate = r.getEmbargoenddate();
source = mergeLists(source, r.getSource());
@ -287,21 +287,23 @@ public abstract class Result extends OafEntity implements Serializable {
coverage = mergeLists(coverage, r.getCoverage());
if (r.getRefereed() != null)
if (r.getRefereed() != null && compareTrust(this, r)<0)
refereed = r.getRefereed();
context = mergeLists(context, r.getContext());
if (r.getProcessingchargeamount() != null)
if (r.getProcessingchargeamount() != null && compareTrust(this, r)<0)
processingchargeamount = r.getProcessingchargeamount();
if (r.getProcessingchargecurrency() != null)
if (r.getProcessingchargecurrency() != null && compareTrust(this, r)<0)
processingchargecurrency = r.getProcessingchargecurrency();
externalReference = mergeLists(externalReference, r.getExternalReference());
}
private List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
if(a == null || b == null)
return a==null?b:a;

View File

@ -24,6 +24,8 @@ public class DedupRecordFactory {
public static JavaRDD<OafEntity> createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) {
long ts = System.currentTimeMillis();
//<id, json_entity>
final JavaPairRDD<String, String> inputJsonEntities = sc.textFile(entitiesInputPath)
.mapToPair((PairFunction<String, String, String>) it ->
@ -43,57 +45,31 @@ public class DedupRecordFactory {
//<dedup_id, json_entity_merged>
final JavaPairRDD<String, String> joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
JavaPairRDD<OafKey, String> keyJson = joinResult.mapToPair((PairFunction<Tuple2<String, String>, OafKey, String>) json -> {
String idValue = json._1();
String trust = "";
try {
trust = MapDocumentUtil.getJPathString("$.dataInfo.trust", json._2());
} catch (Throwable e) {
}
//TODO remember to replace this with the actual trust retrieving
if (StringUtils.isBlank(trust)) {
Random generator = new Random();
int number = generator.nextInt(20);
double result = (number / 100.0) + 0.80;
trust = "" + result;
}
return new Tuple2<OafKey, String>(new OafKey(idValue, trust), json._2());
});
OafComparator c = new OafComparator();
//<dedup_id, mergedRecordsSortedByTrust>
JavaPairRDD<String, Iterable<String>> sortedJoinResult = keyJson.repartitionAndSortWithinPartitions(new OafPartitioner(keyJson.getNumPartitions()), c)
.mapToPair((PairFunction<Tuple2<OafKey, String>, String, String>) t -> new Tuple2<String, String>(t._1().getDedupId(), t._2()))
.groupByKey();
JavaPairRDD<String, Iterable<String>> sortedJoinResult = joinResult.groupByKey();
switch (entityType) {
case publication:
return sortedJoinResult.map(DedupRecordFactory::publicationMerger);
return sortedJoinResult.map(p->DedupRecordFactory.publicationMerger(p, ts));
case dataset:
return sortedJoinResult.map(DedupRecordFactory::datasetMerger);
return sortedJoinResult.map(d->DedupRecordFactory.datasetMerger(d,ts));
case project:
return sortedJoinResult.map(DedupRecordFactory::projectMerger);
return sortedJoinResult.map(p->DedupRecordFactory.projectMerger(p,ts));
case software:
return sortedJoinResult.map(DedupRecordFactory::softwareMerger);
return sortedJoinResult.map(s->DedupRecordFactory.softwareMerger(s,ts));
case datasource:
return sortedJoinResult.map(DedupRecordFactory::datasourceMerger);
return sortedJoinResult.map(d->DedupRecordFactory.datasourceMerger(d,ts));
case organization:
return sortedJoinResult.map(DedupRecordFactory::organizationMerger);
return sortedJoinResult.map(o->DedupRecordFactory.organizationMerger(o,ts));
case otherresearchproduct:
return sortedJoinResult.map(DedupRecordFactory::otherresearchproductMerger);
return sortedJoinResult.map(o->DedupRecordFactory.otherresearchproductMerger(o,ts));
default:
return null;
}
}
private static Publication publicationMerger(Tuple2<String, Iterable<String>> e) {
private static Publication publicationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Publication p = new Publication(); //the result of the merge, to be returned at the end
@ -103,19 +79,11 @@ public class DedupRecordFactory {
final Collection<String> dateofacceptance = Lists.newArrayList();
StringBuilder trust = new StringBuilder("0.0");
if (e._2() != null)
e._2().forEach(pub -> {
try {
Publication publication = mapper.readValue(pub, Publication.class);
final String currentTrust = publication.getDataInfo().getTrust();
if (!"1.0".equals(currentTrust)) {
trust.setLength(0);
trust.append(currentTrust);
}
p.mergeFrom(publication);
p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor()));
//add to the list if they are not null
@ -126,30 +94,74 @@ public class DedupRecordFactory {
}
});
p.setDateofacceptance(DatePicker.pick(dateofacceptance));
p.getDataInfo().setTrust("0.9");
p.setLastupdatetimestamp(ts);
return p;
}
private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e) {
private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Dataset d = new Dataset(); //the result of the merge, to be returned at the end
d.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
final Collection<String> dateofacceptance = Lists.newArrayList();
if (e._2() != null)
e._2().forEach(dat -> {
try {
Dataset dataset = mapper.readValue(dat, Dataset.class);
d.mergeFrom(dataset);
d.setAuthor(DedupUtility.mergeAuthor(d.getAuthor(), dataset.getAuthor()));
//add to the list if they are not null
if (dataset.getDateofacceptance() != null)
dateofacceptance.add(dataset.getDateofacceptance().getValue());
} catch (Exception exc) {
throw new RuntimeException(exc);
}
});
d.setDateofacceptance(DatePicker.pick(dateofacceptance));
d.getDataInfo().setTrust("0.9");
d.setLastupdatetimestamp(ts);
return d;
}
private static Project projectMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Project p = new Project(); //the result of the merge, to be returned at the end
p.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
if (e._2() != null)
e._2().forEach(proj -> {
try {
Project project = mapper.readValue(proj, Project.class);
p.mergeFrom(project);
} catch (Exception exc) {
throw new RuntimeException(exc);
}
});
p.getDataInfo().setTrust("0.9");
p.setLastupdatetimestamp(ts);
return p;
}
private static Software softwareMerger(Tuple2<String, Iterable<String>> e, final long ts) {
throw new NotImplementedException();
}
private static Project projectMerger(Tuple2<String, Iterable<String>> e) {
private static Datasource datasourceMerger(Tuple2<String, Iterable<String>> e, final long ts) {
throw new NotImplementedException();
}
private static Software softwareMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException();
}
private static Datasource datasourceMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException();
}
private static Organization organizationMerger(Tuple2<String, Iterable<String>> e) {
private static Organization organizationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
Organization o = new Organization(); //the result of the merge, to be returned at the end
@ -180,7 +192,7 @@ public class DedupRecordFactory {
return o;
}
private static OtherResearchProduct otherresearchproductMerger(Tuple2<String, Iterable<String>> e) {
private static OtherResearchProduct otherresearchproductMerger(Tuple2<String, Iterable<String>> e, final long ts) {
throw new NotImplementedException();
}

View File

@ -118,38 +118,6 @@ public class DedupUtility {
}
enrichPidFromList(base, enrich);
return base;
// //if both have no authors with pids
// if (pa < 1 && pb < 1) {
// //B is bigger than A
// if (sa < sb)
// return b;
// //A is bigger than B
// else
// return a;
// }
// //If A has author with pids
// if (pa > 0) {
// //B has no author with pid
// if (pb < 1)
// return a;
// //B has author with pid
// else {
// enrichPidFromList(a, b);
// return a;
// }
// }
// //If B has author with pids
// //A has no author with pid
// if (pa < 1)
// return b;
// //A has author with pid
// else {
// enrichPidFromList(b, a);
// return b;
// }
}
private static void enrichPidFromList(List<Author> base, List<Author> enrich) {
@ -160,7 +128,7 @@ public class DedupUtility {
.flatMap(a -> a.getPid()
.stream()
.map(p -> new Tuple2<>(p.toComparableString(), a))
).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));
).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1));
final List<Tuple2<StructuredProperty, Author>> pidToEnrich = enrich
.stream()
@ -220,6 +188,7 @@ public class DedupUtility {
private static String nfd(final String s) {
return Normalizer.normalize(s, Normalizer.Form.NFD);
}
private static Person parse(Author author) {
if (StringUtils.isNotBlank(author.getSurname())) {
return new Person(author.getSurname() + ", " + author.getName(), false);
@ -233,7 +202,7 @@ public class DedupUtility {
if (authors == null)
return 0;
return (int) authors.stream().map(DedupUtility::extractAuthorPid).filter(Objects::nonNull).filter(StringUtils::isNotBlank).count();
return (int) authors.stream().filter(DedupUtility::hasPid).count();
}
private static int authorsSize(List<Author> authors) {
@ -242,29 +211,9 @@ public class DedupUtility {
return authors.size();
}
private static boolean isAccurate(final Author a) {
return StringUtils.isNotBlank(a.getName()) && StringUtils.isNotBlank(a.getSurname());
}
private static String extractAuthorPid(Author a) {
private static boolean hasPid(Author a) {
if (a == null || a.getPid() == null || a.getPid().size() == 0)
return null;
StringBuilder mainPid = new StringBuilder();
a.getPid().forEach(pid -> {
if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) {
mainPid.setLength(0);
mainPid.append(pid.getValue());
} else {
if (mainPid.length() == 0)
mainPid.append(pid.getValue());
}
});
return mainPid.toString();
return false;
return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue()));
}
}

View File

@ -1,15 +0,0 @@
package eu.dnetlib.dedup;
import com.google.common.collect.ComparisonChain;
import java.io.Serializable;
import java.util.Comparator;
public class OafComparator implements Comparator<OafKey>, Serializable {
@Override
public int compare(OafKey a, OafKey b) {
return ComparisonChain.start()
.compare(a.getDedupId(), b.getDedupId())
.compare(a.getTrust(), b.getTrust())
.result();
}
}

View File

@ -1,31 +0,0 @@
package eu.dnetlib.dedup;
import java.io.Serializable;
public class OafKey implements Serializable {
private String dedupId;
private String trust;
public OafKey(String dedupId, String trust) {
this.dedupId = dedupId;
this.trust = trust;
}
public OafKey() {
}
public String getDedupId() {
return dedupId;
}
public void setDedupId(String dedupId) {
this.dedupId = dedupId;
}
public String getTrust() {
return trust;
}
public void setTrust(String trust) {
this.trust = trust;
}
@Override
public String toString(){
return String.format("%s->%d", dedupId,trust);
}
}

View File

@ -1,59 +0,0 @@
package eu.dnetlib.dedup;
import org.apache.spark.Partitioner;
import java.io.Serializable;
public class OafPartitioner extends Partitioner implements Serializable {
private final int numPartitions;
public OafPartitioner(int partitions) {
assert (partitions > 0);
this.numPartitions = partitions;
}
@Override
public int numPartitions() {
return numPartitions;
}
@Override
public int getPartition(Object key) {
if (key instanceof OafKey) {
@SuppressWarnings("unchecked")
OafKey item = (OafKey) key;
return Math.abs(item.getDedupId().hashCode() % numPartitions);
} else {
throw new IllegalArgumentException("Unexpected Key");
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + numPartitions;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof OafPartitioner)) {
return false;
}
//
OafPartitioner other = (OafPartitioner) obj;
if (numPartitions != other.numPartitions) {
return false;
}
//
return true;
}
}

View File

@ -30,7 +30,7 @@
</property>
</parameters>
<start to="CreateSimRels"/>
<start to="CreateDedupRecord"/>
<kill name="Kill">