forked from antonis.lempesis/dnet-hadoop
implemented DedupRecord factory for missing entities
This commit is contained in:
parent
545e940007
commit
b4392f9f43
|
@ -26,7 +26,7 @@ public class Context implements Serializable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return id.hashCode();
|
return id ==null? 0 : id.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -25,8 +25,8 @@ public class Field<T> implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode(){
|
public int hashCode() {
|
||||||
return getValue().hashCode();
|
return getValue() == null ? 0 : getValue().hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -37,11 +37,7 @@ public class Field<T> implements Serializable {
|
||||||
return false;
|
return false;
|
||||||
if (getClass() != obj.getClass())
|
if (getClass() != obj.getClass())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
Field<T> other = (Field<T>) obj;
|
Field<T> other = (Field<T>) obj;
|
||||||
|
|
||||||
return getValue().equals(other.getValue());
|
return getValue().equals(other.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package eu.dnetlib.dhp.schema.oaf;
|
package eu.dnetlib.dhp.schema.oaf;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class GeoLocation implements Serializable {
|
public class GeoLocation implements Serializable {
|
||||||
|
@ -35,8 +37,14 @@ public class GeoLocation implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public boolean isBlank() {
|
||||||
|
return StringUtils.isBlank(point) &&
|
||||||
|
StringUtils.isBlank(box) &&
|
||||||
|
StringUtils.isBlank(place);
|
||||||
|
}
|
||||||
|
|
||||||
public String toComparableString() {
|
public String toComparableString() {
|
||||||
return String.format("%s::%s%s", point != null ? point.toLowerCase() : "", box != null ? box.toLowerCase() : "",place != null ? place.toLowerCase() : "");
|
return isBlank()?"":String.format("%s::%s%s", point != null ? point.toLowerCase() : "", box != null ? box.toLowerCase() : "", place != null ? place.toLowerCase() : "");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -84,6 +84,9 @@ public class Instance implements Serializable {
|
||||||
public void setDateofacceptance(Field<String> dateofacceptance) {
|
public void setDateofacceptance(Field<String> dateofacceptance) {
|
||||||
this.dateofacceptance = dateofacceptance;
|
this.dateofacceptance = dateofacceptance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public String toComparableString(){
|
public String toComparableString(){
|
||||||
return String.format("%s::%s::%s::%s",
|
return String.format("%s::%s::%s::%s",
|
||||||
hostedby != null && hostedby.getKey()!= null ? hostedby.getKey().toLowerCase() : "",
|
hostedby != null && hostedby.getKey()!= null ? hostedby.getKey().toLowerCase() : "",
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package eu.dnetlib.dhp.schema.oaf;
|
package eu.dnetlib.dhp.schema.oaf;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class KeyValue implements Serializable {
|
public class KeyValue implements Serializable {
|
||||||
|
@ -35,7 +37,11 @@ public class KeyValue implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toComparableString() {
|
public String toComparableString() {
|
||||||
return String.format("%s::%s", key != null ? key.toLowerCase() : "", value != null ? value.toLowerCase() : "");
|
return isBlank()?"":String.format("%s::%s", key != null ? key.toLowerCase() : "", value != null ? value.toLowerCase() : "");
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBlank() {
|
||||||
|
return StringUtils.isBlank(key) && StringUtils.isBlank(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -34,4 +34,16 @@ public class OtherResearchProduct extends Result implements Serializable {
|
||||||
public void setTool(List<Field<String>> tool) {
|
public void setTool(List<Field<String>> tool) {
|
||||||
this.tool = tool;
|
this.tool = tool;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mergeFrom(OafEntity e) {
|
||||||
|
super.mergeFrom(e);
|
||||||
|
|
||||||
|
OtherResearchProduct o = (OtherResearchProduct)e;
|
||||||
|
|
||||||
|
contactperson = mergeLists(contactperson, o.getContactperson());
|
||||||
|
contactgroup = mergeLists(contactgroup, o.getContactgroup());
|
||||||
|
tool = mergeLists(tool, o.getTool());
|
||||||
|
mergeOAFDataInfo(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package eu.dnetlib.dhp.schema.oaf;
|
package eu.dnetlib.dhp.schema.oaf;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class Qualifier implements Serializable {
|
public class Qualifier implements Serializable {
|
||||||
|
@ -42,13 +44,18 @@ public class Qualifier implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toComparableString() {
|
public String toComparableString() {
|
||||||
return String.format("%s::%s::%s::%s",
|
return isBlank()?"": String.format("%s::%s::%s::%s",
|
||||||
classid != null ? classid : "",
|
classid != null ? classid : "",
|
||||||
classname != null ? classname : "",
|
classname != null ? classname : "",
|
||||||
schemeid != null ? schemeid : "",
|
schemeid != null ? schemeid : "",
|
||||||
schemename != null ? schemename : "");
|
schemename != null ? schemename : "");
|
||||||
}
|
}
|
||||||
|
public boolean isBlank() {
|
||||||
|
return StringUtils.isBlank(classid) &&
|
||||||
|
StringUtils.isBlank(classname) &&
|
||||||
|
StringUtils.isBlank(schemeid) &&
|
||||||
|
StringUtils.isBlank(schemename);
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return toComparableString().hashCode();
|
return toComparableString().hashCode();
|
||||||
|
|
|
@ -252,10 +252,10 @@ public abstract class Result extends OafEntity implements Serializable {
|
||||||
|
|
||||||
instance = mergeLists(instance, r.getInstance());
|
instance = mergeLists(instance, r.getInstance());
|
||||||
|
|
||||||
if (r.getResulttype() != null && compareTrust(this, r)<0)
|
if (r.getResulttype() != null && compareTrust(this, r) < 0)
|
||||||
resulttype = r.getResulttype();
|
resulttype = r.getResulttype();
|
||||||
|
|
||||||
if (r.getLanguage() != null && compareTrust(this, r)<0)
|
if (r.getLanguage() != null && compareTrust(this, r) < 0)
|
||||||
language = r.getLanguage();
|
language = r.getLanguage();
|
||||||
|
|
||||||
country = mergeLists(country, r.getCountry());
|
country = mergeLists(country, r.getCountry());
|
||||||
|
@ -268,10 +268,10 @@ public abstract class Result extends OafEntity implements Serializable {
|
||||||
|
|
||||||
description = longestLists(description, r.getDescription());
|
description = longestLists(description, r.getDescription());
|
||||||
|
|
||||||
if (r.getPublisher() != null && compareTrust(this, r)<0)
|
if (r.getPublisher() != null && compareTrust(this, r) < 0)
|
||||||
publisher = r.getPublisher();
|
publisher = r.getPublisher();
|
||||||
|
|
||||||
if (r.getEmbargoenddate() != null && compareTrust(this, r)<0)
|
if (r.getEmbargoenddate() != null && compareTrust(this, r) < 0)
|
||||||
embargoenddate = r.getEmbargoenddate();
|
embargoenddate = r.getEmbargoenddate();
|
||||||
|
|
||||||
source = mergeLists(source, r.getSource());
|
source = mergeLists(source, r.getSource());
|
||||||
|
@ -287,15 +287,15 @@ public abstract class Result extends OafEntity implements Serializable {
|
||||||
|
|
||||||
coverage = mergeLists(coverage, r.getCoverage());
|
coverage = mergeLists(coverage, r.getCoverage());
|
||||||
|
|
||||||
if (r.getRefereed() != null && compareTrust(this, r)<0)
|
if (r.getRefereed() != null && compareTrust(this, r) < 0)
|
||||||
refereed = r.getRefereed();
|
refereed = r.getRefereed();
|
||||||
|
|
||||||
context = mergeLists(context, r.getContext());
|
context = mergeLists(context, r.getContext());
|
||||||
|
|
||||||
if (r.getProcessingchargeamount() != null && compareTrust(this, r)<0)
|
if (r.getProcessingchargeamount() != null && compareTrust(this, r) < 0)
|
||||||
processingchargeamount = r.getProcessingchargeamount();
|
processingchargeamount = r.getProcessingchargeamount();
|
||||||
|
|
||||||
if (r.getProcessingchargecurrency() != null && compareTrust(this, r)<0)
|
if (r.getProcessingchargecurrency() != null && compareTrust(this, r) < 0)
|
||||||
processingchargecurrency = r.getProcessingchargecurrency();
|
processingchargecurrency = r.getProcessingchargecurrency();
|
||||||
|
|
||||||
externalReference = mergeLists(externalReference, r.getExternalReference());
|
externalReference = mergeLists(externalReference, r.getExternalReference());
|
||||||
|
@ -303,16 +303,15 @@ public abstract class Result extends OafEntity implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
private List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
|
private List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
|
||||||
if(a == null || b == null)
|
if (a == null || b == null)
|
||||||
return a==null?b:a;
|
return a == null ? b : a;
|
||||||
if (a.size()== b.size()) {
|
if (a.size() == b.size()) {
|
||||||
int msa = a.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
|
int msa = a.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
|
||||||
int msb = b.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
|
int msb = b.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
|
||||||
return msa>msb?a:b;
|
return msa > msb ? a : b;
|
||||||
}
|
}
|
||||||
return a.size()> b.size()?a:b;
|
return a.size() > b.size() ? a : b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -44,4 +44,19 @@ public class Software extends Result implements Serializable {
|
||||||
public void setProgrammingLanguage(Qualifier programmingLanguage) {
|
public void setProgrammingLanguage(Qualifier programmingLanguage) {
|
||||||
this.programmingLanguage = programmingLanguage;
|
this.programmingLanguage = programmingLanguage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mergeFrom(OafEntity e) {
|
||||||
|
super.mergeFrom(e);
|
||||||
|
final Software s = (Software) e;
|
||||||
|
documentationUrl = mergeLists(documentationUrl, s.getDocumentationUrl());
|
||||||
|
|
||||||
|
license = mergeLists(license, s.getLicense());
|
||||||
|
|
||||||
|
codeRepositoryUrl = s.getCodeRepositoryUrl()!= null && compareTrust(this, s)<0?s.getCodeRepositoryUrl():codeRepositoryUrl;
|
||||||
|
|
||||||
|
programmingLanguage= s.getProgrammingLanguage()!= null && compareTrust(this, s)<0?s.getProgrammingLanguage():programmingLanguage;
|
||||||
|
|
||||||
|
mergeOAFDataInfo(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,6 @@ import static java.util.stream.Collectors.toMap;
|
||||||
public class DedupRecordFactory {
|
public class DedupRecordFactory {
|
||||||
|
|
||||||
public static JavaRDD<OafEntity> createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) {
|
public static JavaRDD<OafEntity> createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) {
|
||||||
|
|
||||||
|
|
||||||
long ts = System.currentTimeMillis();
|
long ts = System.currentTimeMillis();
|
||||||
//<id, json_entity>
|
//<id, json_entity>
|
||||||
final JavaPairRDD<String, String> inputJsonEntities = sc.textFile(entitiesInputPath)
|
final JavaPairRDD<String, String> inputJsonEntities = sc.textFile(entitiesInputPath)
|
||||||
|
@ -49,20 +47,19 @@ public class DedupRecordFactory {
|
||||||
|
|
||||||
switch (entityType) {
|
switch (entityType) {
|
||||||
case publication:
|
case publication:
|
||||||
|
return sortedJoinResult.map(p -> DedupRecordFactory.publicationMerger(p, ts));
|
||||||
return sortedJoinResult.map(p->DedupRecordFactory.publicationMerger(p, ts));
|
|
||||||
case dataset:
|
case dataset:
|
||||||
return sortedJoinResult.map(d->DedupRecordFactory.datasetMerger(d,ts));
|
return sortedJoinResult.map(d -> DedupRecordFactory.datasetMerger(d, ts));
|
||||||
case project:
|
case project:
|
||||||
return sortedJoinResult.map(p->DedupRecordFactory.projectMerger(p,ts));
|
return sortedJoinResult.map(p -> DedupRecordFactory.projectMerger(p, ts));
|
||||||
case software:
|
case software:
|
||||||
return sortedJoinResult.map(s->DedupRecordFactory.softwareMerger(s,ts));
|
return sortedJoinResult.map(s -> DedupRecordFactory.softwareMerger(s, ts));
|
||||||
case datasource:
|
case datasource:
|
||||||
return sortedJoinResult.map(d->DedupRecordFactory.datasourceMerger(d,ts));
|
return sortedJoinResult.map(d -> DedupRecordFactory.datasourceMerger(d, ts));
|
||||||
case organization:
|
case organization:
|
||||||
return sortedJoinResult.map(o->DedupRecordFactory.organizationMerger(o,ts));
|
return sortedJoinResult.map(o -> DedupRecordFactory.organizationMerger(o, ts));
|
||||||
case otherresearchproduct:
|
case otherresearchproduct:
|
||||||
return sortedJoinResult.map(o->DedupRecordFactory.otherresearchproductMerger(o,ts));
|
return sortedJoinResult.map(o -> DedupRecordFactory.otherresearchproductMerger(o, ts));
|
||||||
default:
|
default:
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -153,12 +150,48 @@ public class DedupRecordFactory {
|
||||||
|
|
||||||
private static Software softwareMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
private static Software softwareMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||||
|
|
||||||
throw new NotImplementedException();
|
Software s = new Software(); //the result of the merge, to be returned at the end
|
||||||
|
|
||||||
|
s.setId(e._1());
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
final Collection<String> dateofacceptance = Lists.newArrayList();
|
||||||
|
if (e._2() != null)
|
||||||
|
e._2().forEach(soft -> {
|
||||||
|
try {
|
||||||
|
Software software = mapper.readValue(soft, Software.class);
|
||||||
|
|
||||||
|
s.mergeFrom(software);
|
||||||
|
s.setAuthor(DedupUtility.mergeAuthor(s.getAuthor(), software.getAuthor()));
|
||||||
|
//add to the list if they are not null
|
||||||
|
if (software.getDateofacceptance() != null)
|
||||||
|
dateofacceptance.add(software.getDateofacceptance().getValue());
|
||||||
|
} catch (Exception exc) {
|
||||||
|
throw new RuntimeException(exc);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
s.setDateofacceptance(DatePicker.pick(dateofacceptance));
|
||||||
|
s.getDataInfo().setTrust("0.9");
|
||||||
|
s.setLastupdatetimestamp(ts);
|
||||||
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Datasource datasourceMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
private static Datasource datasourceMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||||
|
Datasource d = new Datasource(); //the result of the merge, to be returned at the end
|
||||||
|
d.setId(e._1());
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
if (e._2() != null)
|
||||||
|
e._2().forEach(dat -> {
|
||||||
|
try {
|
||||||
|
Datasource datasource = mapper.readValue(dat, Datasource.class);
|
||||||
|
|
||||||
throw new NotImplementedException();
|
d.mergeFrom(datasource);
|
||||||
|
} catch (Exception exc) {
|
||||||
|
throw new RuntimeException(exc);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
d.getDataInfo().setTrust("0.9");
|
||||||
|
d.setLastupdatetimestamp(ts);
|
||||||
|
return d;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Organization organizationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
private static Organization organizationMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||||
|
@ -188,13 +221,40 @@ public class DedupRecordFactory {
|
||||||
throw new RuntimeException(exc);
|
throw new RuntimeException(exc);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
o.getDataInfo().setTrust("0.9");
|
||||||
|
o.setLastupdatetimestamp(ts);
|
||||||
|
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static OtherResearchProduct otherresearchproductMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
private static OtherResearchProduct otherresearchproductMerger(Tuple2<String, Iterable<String>> e, final long ts) {
|
||||||
|
|
||||||
throw new NotImplementedException();
|
OtherResearchProduct o = new OtherResearchProduct(); //the result of the merge, to be returned at the end
|
||||||
|
|
||||||
|
o.setId(e._1());
|
||||||
|
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
|
final Collection<String> dateofacceptance = Lists.newArrayList();
|
||||||
|
|
||||||
|
if (e._2() != null)
|
||||||
|
e._2().forEach(orp -> {
|
||||||
|
try {
|
||||||
|
OtherResearchProduct otherResearchProduct = mapper.readValue(orp, OtherResearchProduct.class);
|
||||||
|
|
||||||
|
o.mergeFrom(otherResearchProduct);
|
||||||
|
o.setAuthor(DedupUtility.mergeAuthor(o.getAuthor(), otherResearchProduct.getAuthor()));
|
||||||
|
//add to the list if they are not null
|
||||||
|
if (otherResearchProduct.getDateofacceptance() != null)
|
||||||
|
dateofacceptance.add(otherResearchProduct.getDateofacceptance().getValue());
|
||||||
|
} catch (Exception exc) {
|
||||||
|
throw new RuntimeException(exc);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
o.setDateofacceptance(DatePicker.pick(dateofacceptance));
|
||||||
|
o.getDataInfo().setTrust("0.9");
|
||||||
|
o.setLastupdatetimestamp(ts);
|
||||||
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue