diff --git a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
index d517cca00..5e5e42f1e 100644
--- a/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
+++ b/dhp-workflows/dhp-aggregation/src/test/java/eu/dnetlib/dhp/transformation/TransformationJobTest.java
@@ -89,6 +89,8 @@ public class TransformationJobTest {
"-rh", "",
"-ro", "",
"-rr", ""});
+
+
}
@Test
@@ -96,7 +98,7 @@ public class TransformationJobTest {
final String path = this.getClass().getResource("/eu/dnetlib/dhp/transform/mdstorenative").getFile();
System.out.println("path = " + path);
- Path tempDirWithPrefix = Files.createTempDirectory("mdsotre_output");
+ Path tempDirWithPrefix = Files.createTempDirectory("mdstore_output");
System.out.println(tempDirWithPrefix.toFile().getAbsolutePath());
diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml
new file mode 100644
index 000000000..28ef6a453
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ dhp-workflows
+ eu.dnetlib.dhp
+ 1.0.5-SNAPSHOT
+
+ 4.0.0
+
+ dhp-dedup
+
+
+
+
+ org.apache.spark
+ spark-core_2.11
+
+
+ org.apache.spark
+ spark-sql_2.11
+
+
+
+ eu.dnetlib.dhp
+ dhp-common
+ ${project.version}
+
+
+ eu.dnetlib.dhp
+ dhp-schemas
+ ${project.version}
+
+
+ com.arakelian
+ java-jq
+
+
+
+ eu.dnetlib
+ dnet-pace-core
+
+
+ org.apache.spark
+ spark-graphx_2.11
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
new file mode 100644
index 000000000..b65e866f1
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/DedupUtility.java
@@ -0,0 +1,94 @@
+package eu.dnetlib.dedup;
+
+import com.google.common.collect.Sets;
+import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.model.MapDocument;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class DedupUtility {
+
+ public static Map constructAccumulator(final DedupConfig dedupConf, final SparkContext context) {
+
+ Map accumulators = new HashMap<>();
+
+ String acc1 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "records per hash key = 1");
+ accumulators.put(acc1, context.longAccumulator(acc1));
+ String acc2 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField());
+ accumulators.put(acc2, context.longAccumulator(acc2));
+ String acc3 = String.format("%s::%s",dedupConf.getWf().getEntityType(), String.format("Skipped records for count(%s) >= %s", dedupConf.getWf().getOrderField(), dedupConf.getWf().getGroupMaxSize()));
+ accumulators.put(acc3, context.longAccumulator(acc3));
+ String acc4 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "skip list");
+ accumulators.put(acc4, context.longAccumulator(acc4));
+ String acc5 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "dedupSimilarity (x2)");
+ accumulators.put(acc5, context.longAccumulator(acc5));
+ String acc6 = String.format("%s::%s",dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold());
+ accumulators.put(acc6, context.longAccumulator(acc6));
+
+ return accumulators;
+ }
+
+ public static JavaRDD loadDataFromHDFS(String path, JavaSparkContext context) {
+ return context.textFile(path);
+ }
+
+ public static void deleteIfExists(String path) throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fileSystem = FileSystem.get(conf);
+ if (fileSystem.exists(new Path(path))){
+ fileSystem.delete(new Path(path), true);
+ }
+ }
+
+ public static DedupConfig loadConfigFromHDFS(String path) throws IOException {
+
+ Configuration conf = new Configuration();
+ FileSystem fileSystem = FileSystem.get(conf);
+ FSDataInputStream inputStream = new FSDataInputStream(fileSystem.open(new Path(path)));
+
+ return DedupConfig.load(IOUtils.toString(inputStream, StandardCharsets.UTF_8.name()));
+
+ }
+
+ static String readFromClasspath(final String filename, final Class clazz) {
+ final StringWriter sw = new StringWriter();
+ try {
+ IOUtils.copy(clazz.getResourceAsStream(filename), sw);
+ return sw.toString();
+ } catch (final IOException e) {
+ throw new RuntimeException("cannot load resource from classpath: " + filename);
+ }
+ }
+
+ static Set getGroupingKeys(DedupConfig conf, MapDocument doc) {
+ return Sets.newHashSet(BlacklistAwareClusteringCombiner.filterAndCombine(doc, conf));
+ }
+
+ public static String md5(final String s) {
+ try {
+ final MessageDigest md = MessageDigest.getInstance("MD5");
+ md.update(s.getBytes("UTF-8"));
+ return new String(Hex.encodeHex(md.digest()));
+ } catch (final Exception e) {
+ System.err.println("Error creating id");
+ return null;
+ }
+ }
+}
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/Deduper.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/Deduper.java
new file mode 100644
index 000000000..51b991da5
--- /dev/null
+++ b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/Deduper.java
@@ -0,0 +1,159 @@
+package eu.dnetlib.dedup;
+
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.model.Field;
+import eu.dnetlib.pace.model.MapDocument;
+import eu.dnetlib.pace.util.BlockProcessor;
+import eu.dnetlib.pace.util.MapDocumentUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.util.LongAccumulator;
+import scala.Serializable;
+import scala.Tuple2;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class Deduper implements Serializable {
+
+ private static final Log log = LogFactory.getLog(Deduper.class);
+
+ /**
+ * @return the list of relations generated by the deduplication
+ * @param: the spark context
+ * @param: list of JSON entities to be deduped
+ * @param: the dedup configuration
+ */
+ public static JavaPairRDD dedup(JavaSparkContext context, JavaRDD entities, DedupConfig config) {
+
+ Map accumulators = DedupUtility.constructAccumulator(config, context.sc());
+
+ //create vertexes of the graph:
+ JavaPairRDD mapDocs = mapToVertexes(context, entities, config);
+
+
+ //create blocks for deduplication
+ JavaPairRDD> blocks = createBlocks(context, mapDocs, config);
+
+ //create relations by comparing only elements in the same group
+ return computeRelations(context, blocks, config);
+
+// final RDD> edgeRdd = relationRDD.map(it -> new Edge<>(it._1().hashCode(), it._2().hashCode(), "equalTo")).rdd();
+//
+// RDD> vertexes = mapDocs.mapToPair((PairFunction, Object, MapDocument>) t -> new Tuple2