refined definition of equals and hash methods for Oaf model classes, now based on entity identifier, while relations consider sourceid, targetid and relationship semantic; Factored out function to group Oaf objects in grouping operations; Raw graph creation procedure merges entities and relationships providing the same identity

This commit is contained in:
Claudio Atzori 2020-04-24 14:42:01 +02:00
parent a3e480d1c9
commit 268462623a
13 changed files with 106 additions and 523 deletions

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.schema.common;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
/** Oaf model utility methods. */
public class ModelSupport {
@ -146,4 +148,66 @@ public class ModelSupport {
entityMapping.get(EntityType.valueOf(sourceType)).name(),
entityMapping.get(EntityType.valueOf(targetType)).name());
}
public static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
return idFnForRelation(x);
}
return idFnForOafEntity(x);
};
}
private static <T extends Oaf> String idFnForRelation(T t) {
Relation r = (Relation) t;
return Optional.ofNullable(r.getSource())
.map(
source ->
Optional.ofNullable(r.getTarget())
.map(
target ->
Optional.ofNullable(r.getRelType())
.map(
relType ->
Optional.ofNullable(
r
.getSubRelType())
.map(
subRelType ->
Optional
.ofNullable(
r
.getRelClass())
.map(
relClass ->
String
.join(
source,
target,
relType,
subRelType,
relClass))
.orElse(
String
.join(
source,
target,
relType,
subRelType)))
.orElse(
String
.join(
source,
target,
relType)))
.orElse(
String.join(
source, target)))
.orElse(source))
.orElse(null);
}
private static <T extends Oaf> String idFnForOafEntity(T t) {
return ((OafEntity) t).getId();
}
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Dataset extends Result implements Serializable {
@ -116,32 +115,4 @@ public class Dataset extends Result implements Serializable {
mergeOAFDataInfo(d);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Dataset dataset = (Dataset) o;
return Objects.equals(storagedate, dataset.storagedate)
&& Objects.equals(device, dataset.device)
&& Objects.equals(size, dataset.size)
&& Objects.equals(version, dataset.version)
&& Objects.equals(lastmetadataupdate, dataset.lastmetadataupdate)
&& Objects.equals(metadataversionnumber, dataset.metadataversionnumber)
&& Objects.equals(geolocation, dataset.geolocation);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
storagedate,
device,
size,
version,
lastmetadataupdate,
metadataversionnumber,
geolocation);
}
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Datasource extends OafEntity implements Serializable {
@ -512,88 +511,4 @@ public class Datasource extends OafEntity implements Serializable {
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Datasource that = (Datasource) o;
return Objects.equals(datasourcetype, that.datasourcetype)
&& Objects.equals(openairecompatibility, that.openairecompatibility)
&& Objects.equals(officialname, that.officialname)
&& Objects.equals(englishname, that.englishname)
&& Objects.equals(websiteurl, that.websiteurl)
&& Objects.equals(logourl, that.logourl)
&& Objects.equals(contactemail, that.contactemail)
&& Objects.equals(namespaceprefix, that.namespaceprefix)
&& Objects.equals(latitude, that.latitude)
&& Objects.equals(longitude, that.longitude)
&& Objects.equals(dateofvalidation, that.dateofvalidation)
&& Objects.equals(description, that.description)
&& Objects.equals(subjects, that.subjects)
&& Objects.equals(odnumberofitems, that.odnumberofitems)
&& Objects.equals(odnumberofitemsdate, that.odnumberofitemsdate)
&& Objects.equals(odpolicies, that.odpolicies)
&& Objects.equals(odlanguages, that.odlanguages)
&& Objects.equals(odcontenttypes, that.odcontenttypes)
&& Objects.equals(accessinfopackage, that.accessinfopackage)
&& Objects.equals(releasestartdate, that.releasestartdate)
&& Objects.equals(releaseenddate, that.releaseenddate)
&& Objects.equals(missionstatementurl, that.missionstatementurl)
&& Objects.equals(dataprovider, that.dataprovider)
&& Objects.equals(serviceprovider, that.serviceprovider)
&& Objects.equals(databaseaccesstype, that.databaseaccesstype)
&& Objects.equals(datauploadtype, that.datauploadtype)
&& Objects.equals(databaseaccessrestriction, that.databaseaccessrestriction)
&& Objects.equals(datauploadrestriction, that.datauploadrestriction)
&& Objects.equals(versioning, that.versioning)
&& Objects.equals(citationguidelineurl, that.citationguidelineurl)
&& Objects.equals(qualitymanagementkind, that.qualitymanagementkind)
&& Objects.equals(pidsystems, that.pidsystems)
&& Objects.equals(certificates, that.certificates)
&& Objects.equals(policies, that.policies)
&& Objects.equals(journal, that.journal);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
datasourcetype,
openairecompatibility,
officialname,
englishname,
websiteurl,
logourl,
contactemail,
namespaceprefix,
latitude,
longitude,
dateofvalidation,
description,
subjects,
odnumberofitems,
odnumberofitemsdate,
odpolicies,
odlanguages,
odcontenttypes,
accessinfopackage,
releasestartdate,
releaseenddate,
missionstatementurl,
dataprovider,
serviceprovider,
databaseaccesstype,
datauploadtype,
databaseaccessrestriction,
datauploadrestriction,
versioning,
citationguidelineurl,
qualitymanagementkind,
pidsystems,
certificates,
policies,
journal);
}
}

