diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 708d67f6ea..68211fd28e 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -6,10 +6,12 @@ import java.io.Serializable; import java.io.StringReader; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; @@ -23,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; @@ -117,4 +120,26 @@ abstract class AbstractSparkAction implements Serializable { .map(p -> p.getValue() + TYPE_VALUE_SEPARATOR + p.getQualifier().getClassid()) .collect(Collectors.joining(SP_SEPARATOR)); } + + protected static MapFunction patchRelFn() { + return value -> { + final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); + if (rel.getDataInfo() == null) { + rel.setDataInfo(new DataInfo()); + } + return rel; + }; + } + + protected boolean isOpenorgs(Relation rel) { + return Optional + .ofNullable(rel.getCollectedfrom()) + .map( + c -> c + .stream() + .filter(kv -> kv.getValue().equals(ModelConstants.OPENORGS_NAME)) + .findFirst() + .isPresent()) + .orElse(false); + } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java index 4ee8a08da9..5806e9fa45 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupUtility.java @@ -2,14 +2,8 @@ package eu.dnetlib.dhp.oa.dedup; import java.io.StringReader; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.text.Normalizer; import java.util.*; -import java.util.stream.Collectors; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkContext; import org.apache.spark.util.LongAccumulator; import org.dom4j.Document; @@ -18,21 +12,19 @@ import org.dom4j.Element; import org.dom4j.io.SAXReader; import com.google.common.collect.Sets; -import com.wcohen.ss.JaroWinkler; -import eu.dnetlib.dhp.schema.oaf.Author; -import eu.dnetlib.dhp.schema.oaf.StructuredProperty; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; import eu.dnetlib.pace.config.DedupConfig; import eu.dnetlib.pace.model.MapDocument; -import eu.dnetlib.pace.model.Person; -import scala.Tuple2; public class DedupUtility { + public static final String OPENORGS_ID_PREFIX = "openorgs____"; + public static final String CORDA_ID_PREFIX = "corda"; + public static Map constructAccumulator( final DedupConfig dedupConf, final SparkContext context) { @@ -134,4 +126,24 @@ public class DedupUtility { dedupConfig.getWf().setConfigurationId(actionSetId); return dedupConfig; } + + public static int compareOpenOrgIds(String o1, String o2) { + if (o1.contains(OPENORGS_ID_PREFIX) && o2.contains(OPENORGS_ID_PREFIX)) + return o1.compareTo(o2); + if (o1.contains(CORDA_ID_PREFIX) && o2.contains(CORDA_ID_PREFIX)) + return o1.compareTo(o2); + + if (o1.contains(OPENORGS_ID_PREFIX)) + return -1; + if (o2.contains(OPENORGS_ID_PREFIX)) + return 1; + + if (o1.contains(CORDA_ID_PREFIX)) + return -1; + if (o2.contains(CORDA_ID_PREFIX)) + return 1; + + return o1.compareTo(o2); + } + } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java index 7984f0104b..543558f367 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgs.java @@ -94,7 +94,7 @@ public class SparkCopyOpenorgs extends AbstractSparkAction { log.info("Number of organization entities processed: {}", entities.count()); - entities = entities.filter(entities.col("id").contains("openorgs____")); + entities = entities.filter(entities.col("id").contains(DedupUtility.OPENORGS_ID_PREFIX)); log.info("Number of Openorgs organization entities: {}", entities.count()); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java index 00036ebb09..dd74677bc9 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -124,35 +124,10 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { .parquet(outputPath); } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } - private boolean filterOpenorgsRels(Relation rel) { - - if (rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO) + return rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO) && rel.getRelType().equals(ModelConstants.ORG_ORG_RELTYPE) - && rel.getSubRelType().equals(ModelConstants.DEDUP)) - return true; - return false; - } - - private boolean isOpenorgs(Relation rel) { - - if (rel.getCollectedfrom() != null) { - for (KeyValue k : rel.getCollectedfrom()) { - if (k.getValue() != null && k.getValue().equals(ModelConstants.OPENORGS_NAME)) { - return true; - } - } - } - return false; + && rel.getSubRelType().equals(ModelConstants.DEDUP); } private Relation rel(String source, String target, String relClass, DedupConfig dedupConf) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java index 91229fe532..0aaa1e6622 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java @@ -81,34 +81,10 @@ public class SparkCopyOpenorgsSimRels extends AbstractSparkAction { log.info("Copied " + rawRels.count() + " Similarity Relations"); } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } - private boolean filterOpenorgsRels(Relation rel) { - - if (rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO) + return rel.getRelClass().equals(ModelConstants.IS_SIMILAR_TO) && rel.getRelType().equals(ModelConstants.ORG_ORG_RELTYPE) - && rel.getSubRelType().equals(ModelConstants.DEDUP) && isOpenorgs(rel)) - return true; - return false; + && rel.getSubRelType().equals(ModelConstants.DEDUP) && isOpenorgs(rel); } - private boolean isOpenorgs(Relation rel) { - - if (rel.getCollectedfrom() != null) { - for (KeyValue k : rel.getCollectedfrom()) { - if (k.getValue() != null && k.getValue().equals(ModelConstants.OPENORGS_NAME)) { - return true; - } - } - } - return false; - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java index 71bab79d00..9ece438912 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -86,25 +86,4 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } - - private boolean isOpenorgs(Relation rel) { - - if (rel.getCollectedfrom() != null) { - for (KeyValue k : rel.getCollectedfrom()) { - if (k.getValue() != null && k.getValue().equals("OpenOrgs Database")) { - return true; - } - } - } - return false; - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java index ac51f31572..94aab20cce 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -127,7 +127,7 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { .filter(r -> filterRels(r, "organization")) // take the worst id of the diffrel: .mapToPair(rel -> { - if (compareIds(rel.getSource(), rel.getTarget()) > 0) + if (DedupUtility.compareOpenOrgIds(rel.getSource(), rel.getTarget()) > 0) return new Tuple2<>(rel.getSource(), "diffRel"); else return new Tuple2<>(rel.getTarget(), "diffRel"); @@ -200,35 +200,6 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { } } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } - - public static int compareIds(String o1, String o2) { - if (o1.contains("openorgs____") && o2.contains("openorgs____")) - return o1.compareTo(o2); - if (o1.contains("corda") && o2.contains("corda")) - return o1.compareTo(o2); - - if (o1.contains("openorgs____")) - return -1; - if (o2.contains("openorgs____")) - return 1; - - if (o1.contains("corda")) - return -1; - if (o2.contains("corda")) - return 1; - - return o1.compareTo(o2); - } - private static boolean filterRels(Relation rel, String entityType) { switch (entityType) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index f1de3609b8..53e6724bae 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -14,7 +14,6 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; -import org.apache.spark.util.LongAccumulator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,9 +23,7 @@ import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel; import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Organization; -import eu.dnetlib.dhp.schema.oaf.Relation; +import eu.dnetlib.dhp.schema.oaf.*; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import scala.Tuple2; @@ -35,8 +32,6 @@ import scala.Tuple3; public class SparkPrepareOrgRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class); - public static final String OPENORGS_ID_PREFIX = "openorgs____"; - public static final String CORDA_ID_PREFIX = "corda"; public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) { super(parser, spark); @@ -141,7 +136,7 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { .filter(r -> filterRels(r, "organization")) // put the best id as source of the diffrel: .map(rel -> { - if (compareIds(rel.getSource(), rel.getTarget()) < 0) + if (DedupUtility.compareOpenOrgIds(rel.getSource(), rel.getTarget()) < 0) return new Tuple2<>(new Tuple2<>(rel.getSource(), rel.getTarget()), "diffRel"); else return new Tuple2<>(new Tuple2<>(rel.getTarget(), rel.getSource()), "diffRel"); @@ -216,17 +211,20 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { .joinWith(entities, relations.col("_2").equalTo(entities.col("_1")), "inner") .map( (MapFunction, Tuple2>, OrgSimRel>) r -> { - + final Organization o = r._2()._2(); return new OrgSimRel( r._1()._1(), - r._2()._2().getOriginalId().get(0), - r._2()._2().getLegalname() != null ? r._2()._2().getLegalname().getValue() : "", - r._2()._2().getLegalshortname() != null ? r._2()._2().getLegalshortname().getValue() : "", - r._2()._2().getCountry() != null ? r._2()._2().getCountry().getClassid() : "", - r._2()._2().getWebsiteurl() != null ? r._2()._2().getWebsiteurl().getValue() : "", - r._2()._2().getCollectedfrom().get(0).getValue(), + o.getOriginalId().get(0), + Optional.ofNullable(o.getLegalname()).map(Field::getValue).orElse(""), + Optional.ofNullable(o.getLegalshortname()).map(Field::getValue).orElse(""), + Optional.ofNullable(o.getCountry()).map(Qualifier::getClassid).orElse(""), + Optional.ofNullable(o.getWebsiteurl()).map(Field::getValue).orElse(""), + Optional + .ofNullable(o.getCollectedfrom()) + .map(c -> Optional.ofNullable(c.get(0)).map(KeyValue::getValue).orElse("")) + .orElse(""), r._1()._3(), - structuredPropertyListToString(r._2()._2().getPid())); + structuredPropertyListToString(o.getPid())); }, Encoders.bean(OrgSimRel.class)) .map( @@ -245,28 +243,9 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { } - public static int compareIds(String o1, String o2) { - if (o1.contains(OPENORGS_ID_PREFIX) && o2.contains(OPENORGS_ID_PREFIX)) - return o1.compareTo(o2); - if (o1.contains(CORDA_ID_PREFIX) && o2.contains(CORDA_ID_PREFIX)) - return o1.compareTo(o2); - - if (o1.contains(OPENORGS_ID_PREFIX)) - return -1; - if (o2.contains(OPENORGS_ID_PREFIX)) - return 1; - - if (o1.contains(CORDA_ID_PREFIX)) - return -1; - if (o2.contains(CORDA_ID_PREFIX)) - return 1; - - return o1.compareTo(o2); - } - // Sort IDs basing on the type. Priority: 1) openorgs, 2)corda, 3)alphabetic public static List sortIds(List ids) { - ids.sort((o1, o2) -> compareIds(o1, o2)); + ids.sort((o1, o2) -> DedupUtility.compareOpenOrgIds(o1, o2)); return ids; } @@ -301,7 +280,7 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { for (String id1 : g._2()) { for (String id2 : g._2()) { if (!id1.equals(id2)) - if (id1.contains(OPENORGS_ID_PREFIX) && !id2.contains("openorgsmesh")) + if (id1.contains(DedupUtility.OPENORGS_ID_PREFIX) && !id2.contains("openorgsmesh")) rels.add(new Tuple2<>(id1, id2)); } } @@ -340,13 +319,4 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index f69e9b5f3a..8e6e79eb32 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -142,16 +142,6 @@ public class SparkPropagateRelation extends AbstractSparkAction { StringUtils.isNotBlank(r.getRelClass()); } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } - private static String getId(Relation r, FieldType type) { switch (type) { case SOURCE: diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml index d00ebad6c3..060d979dae 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-dedup-openaire/src/main/resources/eu/dnetlib/dhp/oa/dedup/openorgs/oozie_app/workflow.xml @@ -111,7 +111,7 @@ --isLookUpUrl${isLookUpUrl} --actionSetId${actionSetIdOpenorgs} --workingPath${workingPath} - --numPartitions8000 + --numPartitions1000 @@ -139,7 +139,7 @@ --isLookUpUrl${isLookUpUrl} --workingPath${workingPath} --actionSetId${actionSetIdOpenorgs} - --numPartitions8000 + --numPartitions1000 diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java index 7aaed3de7d..419be1da31 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java +++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/SparkOpenorgsTest.java @@ -3,7 +3,6 @@ package eu.dnetlib.dhp.oa.dedup; import static java.nio.file.Files.createTempDirectory; -import static org.apache.spark.sql.functions.count; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.lenient; @@ -16,14 +15,7 @@ import java.nio.file.Paths; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FilterFunction; -import org.apache.spark.api.java.function.ForeachFunction; -import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; @@ -35,12 +27,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.oaf.DataInfo; -import eu.dnetlib.dhp.schema.oaf.Relation; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import jdk.nashorn.internal.ir.annotations.Ignore; @ExtendWith(MockitoExtension.class) public class SparkOpenorgsTest implements Serializable { @@ -224,13 +212,4 @@ public class SparkOpenorgsTest implements Serializable { FileUtils.deleteDirectory(new File(testDedupGraphBasePath)); } - private static MapFunction patchRelFn() { - return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); - } - return rel; - }; - } }