diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java
index 4dc633d55..0f8350eee 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/jpath/JsonPathTest.java
@@ -18,12 +18,12 @@ class JsonPathTest {
@Test
void jsonToModelTest() throws IOException {
DedupConfig conf = DedupConfig
- .load(
- IOUtils
- .toString(
- SparkOpenorgsDedupTest.class
- .getResourceAsStream(
- "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
+ .load(
+ IOUtils
+ .toString(
+ SparkOpenorgsDedupTest.class
+ .getResourceAsStream(
+ "/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
final String org = IOUtils.toString(getClass().getResourceAsStream("organization_example1.json"));
@@ -58,7 +58,7 @@ class JsonPathTest {
void testJPath2() throws IOException {
DedupConfig conf = DedupConfig
- .load(IOUtils.toString(getClass().getResourceAsStream("dedup_conf_dataset.json")));
+ .load(IOUtils.toString(getClass().getResourceAsStream("dedup_conf_dataset.json")));
final String dat = IOUtils.toString(getClass().getResourceAsStream("dataset_example1.json"));
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/DedupLocalTestUtils.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/DedupLocalTestUtils.java
index e584142a9..1ab620062 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/DedupLocalTestUtils.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/DedupLocalTestUtils.java
@@ -1,12 +1,11 @@
+
package eu.dnetlib.dhp.oa.dedup.local;
-import com.cloudera.com.fasterxml.jackson.core.JsonFactory;
-import com.cloudera.com.fasterxml.jackson.databind.JsonNode;
-import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
-import eu.dnetlib.dhp.schema.oaf.OafEntity;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.pace.config.DedupConfig;
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections4.IteratorUtils;
import org.apache.commons.io.IOUtils;
@@ -16,178 +15,206 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.spark_project.guava.hash.Hashing;
+
+import com.cloudera.com.fasterxml.jackson.core.JsonFactory;
+import com.cloudera.com.fasterxml.jackson.databind.JsonNode;
+import com.cloudera.com.fasterxml.jackson.databind.ObjectMapper;
+
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.pace.config.DedupConfig;
import scala.collection.JavaConverters;
import scala.collection.convert.Wrappers;
import scala.collection.mutable.ArrayBuffer;
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
public abstract class DedupLocalTestUtils {
- public static String prepareTable(Row doc) {
- StringBuilder ret = new StringBuilder("
");
+ public static String prepareTable(Row doc) {
+ StringBuilder ret = new StringBuilder("");
- for(String fieldName: doc.schema().fieldNames()) {
- Object value = doc.getAs(fieldName);
- if(value.getClass() == String.class){
- ret.append("").append(fieldName).append(" | ").append(value).append(" |
");
- }
- else if(value.getClass() == Wrappers.JListWrapper.class) {
- List values = IteratorUtils.toList(JavaConverters.asJavaIteratorConverter(((Wrappers.JListWrapper) value).iterator()).asJava())
- .stream()
- .map(DedupLocalTestUtils::takeValue)
- .collect(Collectors.toList());
- ret.append("").append(fieldName).append(" | [").append(String.join(";", values)).append("] |
");
- }
- else if(value.getClass() == ArrayBuffer.class){
- List values = new ArrayList<>(IteratorUtils.toList(JavaConverters.asJavaIteratorConverter(((ArrayBuffer) value).iterator()).asJava()));
- ret.append("").append(fieldName).append(" | [").append(String.join(";", values)).append("] |
");
- }
+ for (String fieldName : doc.schema().fieldNames()) {
+ Object value = doc.getAs(fieldName);
+ if (value.getClass() == String.class) {
+ ret.append("").append(fieldName).append(" | ").append(value).append(" |
");
+ } else if (value.getClass() == Wrappers.JListWrapper.class) {
+ List values = IteratorUtils
+ .toList(
+ JavaConverters
+ .asJavaIteratorConverter(((Wrappers.JListWrapper) value).iterator())
+ .asJava())
+ .stream()
+ .map(DedupLocalTestUtils::takeValue)
+ .collect(Collectors.toList());
+ ret
+ .append("")
+ .append(fieldName)
+ .append(" | [")
+ .append(String.join(";", values))
+ .append("] |
");
+ } else if (value.getClass() == ArrayBuffer.class) {
+ List values = new ArrayList<>(IteratorUtils
+ .toList(JavaConverters.asJavaIteratorConverter(((ArrayBuffer) value).iterator()).asJava()));
+ ret
+ .append("")
+ .append(fieldName)
+ .append(" | [")
+ .append(String.join(";", values))
+ .append("] |
");
+ }
- }
+ }
- ret.append("
");
- return ret.toString();
+ ret.append("
");
+ return ret.toString();
- }
+ }
- protected static String fileToString(String filePath) throws IOException {
+ protected static String fileToString(String filePath) throws IOException {
- Path path=new Path(filePath);
- FileSystem fs = FileSystem.get(new Configuration());
- BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
- try {
- return String.join("", br.lines().collect(Collectors.toList()));
- } finally {
- br.close();
- }
- }
+ Path path = new Path(filePath);
+ FileSystem fs = FileSystem.get(new Configuration());
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
+ try {
+ return String.join("", br.lines().collect(Collectors.toList()));
+ } finally {
+ br.close();
+ }
+ }
- public static void prepareGraphParams(Dataset entities, Dataset simRels, String filePath, String templateFilePath) {
+ public static void prepareGraphParams(Dataset entities, Dataset simRels, String filePath,
+ String templateFilePath) {
- List vertexes = entities.toJavaRDD().map(r -> r.getAs("identifier").toString()).collect();
+ List vertexes = entities.toJavaRDD().map(r -> r.getAs("identifier").toString()).collect();
- List nodes = entities.toJavaRDD().map(e -> new Node(e.getAs("identifier").toString(), vertexes.indexOf(e.getAs("identifier").toString()), prepareTable(e))).collect();
+ List nodes = entities
+ .toJavaRDD()
+ .map(
+ e -> new Node(e.getAs("identifier").toString(), vertexes.indexOf(e.getAs("identifier").toString()),
+ prepareTable(e)))
+ .collect();
- List edges = simRels.toJavaRDD().collect().stream().map(sr -> new Edge(vertexes.indexOf(sr.getSource()), vertexes.indexOf(sr.getTarget()))).collect(Collectors.toList());
+ List edges = simRels
+ .toJavaRDD()
+ .collect()
+ .stream()
+ .map(sr -> new Edge(vertexes.indexOf(sr.getSource()), vertexes.indexOf(sr.getTarget())))
+ .collect(Collectors.toList());
- try(FileWriter fw = new FileWriter(filePath)) {
- String fullText = IOUtils.toString(new FileReader(templateFilePath));
+ try (FileWriter fw = new FileWriter(filePath)) {
+ String fullText = IOUtils.toString(new FileReader(templateFilePath));
- String s = fullText
- .replaceAll("%nodes%", new ObjectMapper().writeValueAsString(nodes))
- .replaceAll("%edges%", new ObjectMapper().writeValueAsString(edges));
+ String s = fullText
+ .replaceAll("%nodes%", new ObjectMapper().writeValueAsString(nodes))
+ .replaceAll("%edges%", new ObjectMapper().writeValueAsString(edges));
- IOUtils.write(s, fw);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ IOUtils.write(s, fw);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
- }
+ }
- public static long hash(final String id) {
- return Hashing.murmur3_128().hashString(id).asLong();
- }
+ public static long hash(final String id) {
+ return Hashing.murmur3_128().hashString(id).asLong();
+ }
- public static Relation createRel(String source, String target, String relClass, DedupConfig dedupConf) {
+ public static Relation createRel(String source, String target, String relClass, DedupConfig dedupConf) {
- String entityType = dedupConf.getWf().getEntityType();
+ String entityType = dedupConf.getWf().getEntityType();
- Relation r = new Relation();
- r.setSource(source);
- r.setTarget(target);
- r.setRelClass(relClass);
- r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
- r.setSubRelType(ModelConstants.DEDUP);
- return r;
- }
+ Relation r = new Relation();
+ r.setSource(source);
+ r.setTarget(target);
+ r.setRelClass(relClass);
+ r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
+ r.setSubRelType(ModelConstants.DEDUP);
+ return r;
+ }
- public static OafEntity createOafEntity(String id, OafEntity base, long ts) {
- try {
- OafEntity res = (OafEntity) BeanUtils.cloneBean(base);
- res.setId(id);
- res.setLastupdatetimestamp(ts);
- return res;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ public static OafEntity createOafEntity(String id, OafEntity base, long ts) {
+ try {
+ OafEntity res = (OafEntity) BeanUtils.cloneBean(base);
+ res.setId(id);
+ res.setLastupdatetimestamp(ts);
+ return res;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- public static String takeValue(String json) {
- ObjectMapper mapper = new ObjectMapper(new JsonFactory());
- try {
- JsonNode rootNode = mapper.readTree(json);
- return rootNode.get("value").toString().replaceAll("\"", "");
+ public static String takeValue(String json) {
+ ObjectMapper mapper = new ObjectMapper(new JsonFactory());
+ try {
+ JsonNode rootNode = mapper.readTree(json);
+ return rootNode.get("value").toString().replaceAll("\"", "");
- } catch (Exception e) {
- return json;
- }
+ } catch (Exception e) {
+ return json;
+ }
- }
+ }
}
-class Node implements Serializable{
- String label;
- int id;
- String title;
+class Node implements Serializable {
+ String label;
+ int id;
+ String title;
- public Node(String label, int id, String title) {
- this.label = label;
- this.id = id;
- this.title = title;
- }
+ public Node(String label, int id, String title) {
+ this.label = label;
+ this.id = id;
+ this.title = title;
+ }
- public String getLabel() {
- return label;
- }
+ public String getLabel() {
+ return label;
+ }
- public void setLabel(String label) {
- this.label = label;
- }
+ public void setLabel(String label) {
+ this.label = label;
+ }
- public int getId() {
- return id;
- }
+ public int getId() {
+ return id;
+ }
- public void setId(int id) {
- this.id = id;
- }
+ public void setId(int id) {
+ this.id = id;
+ }
- public String getTitle() {
- return title;
- }
+ public String getTitle() {
+ return title;
+ }
- public void setTitle(String title) {
- this.title = title;
- }
+ public void setTitle(String title) {
+ this.title = title;
+ }
}
-class Edge implements Serializable{
- int from;
- int to;
+class Edge implements Serializable {
+ int from;
+ int to;
- public Edge(int from, int to) {
- this.from = from;
- this.to = to;
- }
+ public Edge(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
- public int getFrom() {
- return from;
- }
+ public int getFrom() {
+ return from;
+ }
- public void setFrom(int from) {
- this.from = from;
- }
+ public void setFrom(int from) {
+ this.from = from;
+ }
- public int getTo() {
- return to;
- }
+ public int getTo() {
+ return to;
+ }
- public void setTo(int to) {
- this.to = to;
- }
+ public void setTo(int to) {
+ this.to = to;
+ }
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java
index 490b55481..662c468e0 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/test/java/eu/dnetlib/dhp/oa/dedup/local/SparkDedupLocalTest.java
@@ -1,19 +1,15 @@
+
package eu.dnetlib.dhp.oa.dedup.local;
-import com.google.common.collect.Lists;
-import com.kwartile.lib.cc.ConnectedComponent;
-import eu.dnetlib.dhp.oa.dedup.DedupUtility;
-import eu.dnetlib.dhp.schema.common.EntityType;
-import eu.dnetlib.dhp.schema.common.ModelConstants;
-import eu.dnetlib.dhp.schema.common.ModelSupport;
-import eu.dnetlib.dhp.schema.oaf.OafEntity;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.dhp.schema.oaf.Result;
-import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
-import eu.dnetlib.pace.config.DedupConfig;
-import eu.dnetlib.pace.model.SparkDeduper;
-import eu.dnetlib.pace.model.SparkModel;
-import eu.dnetlib.pace.tree.support.TreeProcessor;
+import java.awt.*;
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.List;
+import java.util.stream.Stream;
+
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
@@ -26,281 +22,309 @@ import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.google.common.collect.Lists;
+import com.kwartile.lib.cc.ConnectedComponent;
+
+import eu.dnetlib.dhp.oa.dedup.DedupUtility;
+import eu.dnetlib.dhp.schema.common.EntityType;
+import eu.dnetlib.dhp.schema.common.ModelConstants;
+import eu.dnetlib.dhp.schema.common.ModelSupport;
+import eu.dnetlib.dhp.schema.oaf.OafEntity;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.Result;
+import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
+import eu.dnetlib.pace.config.DedupConfig;
+import eu.dnetlib.pace.model.SparkDeduper;
+import eu.dnetlib.pace.model.SparkModel;
+import eu.dnetlib.pace.tree.support.TreeProcessor;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.JavaConversions;
import scala.collection.mutable.WrappedArray;
-import java.awt.*;
-import java.io.File;
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.*;
-import java.util.stream.Stream;
-
@ExtendWith(MockitoExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SparkDedupLocalTest extends DedupLocalTestUtils {
- static SparkSession spark;
- static DedupConfig config;
- static JavaSparkContext context;
+ static SparkSession spark;
+ static DedupConfig config;
+ static JavaSparkContext context;
+
+ final String entitiesPath = Paths
+ .get(
+ Objects
+ .requireNonNull(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/entities/publication"))
+ .toURI())
+ .toFile()
+ .getAbsolutePath();
+
+ final String dedupConfPath = Paths
+ .get(
+ Objects
+ .requireNonNull(SparkDedupLocalTest.class.getResource("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"))
+ .toURI())
+ .toFile()
+ .getAbsolutePath();
+
+ final static int MAX_ACCEPTANCE_DATE = 20;
+
+ private static SparkDeduper deduper;
+ private static SparkModel model;
+
+ public SparkDedupLocalTest() throws URISyntaxException {
+ }
+
+ @BeforeAll
+ public void setup() throws IOException {
+
+ config = DedupConfig.load(fileToString(dedupConfPath));
+
+ spark = SparkSession
+ .builder()
+ .appName("Deduplication")
+ .master("local[*]")
+ .getOrCreate();
+ context = JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ deduper = new SparkDeduper(config);
+
+ model = new SparkModel(config);
+
+ }
+
+ @Test // full deduplication workflow test
+ @Disabled
+ public void deduplicationTest() {
+
+ long before_simrels = System.currentTimeMillis();
+
+ Dataset entities = spark
+ .read()
+ .textFile(entitiesPath)
+ .transform(deduper.model().parseJsonDataset());
+
+ Dataset simRels = entities
+ .transform(deduper.dedup())
+ .distinct()
+ .map(
+ (MapFunction) t -> DedupUtility
+ .createSimRel(
+ t.getStruct(0).getString(0), t.getStruct(0).getString(1), config.getWf().getEntityType()),
+ Encoders.bean(Relation.class));
+
+ long simrels_time = System.currentTimeMillis() - before_simrels;
+
+ long simrels_number = simRels.count();
+
+ long before_mergerels = System.currentTimeMillis();
+
+ UserDefinedFunction hashUDF = functions
+ .udf(
+ (String s) -> hash(s), DataTypes.LongType);
+
+ //
+ Dataset vertexIdMap = simRels
+ .select("source", "target")
+ .selectExpr("source as id")
+ .union(simRels.selectExpr("target as id"))
+ .distinct()
+ .withColumn("vertexId", hashUDF.apply(functions.col("id")));
+
+ // transform simrels into pairs of numeric ids
+ final Dataset edges = simRels
+ .select("source", "target")
+ .withColumn("source", hashUDF.apply(functions.col("source")))
+ .withColumn("target", hashUDF.apply(functions.col("target")));
+
+ for (Relation r : simRels.toJavaRDD().collect()) {
+ System.out.println(r.getSource() + " ---> " + r.getTarget());
+ }
+
+ // resolve connected components
+ // ("vertexId", "groupId")
+ Dataset cliques = ConnectedComponent
+ .runOnPairs(edges, 50, spark);
+
+ // transform "vertexId" back to its original string value
+ // groupId is kept numeric as its string value is not used
+ // ("id", "groupId")
+ Dataset mergeRels = cliques
+ .join(vertexIdMap, JavaConversions.asScalaBuffer(Collections.singletonList("vertexId")), "inner")
+ .drop("vertexId")
+ .distinct()
+ .flatMap((FlatMapFunction) (Row r) -> {
+ ArrayList res = new ArrayList<>();
+
+ String id = r.getAs("id");
+ String groupId = r.getAs("groupId").toString();
+ res.add(createRel(groupId, id, ModelConstants.MERGES, config));
+ res.add(createRel(id, groupId, ModelConstants.IS_MERGED_IN, config));
+
+ return res.iterator();
+ }, Encoders.bean(Relation.class));
+
+ long mergerels_time = System.currentTimeMillis() - before_mergerels;
+
+ long mergerels_number = mergeRels.count();
+
+ long before_dedupentity = System.currentTimeMillis();
+
+ final Class clazz = ModelSupport.entityTypes
+ .get(EntityType.valueOf(config.getWf().getSubEntityValue()));
+ final Encoder beanEncoder = Encoders.bean(clazz);
+ final Encoder kryoEncoder = Encoders.kryo(clazz);
+
+ Dataset kryoEntities = spark
+ .read()
+ .schema(Encoders.bean(clazz).schema())
+ .json(entitiesPath)
+ .as(beanEncoder)
+ .map(
+ (MapFunction>) entity -> {
+ return new Tuple2<>(entity.getId(), entity);
+ },
+ Encoders.tuple(Encoders.STRING(), kryoEncoder))
+ .selectExpr("_1 AS id", "_2 AS kryoObject");
+
+ //