View File

@ -113,27 +113,11 @@ public abstract class OafEntity extends Oaf implements Serializable {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
OafEntity oafEntity = (OafEntity) o;
return Objects.equals(id, oafEntity.id)
&& Objects.equals(originalId, oafEntity.originalId)
&& Objects.equals(collectedfrom, oafEntity.collectedfrom)
&& Objects.equals(pid, oafEntity.pid)
&& Objects.equals(dateofcollection, oafEntity.dateofcollection)
&& Objects.equals(dateoftransformation, oafEntity.dateoftransformation)
&& Objects.equals(extraInfo, oafEntity.extraInfo)
&& Objects.equals(oaiprovenance, oafEntity.oaiprovenance);
return Objects.equals(id, oafEntity.id);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
id,
originalId,
collectedfrom,
pid,
dateofcollection,
dateoftransformation,
extraInfo,
oaiprovenance);
return Objects.hash(super.hashCode(), id);
}
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Organization extends OafEntity implements Serializable {
@ -233,52 +232,4 @@ public class Organization extends OafEntity implements Serializable {
country = o.getCountry() != null && compareTrust(this, e) < 0 ? o.getCountry() : country;
mergeOAFDataInfo(o);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Organization that = (Organization) o;
return Objects.equals(legalshortname, that.legalshortname)
&& Objects.equals(legalname, that.legalname)
&& Objects.equals(alternativeNames, that.alternativeNames)
&& Objects.equals(websiteurl, that.websiteurl)
&& Objects.equals(logourl, that.logourl)
&& Objects.equals(eclegalbody, that.eclegalbody)
&& Objects.equals(eclegalperson, that.eclegalperson)
&& Objects.equals(ecnonprofit, that.ecnonprofit)
&& Objects.equals(ecresearchorganization, that.ecresearchorganization)
&& Objects.equals(echighereducation, that.echighereducation)
&& Objects.equals(
ecinternationalorganizationeurinterests,
that.ecinternationalorganizationeurinterests)
&& Objects.equals(ecinternationalorganization, that.ecinternationalorganization)
&& Objects.equals(ecenterprise, that.ecenterprise)
&& Objects.equals(ecsmevalidated, that.ecsmevalidated)
&& Objects.equals(ecnutscode, that.ecnutscode)
&& Objects.equals(country, that.country);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
legalshortname,
legalname,
alternativeNames,
websiteurl,
logourl,
eclegalbody,
eclegalperson,
ecnonprofit,
ecresearchorganization,
echighereducation,
ecinternationalorganizationeurinterests,
ecinternationalorganization,
ecenterprise,
ecsmevalidated,
ecnutscode,
country);
}
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class OtherResearchProduct extends Result implements Serializable {
@ -56,20 +55,4 @@ public class OtherResearchProduct extends Result implements Serializable {
tool = mergeLists(tool, o.getTool());
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
OtherResearchProduct that = (OtherResearchProduct) o;
return Objects.equals(contactperson, that.contactperson)
&& Objects.equals(contactgroup, that.contactgroup)
&& Objects.equals(tool, that.tool);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), contactperson, contactgroup, tool);
}
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Project extends OafEntity implements Serializable {
@ -352,70 +351,4 @@ public class Project extends OafEntity implements Serializable {
: fundedamount;
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Project project = (Project) o;
return Objects.equals(websiteurl, project.websiteurl)
&& Objects.equals(code, project.code)
&& Objects.equals(acronym, project.acronym)
&& Objects.equals(title, project.title)
&& Objects.equals(startdate, project.startdate)
&& Objects.equals(enddate, project.enddate)
&& Objects.equals(callidentifier, project.callidentifier)
&& Objects.equals(keywords, project.keywords)
&& Objects.equals(duration, project.duration)
&& Objects.equals(ecsc39, project.ecsc39)
&& Objects.equals(oamandatepublications, project.oamandatepublications)
&& Objects.equals(ecarticle29_3, project.ecarticle29_3)
&& Objects.equals(subjects, project.subjects)
&& Objects.equals(fundingtree, project.fundingtree)
&& Objects.equals(contracttype, project.contracttype)
&& Objects.equals(optional1, project.optional1)
&& Objects.equals(optional2, project.optional2)
&& Objects.equals(jsonextrainfo, project.jsonextrainfo)
&& Objects.equals(contactfullname, project.contactfullname)
&& Objects.equals(contactfax, project.contactfax)
&& Objects.equals(contactphone, project.contactphone)
&& Objects.equals(contactemail, project.contactemail)
&& Objects.equals(summary, project.summary)
&& Objects.equals(currency, project.currency)
&& Objects.equals(totalcost, project.totalcost)
&& Objects.equals(fundedamount, project.fundedamount);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
websiteurl,
code,
acronym,
title,
startdate,
enddate,
callidentifier,
keywords,
duration,
ecsc39,
oamandatepublications,
ecarticle29_3,
subjects,
fundingtree,
contracttype,
optional1,
optional2,
jsonextrainfo,
contactfullname,
contactfax,
contactphone,
contactemail,
summary,
currency,
totalcost,
fundedamount);
}
}

