diff --git a/dhp-schemas/pom.xml b/dhp-schemas/pom.xml
index 89e52858b..8338f69e4 100644
--- a/dhp-schemas/pom.xml
+++ b/dhp-schemas/pom.xml
@@ -30,7 +30,12 @@
com.fasterxml.jackson.core
jackson-databind
-
+
+
+ com.google.guava
+ guava
+
+
junit
junit
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/action/AtomicAction.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/action/AtomicAction.java
new file mode 100644
index 000000000..0f9aa3adb
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/action/AtomicAction.java
@@ -0,0 +1,38 @@
+package eu.dnetlib.dhp.schema.action;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+
+import java.io.Serializable;
+
+@JsonDeserialize(using = AtomicActionDeserializer.class)
+public class AtomicAction implements Serializable {
+
+ private Class clazz;
+
+ private T payload;
+
+ public AtomicAction() {
+ }
+
+ public AtomicAction(Class clazz, T payload) {
+ this.clazz = clazz;
+ this.payload = payload;
+ }
+
+ public Class getClazz() {
+ return clazz;
+ }
+
+ public void setClazz(Class clazz) {
+ this.clazz = clazz;
+ }
+
+ public T getPayload() {
+ return payload;
+ }
+
+ public void setPayload(T payload) {
+ this.payload = payload;
+ }
+}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/action/AtomicActionDeserializer.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/action/AtomicActionDeserializer.java
new file mode 100644
index 000000000..e6017288f
--- /dev/null
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/action/AtomicActionDeserializer.java
@@ -0,0 +1,29 @@
+package eu.dnetlib.dhp.schema.action;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.schema.oaf.Oaf;
+
+import java.io.IOException;
+
+public class AtomicActionDeserializer extends JsonDeserializer {
+
+ @Override
+ public Object deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException {
+ JsonNode node = jp.getCodec().readTree(jp);
+ String classTag = node.get("clazz").asText();
+ JsonNode payload = node.get("payload");
+ ObjectMapper mapper = new ObjectMapper();
+
+ try {
+ final Class> clazz = Class.forName(classTag);
+ return new AtomicAction(clazz, (Oaf) mapper.readValue(payload.toString(), clazz));
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ }
+}
diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
index 24a363bec..6738b8693 100644
--- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
+++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/oaf/Relation.java
@@ -2,10 +2,11 @@ package eu.dnetlib.dhp.schema.oaf;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.junit.Assert;
+import static com.google.common.base.Preconditions.checkArgument;
public class Relation extends Oaf {
@@ -70,14 +71,34 @@ public class Relation extends Oaf {
}
public void mergeFrom(final Relation r) {
- Assert.assertEquals("source ids must be equal", getSource(), r.getSource());
- Assert.assertEquals("target ids must be equal", getTarget(), r.getTarget());
- Assert.assertEquals("relType(s) must be equal", getRelType(), r.getRelType());
- Assert.assertEquals("subRelType(s) must be equal", getSubRelType(), r.getSubRelType());
- Assert.assertEquals("relClass(es) must be equal", getRelClass(), r.getRelClass());
+
+ checkArgument(Objects.equals(getSource(), r.getSource()),"source ids must be equal");
+ checkArgument(Objects.equals(getTarget(), r.getTarget()),"target ids must be equal");
+ checkArgument(Objects.equals(getRelType(), r.getRelType()),"relType(s) must be equal");
+ checkArgument(Objects.equals(getSubRelType(), r.getSubRelType()),"subRelType(s) must be equal");
+ checkArgument(Objects.equals(getRelClass(), r.getRelClass()),"relClass(es) must be equal");
+
setCollectedFrom(Stream.concat(getCollectedFrom().stream(), r.getCollectedFrom().stream())
.distinct() // relies on KeyValue.equals
.collect(Collectors.toList()));
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Relation relation = (Relation) o;
+ return relType.equals(relation.relType) &&
+ subRelType.equals(relation.subRelType) &&
+ relClass.equals(relation.relClass) &&
+ source.equals(relation.source) &&
+ target.equals(relation.target) &&
+ Objects.equals(collectedFrom, relation.collectedFrom);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(relType, subRelType, relClass, source, target, collectedFrom);
+ }
+
}
diff --git a/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/action/AtomicActionTest.java b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/action/AtomicActionTest.java
new file mode 100644
index 000000000..dcf20e342
--- /dev/null
+++ b/dhp-schemas/src/test/java/eu/dnetlib/dhp/schema/action/AtomicActionTest.java
@@ -0,0 +1,37 @@
+package eu.dnetlib.dhp.schema.action;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import eu.dnetlib.dhp.schema.oaf.Relation;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class AtomicActionTest {
+
+ @Test
+ public void serializationTest() throws IOException {
+
+ Relation rel = new Relation();
+ rel.setSource("1");
+ rel.setTarget("2");
+ rel.setRelType("resultResult");
+ rel.setSubRelType("dedup");
+ rel.setRelClass("merges");
+
+ AtomicAction aa1 = new AtomicAction(Relation.class, rel);
+
+ final ObjectMapper mapper = new ObjectMapper();
+ String json = mapper.writeValueAsString(aa1);
+
+ Assert.assertTrue(StringUtils.isNotBlank(json));
+
+ AtomicAction aa2 = mapper.readValue(json, AtomicAction.class);
+
+ Assert.assertEquals(aa1.getClazz(), aa2.getClazz());
+ Assert.assertEquals(aa1.getPayload(), aa2.getPayload());
+
+ }
+
+}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java
index cf95711eb..19a0cb5c9 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/actions/TransformActions.java
@@ -5,17 +5,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
-import eu.dnetlib.actionmanager.actions.AtomicAction;
+import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.DataInfo;
-import eu.dnetlib.dhp.schema.oaf.Oaf;
-import eu.dnetlib.dhp.schema.oaf.Qualifier;
-import eu.dnetlib.dhp.schema.oaf.Relation;
+import eu.dnetlib.dhp.schema.oaf.*;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
-import org.apache.commons.codec.binary.Base64;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
@@ -84,59 +81,97 @@ public class TransformActions implements Serializable {
log.info(String.format("transforming actions from '%s' to '%s'", sourcePath, targetDirectory));
sc.sequenceFile(sourcePath, Text.class, Text.class)
- .mapToPair(a -> new Tuple2<>(a._1(), AtomicAction.fromJSON(a._2().toString())))
+ .mapToPair(a -> new Tuple2<>(a._1(), eu.dnetlib.actionmanager.actions.AtomicAction.fromJSON(a._2().toString())))
.mapToPair(a -> new Tuple2<>(a._1(), transformAction(a._1().toString(), a._2())))
-
+ .filter(t -> StringUtils.isNotBlank(t._2().toString()))
.saveAsHadoopFile(targetDirectory.toString(), Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
}
}
}
- private Text transformAction(String atomicaActionId, AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException {
-
+ private Text transformAction(String atomicaActionId, eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException, JsonProcessingException {
+ final Text out = new Text();
final ObjectMapper mapper = new ObjectMapper();
if (aa.getTargetValue() != null && aa.getTargetValue().length > 0) {
- Oaf oaf = ProtoConverter.convert(OafProtos.Oaf.parseFrom(aa.getTargetValue()));
- aa.setTargetValue(mapper.writeValueAsString(oaf).getBytes());
+ out.set(mapper.writeValueAsString(doTransform(aa)));
} else {
-
if (atomicaActionId.contains("dedupSimilarity")) {
-
- final String[] splitId = atomicaActionId.split("@");
-
- String source = splitId[0];
- String target = splitId[2];
-
- String[] relSemantic = splitId[1].split("_");
-
- Relation rel = new Relation();
- rel.setSource(source);
- rel.setTarget(target);
- rel.setRelType(relSemantic[0]);
- rel.setSubRelType(relSemantic[1]);
- rel.setRelClass(relSemantic[2]);
-
- DataInfo d = new DataInfo();
- d.setDeletedbyinference(false);
- d.setInferenceprovenance("deduplication");
- d.setInferred(true);
- d.setInvisible(false);
- Qualifier provenanceaction = new Qualifier();
-
- provenanceaction.setClassid("deduplication");
- provenanceaction.setClassname("deduplication");
- provenanceaction.setSchemeid("dnet:provenanceActions");
- provenanceaction.setSchemename("dnet:provenanceActions");
-
- d.setProvenanceaction(provenanceaction);
-
- rel.setDataInfo(d);
-
- aa.setTargetValue(mapper.writeValueAsString(rel).getBytes());
+ out.set(mapper.writeValueAsString(getRelationAtomicAction(atomicaActionId)));
}
}
- return new Text(mapper.writeValueAsString(aa));
+ return out;
+ }
+
+ private AtomicAction getRelationAtomicAction(String atomicaActionId) {
+ final String[] splitId = atomicaActionId.split("@");
+
+ String source = splitId[0];
+ String target = splitId[2];
+
+ String[] relSemantic = splitId[1].split("_");
+
+ Relation rel = new Relation();
+ rel.setSource(source);
+ rel.setTarget(target);
+ rel.setRelType(relSemantic[0]);
+ rel.setSubRelType(relSemantic[1]);
+ rel.setRelClass(relSemantic[2]);
+
+ DataInfo d = new DataInfo();
+ d.setDeletedbyinference(false);
+ d.setInferenceprovenance("deduplication");
+ d.setInferred(true);
+ d.setInvisible(false);
+ Qualifier provenanceaction = new Qualifier();
+
+ provenanceaction.setClassid("deduplication");
+ provenanceaction.setClassname("deduplication");
+ provenanceaction.setSchemeid("dnet:provenanceActions");
+ provenanceaction.setSchemename("dnet:provenanceActions");
+
+ d.setProvenanceaction(provenanceaction);
+
+ rel.setDataInfo(d);
+
+ return new AtomicAction<>(Relation.class, rel);
+ }
+
+ private AtomicAction doTransform(eu.dnetlib.actionmanager.actions.AtomicAction aa) throws InvalidProtocolBufferException {
+ final OafProtos.Oaf proto_oaf = OafProtos.Oaf.parseFrom(aa.getTargetValue());
+ final Oaf oaf = ProtoConverter.convert(proto_oaf);
+ switch (proto_oaf.getKind()) {
+ case entity:
+ switch (proto_oaf.getEntity().getType()) {
+ case datasource:
+ return new AtomicAction<>(Datasource.class, (Datasource) oaf);
+ case organization:
+ return new AtomicAction<>(Organization.class, (Organization) oaf);
+ case project:
+ return new AtomicAction<>(Project.class, (Project) oaf);
+ case result:
+ final String resulttypeid = proto_oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
+ switch (resulttypeid) {
+ case "publication":
+ return new AtomicAction<>(Publication.class, (Publication) oaf);
+ case "software":
+ return new AtomicAction<>(Software.class, (Software) oaf);
+ case "other":
+ return new AtomicAction<>(OtherResearchProduct.class, (OtherResearchProduct) oaf);
+ case "dataset":
+ return new AtomicAction<>(Dataset.class, (Dataset) oaf);
+ default:
+ // can be an update, where the resulttype is not specified
+ return new AtomicAction<>(Result.class, (Result) oaf);
+ }
+ default:
+ throw new IllegalArgumentException("invalid entity type: " + proto_oaf.getEntity().getType());
+ }
+ case relation:
+ return new AtomicAction<>(Relation.class, (Relation) oaf);
+ default:
+ throw new IllegalArgumentException("invalid kind: " + proto_oaf.getKind());
+ }
}
private String getTargetBaseDir(String isLookupUrl) throws ISLookUpException {
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java
index 775e5e7d8..7f907b0c8 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step2/GenerateEntitiesApplication.java
@@ -15,6 +15,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -87,7 +88,7 @@ public class GenerateEntitiesApplication {
.map(oaf -> oaf.getClass().getSimpleName().toLowerCase() + "|" + convertToJson(oaf)));
}
- inputRdd.saveAsTextFile(targetPath);
+ inputRdd.saveAsTextFile(targetPath, GzipCodec.class);
}
diff --git a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java
index 4f10068e7..4ee24cba0 100644
--- a/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java
+++ b/dhp-workflows/dhp-aggregation/src/main/java/eu/dnetlib/dhp/migration/step3/DispatchEntitiesApplication.java
@@ -4,6 +4,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -60,7 +61,7 @@ public class DispatchEntitiesApplication {
sc.textFile(sourcePath)
.filter(l -> isEntityType(l, type))
.map(l -> StringUtils.substringAfter(l, "|"))
- .saveAsTextFile(targetPath + "/" + type); // use repartition(XXX) ???
+ .saveAsTextFile(targetPath + "/" + type, GzipCodec.class); // use repartition(XXX) ???
}
private static boolean isEntityType(final String line, final String type) {
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml
index ec2861a0e..ed01c8de4 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/actions/oozie_app/workflow.xml
@@ -54,12 +54,25 @@
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
- ${jobTracker}
- ${nameNode}
eu.dnetlib.dhp.migration.actions.MigrateActionSet
-Dmapred.task.timeout=${distcp_task_timeout}
-is${isLookupUrl}
@@ -78,8 +91,6 @@
- ${jobTracker}
- ${nameNode}
yarn
cluster
transform_actions
diff --git a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml
index 71aa15d8e..42ab59822 100644
--- a/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml
+++ b/dhp-workflows/dhp-aggregation/src/main/resources/eu/dnetlib/dhp/migration/wfs/regular_all_steps/oozie_app/workflow.xml
@@ -1,4 +1,5 @@
+
workingPath
@@ -48,6 +49,21 @@
+
+ ${jobTracker}
+ ${nameNode}
+
+
+ mapreduce.job.queuename
+ ${queueName}
+
+
+ oozie.launcher.mapred.job.queue.name
+ ${oozieLauncherQueueName}
+
+
+
+
@@ -64,8 +80,8 @@
-
-
+
+
@@ -73,8 +89,6 @@
- ${jobTracker}
- ${nameNode}
eu.dnetlib.dhp.migration.step1.MigrateDbEntitiesApplication
-p${workingPath}/db_records
-pgurl${postgresURL}
@@ -87,8 +101,6 @@
- ${jobTracker}
- ${nameNode}
eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication
-p${workingPath}/odf_records
-mongourl${mongoURL}
@@ -103,8 +115,6 @@
- ${jobTracker}
- ${nameNode}
eu.dnetlib.dhp.migration.step1.MigrateMongoMdstoresApplication
-p${workingPath}/oaf_records
-mongourl${mongoURL}
@@ -119,7 +129,7 @@
-
+
@@ -127,9 +137,7 @@
- ${jobTracker}
- ${nameNode}
- yarn-cluster
+ yarn
cluster
GenerateEntities
eu.dnetlib.dhp.migration.step2.GenerateEntitiesApplication
@@ -155,8 +163,8 @@
-
-
+
+
@@ -164,9 +172,7 @@
- ${jobTracker}
- ${nameNode}
- yarn-cluster
+ yarn
cluster
GenerateGraph
eu.dnetlib.dhp.migration.step3.DispatchEntitiesApplication
diff --git a/dhp-workflows/dhp-dedup/pom.xml b/dhp-workflows/dhp-dedup/pom.xml
index 0721af25d..691fbe6d5 100644
--- a/dhp-workflows/dhp-dedup/pom.xml
+++ b/dhp-workflows/dhp-dedup/pom.xml
@@ -65,6 +65,15 @@
com.arakelian
java-jq
+
+ dom4j
+ dom4j
+
+
+ jaxen
+ jaxen
+
+
eu.dnetlib
@@ -83,8 +92,6 @@
jackson-core
-
-
diff --git a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java b/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java
deleted file mode 100644
index 16e112c25..000000000
--- a/dhp-workflows/dhp-dedup/src/main/java/eu/dnetlib/dedup/SparkCreateConnectedComponent.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package eu.dnetlib.dedup;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import eu.dnetlib.dedup.graph.ConnectedComponent;
-import eu.dnetlib.dedup.graph.GraphProcessor;
-import eu.dnetlib.dhp.application.ArgumentApplicationParser;
-import eu.dnetlib.dhp.schema.oaf.Relation;
-import eu.dnetlib.pace.config.DedupConfig;
-import eu.dnetlib.pace.util.MapDocumentUtil;
-import org.apache.commons.io.IOUtils;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.graphx.Edge;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Encoders;
-import org.apache.spark.sql.SparkSession;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SparkCreateConnectedComponent {
-
- public static void main(String[] args) throws Exception {
- final ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
- parser.parseArgument(args);
- final SparkSession spark = SparkSession
- .builder()
- .appName(SparkCreateConnectedComponent.class.getSimpleName())
- .master(parser.get("master"))
- .getOrCreate();
-
- final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
- final String inputPath = parser.get("sourcePath");
- final String entity = parser.get("entity");
- final String targetPath = parser.get("targetPath");
-// final DedupConfig dedupConf = DedupConfig.load(IOUtils.toString(SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf2.json")));
- final DedupConfig dedupConf = DedupConfig.load(parser.get("dedupConf"));
-
- final JavaPairRDD