diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java index f21b4d5b3b..248f32e547 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/common/AbstractPaceFunctions.java @@ -67,7 +67,7 @@ public abstract class AbstractPaceFunctions { } protected String cleanup(final String s) { - final String s1 = HTML_REGEX.matcher(s).replaceAll( ""); + final String s1 = HTML_REGEX.matcher(s).replaceAll(""); final String s2 = unicodeNormalization(s1.toLowerCase()); final String s3 = nfd(s2); final String s4 = fixXML(s3); diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala index 4a5c4e7aff..286b256efd 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/model/SparkDedupConfig.scala @@ -91,7 +91,7 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria } else { res } - }) + }).checkpoint() var relBlocks: Dataset[Row] = null @@ -178,8 +178,8 @@ case class SparkDedupConfig(conf: DedupConfig, numPartitions: Int) extends Seria val res = relBlocks.filter(col("match").equalTo(true)) .select(col("l.identifier").as("from"), col("r.identifier").as("to")) - .repartition() - .dropDuplicates() + //.repartition() + .distinct() // res.show(false) res.select(functions.struct("from", "to")) diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java index 1f6b76fe6a..3897e37f87 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/tree/JsonListMatch.java @@ -6,19 +6,19 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import com.jayway.jsonpath.Configuration; -import com.jayway.jsonpath.DocumentContext; -import com.jayway.jsonpath.Option; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.google.common.collect.Sets; +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.Option; import eu.dnetlib.pace.config.Config; import eu.dnetlib.pace.tree.support.AbstractListComparator; import eu.dnetlib.pace.tree.support.ComparatorClass; import eu.dnetlib.pace.util.MapDocumentUtil; -import com.jayway.jsonpath.JsonPath; @ComparatorClass("jsonListMatch") public class JsonListMatch extends AbstractListComparator { @@ -63,7 +63,9 @@ public class JsonListMatch extends AbstractListComparator { StringBuilder st = new StringBuilder(); // to build the string used for comparisons basing on the jpath into // parameters - final DocumentContext documentContext = JsonPath.using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)).parse(json); + final DocumentContext documentContext = JsonPath + .using(Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS)) + .parse(json); // for each path in the param list for (String key : params.keySet().stream().filter(k -> k.contains("jpath")).collect(Collectors.toList())) { String path = params.get(key); diff --git a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java index 95918a7c74..30606bb9b7 100644 --- a/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java +++ b/dhp-pace-core/src/main/java/eu/dnetlib/pace/util/BlockProcessor.java @@ -89,7 +89,7 @@ public class BlockProcessor { break; } - if (i > wf.getSlidingWindowSize()) { + if (++i > wf.getSlidingWindowSize()) { break; } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java index d9b71badd8..7c4ab52657 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCreateSimRels.java @@ -91,10 +91,10 @@ public class SparkCreateSimRels extends AbstractSparkAction { .textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)) .transform(sparkConfig.modelExtractor()) // Extract fields from input json column according to model // definition - .transform(sparkConfig.generateClustersWithDFAPI()) // generate pairs according to + .transform(sparkConfig.generateAndProcessClustersWithJoins()) // generate pairs according to // filters, clusters, and model // definition - .transform(sparkConfig.processClusters()) // process blocks and emits pairs of found + // .transform(sparkConfig.processClusters()) // process blocks and emits pairs of found // similarities .map( (MapFunction) t -> DedupUtility