From 1287315ffb546397bcbcac588fd5b80a62cab665 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Mon, 11 Dec 2023 21:26:05 +0100 Subject: [PATCH] Do no longer use dedupId information from pivotHistory Database --- .../dhp/oa/dedup/SparkCreateMergeRels.java | 85 ++++++++++--------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java index 5bb132b899..46c29494ec 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.java @@ -1,24 +1,23 @@ package eu.dnetlib.dhp.oa.dedup; -import com.google.common.hash.Hashing; -import com.kwartile.lib.cc.ConnectedComponent; -import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelConstants; -import eu.dnetlib.dhp.schema.common.ModelSupport; -import eu.dnetlib.dhp.schema.oaf.*; -import eu.dnetlib.dhp.schema.oaf.utils.PidType; -import eu.dnetlib.dhp.utils.ISLookupClientFactory; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; -import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; -import eu.dnetlib.pace.config.DedupConfig; +import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; +import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; +import static org.apache.spark.sql.functions.*; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*; +import org.apache.spark.sql.Dataset; import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.expressions.Window; @@ -29,20 +28,23 @@ import org.dom4j.DocumentException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xml.sax.SAXException; + +import com.google.common.hash.Hashing; +import com.kwartile.lib.cc.ConnectedComponent; + +import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.schema.common.EntityType; +import eu.dnetlib.dhp.schema.common.ModelConstants; +import eu.dnetlib.dhp.schema.common.ModelSupport; +import eu.dnetlib.dhp.schema.oaf.*; +import eu.dnetlib.dhp.schema.oaf.utils.PidType; +import eu.dnetlib.dhp.utils.ISLookupClientFactory; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; +import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; +import eu.dnetlib.pace.config.DedupConfig; import scala.Tuple3; import scala.collection.JavaConversions; -import java.io.IOException; -import java.time.LocalDate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Optional; - -import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS; -import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP; -import static org.apache.spark.sql.functions.*; - public class SparkCreateMergeRels extends AbstractSparkAction { private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class); @@ -121,6 +123,7 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .distinct() .withColumn("vertexId", hashUDF.apply(functions.col("id"))); + // transform simrels into pairs of numeric ids final Dataset edges = spark .read() .load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)) @@ -128,27 +131,34 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .withColumn("source", hashUDF.apply(functions.col("source"))) .withColumn("target", hashUDF.apply(functions.col("target"))); + // resolve connected components + // ("vertexId", "groupId") Dataset cliques = ConnectedComponent .runOnPairs(edges, 50, spark); + // transform "vertexId" back to its original string value + // groupId is kept numeric as its string value is not used + // ("id", "groupId") Dataset rawMergeRels = cliques .join(vertexIdMap, JavaConversions.asScalaBuffer(Collections.singletonList("vertexId")), "inner") .drop("vertexId") .distinct(); + // empty dataframe if historydatabase is not used Dataset pivotHistory = spark .createDataset( Collections.emptyList(), RowEncoder - .apply(StructType.fromDDL("id STRING, firstUsage STRING, lastUsage STRING, dedupId STRING"))); + .apply(StructType.fromDDL("id STRING, lastUsage STRING"))); if (StringUtils.isNotBlank(pivotHistoryDatabase)) { pivotHistory = spark .read() .table(pivotHistoryDatabase + "." + subEntity) - .selectExpr("id", "lastUsage", "dedupId"); + .selectExpr("id", "lastUsage"); } + // depending on resulttype collectefrom and dateofacceptance are evaluated differently String collectedfromExpr = "false AS collectedfrom"; String dateExpr = "'' AS date"; @@ -164,8 +174,10 @@ public class SparkCreateMergeRels extends AbstractSparkAction { dateExpr = "dateofacceptance.value AS date"; } + // cap pidType at w3id as from there on they are considered equal UserDefinedFunction mapPid = udf( (String s) -> Math.min(PidType.tryValueOf(s).ordinal(), PidType.w3id.ordinal()), DataTypes.IntegerType); + UserDefinedFunction validDate = udf((String date) -> { if (StringUtils.isNotBlank(date) && date.matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(date)) { @@ -186,8 +198,6 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .withColumn("pidType", mapPid.apply(col("pidType"))) // ordinal of pid type .withColumn("date", validDate.apply(col("date"))); - UserDefinedFunction generateDedupId = udf((String s) -> IdGenerator.generate(s), DataTypes.StringType); - // ordering to selected pivot id WindowSpec w = Window .partitionBy("groupId") @@ -202,17 +212,15 @@ public class SparkCreateMergeRels extends AbstractSparkAction { .join(pivotHistory, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "full") .join(pivotingData, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left") .withColumn("pivot", functions.first("id").over(w)) - .withColumn("pivotDedupId", functions.first("dedupId").over(w)) .withColumn("position", functions.row_number().over(w)) - .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) - // .select("id", "groupId", "collectedfrom", "pivot", "dedupId", "pivotDedupId") - // .distinct() + .filter(cut > 0 ? col("position").lt(lit(cut)) : lit(true)) // apply cut after choosing pivot .flatMap( (FlatMapFunction>) (Row r) -> { String id = r.getAs("id"); + String dedupId = IdGenerator.generate(id); + String pivot = r.getAs("pivot"); - String pivotDedupId = r.getAs("pivotDedupId"); // dedupId associated with the pivot - String dedupId = r.getAs("dedupId"); // dedupId associated with this id if it was a pivot + String pivotDedupId = IdGenerator.generate(pivot); // filter out id == pivotDedupId // those are caused by claim expressed on pivotDedupId @@ -233,14 +241,9 @@ public class SparkCreateMergeRels extends AbstractSparkAction { return res.iterator(); } - // new pivot, assign pivotDedupId with current IdGenerator - if (StringUtils.isBlank(pivotDedupId)) { - pivotDedupId = IdGenerator.generate(pivot); - } - - // this was a pivot in a preceding graph but it has been merged into a new group with different + // this was a pivot in a previous graph but it has been merged into a new group with different // pivot - if (StringUtils.isNotBlank(dedupId) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { + if (!r.isNullAt(r.fieldIndex("lastUsage")) && !pivot.equals(id) && !dedupId.equals(pivotDedupId)) { // materialize the previous dedup record as a merge relation with the new one res.add(new Tuple3<>(dedupId, pivotDedupId, null)); }