From 56224e034ab1b19bfc21d5dab38ea49d6da62529 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 28 Oct 2024 12:05:56 +0100 Subject: [PATCH] Fill the new mergedIds field when generating dedup records Filter out dedup records composed of invisible records only Filter out mergerels that have not been used when creating the dedup record (ungrouping of cliques) --- .../dhp/oa/dedup/DedupRecordFactory.java | 15 ++++++-- .../dhp/oa/dedup/SparkCreateDedupRecord.java | 36 +++++++++++++++++-- pom.xml | 2 +- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 36ed4d7c1..44482cfdb 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -2,14 +2,13 @@ package eu.dnetlib.dhp.oa.dedup; import java.util.*; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; import eu.dnetlib.dhp.oa.dedup.model.Identifier; @@ -107,6 +106,8 @@ public class DedupRecordFactory { final HashSet acceptanceDate = new HashSet<>(); + boolean isVisible = false; + while (it.hasNext()) { Tuple3 t = it.next(); OafEntity entity = t._3(); @@ -114,6 +115,7 @@ public class DedupRecordFactory { if (entity == null) { aliases.add(t._2()); } else { + isVisible = isVisible || !entity.getDataInfo().getInvisible(); cliques.add(entity); if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { @@ -129,13 +131,20 @@ public class DedupRecordFactory { } - if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) { + if (!isVisible || acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) { return Collections.emptyIterator(); } OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator()); // dedup records do not have date of transformation attribute mergedEntity.setDateoftransformation(null); + mergedEntity + .setMergedIds( + Stream + .concat(cliques.stream().map(OafEntity::getId), aliases.stream()) + .distinct() + .sorted() + .collect(Collectors.toList())); return Stream .concat( diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 6989ec54b..6f5f40e43 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -5,11 +5,11 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTION import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,6 +17,7 @@ import org.xml.sax.SAXException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; @@ -25,6 +26,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; +import scala.collection.JavaConverters; public class SparkCreateDedupRecord extends AbstractSparkAction { @@ -85,6 +87,36 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); + + log.info("Updating mergerels for: '{}'", subEntity); + final Dataset dedupIds = spark + .read() + .schema("`id` STRING, `mergedIds` ARRAY") + .json(outputPath) + .selectExpr("id as source", "explode(mergedIds) as target"); + spark + .read() + .load(mergeRelPath) + .where("relClass == 'merges'") + .join(dedupIds, JavaConverters.asScalaBuffer(Arrays.asList("source", "target")).toSeq(), "left_semi") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .save(workingPath + "/mergerel_filtered"); + + final Dataset validRels = spark.read().load(workingPath + "/mergerel_filtered"); + + final Dataset filteredMergeRels = validRels + .union( + validRels + .withColumnRenamed("source", "source_tmp") + .withColumnRenamed("target", "target_tmp") + .withColumn("relClass", functions.lit(ModelConstants.IS_MERGED_IN)) + .withColumnRenamed("target_tmp", "source") + .withColumnRenamed("source_tmp", "target")); + + saveParquet(filteredMergeRels, mergeRelPath, SaveMode.Overwrite); + removeOutputDir(spark, workingPath + "/mergerel_filtered"); } } diff --git a/pom.xml b/pom.xml index e1d99f25b..9480ddfc0 100644 --- a/pom.xml +++ b/pom.xml @@ -937,7 +937,7 @@ 1.1.3 1.7 1.0.7 - [8.0.1] + [9.0.0] cdh5.9.2 3.5 11.0.2