diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java index 36ed4d7c1..44482cfdb 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/DedupRecordFactory.java @@ -2,14 +2,13 @@ package eu.dnetlib.dhp.oa.dedup; import java.util.*; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapGroupsFunction; import org.apache.spark.api.java.function.MapFunction; -import org.apache.spark.api.java.function.ReduceFunction; import org.apache.spark.sql.*; import eu.dnetlib.dhp.oa.dedup.model.Identifier; @@ -107,6 +106,8 @@ public class DedupRecordFactory { final HashSet acceptanceDate = new HashSet<>(); + boolean isVisible = false; + while (it.hasNext()) { Tuple3 t = it.next(); OafEntity entity = t._3(); @@ -114,6 +115,7 @@ public class DedupRecordFactory { if (entity == null) { aliases.add(t._2()); } else { + isVisible = isVisible || !entity.getDataInfo().getInvisible(); cliques.add(entity); if (acceptanceDate.size() < MAX_ACCEPTANCE_DATE) { @@ -129,13 +131,20 @@ public class DedupRecordFactory { } - if (acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) { + if (!isVisible || acceptanceDate.size() >= MAX_ACCEPTANCE_DATE || cliques.isEmpty()) { return Collections.emptyIterator(); } OafEntity mergedEntity = MergeUtils.mergeGroup(dedupId, cliques.iterator()); // dedup records do not have date of transformation attribute mergedEntity.setDateoftransformation(null); + mergedEntity + .setMergedIds( + Stream + .concat(cliques.stream().map(OafEntity::getId), aliases.stream()) + .distinct() + .sorted() + .collect(Collectors.toList())); return Stream .concat( diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java index 6989ec54b..6f5f40e43 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateDedupRecord.java @@ -5,11 +5,11 @@ import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTION import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; import java.io.IOException; +import java.util.Arrays; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,6 +17,7 @@ import org.xml.sax.SAXException; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelConstants; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.DataInfo; import eu.dnetlib.dhp.schema.oaf.OafEntity; @@ -25,6 +26,7 @@ import eu.dnetlib.dhp.utils.ISLookupClientFactory; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; import eu.dnetlib.pace.config.DedupConfig; +import scala.collection.JavaConverters; public class SparkCreateDedupRecord extends AbstractSparkAction { @@ -85,6 +87,36 @@ public class SparkCreateDedupRecord extends AbstractSparkAction { .mode(SaveMode.Overwrite) .option("compression", "gzip") .json(outputPath); + + log.info("Updating mergerels for: '{}'", subEntity); + final Dataset dedupIds = spark + .read() + .schema("`id` STRING, `mergedIds` ARRAY") + .json(outputPath) + .selectExpr("id as source", "explode(mergedIds) as target"); + spark + .read() + .load(mergeRelPath) + .where("relClass == 'merges'") + .join(dedupIds, JavaConverters.asScalaBuffer(Arrays.asList("source", "target")).toSeq(), "left_semi") + .write() + .mode(SaveMode.Overwrite) + .option("compression", "gzip") + .save(workingPath + "/mergerel_filtered"); + + final Dataset validRels = spark.read().load(workingPath + "/mergerel_filtered"); + + final Dataset filteredMergeRels = validRels + .union( + validRels + .withColumnRenamed("source", "source_tmp") + .withColumnRenamed("target", "target_tmp") + .withColumn("relClass", functions.lit(ModelConstants.IS_MERGED_IN)) + .withColumnRenamed("target_tmp", "source") + .withColumnRenamed("source_tmp", "target")); + + saveParquet(filteredMergeRels, mergeRelPath, SaveMode.Overwrite); + removeOutputDir(spark, workingPath + "/mergerel_filtered"); } } diff --git a/pom.xml b/pom.xml index e1d99f25b..9480ddfc0 100644 --- a/pom.xml +++ b/pom.xml @@ -937,7 +937,7 @@ 1.1.3 1.7 1.0.7 - [8.0.1] + [9.0.0] cdh5.9.2 3.5 11.0.2