From 39367676d7c1f2ac3fc39d72e643a45104872b45 Mon Sep 17 00:00:00 2001 From: "sandro.labruzzo" Date: Thu, 12 Dec 2019 15:18:48 +0100 Subject: [PATCH] implemented DedupRecord factory with the merge of project --- .../eu/dnetlib/dhp/schema/oaf/Dataset.java | 24 +++- .../dnetlib/dhp/schema/oaf/GeoLocation.java | 25 ++++ .../java/eu/dnetlib/dhp/schema/oaf/Oaf.java | 20 +++ .../eu/dnetlib/dhp/schema/oaf/OafEntity.java | 10 +- .../dnetlib/dhp/schema/oaf/Organization.java | 31 +++-- .../eu/dnetlib/dhp/schema/oaf/Project.java | 35 +++++ .../dnetlib/dhp/schema/oaf/Publication.java | 3 +- .../eu/dnetlib/dhp/schema/oaf/Result.java | 16 ++- .../eu/dnetlib/dedup/DedupRecordFactory.java | 128 ++++++++++-------- .../java/eu/dnetlib/dedup/DedupUtility.java | 83 +++--------- .../java/eu/dnetlib/dedup/OafComparator.java | 15 -- .../main/java/eu/dnetlib/dedup/OafKey.java | 31 ----- .../java/eu/dnetlib/dedup/OafPartitioner.java | 59 -------- .../dnetlib/dhp/dedup/oozie_app/workflow.xml | 2 +- 14 files changed, 224 insertions(+), 258 deletions(-) delete mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafComparator.java delete mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafKey.java delete mode 100644 dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafPartitioner.java diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java index 44d5226e9..27bee998e 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java @@ -19,7 +19,7 @@ public class Dataset extends Result implements Serializable { private List geolocation; - public Field getStoragedate() { + public Field getStoragedate() { return storagedate; } @@ -74,4 +74,26 @@ public class Dataset extends Result implements Serializable { public void setGeolocation(List geolocation) { this.geolocation = geolocation; } + + @Override + public void mergeFrom(OafEntity e) { + super.mergeFrom(e); + final Dataset d = (Dataset) e; + + storagedate = d.getStoragedate() != null && compareTrust(this, e)<0? d.getStoragedate() : storagedate; + + device= d.getDevice() != null && compareTrust(this, e)<0? d.getDevice() : device; + + size= d.getSize() != null && compareTrust(this, e)<0? d.getSize() : size; + + version= d.getVersion() != null && compareTrust(this, e)<0? d.getVersion() : version; + + lastmetadataupdate= d.getLastmetadataupdate() != null && compareTrust(this, e)<0? d.getLastmetadataupdate() :lastmetadataupdate; + + metadataversionnumber= d.getMetadataversionnumber() != null && compareTrust(this, e)<0? d.getMetadataversionnumber() : metadataversionnumber; + + geolocation = mergeLists(geolocation, d.getGeolocation()); + + mergeOAFDataInfo(d); + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java index 567254a23..a0ce32353 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/GeoLocation.java @@ -33,4 +33,29 @@ public class GeoLocation implements Serializable { public void setPlace(String place) { this.place = place; } + + + public String toComparableString() { + return String.format("%s::%s%s", point != null ? point.toLowerCase() : "", box != null ? box.toLowerCase() : "",place != null ? place.toLowerCase() : ""); + } + + @Override + public int hashCode() { + return toComparableString().hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + GeoLocation other = (GeoLocation) obj; + + return toComparableString() + .equals(other.toComparableString()); + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java index 352ebbc6e..cc2ab8428 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Oaf.java @@ -23,4 +23,24 @@ public abstract class Oaf implements Serializable { public void setLastupdatetimestamp(Long lastupdatetimestamp) { this.lastupdatetimestamp = lastupdatetimestamp; } + + + public void mergeOAFDataInfo(Oaf e) { + if (e.getDataInfo()!= null && compareTrust(this,e)<0) + dataInfo = e.getDataInfo(); + } + + protected String extractTrust(Oaf e) { + if (e == null || e.getDataInfo()== null || e.getDataInfo().getTrust()== null) + return "0.0"; + return e.getDataInfo().getTrust(); + + + + } + + protected int compareTrust(Oaf a, Oaf b) { + return extractTrust(a).compareTo(extractTrust(b)); + + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java index 955ddfd01..8a86f822d 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java @@ -86,6 +86,7 @@ public abstract class OafEntity extends Oaf implements Serializable { this.oaiprovenance = oaiprovenance; } + public void mergeFrom(OafEntity e) { if (e == null) @@ -97,13 +98,16 @@ public abstract class OafEntity extends Oaf implements Serializable { pid = mergeLists(pid, e.getPid()); - dateofcollection = e.getDateofcollection(); + if (e.getDateofcollection() != null && compareTrust(this, e) < 0) + dateofcollection = e.getDateofcollection(); - dateoftransformation = e.getDateoftransformation(); + if (e.getDateoftransformation() != null && compareTrust(this, e) < 0) + dateoftransformation = e.getDateoftransformation(); extraInfo = mergeLists(extraInfo, e.getExtraInfo()); - oaiprovenance = e.getOaiprovenance(); + if (e.getOaiprovenance() != null && compareTrust(this, e) < 0) + oaiprovenance = e.getOaiprovenance(); } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java index c3e9a7007..b0dffb485 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java @@ -170,21 +170,22 @@ public class Organization extends OafEntity implements Serializable { public void mergeFrom(OafEntity e) { super.mergeFrom(e); final Organization o = (Organization) e; - legalshortname = o.getLegalshortname() != null ? o.getLegalshortname() : legalshortname; - legalname = o.getLegalname() != null ? o.getLegalname() : legalname; + legalshortname = o.getLegalshortname() != null && compareTrust(this, e)<0? o.getLegalshortname() : legalshortname; + legalname = o.getLegalname() != null && compareTrust(this, e)<0 ? o.getLegalname() : legalname; alternativeNames = mergeLists(o.getAlternativeNames(), alternativeNames); - websiteurl = o.getWebsiteurl() != null ? o.getWebsiteurl() : websiteurl; - logourl = o.getLogourl() != null ? o.getLogourl() : logourl; - eclegalbody = o.getEclegalbody() != null ? o.getEclegalbody() : eclegalbody; - eclegalperson = o.getEclegalperson() != null ? o.getEclegalperson() : eclegalperson; - ecnonprofit = o.getEcnonprofit() != null ? o.getEcnonprofit() : ecnonprofit; - ecresearchorganization = o.getEcresearchorganization() != null ? o.getEcresearchorganization() : ecresearchorganization; - echighereducation = o.getEchighereducation() != null ? o.getEchighereducation() : echighereducation; - ecinternationalorganizationeurinterests = o.getEcinternationalorganizationeurinterests() != null ? o.getEcinternationalorganizationeurinterests() : ecinternationalorganizationeurinterests; - ecinternationalorganization = o.getEcinternationalorganization() != null ? o.getEcinternationalorganization() : ecinternationalorganization; - ecenterprise = o.getEcenterprise() != null ? o.getEcenterprise() :ecenterprise; - ecsmevalidated = o.getEcsmevalidated() != null ? o.getEcsmevalidated() :ecsmevalidated; - ecnutscode = o.getEcnutscode() != null ? o.getEcnutscode() :ecnutscode; - country = o.getCountry() != null ? o.getCountry() :country; + websiteurl = o.getWebsiteurl() != null && compareTrust(this, e)<0? o.getWebsiteurl() : websiteurl; + logourl = o.getLogourl() != null && compareTrust(this, e)<0? o.getLogourl() : logourl; + eclegalbody = o.getEclegalbody() != null && compareTrust(this, e)<0? o.getEclegalbody() : eclegalbody; + eclegalperson = o.getEclegalperson() != null && compareTrust(this, e)<0? o.getEclegalperson() : eclegalperson; + ecnonprofit = o.getEcnonprofit() != null && compareTrust(this, e)<0? o.getEcnonprofit() : ecnonprofit; + ecresearchorganization = o.getEcresearchorganization() != null && compareTrust(this, e)<0? o.getEcresearchorganization() : ecresearchorganization; + echighereducation = o.getEchighereducation() != null && compareTrust(this, e)<0? o.getEchighereducation() : echighereducation; + ecinternationalorganizationeurinterests = o.getEcinternationalorganizationeurinterests() != null && compareTrust(this, e)<0? o.getEcinternationalorganizationeurinterests() : ecinternationalorganizationeurinterests; + ecinternationalorganization = o.getEcinternationalorganization() != null && compareTrust(this, e)<0? o.getEcinternationalorganization() : ecinternationalorganization; + ecenterprise = o.getEcenterprise() != null && compareTrust(this, e)<0? o.getEcenterprise() :ecenterprise; + ecsmevalidated = o.getEcsmevalidated() != null && compareTrust(this, e)<0? o.getEcsmevalidated() :ecsmevalidated; + ecnutscode = o.getEcnutscode() != null && compareTrust(this, e)<0? o.getEcnutscode() :ecnutscode; + country = o.getCountry() != null && compareTrust(this, e)<0 ? o.getCountry() :country; + mergeOAFDataInfo(o); } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java index 65f22da37..0bc11bb41 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java @@ -264,4 +264,39 @@ public class Project extends OafEntity implements Serializable { public void setFundedamount(Float fundedamount) { this.fundedamount = fundedamount; } + + + @Override + public void mergeFrom(OafEntity e) { + super.mergeFrom(e); + Project p = (Project)e; + + websiteurl= p.getWebsiteurl()!= null && compareTrust(this,e)<0?p.getWebsiteurl():websiteurl; + code= p.getCode()!=null && compareTrust(this,e)<0?p.getCode():code; + acronym= p.getAcronym()!= null && compareTrust(this,e)<0?p.getAcronym():acronym; + title= p.getTitle()!= null && compareTrust(this,e)<0?p.getTitle():title; + startdate= p.getStartdate()!=null && compareTrust(this,e)<0?p.getStartdate():startdate; + enddate= p.getEnddate()!=null && compareTrust(this,e)<0?p.getEnddate():enddate; + callidentifier= p.getCallidentifier()!=null && compareTrust(this,e)<0?p.getCallidentifier():callidentifier; + keywords= p.getKeywords()!=null && compareTrust(this,e)<0?p.getKeywords():keywords; + duration= p.getDuration()!=null && compareTrust(this,e)<0?p.getDuration():duration; + ecsc39= p.getEcsc39()!=null && compareTrust(this,e)<0?p.getEcsc39():ecsc39; + oamandatepublications= p.getOamandatepublications()!=null && compareTrust(this,e)<0?p.getOamandatepublications():oamandatepublications; + ecarticle29_3= p.getEcarticle29_3()!=null && compareTrust(this,e)<0?p.getEcarticle29_3():ecarticle29_3; + subjects= mergeLists(subjects, p.getSubjects()); + fundingtree= mergeLists(fundingtree, p.getFundingtree()); + contracttype= p.getContracttype()!=null && compareTrust(this,e)<0?p.getContracttype():contracttype; + optional1= p.getOptional1()!=null && compareTrust(this,e)<0?p.getOptional1():optional1; + optional2= p.getOptional2()!=null && compareTrust(this,e)<0?p.getOptional2():optional2; + jsonextrainfo= p.getJsonextrainfo()!=null && compareTrust(this,e)<0?p.getJsonextrainfo():jsonextrainfo; + contactfullname= p.getContactfullname()!=null && compareTrust(this,e)<0?p.getContactfullname():contactfullname; + contactfax= p.getContactfax()!=null && compareTrust(this,e)<0?p.getContactfax():contactfax; + contactphone= p.getContactphone()!=null && compareTrust(this,e)<0?p.getContactphone():contactphone; + contactemail= p.getContactemail()!=null && compareTrust(this,e)<0?p.getContactemail():contactemail; + summary= p.getSummary()!=null && compareTrust(this,e)<0?p.getSummary():summary; + currency= p.getCurrency()!=null && compareTrust(this,e)<0?p.getCurrency():currency; + totalcost= p.getTotalcost()!=null && compareTrust(this,e)<0?p.getTotalcost():totalcost; + fundedamount= p.getFundedamount()!= null && compareTrust(this,e)<0?p.getFundedamount():fundedamount; + mergeOAFDataInfo(e); + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java index 181062f32..bb6990c1d 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java @@ -21,8 +21,9 @@ public class Publication extends Result implements Serializable { Publication p = (Publication) e; - if (p.getJournal() != null) + if (p.getJournal() != null && compareTrust(this, e)<0) journal = p.getJournal(); + mergeOAFDataInfo(e); } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java index 7fb7aef10..10339178d 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java @@ -252,10 +252,10 @@ public abstract class Result extends OafEntity implements Serializable { instance = mergeLists(instance, r.getInstance()); - if (r.getResulttype() != null) + if (r.getResulttype() != null && compareTrust(this, r)<0) resulttype = r.getResulttype(); - if (r.getLanguage() != null) + if (r.getLanguage() != null && compareTrust(this, r)<0) language = r.getLanguage(); country = mergeLists(country, r.getCountry()); @@ -268,10 +268,10 @@ public abstract class Result extends OafEntity implements Serializable { description = longestLists(description, r.getDescription()); - if (r.getPublisher() != null) + if (r.getPublisher() != null && compareTrust(this, r)<0) publisher = r.getPublisher(); - if (r.getEmbargoenddate() != null) + if (r.getEmbargoenddate() != null && compareTrust(this, r)<0) embargoenddate = r.getEmbargoenddate(); source = mergeLists(source, r.getSource()); @@ -287,21 +287,23 @@ public abstract class Result extends OafEntity implements Serializable { coverage = mergeLists(coverage, r.getCoverage()); - if (r.getRefereed() != null) + if (r.getRefereed() != null && compareTrust(this, r)<0) refereed = r.getRefereed(); context = mergeLists(context, r.getContext()); - if (r.getProcessingchargeamount() != null) + if (r.getProcessingchargeamount() != null && compareTrust(this, r)<0) processingchargeamount = r.getProcessingchargeamount(); - if (r.getProcessingchargecurrency() != null) + if (r.getProcessingchargecurrency() != null && compareTrust(this, r)<0) processingchargecurrency = r.getProcessingchargecurrency(); externalReference = mergeLists(externalReference, r.getExternalReference()); } + + private List> longestLists(List> a, List> b) { if(a == null || b == null) return a==null?b:a; diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java index c6e3efbc4..86c1651a2 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupRecordFactory.java @@ -24,6 +24,8 @@ public class DedupRecordFactory { public static JavaRDD createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) { + + long ts = System.currentTimeMillis(); // final JavaPairRDD inputJsonEntities = sc.textFile(entitiesInputPath) .mapToPair((PairFunction) it -> @@ -43,57 +45,31 @@ public class DedupRecordFactory { // final JavaPairRDD joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction>, String, String>) Tuple2::_2); - JavaPairRDD keyJson = joinResult.mapToPair((PairFunction, OafKey, String>) json -> { - - String idValue = json._1(); - - String trust = ""; - try { - trust = MapDocumentUtil.getJPathString("$.dataInfo.trust", json._2()); - } catch (Throwable e) { - - } - - //TODO remember to replace this with the actual trust retrieving - if (StringUtils.isBlank(trust)) { - Random generator = new Random(); - int number = generator.nextInt(20); - double result = (number / 100.0) + 0.80; - trust = "" + result; - } - - return new Tuple2(new OafKey(idValue, trust), json._2()); - }); - - OafComparator c = new OafComparator(); - // - JavaPairRDD> sortedJoinResult = keyJson.repartitionAndSortWithinPartitions(new OafPartitioner(keyJson.getNumPartitions()), c) - .mapToPair((PairFunction, String, String>) t -> new Tuple2(t._1().getDedupId(), t._2())) - .groupByKey(); - + JavaPairRDD> sortedJoinResult = joinResult.groupByKey(); switch (entityType) { case publication: - return sortedJoinResult.map(DedupRecordFactory::publicationMerger); + + return sortedJoinResult.map(p->DedupRecordFactory.publicationMerger(p, ts)); case dataset: - return sortedJoinResult.map(DedupRecordFactory::datasetMerger); + return sortedJoinResult.map(d->DedupRecordFactory.datasetMerger(d,ts)); case project: - return sortedJoinResult.map(DedupRecordFactory::projectMerger); + return sortedJoinResult.map(p->DedupRecordFactory.projectMerger(p,ts)); case software: - return sortedJoinResult.map(DedupRecordFactory::softwareMerger); + return sortedJoinResult.map(s->DedupRecordFactory.softwareMerger(s,ts)); case datasource: - return sortedJoinResult.map(DedupRecordFactory::datasourceMerger); + return sortedJoinResult.map(d->DedupRecordFactory.datasourceMerger(d,ts)); case organization: - return sortedJoinResult.map(DedupRecordFactory::organizationMerger); + return sortedJoinResult.map(o->DedupRecordFactory.organizationMerger(o,ts)); case otherresearchproduct: - return sortedJoinResult.map(DedupRecordFactory::otherresearchproductMerger); + return sortedJoinResult.map(o->DedupRecordFactory.otherresearchproductMerger(o,ts)); default: return null; } } - private static Publication publicationMerger(Tuple2> e) { + private static Publication publicationMerger(Tuple2> e, final long ts) { Publication p = new Publication(); //the result of the merge, to be returned at the end @@ -103,19 +79,11 @@ public class DedupRecordFactory { final Collection dateofacceptance = Lists.newArrayList(); - - StringBuilder trust = new StringBuilder("0.0"); - if (e._2() != null) e._2().forEach(pub -> { try { Publication publication = mapper.readValue(pub, Publication.class); - final String currentTrust = publication.getDataInfo().getTrust(); - if (!"1.0".equals(currentTrust)) { - trust.setLength(0); - trust.append(currentTrust); - } p.mergeFrom(publication); p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor())); //add to the list if they are not null @@ -126,30 +94,74 @@ public class DedupRecordFactory { } }); p.setDateofacceptance(DatePicker.pick(dateofacceptance)); + p.getDataInfo().setTrust("0.9"); + p.setLastupdatetimestamp(ts); return p; } - private static Dataset datasetMerger(Tuple2> e) { + private static Dataset datasetMerger(Tuple2> e, final long ts) { + + Dataset d = new Dataset(); //the result of the merge, to be returned at the end + + d.setId(e._1()); + + final ObjectMapper mapper = new ObjectMapper(); + + final Collection dateofacceptance = Lists.newArrayList(); + + if (e._2() != null) + e._2().forEach(dat -> { + try { + Dataset dataset = mapper.readValue(dat, Dataset.class); + + d.mergeFrom(dataset); + d.setAuthor(DedupUtility.mergeAuthor(d.getAuthor(), dataset.getAuthor())); + //add to the list if they are not null + if (dataset.getDateofacceptance() != null) + dateofacceptance.add(dataset.getDateofacceptance().getValue()); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + d.setDateofacceptance(DatePicker.pick(dateofacceptance)); + d.getDataInfo().setTrust("0.9"); + d.setLastupdatetimestamp(ts); + return d; + } + + private static Project projectMerger(Tuple2> e, final long ts) { + + Project p = new Project(); //the result of the merge, to be returned at the end + + p.setId(e._1()); + + final ObjectMapper mapper = new ObjectMapper(); + if (e._2() != null) + e._2().forEach(proj -> { + try { + Project project = mapper.readValue(proj, Project.class); + + p.mergeFrom(project); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + }); + p.getDataInfo().setTrust("0.9"); + p.setLastupdatetimestamp(ts); + return p; + } + + private static Software softwareMerger(Tuple2> e, final long ts) { throw new NotImplementedException(); } - private static Project projectMerger(Tuple2> e) { + private static Datasource datasourceMerger(Tuple2> e, final long ts) { throw new NotImplementedException(); } - private static Software softwareMerger(Tuple2> e) { - - throw new NotImplementedException(); - } - - private static Datasource datasourceMerger(Tuple2> e) { - - throw new NotImplementedException(); - } - - private static Organization organizationMerger(Tuple2> e) { + private static Organization organizationMerger(Tuple2> e, final long ts) { Organization o = new Organization(); //the result of the merge, to be returned at the end @@ -180,7 +192,7 @@ public class DedupRecordFactory { return o; } - private static OtherResearchProduct otherresearchproductMerger(Tuple2> e) { + private static OtherResearchProduct otherresearchproductMerger(Tuple2> e, final long ts) { throw new NotImplementedException(); } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java index 388ab9b69..3bed74f86 100644 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java @@ -109,58 +109,26 @@ public class DedupUtility { int sa = authorsSize(a); int sb = authorsSize(b); - if(pa == pb){ - base = sa>sb?a:b; - enrich = sa>sb?b:a; + if (pa == pb) { + base = sa > sb ? a : b; + enrich = sa > sb ? b : a; } else { - base = pa>pb?a:b; - enrich = pa>pb?b:a; + base = pa > pb ? a : b; + enrich = pa > pb ? b : a; } enrichPidFromList(base, enrich); return base; - - - -// //if both have no authors with pids -// if (pa < 1 && pb < 1) { -// //B is bigger than A -// if (sa < sb) -// return b; -// //A is bigger than B -// else -// return a; -// } -// //If A has author with pids -// if (pa > 0) { -// //B has no author with pid -// if (pb < 1) -// return a; -// //B has author with pid -// else { -// enrichPidFromList(a, b); -// return a; -// } -// } -// //If B has author with pids -// //A has no author with pid -// if (pa < 1) -// return b; -// //A has author with pid -// else { -// enrichPidFromList(b, a); -// return b; -// } } private static void enrichPidFromList(List base, List enrich) { - if(base==null || enrich == null) + if (base == null || enrich == null) return; final Map basePidAuthorMap = base.stream() .filter(a -> a.getPid() != null && a.getPid().size() > 0) .flatMap(a -> a.getPid() .stream() .map(p -> new Tuple2<>(p.toComparableString(), a)) - ).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); + ).collect(Collectors.toMap(Tuple2::_1, Tuple2::_2, (x1, x2) -> x1)); final List> pidToEnrich = enrich .stream() @@ -171,7 +139,7 @@ public class DedupUtility { pidToEnrich.forEach(a -> { Optional> simAuhtor = base.stream().map(ba -> new Tuple2<>(sim(ba, a._2()), ba)).max(Comparator.comparing(Tuple2::_1)); - if (simAuhtor.isPresent() && simAuhtor.get()._1()> THRESHOLD) { + if (simAuhtor.isPresent() && simAuhtor.get()._1() > THRESHOLD) { Author r = simAuhtor.get()._2(); r.getPid().add(a._1()); } @@ -179,15 +147,15 @@ public class DedupUtility { } public static String createEntityPath(final String basePath, final String entityType) { - return String.format("%s/%s", basePath,entityType); + return String.format("%s/%s", basePath, entityType); } public static String createSimRelPath(final String basePath, final String entityType) { - return String.format("%s/%s_simRel", basePath,entityType); + return String.format("%s/%s_simRel", basePath, entityType); } public static String createMergeRelPath(final String basePath, final String entityType) { - return String.format("%s/%s_mergeRel", basePath,entityType); + return String.format("%s/%s_mergeRel", basePath, entityType); } private static Double sim(Author a, Author b) { @@ -220,6 +188,7 @@ public class DedupUtility { private static String nfd(final String s) { return Normalizer.normalize(s, Normalizer.Form.NFD); } + private static Person parse(Author author) { if (StringUtils.isNotBlank(author.getSurname())) { return new Person(author.getSurname() + ", " + author.getName(), false); @@ -233,7 +202,7 @@ public class DedupUtility { if (authors == null) return 0; - return (int) authors.stream().map(DedupUtility::extractAuthorPid).filter(Objects::nonNull).filter(StringUtils::isNotBlank).count(); + return (int) authors.stream().filter(DedupUtility::hasPid).count(); } private static int authorsSize(List authors) { @@ -242,29 +211,9 @@ public class DedupUtility { return authors.size(); } - - private static boolean isAccurate(final Author a) { - return StringUtils.isNotBlank(a.getName()) && StringUtils.isNotBlank(a.getSurname()); - } - - private static String extractAuthorPid(Author a) { - + private static boolean hasPid(Author a) { if (a == null || a.getPid() == null || a.getPid().size() == 0) - return null; - - StringBuilder mainPid = new StringBuilder(); - - a.getPid().forEach(pid -> { - if (pid.getQualifier().getClassid().equalsIgnoreCase("orcid")) { - mainPid.setLength(0); - mainPid.append(pid.getValue()); - } else { - if (mainPid.length() == 0) - mainPid.append(pid.getValue()); - } - }); - - return mainPid.toString(); - + return false; + return a.getPid().stream().anyMatch(p -> p != null && StringUtils.isNotBlank(p.getValue())); } } diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafComparator.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafComparator.java deleted file mode 100644 index 2ab78db7c..000000000 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafComparator.java +++ /dev/null @@ -1,15 +0,0 @@ -package eu.dnetlib.dedup; -import com.google.common.collect.ComparisonChain; -import java.io.Serializable; -import java.util.Comparator; - -public class OafComparator implements Comparator, Serializable { - - @Override - public int compare(OafKey a, OafKey b) { - return ComparisonChain.start() - .compare(a.getDedupId(), b.getDedupId()) - .compare(a.getTrust(), b.getTrust()) - .result(); - } -} \ No newline at end of file diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafKey.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafKey.java deleted file mode 100644 index f66b0457e..000000000 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafKey.java +++ /dev/null @@ -1,31 +0,0 @@ -package eu.dnetlib.dedup; - -import java.io.Serializable; -public class OafKey implements Serializable { - - private String dedupId; - private String trust; - - public OafKey(String dedupId, String trust) { - this.dedupId = dedupId; - this.trust = trust; - } - public OafKey() { - } - public String getDedupId() { - return dedupId; - } - public void setDedupId(String dedupId) { - this.dedupId = dedupId; - } - public String getTrust() { - return trust; - } - public void setTrust(String trust) { - this.trust = trust; - } - @Override - public String toString(){ - return String.format("%s->%d", dedupId,trust); - } -} diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafPartitioner.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafPartitioner.java deleted file mode 100644 index 20885fd0b..000000000 --- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/OafPartitioner.java +++ /dev/null @@ -1,59 +0,0 @@ -package eu.dnetlib.dedup; - -import org.apache.spark.Partitioner; - -import java.io.Serializable; - -public class OafPartitioner extends Partitioner implements Serializable { - - private final int numPartitions; - - public OafPartitioner(int partitions) { - assert (partitions > 0); - this.numPartitions = partitions; - } - - @Override - public int numPartitions() { - return numPartitions; - } - - @Override - public int getPartition(Object key) { - if (key instanceof OafKey) { - @SuppressWarnings("unchecked") - OafKey item = (OafKey) key; - return Math.abs(item.getDedupId().hashCode() % numPartitions); - } else { - throw new IllegalArgumentException("Unexpected Key"); - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + numPartitions; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof OafPartitioner)) { - return false; - } - // - OafPartitioner other = (OafPartitioner) obj; - if (numPartitions != other.numPartitions) { - return false; - } - // - return true; - } -} diff --git a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml index 09dd3a315..0e75aa072 100644 --- a/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup/src/main/resources/eu/dnetlib/dhp/dedup/oozie_app/workflow.xml @@ -30,7 +30,7 @@ - +