View File

@ -2,7 +2,6 @@ package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.Objects;
public class Publication extends Result implements Serializable {
@ -34,18 +33,4 @@ public class Publication extends Result implements Serializable {
if (p.getJournal() != null && compareTrust(this, e) < 0) journal = p.getJournal();
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Publication that = (Publication) o;
return Objects.equals(journal, that.journal);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), journal);
}
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf;
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class Result extends OafEntity implements Serializable {
@ -289,60 +288,4 @@ public class Result extends OafEntity implements Serializable {
}
return a.size() > b.size() ? a : b;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Result result = (Result) o;
return Objects.equals(author, result.author)
&& Objects.equals(resulttype, result.resulttype)
&& Objects.equals(language, result.language)
&& Objects.equals(country, result.country)
&& Objects.equals(subject, result.subject)
&& Objects.equals(title, result.title)
&& Objects.equals(relevantdate, result.relevantdate)
&& Objects.equals(description, result.description)
&& Objects.equals(dateofacceptance, result.dateofacceptance)
&& Objects.equals(publisher, result.publisher)
&& Objects.equals(embargoenddate, result.embargoenddate)
&& Objects.equals(source, result.source)
&& Objects.equals(fulltext, result.fulltext)
&& Objects.equals(format, result.format)
&& Objects.equals(contributor, result.contributor)
&& Objects.equals(resourcetype, result.resourcetype)
&& Objects.equals(coverage, result.coverage)
&& Objects.equals(bestaccessright, result.bestaccessright)
&& Objects.equals(context, result.context)
&& Objects.equals(externalReference, result.externalReference)
&& Objects.equals(instance, result.instance);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
author,
resulttype,
language,
country,
subject,
title,
relevantdate,
description,
dateofacceptance,
publisher,
embargoenddate,
source,
fulltext,
format,
contributor,
resourcetype,
coverage,
bestaccessright,
context,
externalReference,
instance);
}
}

