diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/DedupLocalTestUtils.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/DedupLocalTestUtils.java
new file mode 100644
index 000000000..424895d11
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/DedupLocalTestUtils.java
@@ -0,0 +1,116 @@
+package eu.dnetlib.dhp.oa.dedup.local;
+
+public abstract class DedupLocalTestUtils {
+
+// public static String prepareTable(MapDocument doc) {
+//
+// String ret = "
";
+//
+// for(String fieldName: doc.getFieldMap().keySet()) {
+// if (doc.getFieldMap().get(fieldName).getType().equals(Type.String)) {
+// ret += "" + fieldName + " | " + doc.getFieldMap().get(fieldName).stringValue() + " |
";
+// }
+// else if (doc.getFieldMap().get(fieldName).getType().equals(Type.List)) {
+// ret += "" + fieldName + " | [" + ((FieldListImpl)doc.getFieldMap().get(fieldName)).stringList().stream().collect(Collectors.joining(";")) + "] |
";
+// }
+// }
+//
+// return ret + "
";
+//
+// }
+//
+// public static void prepareGraphParams(List vertexes, List> edgesTuple, String filePath, String templateFilePath, Map mapDocuments) {
+//
+// List nodes = vertexes.stream().map(v -> new Node(v.substring(3, 20).replaceAll("_", ""), vertexes.indexOf(v), prepareTable(mapDocuments.get(v)))).collect(Collectors.toList());
+// List edges = edgesTuple.stream().map(e -> new Edge(vertexes.indexOf(e._1()), vertexes.indexOf(e._2()))).collect(Collectors.toList());
+//
+// try(FileWriter fw = new FileWriter(filePath)) {
+// String fullText = IOUtils.toString(new FileReader(templateFilePath));
+//
+// String s = fullText
+// .replaceAll("%nodes%", new ObjectMapper().writeValueAsString(nodes))
+// .replaceAll("%edges%", new ObjectMapper().writeValueAsString(edges));
+//
+// IOUtils.write(s, fw);
+// } catch (IOException e) {
+// e.printStackTrace();
+// }
+//
+// }
+//
+// public static String getOrganizationLegalname(MapDocument mapDocument){
+// return mapDocument.getFieldMap().get("legalname").stringValue();
+// }
+//
+// public static String getJSONEntity(List entities, String id){
+//
+// for (String entity: entities) {
+// if(entity.contains(id))
+// return entity;
+// }
+// return "";
+// }
+
+}
+
+class Node{
+ String label;
+ int id;
+ String title;
+
+ public Node(String label, int id, String title) {
+ this.label = label;
+ this.id = id;
+ this.title = title;
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+}
+
+class Edge{
+ int from;
+ int to;
+
+ public Edge(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
+
+ public int getFrom() {
+ return from;
+ }
+
+ public void setFrom(int from) {
+ this.from = from;
+ }
+
+ public int getTo() {
+ return to;
+ }
+
+ public void setTo(int to) {
+ this.to = to;
+ }
+}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java
new file mode 100644
index 000000000..36f102ce9
--- /dev/null
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java
@@ -0,0 +1,563 @@
+package eu.dnetlib.dhp.oa.dedup.local;
+
+import com.google.common.collect.Lists;
+import eu.dnetlib.dhp.oa.dedup.DedupRecordFactory;
+import eu.dnetlib.dhp.oa.dedup.DedupUtility;
+import eu.dnetlib.dhp.schema.common.EntityType;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.*;
+import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.model.SparkDeduper;
+import eu.dnetlib.pace.tree.support.TreeProcessor;
+import eu.dnetlib.pace.util.MapDocumentUtil;
+import org.apache.commons.beanutils.BeanUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.*;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.expressions.UserDefinedFunction;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.platform.commons.util.StringUtils;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.spark_project.guava.hash.Hashing;
+import scala.Tuple2;
+import com.kwartile.lib.cc.ConnectedComponent;
+import scala.Tuple3;
+import scala.collection.JavaConversions;
+
+import java.awt.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static eu.dnetlib.dhp.oa.dedup.DedupRecordFactory.*;
+import static eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord.ROOT_TRUST;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.DNET_PROVENANCE_ACTIONS;
+import static eu.dnetlib.dhp.schema.common.ModelConstants.PROVENANCE_DEDUP;
+
+@ExtendWith(MockitoExtension.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SparkDedupLocalTest extends DedupLocalTestUtils {
+
+ static SparkSession spark;
+ static DedupConfig config;
+ static JavaSparkContext context;
+
+ final static String entitiesPath;
+
+ static {
+ try {
+ entitiesPath = Paths
+ .get(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/entities/publication").toURI())
+ .toFile()
+ .getAbsolutePath();
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ final String dedupConfPath = Paths
+ .get(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json").toURI())
+ .toFile()
+ .getAbsolutePath();
+ final static String subEntity = "publication";
+ final static String entity = "result";
+
+ final static String workingPath = "/tmp/working_dir";
+ final static String numPartitions = "20";
+
+ final static int MAX_ACCEPTANCE_DATE = 20;
+
+ public static DataInfo dataInfo;
+
+ private static SparkDeduper deduper;
+
+ final static String simRelsPath = workingPath + "/simrels";
+ final static String mergeRelsPath = workingPath + "/mergerels";
+ final static String groupEntityPath = workingPath + "/groupentities";
+
+ final static String groundTruthFieldJPath = "$.orcid";
+
+ public SparkDedupLocalTest() throws URISyntaxException {
+ }
+
+ public static void cleanup() throws IOException {
+ //remove directories to clean workspace
+ FileUtils.deleteDirectory(new File(simRelsPath));
+ FileUtils.deleteDirectory(new File(mergeRelsPath));
+ FileUtils.deleteDirectory(new File(groupEntityPath));
+ }
+
+ @BeforeAll
+ public void setup() throws IOException {
+
+ cleanup();
+
+ config = DedupConfig.load(readFileFromHDFS(dedupConfPath));
+
+ spark = SparkSession
+ .builder()
+ .appName("Deduplication")
+ .master("local[*]")
+ .getOrCreate();
+ context = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ deduper = new SparkDeduper(config);
+
+ dataInfo = getDataInfo(config);
+ }
+
+ @AfterAll
+ public static void finalCleanUp() throws IOException {
+ cleanup();
+ }
+
+ protected static String readFileFromHDFS(String filePath) throws IOException {
+
+ Path path=new Path(filePath);
+ FileSystem fs = FileSystem.get(new Configuration());
+ BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
+ try {
+ return String.join("", br.lines().collect(Collectors.toList()));
+ } finally {
+ br.close();
+ }
+ }
+
+ @Test //full deduplication workflow test
+ @Disabled
+ public void deduplicationTest() {
+
+ long before_simrels = System.currentTimeMillis();
+
+ Dataset> simRels = spark
+ .read()
+ .textFile(entitiesPath)
+ .transform(deduper.model().parseJsonDataset())
+ .transform(deduper.dedup())
+ .distinct()
+ .map((MapFunction) t ->
+ DedupUtility.createSimRel(t.getStruct(0).getString(0), t.getStruct(0).getString(1), entity), Encoders.bean(Relation.class)
+ );
+
+ long simrels_time = System.currentTimeMillis() - before_simrels;
+
+ long simrels_number = simRels.count();
+
+ long before_mergerels = System.currentTimeMillis();
+
+ UserDefinedFunction hashUDF = functions
+ .udf(
+ (String s) -> hash(s), DataTypes.LongType);
+
+ //
+ Dataset vertexIdMap = simRels
+ .select("source", "target")
+ .selectExpr("source as id")
+ .union(simRels.selectExpr("target as id"))
+ .distinct()
+ .withColumn("vertexId", hashUDF.apply(functions.col("id")));
+
+ // transform simrels into pairs of numeric ids
+ final Dataset edges = simRels
+ .select("source", "target")
+ .withColumn("source", hashUDF.apply(functions.col("source")))
+ .withColumn("target", hashUDF.apply(functions.col("target")));
+
+ // resolve connected components
+ // ("vertexId", "groupId")
+ Dataset cliques = ConnectedComponent
+ .runOnPairs(edges, 50, spark);
+
+ // transform "vertexId" back to its original string value
+ // groupId is kept numeric as its string value is not used
+ // ("id", "groupId")
+ Dataset mergeRels = cliques
+ .join(vertexIdMap, JavaConversions.asScalaBuffer(Collections.singletonList("vertexId")), "inner")
+ .drop("vertexId")
+ .distinct()
+ .flatMap((FlatMapFunction) (Row r) -> {
+ ArrayList res = new ArrayList<>();
+
+ String id = r.getAs("id");
+ String groupId = r.getAs("groupId").toString();
+ res.add(rel(null, groupId, id, ModelConstants.MERGES, config));
+ res.add(rel(null, id, groupId, ModelConstants.IS_MERGED_IN, config));
+
+ return res.iterator();
+ }, Encoders.bean(Relation.class));
+
+
+ long mergerels_time = System.currentTimeMillis() - before_mergerels;
+
+ long mergerels_number = mergeRels.count();
+
+ long before_dedupentity = System.currentTimeMillis();
+
+ final Class clazz = ModelSupport.entityTypes.get(EntityType.valueOf(subEntity));
+ final Encoder beanEncoder = Encoders.bean(clazz);
+ final Encoder kryoEncoder = Encoders.kryo(clazz);
+
+ Dataset entities = spark
+ .read()
+ .schema(Encoders.bean(clazz).schema())
+ .json(entitiesPath)
+ .as(beanEncoder)
+ .map(
+ (MapFunction>) entity -> {
+ return new Tuple2<>(entity.getId(), entity);
+ },
+ Encoders.tuple(Encoders.STRING(), kryoEncoder))
+ .selectExpr("_1 AS id", "_2 AS kryoObject");
+
+ //