implemented DedupRecord factory with the merge of organizations

This commit is contained in:
Sandro La Bruzzo 2019-12-11 16:57:37 +01:00
parent abd9034da0
commit 6b45e37e22
7 changed files with 113 additions and 1809 deletions

View File

@ -164,4 +164,27 @@ public class Organization extends OafEntity implements Serializable {
public void setCountry(Qualifier country) { public void setCountry(Qualifier country) {
this.country = country; this.country = country;
} }
@Override
public void mergeFrom(OafEntity e) {
super.mergeFrom(e);
final Organization o = (Organization) e;
legalshortname = o.getLegalshortname() != null ? o.getLegalshortname() : legalshortname;
legalname = o.getLegalname() != null ? o.getLegalname() : legalname;
alternativeNames = mergeLists(o.getAlternativeNames(), alternativeNames);
websiteurl = o.getWebsiteurl() != null ? o.getWebsiteurl() : websiteurl;
logourl = o.getLogourl() != null ? o.getLogourl() : logourl;
eclegalbody = o.getEclegalbody() != null ? o.getEclegalbody() : eclegalbody;
eclegalperson = o.getEclegalperson() != null ? o.getEclegalperson() : eclegalperson;
ecnonprofit = o.getEcnonprofit() != null ? o.getEcnonprofit() : ecnonprofit;
ecresearchorganization = o.getEcresearchorganization() != null ? o.getEcresearchorganization() : ecresearchorganization;
echighereducation = o.getEchighereducation() != null ? o.getEchighereducation() : echighereducation;
ecinternationalorganizationeurinterests = o.getEcinternationalorganizationeurinterests() != null ? o.getEcinternationalorganizationeurinterests() : ecinternationalorganizationeurinterests;
ecinternationalorganization = o.getEcinternationalorganization() != null ? o.getEcinternationalorganization() : ecinternationalorganization;
ecenterprise = o.getEcenterprise() != null ? o.getEcenterprise() :ecenterprise;
ecsmevalidated = o.getEcsmevalidated() != null ? o.getEcsmevalidated() :ecsmevalidated;
ecnutscode = o.getEcnutscode() != null ? o.getEcnutscode() :ecnutscode;
country = o.getCountry() != null ? o.getCountry() :country;
}
} }

View File

@ -3,10 +3,7 @@ package eu.dnetlib.dhp.schema.oaf;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.*;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public abstract class Result extends OafEntity implements Serializable { public abstract class Result extends OafEntity implements Serializable {
@ -253,11 +250,6 @@ public abstract class Result extends OafEntity implements Serializable {
Result r = (Result) e; Result r = (Result) e;
//TODO mergeFrom is used only for create Dedup Records since the creation of these two fields requires more complex functions (maybe they will be filled in an external function)
// dateofacceptance = r.getDateofacceptance();
instance = mergeLists(instance, r.getInstance()); instance = mergeLists(instance, r.getInstance());
if (r.getResulttype() != null) if (r.getResulttype() != null)
@ -274,7 +266,7 @@ public abstract class Result extends OafEntity implements Serializable {
relevantdate = mergeLists(relevantdate, r.getRelevantdate()); relevantdate = mergeLists(relevantdate, r.getRelevantdate());
description = mergeLists(description, r.getDescription()); description = longestLists(description, r.getDescription());
if (r.getPublisher() != null) if (r.getPublisher() != null)
publisher = r.getPublisher(); publisher = r.getPublisher();
@ -310,5 +302,16 @@ public abstract class Result extends OafEntity implements Serializable {
} }
private List<Field<String>> longestLists(List<Field<String>> a, List<Field<String>> b) {
if(a == null || b == null)
return a==null?b:a;
if (a.size()== b.size()) {
int msa = a.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
int msb = b.stream().filter(i -> i.getValue() != null).map(i -> i.getValue().length()).max(Comparator.naturalOrder()).orElse(0);
return msa>msb?a:b;
}
return a.size()> b.size()?a:b;
}
} }

View File

@ -35,7 +35,7 @@ public class StructuredProperty implements Serializable {
} }
public String toComparableString(){ public String toComparableString(){
return String.format("%s::%s", value != null ? value.toLowerCase() : "", qualifier != null ? qualifier.toComparableString().toLowerCase() : ""); return value != null ? value.toLowerCase() : "";
} }
@Override @Override

View File