View File

@ -3,7 +3,6 @@ package eu.dnetlib.dhp.schema.oaf;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
public class Software extends Result implements Serializable {
@ -76,26 +75,4 @@ public class Software extends Result implements Serializable {
mergeOAFDataInfo(e);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
Software software = (Software) o;
return Objects.equals(documentationUrl, software.documentationUrl)
&& Objects.equals(license, software.license)
&& Objects.equals(codeRepositoryUrl, software.codeRepositoryUrl)
&& Objects.equals(programmingLanguage, software.programmingLanguage);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
documentationUrl,
license,
codeRepositoryUrl,
programmingLanguage);
}
}

View File

@ -166,10 +166,8 @@ public class PromoteActionPayloadForGraphTableJob {
actionPayloadClazz.getSimpleName(),
rowClazz.getSimpleName());
SerializableSupplier<Function<G, String>> rowIdFn =
PromoteActionPayloadForGraphTableJob::idFn;
SerializableSupplier<Function<A, String>> actionPayloadIdFn =
PromoteActionPayloadForGraphTableJob::idFn;
SerializableSupplier<Function<G, String>> rowIdFn = ModelSupport::idFn;
SerializableSupplier<Function<A, String>> actionPayloadIdFn = ModelSupport::idFn;
SerializableSupplier<BiFunction<G, A, G>> mergeRowWithActionPayloadAndGetFn =
MergeAndGet.functionFor(strategy);
SerializableSupplier<BiFunction<G, G, G>> mergeRowsAndGetFn =
@ -192,68 +190,6 @@ public class PromoteActionPayloadForGraphTableJob {
joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
}
private static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
return idFnForRelation(x);
}
return idFnForOafEntity(x);
};
}
private static <T extends Oaf> String idFnForRelation(T t) {
Relation r = (Relation) t;
return Optional.ofNullable(r.getSource())
.map(
source ->
Optional.ofNullable(r.getTarget())
.map(
target ->
Optional.ofNullable(r.getRelType())
.map(
relType ->
Optional.ofNullable(
r
.getSubRelType())
.map(
subRelType ->
Optional
.ofNullable(
r
.getRelClass())
.map(
relClass ->
String
.join(
source,
target,
relType,
subRelType,
relClass))
.orElse(
String
.join(
source,
target,
relType,
subRelType)))
.orElse(
String
.join(
source,
target,
relType)))
.orElse(
String.join(
source, target)))
.orElse(source))
.orElse(null);
}
private static <T extends Oaf> String idFnForOafEntity(T t) {
return ((OafEntity) t).getId();
}
private static <T extends Oaf> SerializableSupplier<T> zeroFn(Class<T> clazz) {
switch (clazz.getCanonicalName()) {
case "eu.dnetlib.dhp.schema.oaf.Dataset":

View File

@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.DbClient;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import java.io.IOException;
import java.sql.SQLException;
@ -29,6 +30,8 @@ public class GenerateEntitiesApplication {
private static final Logger log = LoggerFactory.getLogger(GenerateEntitiesApplication.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(final String[] args) throws Exception {
final ArgumentApplicationParser parser =
new ArgumentApplicationParser(
@ -78,7 +81,7 @@ public class GenerateEntitiesApplication {
log.info("Generate entities from files:");
existingSourcePaths.forEach(log::info);
JavaRDD<String> inputRdd = sc.emptyRDD();
JavaRDD<Oaf> inputRdd = sc.emptyRDD();
for (final String sp : existingSourcePaths) {
inputRdd =
@ -86,15 +89,29 @@ public class GenerateEntitiesApplication {
sc.sequenceFile(sp, Text.class, Text.class)
.map(k -> new Tuple2<>(k._1().toString(), k._2().toString()))
.map(k -> convertToListOaf(k._1(), k._2(), code2name))
.flatMap(list -> list.iterator())
.map(
oaf ->
oaf.getClass().getSimpleName().toLowerCase()
+ "|"
+ convertToJson(oaf)));
.flatMap(list -> list.iterator()));
}
inputRdd.saveAsTextFile(targetPath, GzipCodec.class);
inputRdd.mapToPair(oaf -> new Tuple2<>(ModelSupport.idFn().apply(oaf), oaf))
.reduceByKey((o1, o2) -> merge(o1, o2))
.map(Tuple2::_2)
.map(
oaf ->
oaf.getClass().getSimpleName().toLowerCase()
+ "|"
+ OBJECT_MAPPER.writeValueAsString(oaf))
.saveAsTextFile(targetPath, GzipCodec.class);
}
private static Oaf merge(Oaf o1, Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
((OafEntity) o1).mergeFrom((OafEntity) o2);
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
}
return o1;
}
private static List<Oaf> convertToListOaf(
@ -120,9 +137,10 @@ public class GenerateEntitiesApplication {
return Arrays.asList(convertFromJson(s, Dataset.class));
case "software":
return Arrays.asList(convertFromJson(s, Software.class));
case "otherresearchproducts":
default:
case "otherresearchproduct":
return Arrays.asList(convertFromJson(s, OtherResearchProduct.class));
default:
throw new RuntimeException("type not managed: " + type.toLowerCase());
}
}
@ -150,17 +168,9 @@ public class GenerateEntitiesApplication {
return map;
}
private static String convertToJson(final Oaf oaf) {
try {
return new ObjectMapper().writeValueAsString(oaf);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private static Oaf convertFromJson(final String s, final Class<? extends Oaf> clazz) {
try {
return new ObjectMapper().readValue(s, clazz);
return OBJECT_MAPPER.readValue(s, clazz);
} catch (final Exception e) {
log.error("Error parsing object of class: " + clazz);
log.error(s);

View File

@ -1,7 +1,6 @@
package eu.dnetlib.dhp.oa.graph.raw;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.schema.common.ModelSupport.isSubClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
@ -10,7 +9,6 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@ -83,7 +81,9 @@ public class MergeClaimsApplication {
readFromPath(spark, rawPath, clazz)
.map(
(MapFunction<T, Tuple2<String, T>>)
value -> new Tuple2<>(idFn().apply(value), value),
value ->
new Tuple2<>(
ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
final JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
@ -92,14 +92,11 @@ public class MergeClaimsApplication {
.getValue()
.map(
(MapFunction<T, Tuple2<String, T>>)
value -> new Tuple2<>(idFn().apply(value), value),
value ->
new Tuple2<>(
ModelSupport.idFn().apply(value), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
/*
Dataset<Tuple2<String, T>> claim = readFromPath(spark, claimPath, clazz)
.map((MapFunction<T, Tuple2<String, T>>) value -> new Tuple2<>(idFn().apply(value), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(clazz)));
*/
raw.joinWith(claim, raw.col("_1").equalTo(claim.col("_1")), "full_outer")
.map(
(MapFunction<Tuple2<Tuple2<String, T>, Tuple2<String, T>>, T>)
@ -131,78 +128,12 @@ public class MergeClaimsApplication {
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
/*
return spark.read()
.load(path)
.as(Encoders.bean(clazz))
.filter((FilterFunction<T>) value -> Objects.nonNull(idFn().apply(value)));
*/
.filter(
(FilterFunction<T>)
value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
private static <T extends Oaf> Function<T, String> idFn() {
return x -> {
if (isSubClass(x, Relation.class)) {
return idFnForRelation(x);
}
return idFnForOafEntity(x);
};
}
private static <T extends Oaf> String idFnForRelation(T t) {
Relation r = (Relation) t;
return Optional.ofNullable(r.getSource())
.map(
source ->
Optional.ofNullable(r.getTarget())
.map(
target ->
Optional.ofNullable(r.getRelType())
.map(
relType ->
Optional.ofNullable(
r
.getSubRelType())
.map(
subRelType ->
Optional
.ofNullable(
r
.getRelClass())
.map(
relClass ->
String
.join(
source,
target,
relType,
subRelType,
relClass))
.orElse(
String
.join(
source,
target,
relType,
subRelType)))
.orElse(
String
.join(
source,
target,
relType)))
.orElse(
String.join(
source, target)))
.orElse(source))
.orElse(null);
}
private static <T extends Oaf> String idFnForOafEntity(T t) {
return ((OafEntity) t).getId();
}
}