diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java index 00dfd215d..d16a3a63d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPropagateRelation.java @@ -8,6 +8,7 @@ import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.Relation; import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; @@ -20,6 +21,9 @@ import org.slf4j.LoggerFactory; import scala.Tuple2; import scala.Tuple3; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.Objects; import static org.apache.spark.sql.functions.col; @@ -80,68 +84,51 @@ public class SparkPropagateRelation extends AbstractSparkAction { .distinct() .cache(); - final String inputRelationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); + Dataset allRels = spark.read() + .schema(REL_BEAN_ENC.schema()) + .json(DedupUtility.createEntityPath(graphBasePath, "relation")); - Dataset rels = spark.read().schema(REL_BEAN_ENC.schema()).json(inputRelationPath) - .as(REL_BEAN_ENC) -// .map((MapFunction) rel -> { -// if (rel.getDataInfo() == null) { -// rel.setDataInfo(new DataInfo()); -// } -// return rel; -// }, REL_BEAN_ENC) - ; - - Dataset> dedupedRels = rels - .joinWith(mergedIds, rels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") + Dataset dedupedRels = allRels + .joinWith(mergedIds, allRels.col("source").equalTo(mergedIds.col("mergedObjectID")), "left_outer") .joinWith(mergedIds, col("_1.target").equalTo(mergedIds.col("mergedObjectID")), "left_outer") - .filter("_1._2 IS NOT NULL OR _2 IS NOT NULL") .select("_1._1", "_1._2.dedupID", "_2.dedupID") .as(Encoders.tuple(REL_BEAN_ENC, Encoders.STRING(), Encoders.STRING())) - .cache(); + .flatMap(SparkPropagateRelation::addInferredRelations, REL_KRYO_ENC); - mergedIds.unpersist(); + Dataset processedRelations = distinctRelations(dedupedRels.union(mergeRels.map((MapFunction) r -> r, REL_KRYO_ENC))) + .filter((FilterFunction) r -> !Objects.equals(r.getSource(), r.getTarget())); - Dataset newRels = dedupedRels - .map((MapFunction, Relation>) t -> { - Relation r = t._1(); - String newSource = t._2(); - String newTarget = t._3(); + save(processedRelations, outputRelationPath, SaveMode.Overwrite); + } - if (r.getDataInfo() == null) { - r.setDataInfo(new DataInfo()); - } - r.getDataInfo().setDeletedbyinference(false); + private static Iterator addInferredRelations(Tuple3 t) throws Exception { + Relation existingRel = t._1(); + String newSource = t._2(); + String newTarget = t._3(); - if (newSource != null) - r.setSource(newSource); + if (newSource == null && newTarget == null) { + return Collections.singleton(t._1()).iterator(); + } - if (newTarget != null) - r.setTarget(newTarget); + // update existing relation + if (existingRel.getDataInfo() == null) { + existingRel.setDataInfo(new DataInfo()); + } + existingRel.getDataInfo().setDeletedbyinference(true); - return r; - }, REL_BEAN_ENC) - .distinct(); + // Create new relation inferred by dedupIDs + Relation inferredRel = (Relation) BeanUtils.cloneBean(existingRel); - Dataset updated = dedupedRels - .map((MapFunction, Relation>) t -> { - Relation r = t._1(); - if (r.getDataInfo() == null) { - r.setDataInfo(new DataInfo()); - } - r.getDataInfo().setDeletedbyinference(true); - return r; - }, REL_BEAN_ENC); + inferredRel.setDataInfo((DataInfo) BeanUtils.cloneBean(existingRel.getDataInfo())); + inferredRel.getDataInfo().setDeletedbyinference(false); - save( - distinctRelations( - newRels - .union(updated) - .union(mergeRels) - .map((MapFunction) r -> r, REL_KRYO_ENC) - ) - .filter((FilterFunction) r -> !Objects.equals(r.getSource(), r.getTarget())), - outputRelationPath, SaveMode.Overwrite); + if (newSource != null) + inferredRel.setSource(newSource); + + if (newTarget != null) + inferredRel.setTarget(newTarget); + + return Arrays.asList(existingRel, inferredRel).iterator(); } private Dataset distinctRelations(Dataset rels) { @@ -156,8 +143,7 @@ public class SparkPropagateRelation extends AbstractSparkAction { return b; } ) - - .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); + .map((MapFunction, Relation>) Tuple2::_2, REL_BEAN_ENC); } private FilterFunction getRelationFilterFunction() {