@ -22,32 +22,32 @@ import static java.util.stream.Collectors.toMap;
public class DedupRecordFactory { public class DedupRecordFactory {
public static JavaRDD<OafEntity> createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf){ public static JavaRDD<OafEntity> createDedupRecord(final JavaSparkContext sc, final SparkSession spark, final String mergeRelsInputPath, final String entitiesInputPath, final OafEntityType entityType, final DedupConfig dedupConf) {
//<id, json_entity> //<id, json_entity>
final JavaPairRDD<String, String> inputJsonEntities = sc.textFile(entitiesInputPath) final JavaPairRDD<String, String> inputJsonEntities = sc.textFile(entitiesInputPath)
.mapToPair((PairFunction<String,String,String>) it-> .mapToPair((PairFunction<String, String, String>) it ->
new Tuple2<String, String>(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it),it) new Tuple2<String, String>(MapDocumentUtil.getJPathString(dedupConf.getWf().getIdPath(), it), it)
); );
//<source, target>: source is the dedup_id, target is the id of the mergedIn //<source, target>: source is the dedup_id, target is the id of the mergedIn
JavaPairRDD<String,String> mergeRels = spark JavaPairRDD<String, String> mergeRels = spark
.read().load(mergeRelsInputPath).as(Encoders.bean(Relation.class)) .read().load(mergeRelsInputPath).as(Encoders.bean(Relation.class))
.where("relClass=='merges'") .where("relClass=='merges'")
.javaRDD() .javaRDD()
.mapToPair( .mapToPair(
(PairFunction<Relation, String,String>)r-> (PairFunction<Relation, String, String>) r ->
new Tuple2<String, String>(r.getTarget(), r.getSource()) new Tuple2<String, String>(r.getTarget(), r.getSource())
); );
//<dedup_id, json_entity_merged> //<dedup_id, json_entity_merged>
final JavaPairRDD<String, String> joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2); final JavaPairRDD<String, String> joinResult = mergeRels.join(inputJsonEntities).mapToPair((PairFunction<Tuple2<String, Tuple2<String, String>>, String, String>) Tuple2::_2);
JavaPairRDD<OafKey, String> keyJson = joinResult.mapToPair((PairFunction<Tuple2<String, String>, OafKey, String>) json -> { JavaPairRDD<OafKey, String> keyJson = joinResult.mapToPair((PairFunction<Tuple2<String, String>, OafKey, String>) json -> {
String idValue = json._1(); String idValue = json._1();
String trust =""; String trust = "";
try { try {
trust = MapDocumentUtil.getJPathString("$.dataInfo.trust", json._2()); trust = MapDocumentUtil.getJPathString("$.dataInfo.trust", json._2());
} catch (Throwable e) { } catch (Throwable e) {
@ -72,11 +72,7 @@ public class DedupRecordFactory {
.groupByKey(); .groupByKey();
switch (entityType) {
switch(entityType){
case publication: case publication:
return sortedJoinResult.map(DedupRecordFactory::publicationMerger); return sortedJoinResult.map(DedupRecordFactory::publicationMerger);
case dataset: case dataset:
@ -97,7 +93,7 @@ public class DedupRecordFactory {
} }
private static Publication publicationMerger(Tuple2<String, Iterable<String>> e){ private static Publication publicationMerger(Tuple2<String, Iterable<String>> e) {
Publication p = new Publication(); //the result of the merge, to be returned at the end Publication p = new Publication(); //the result of the merge, to be returned at the end
@ -111,54 +107,80 @@ public class DedupRecordFactory {
StringBuilder trust = new StringBuilder("0.0"); StringBuilder trust = new StringBuilder("0.0");
if (e._2() != null) if (e._2() != null)
e._2().forEach(pub -> { e._2().forEach(pub -> {
try { try {
Publication publication = mapper.readValue(pub, Publication.class); Publication publication = mapper.readValue(pub, Publication.class);
final String currentTrust = publication.getDataInfo().getTrust(); final String currentTrust = publication.getDataInfo().getTrust();
if (!"1.0".equals(currentTrust)) { if (!"1.0".equals(currentTrust)) {
trust.setLength(0); trust.setLength(0);
trust.append(currentTrust); trust.append(currentTrust);
}
p.mergeFrom(publication);
p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor()));
//add to the list if they are not null
if (publication.getDateofacceptance() != null)
dateofacceptance.add(publication.getDateofacceptance().getValue());
} catch (Exception exc) {
throw new RuntimeException(exc);
} }
p.mergeFrom(publication); });
p.setAuthor(DedupUtility.mergeAuthor(p.getAuthor(), publication.getAuthor()));
//add to the list if they are not null
if (publication.getDateofacceptance() != null)
dateofacceptance.add(publication.getDateofacceptance().getValue());
} catch (Exception exc){
throw new RuntimeException(exc);
}
});
p.setDateofacceptance(DatePicker.pick(dateofacceptance)); p.setDateofacceptance(DatePicker.pick(dateofacceptance));
return p; return p;
} }
private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e){ private static Dataset datasetMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException(); throw new NotImplementedException();
} }
private static Project projectMerger(Tuple2<String, Iterable<String>> e){ private static Project projectMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException(); throw new NotImplementedException();
} }
private static Software softwareMerger(Tuple2<String, Iterable<String>> e){ private static Software softwareMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException(); throw new NotImplementedException();
} }
private static Datasource datasourceMerger(Tuple2<String, Iterable<String>> e){ private static Datasource datasourceMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException(); throw new NotImplementedException();
} }
private static Organization organizationMerger(Tuple2<String, Iterable<String>> e){ private static Organization organizationMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException(); Organization o = new Organization(); //the result of the merge, to be returned at the end
o.setId(e._1());
final ObjectMapper mapper = new ObjectMapper();
StringBuilder trust = new StringBuilder("0.0");
if (e._2() != null)
e._2().forEach(pub -> {
try {
Organization organization = mapper.readValue(pub, Organization.class);
final String currentTrust = organization.getDataInfo().getTrust();
if (!"1.0".equals(currentTrust)) {
trust.setLength(0);
trust.append(currentTrust);
}
o.mergeFrom(organization);
} catch (Exception exc) {
throw new RuntimeException(exc);
}
});
return o;
} }
private static OtherResearchProduct otherresearchproductMerger(Tuple2<String, Iterable<String>> e){ private static OtherResearchProduct otherresearchproductMerger(Tuple2<String, Iterable<String>> e) {
throw new NotImplementedException(); throw new NotImplementedException();
} }

View File

@ -16,10 +16,11 @@ import java.util.List;
public class SparkCreateDedupTest { public class SparkCreateDedupTest {
String configuration; String configuration;
String entity = "organization";
@Before @Before
public void setUp() throws IOException { public void setUp() throws IOException {
configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/pub.curr.conf.json")); configuration = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dedup/conf/org.curr.conf.json"));
} }
@ -29,7 +30,7 @@ public class SparkCreateDedupTest {
SparkCreateSimRels.main(new String[] { SparkCreateSimRels.main(new String[] {
"-mt", "local[*]", "-mt", "local[*]",
"-s", "/home/sandro/betadump", "-s", "/home/sandro/betadump",
"-e", "publication", "-e", entity,
"-c", ArgumentApplicationParser.compressArgument(configuration), "-c", ArgumentApplicationParser.compressArgument(configuration),
"-t", "/tmp/dedup", "-t", "/tmp/dedup",
}); });
@ -42,7 +43,7 @@ public class SparkCreateDedupTest {
SparkCreateConnectedComponent.main(new String[] { SparkCreateConnectedComponent.main(new String[] {
"-mt", "local[*]", "-mt", "local[*]",
"-s", "/home/sandro/betadump", "-s", "/home/sandro/betadump",
"-e", "publication", "-e", entity,
"-c", ArgumentApplicationParser.compressArgument(configuration), "-c", ArgumentApplicationParser.compressArgument(configuration),
"-t", "/tmp/dedup", "-t", "/tmp/dedup",
}); });
@ -54,7 +55,7 @@ public class SparkCreateDedupTest {
SparkCreateDedupRecord.main(new String[] { SparkCreateDedupRecord.main(new String[] {
"-mt", "local[*]", "-mt", "local[*]",
"-s", "/home/sandro/betadump", "-s", "/home/sandro/betadump",
"-e", "publication", "-e", entity,
"-c", ArgumentApplicationParser.compressArgument(configuration), "-c", ArgumentApplicationParser.compressArgument(configuration),
"-d", "/tmp/dedup", "-d", "/tmp/dedup",
}); });

View File

@ -7,7 +7,7 @@
"queueMaxSize": "2000", "queueMaxSize": "2000",
"groupMaxSize": "50", "groupMaxSize": "50",
"slidingWindowSize": "200", "slidingWindowSize": "200",
"idPath": ".id", "idPath": "$.id",
"rootBuilder": [ "rootBuilder": [
"organization", "organization",
"projectOrganization_participation_isParticipant", "projectOrganization_participation_isParticipant",
@ -84,7 +84,7 @@
"type": "String", "type": "String",
"weight": "0", "weight": "0",
"ignoreMissing": "false", "ignoreMissing": "false",
"path": ".country.classid" "path": "$.country.classid"
}, },
{ {
"name": "legalshortname", "name": "legalshortname",
@ -92,7 +92,7 @@
"type": "String", "type": "String",
"weight": "0.1", "weight": "0.1",
"ignoreMissing": "true", "ignoreMissing": "true",
"path": ".legalshortname.value" "path": "$.legalshortname.value"
}, },
{ {
"name": "legalname", "name": "legalname",
@ -100,7 +100,7 @@
"type": "String", "type": "String",
"weight": "0.9", "weight": "0.9",
"ignoreMissing": "false", "ignoreMissing": "false",
"path": ".legalname.value", "path": "$.legalname.value",
"params": { "params": {
"windowSize": 4, "windowSize": 4,
"threshold": 0.7 "threshold": 0.7
@ -112,11 +112,19 @@
"type": "URL", "type": "URL",
"weight": "0", "weight": "0",
"ignoreMissing": "true", "ignoreMissing": "true",
"path": ".websiteurl.value", "path": "$.websiteurl.value",
"params": { "params": {
"host": 0.5, "host": 0.5,
"path": 0.5 "path": 0.5
} }
},
{
"name": "gridid",
"algo": "Null",
"type": "String",
"weight": "0.0",
"ignoreMissing": "true",
"path": "$.pid[?(@.qualifier.classid ==\"grid\")].value"
} }
], ],
"blacklists": { "blacklists": {