diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java index bc317a6ff6..ddc0db75d2 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java @@ -3,6 +3,8 @@ package eu.dnetlib.dhp.schema.common; import com.google.common.collect.Maps; import eu.dnetlib.dhp.schema.oaf.*; import java.util.Map; +import java.util.Optional; +import java.util.function.Function; /** Oaf model utility methods. */ public class ModelSupport { @@ -146,4 +148,66 @@ public class ModelSupport { entityMapping.get(EntityType.valueOf(sourceType)).name(), entityMapping.get(EntityType.valueOf(targetType)).name()); } + + public static Function idFn() { + return x -> { + if (isSubClass(x, Relation.class)) { + return idFnForRelation(x); + } + return idFnForOafEntity(x); + }; + } + + private static String idFnForRelation(T t) { + Relation r = (Relation) t; + return Optional.ofNullable(r.getSource()) + .map( + source -> + Optional.ofNullable(r.getTarget()) + .map( + target -> + Optional.ofNullable(r.getRelType()) + .map( + relType -> + Optional.ofNullable( + r + .getSubRelType()) + .map( + subRelType -> + Optional + .ofNullable( + r + .getRelClass()) + .map( + relClass -> + String + .join( + source, + target, + relType, + subRelType, + relClass)) + .orElse( + String + .join( + source, + target, + relType, + subRelType))) + .orElse( + String + .join( + source, + target, + relType))) + .orElse( + String.join( + source, target))) + .orElse(source)) + .orElse(null); + } + + private static String idFnForOafEntity(T t) { + return ((OafEntity) t).getId(); + } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java index ba9a62eb7a..50dca8104c 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Dataset.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf; import eu.dnetlib.dhp.schema.common.ModelConstants; import java.io.Serializable; import java.util.List; -import java.util.Objects; public class Dataset extends Result implements Serializable { @@ -116,32 +115,4 @@ public class Dataset extends Result implements Serializable { mergeOAFDataInfo(d); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - Dataset dataset = (Dataset) o; - return Objects.equals(storagedate, dataset.storagedate) - && Objects.equals(device, dataset.device) - && Objects.equals(size, dataset.size) - && Objects.equals(version, dataset.version) - && Objects.equals(lastmetadataupdate, dataset.lastmetadataupdate) - && Objects.equals(metadataversionnumber, dataset.metadataversionnumber) - && Objects.equals(geolocation, dataset.geolocation); - } - - @Override - public int hashCode() { - return Objects.hash( - super.hashCode(), - storagedate, - device, - size, - version, - lastmetadataupdate, - metadataversionnumber, - geolocation); - } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java index f2755b86b6..6b181ba06f 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Datasource.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf; import java.io.Serializable; import java.util.List; -import java.util.Objects; public class Datasource extends OafEntity implements Serializable { @@ -512,88 +511,4 @@ public class Datasource extends OafEntity implements Serializable { mergeOAFDataInfo(e); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - Datasource that = (Datasource) o; - return Objects.equals(datasourcetype, that.datasourcetype) - && Objects.equals(openairecompatibility, that.openairecompatibility) - && Objects.equals(officialname, that.officialname) - && Objects.equals(englishname, that.englishname) - && Objects.equals(websiteurl, that.websiteurl) - && Objects.equals(logourl, that.logourl) - && Objects.equals(contactemail, that.contactemail) - && Objects.equals(namespaceprefix, that.namespaceprefix) - && Objects.equals(latitude, that.latitude) - && Objects.equals(longitude, that.longitude) - && Objects.equals(dateofvalidation, that.dateofvalidation) - && Objects.equals(description, that.description) - && Objects.equals(subjects, that.subjects) - && Objects.equals(odnumberofitems, that.odnumberofitems) - && Objects.equals(odnumberofitemsdate, that.odnumberofitemsdate) - && Objects.equals(odpolicies, that.odpolicies) - && Objects.equals(odlanguages, that.odlanguages) - && Objects.equals(odcontenttypes, that.odcontenttypes) - && Objects.equals(accessinfopackage, that.accessinfopackage) - && Objects.equals(releasestartdate, that.releasestartdate) - && Objects.equals(releaseenddate, that.releaseenddate) - && Objects.equals(missionstatementurl, that.missionstatementurl) - && Objects.equals(dataprovider, that.dataprovider) - && Objects.equals(serviceprovider, that.serviceprovider) - && Objects.equals(databaseaccesstype, that.databaseaccesstype) - && Objects.equals(datauploadtype, that.datauploadtype) - && Objects.equals(databaseaccessrestriction, that.databaseaccessrestriction) - && Objects.equals(datauploadrestriction, that.datauploadrestriction) - && Objects.equals(versioning, that.versioning) - && Objects.equals(citationguidelineurl, that.citationguidelineurl) - && Objects.equals(qualitymanagementkind, that.qualitymanagementkind) - && Objects.equals(pidsystems, that.pidsystems) - && Objects.equals(certificates, that.certificates) - && Objects.equals(policies, that.policies) - && Objects.equals(journal, that.journal); - } - - @Override - public int hashCode() { - return Objects.hash( - super.hashCode(), - datasourcetype, - openairecompatibility, - officialname, - englishname, - websiteurl, - logourl, - contactemail, - namespaceprefix, - latitude, - longitude, - dateofvalidation, - description, - subjects, - odnumberofitems, - odnumberofitemsdate, - odpolicies, - odlanguages, - odcontenttypes, - accessinfopackage, - releasestartdate, - releaseenddate, - missionstatementurl, - dataprovider, - serviceprovider, - databaseaccesstype, - datauploadtype, - databaseaccessrestriction, - datauploadrestriction, - versioning, - citationguidelineurl, - qualitymanagementkind, - pidsystems, - certificates, - policies, - journal); - } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java index 6db3b7f745..0d202395d2 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OafEntity.java @@ -113,27 +113,11 @@ public abstract class OafEntity extends Oaf implements Serializable { if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; OafEntity oafEntity = (OafEntity) o; - return Objects.equals(id, oafEntity.id) - && Objects.equals(originalId, oafEntity.originalId) - && Objects.equals(collectedfrom, oafEntity.collectedfrom) - && Objects.equals(pid, oafEntity.pid) - && Objects.equals(dateofcollection, oafEntity.dateofcollection) - && Objects.equals(dateoftransformation, oafEntity.dateoftransformation) - && Objects.equals(extraInfo, oafEntity.extraInfo) - && Objects.equals(oaiprovenance, oafEntity.oaiprovenance); + return Objects.equals(id, oafEntity.id); } @Override public int hashCode() { - return Objects.hash( - super.hashCode(), - id, - originalId, - collectedfrom, - pid, - dateofcollection, - dateoftransformation, - extraInfo, - oaiprovenance); + return Objects.hash(super.hashCode(), id); } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java index 7352b4847a..08e7851140 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Organization.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf; import java.io.Serializable; import java.util.List; -import java.util.Objects; public class Organization extends OafEntity implements Serializable { @@ -233,52 +232,4 @@ public class Organization extends OafEntity implements Serializable { country = o.getCountry() != null && compareTrust(this, e) < 0 ? o.getCountry() : country; mergeOAFDataInfo(o); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - Organization that = (Organization) o; - return Objects.equals(legalshortname, that.legalshortname) - && Objects.equals(legalname, that.legalname) - && Objects.equals(alternativeNames, that.alternativeNames) - && Objects.equals(websiteurl, that.websiteurl) - && Objects.equals(logourl, that.logourl) - && Objects.equals(eclegalbody, that.eclegalbody) - && Objects.equals(eclegalperson, that.eclegalperson) - && Objects.equals(ecnonprofit, that.ecnonprofit) - && Objects.equals(ecresearchorganization, that.ecresearchorganization) - && Objects.equals(echighereducation, that.echighereducation) - && Objects.equals( - ecinternationalorganizationeurinterests, - that.ecinternationalorganizationeurinterests) - && Objects.equals(ecinternationalorganization, that.ecinternationalorganization) - && Objects.equals(ecenterprise, that.ecenterprise) - && Objects.equals(ecsmevalidated, that.ecsmevalidated) - && Objects.equals(ecnutscode, that.ecnutscode) - && Objects.equals(country, that.country); - } - - @Override - public int hashCode() { - return Objects.hash( - super.hashCode(), - legalshortname, - legalname, - alternativeNames, - websiteurl, - logourl, - eclegalbody, - eclegalperson, - ecnonprofit, - ecresearchorganization, - echighereducation, - ecinternationalorganizationeurinterests, - ecinternationalorganization, - ecenterprise, - ecsmevalidated, - ecnutscode, - country); - } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java index 13a8e3c45c..8d8fe2d1ca 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/OtherResearchProduct.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf; import eu.dnetlib.dhp.schema.common.ModelConstants; import java.io.Serializable; import java.util.List; -import java.util.Objects; public class OtherResearchProduct extends Result implements Serializable { @@ -56,20 +55,4 @@ public class OtherResearchProduct extends Result implements Serializable { tool = mergeLists(tool, o.getTool()); mergeOAFDataInfo(e); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - OtherResearchProduct that = (OtherResearchProduct) o; - return Objects.equals(contactperson, that.contactperson) - && Objects.equals(contactgroup, that.contactgroup) - && Objects.equals(tool, that.tool); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), contactperson, contactgroup, tool); - } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java index 7ed816fbe3..ea7d3a80bd 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Project.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf; import java.io.Serializable; import java.util.List; -import java.util.Objects; public class Project extends OafEntity implements Serializable { @@ -352,70 +351,4 @@ public class Project extends OafEntity implements Serializable { : fundedamount; mergeOAFDataInfo(e); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - Project project = (Project) o; - return Objects.equals(websiteurl, project.websiteurl) - && Objects.equals(code, project.code) - && Objects.equals(acronym, project.acronym) - && Objects.equals(title, project.title) - && Objects.equals(startdate, project.startdate) - && Objects.equals(enddate, project.enddate) - && Objects.equals(callidentifier, project.callidentifier) - && Objects.equals(keywords, project.keywords) - && Objects.equals(duration, project.duration) - && Objects.equals(ecsc39, project.ecsc39) - && Objects.equals(oamandatepublications, project.oamandatepublications) - && Objects.equals(ecarticle29_3, project.ecarticle29_3) - && Objects.equals(subjects, project.subjects) - && Objects.equals(fundingtree, project.fundingtree) - && Objects.equals(contracttype, project.contracttype) - && Objects.equals(optional1, project.optional1) - && Objects.equals(optional2, project.optional2) - && Objects.equals(jsonextrainfo, project.jsonextrainfo) - && Objects.equals(contactfullname, project.contactfullname) - && Objects.equals(contactfax, project.contactfax) - && Objects.equals(contactphone, project.contactphone) - && Objects.equals(contactemail, project.contactemail) - && Objects.equals(summary, project.summary) - && Objects.equals(currency, project.currency) - && Objects.equals(totalcost, project.totalcost) - && Objects.equals(fundedamount, project.fundedamount); - } - - @Override - public int hashCode() { - return Objects.hash( - super.hashCode(), - websiteurl, - code, - acronym, - title, - startdate, - enddate, - callidentifier, - keywords, - duration, - ecsc39, - oamandatepublications, - ecarticle29_3, - subjects, - fundingtree, - contracttype, - optional1, - optional2, - jsonextrainfo, - contactfullname, - contactfax, - contactphone, - contactemail, - summary, - currency, - totalcost, - fundedamount); - } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java index df2ecfa1b5..62727a50f0 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Publication.java @@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf; import eu.dnetlib.dhp.schema.common.ModelConstants; import java.io.Serializable; -import java.util.Objects; public class Publication extends Result implements Serializable { @@ -34,18 +33,4 @@ public class Publication extends Result implements Serializable { if (p.getJournal() != null && compareTrust(this, e) < 0) journal = p.getJournal(); mergeOAFDataInfo(e); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - Publication that = (Publication) o; - return Objects.equals(journal, that.journal); - } - - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), journal); - } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java index f18a9fa0c7..73192b8941 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Result.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf; import java.io.Serializable; import java.util.Comparator; import java.util.List; -import java.util.Objects; public class Result extends OafEntity implements Serializable { @@ -289,60 +288,4 @@ public class Result extends OafEntity implements Serializable { } return a.size() > b.size() ? a : b; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - Result result = (Result) o; - return Objects.equals(author, result.author) - && Objects.equals(resulttype, result.resulttype) - && Objects.equals(language, result.language) - && Objects.equals(country, result.country) - && Objects.equals(subject, result.subject) - && Objects.equals(title, result.title) - && Objects.equals(relevantdate, result.relevantdate) - && Objects.equals(description, result.description) - && Objects.equals(dateofacceptance, result.dateofacceptance) - && Objects.equals(publisher, result.publisher) - && Objects.equals(embargoenddate, result.embargoenddate) - && Objects.equals(source, result.source) - && Objects.equals(fulltext, result.fulltext) - && Objects.equals(format, result.format) - && Objects.equals(contributor, result.contributor) - && Objects.equals(resourcetype, result.resourcetype) - && Objects.equals(coverage, result.coverage) - && Objects.equals(bestaccessright, result.bestaccessright) - && Objects.equals(context, result.context) - && Objects.equals(externalReference, result.externalReference) - && Objects.equals(instance, result.instance); - } - - @Override - public int hashCode() { - return Objects.hash( - super.hashCode(), - author, - resulttype, - language, - country, - subject, - title, - relevantdate, - description, - dateofacceptance, - publisher, - embargoenddate, - source, - fulltext, - format, - contributor, - resourcetype, - coverage, - bestaccessright, - context, - externalReference, - instance); - } } diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java index 801c1f2f15..59ef7d1c8b 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Software.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf; import eu.dnetlib.dhp.schema.common.ModelConstants; import java.io.Serializable; import java.util.List; -import java.util.Objects; public class Software extends Result implements Serializable { @@ -76,26 +75,4 @@ public class Software extends Result implements Serializable { mergeOAFDataInfo(e); } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; - Software software = (Software) o; - return Objects.equals(documentationUrl, software.documentationUrl) - && Objects.equals(license, software.license) - && Objects.equals(codeRepositoryUrl, software.codeRepositoryUrl) - && Objects.equals(programmingLanguage, software.programmingLanguage); - } - - @Override - public int hashCode() { - return Objects.hash( - super.hashCode(), - documentationUrl, - license, - codeRepositoryUrl, - programmingLanguage); - } } diff --git a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java index 1fa173e9c9..b605307078 100644 --- a/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java +++ b/dhp-workflows/dhp-actionmanager/src/main/java/eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJob.java @@ -166,10 +166,8 @@ public class PromoteActionPayloadForGraphTableJob { actionPayloadClazz.getSimpleName(), rowClazz.getSimpleName()); - SerializableSupplier> rowIdFn = - PromoteActionPayloadForGraphTableJob::idFn; - SerializableSupplier> actionPayloadIdFn = - PromoteActionPayloadForGraphTableJob::idFn; + SerializableSupplier> rowIdFn = ModelSupport::idFn; + SerializableSupplier> actionPayloadIdFn = ModelSupport::idFn; SerializableSupplier> mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(strategy); SerializableSupplier> mergeRowsAndGetFn = @@ -192,68 +190,6 @@ public class PromoteActionPayloadForGraphTableJob { joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz); } - private static Function idFn() { - return x -> { - if (isSubClass(x, Relation.class)) { - return idFnForRelation(x); - } - return idFnForOafEntity(x); - }; - } - - private static String idFnForRelation(T t) { - Relation r = (Relation) t; - return Optional.ofNullable(r.getSource()) - .map( - source -> - Optional.ofNullable(r.getTarget()) - .map( - target -> - Optional.ofNullable(r.getRelType()) - .map( - relType -> - Optional.ofNullable( - r - .getSubRelType()) - .map( - subRelType -> - Optional - .ofNullable( - r - .getRelClass()) - .map( - relClass -> - String - .join( - source, - target, - relType, - subRelType, - relClass)) - .orElse( - String - .join( - source, - target, - relType, - subRelType))) - .orElse( - String - .join( - source, - target, - relType))) - .orElse( - String.join( - source, target))) - .orElse(source)) - .orElse(null); - } - - private static String idFnForOafEntity(T t) { - return ((OafEntity) t).getId(); - } - private static SerializableSupplier zeroFn(Class clazz) { switch (clazz.getCanonicalName()) { case "eu.dnetlib.dhp.schema.oaf.Dataset": diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java index 34ae2df9b5..97a94bc409 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/GenerateEntitiesApplication.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.graph.raw.common.DbClient; +import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import java.io.IOException; import java.sql.SQLException; @@ -29,6 +30,8 @@ public class GenerateEntitiesApplication { private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + public static void main(final String[] args) throws Exception { final ArgumentApplicationParser parser = new ArgumentApplicationParser( @@ -78,7 +81,7 @@ public class GenerateEntitiesApplication { log.info("Generate entities from files:"); existingSourcePaths.forEach(log::info); - JavaRDD inputRdd = sc.emptyRDD(); + JavaRDD inputRdd = sc.emptyRDD(); for (final String sp : existingSourcePaths) { inputRdd = @@ -86,15 +89,29 @@ public class GenerateEntitiesApplication { sc.sequenceFile(sp, Text.class, Text.class) .map(k -> new Tuple2<>(k._1().toString(), k._2().toString())) .map(k -> convertToListOaf(k._1(), k._2(), code2name)) - .flatMap(list -> list.iterator()) - .map( - oaf -> - oaf.getClass().getSimpleName().toLowerCase() - + "|" - + convertToJson(oaf))); + .flatMap(list -> list.iterator())); } - inputRdd.saveAsTextFile(targetPath, GzipCodec.class); + inputRdd.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf)) + .reduceByKey((o1, o2) -> merge(o1, o2)) + .map(Tuple2::_2) + .map( + oaf -> + oaf.getClass().getSimpleName().toLowerCase() + + "|" + + OBJECT_MAPPER.writeValueAsString(oaf)) + .saveAsTextFile(targetPath, GzipCodec.class); + } + + private static Oaf merge(Oaf o1, Oaf o2) { + if (ModelSupport.isSubClass(o1, OafEntity.class)) { + ((OafEntity) o1).mergeFrom((OafEntity) o2); + } else if (ModelSupport.isSubClass(o1, Relation.class)) { + ((Relation) o1).mergeFrom((Relation) o2); + } else { + throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName()); + } + return o1; } private static List convertToListOaf( @@ -120,9 +137,10 @@ public class GenerateEntitiesApplication { return Arrays.asList(convertFromJson(s, Dataset.class)); case "software": return Arrays.asList(convertFromJson(s, Software.class)); - case "otherresearchproducts": - default: + case "otherresearchproduct": return Arrays.asList(convertFromJson(s, OtherResearchProduct.class)); + default: + throw new RuntimeException("type not managed: " + type.toLowerCase()); } } @@ -150,17 +168,9 @@ public class GenerateEntitiesApplication { return map; } - private static String convertToJson(final Oaf oaf) { - try { - return new ObjectMapper().writeValueAsString(oaf); - } catch (final Exception e) { - throw new RuntimeException(e); - } - } - private static Oaf convertFromJson(final String s, final Class clazz) { try { - return new ObjectMapper().readValue(s, clazz); + return OBJECT_MAPPER.readValue(s, clazz); } catch (final Exception e) { log.error("Error parsing object of class: " + clazz); log.error(s); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java index 130b826e93..f10991bca6 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/raw/MergeClaimsApplication.java @@ -1,7 +1,6 @@ package eu.dnetlib.dhp.oa.graph.raw; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; @@ -10,7 +9,6 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import java.util.Objects; import java.util.Optional; -import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; @@ -83,7 +81,9 @@ public class MergeClaimsApplication { readFromPath(spark, rawPath, clazz) .map( (MapFunction>) - value -> new Tuple2<>(idFn().apply(value), value), + value -> + new Tuple2<>( + ModelSupport.idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); @@ -92,14 +92,11 @@ public class MergeClaimsApplication { .getValue() .map( (MapFunction>) - value -> new Tuple2<>(idFn().apply(value), value), + value -> + new Tuple2<>( + ModelSupport.idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); - /* - Dataset> claim = readFromPath(spark, claimPath, clazz) - .map((MapFunction>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz))); - */ - raw.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer") .map( (MapFunction, Tuple2>, T>) @@ -131,78 +128,12 @@ public class MergeClaimsApplication { .map( (MapFunction) value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)) - .filter((FilterFunction) value -> Objects.nonNull(idFn().apply(value))); - /* - return spark.read() - .load(path) - .as(Encoders.bean(clazz)) - .filter((FilterFunction) value -> Objects.nonNull(idFn().apply(value))); - */ + .filter( + (FilterFunction) + value -> Objects.nonNull(ModelSupport.idFn().apply(value))); } private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } - - private static Function idFn() { - return x -> { - if (isSubClass(x, Relation.class)) { - return idFnForRelation(x); - } - return idFnForOafEntity(x); - }; - } - - private static String idFnForRelation(T t) { - Relation r = (Relation) t; - return Optional.ofNullable(r.getSource()) - .map( - source -> - Optional.ofNullable(r.getTarget()) - .map( - target -> - Optional.ofNullable(r.getRelType()) - .map( - relType -> - Optional.ofNullable( - r - .getSubRelType()) - .map( - subRelType -> - Optional - .ofNullable( - r - .getRelClass()) - .map( - relClass -> - String - .join( - source, - target, - relType, - subRelType, - relClass)) - .orElse( - String - .join( - source, - target, - relType, - subRelType))) - .orElse( - String - .join( - source, - target, - relType))) - .orElse( - String.join( - source, target))) - .orElse(source)) - .orElse(null); - } - - private static String idFnForOafEntity(T t) { - return ((OafEntity) t).getId(); - } }