diff --git a/dhp-shade-package/pom.xml b/dhp-shade-package/pom.xml
index fe3b3c0d2..be0563ef1 100644
--- a/dhp-shade-package/pom.xml
+++ b/dhp-shade-package/pom.xml
@@ -45,11 +45,11 @@
-
-
-
-
-
+
+ eu.dnetlib.dhp
+ dhp-dedup-openaire
+ ${project.version}
+
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
index 0af7bb6d0..7211c46f5 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java
@@ -128,13 +128,12 @@ abstract class AbstractSparkAction implements Serializable {
.collect(Collectors.joining(SP_SEPARATOR));
}
- protected static MapFunction patchRelFn() {
+ protected static MapFunction patchRelFn() {
return value -> {
- final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class);
- if (rel.getDataInfo() == null) {
- rel.setDataInfo(new DataInfo());
+ if (value.getDataInfo() == null) {
+ value.setDataInfo(new DataInfo());
}
- return rel;
+ return value;
};
}
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java
index eca2193af..8eb4de5f8 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java
@@ -68,22 +68,20 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction {
final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
// collect organization merge relations from openorgs database
- JavaRDD mergeRelsRDD = spark
+ Dataset relations = spark
.read()
- .textFile(relationPath)
+ .schema(Encoders.bean(Relation.class).schema())
+ .json(relationPath)
+ .as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
- .toJavaRDD()
.filter(this::isOpenorgs) // take only openorgs relations
.filter(this::isMergeRel); // take merges and isMergedIn relations
- log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count());
-
- final Dataset relations = spark
- .createDataset(
- mergeRelsRDD.rdd(),
- Encoders.bean(Relation.class));
+ relations.cache();
+ log.info("Number of Openorgs Merge Relations collected: {}", relations.count());
saveParquet(relations, outputPath, SaveMode.Append);
+ relations.unpersist();
}
private boolean isMergeRel(Relation rel) {
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java
index 93027e99a..a166549f2 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java
@@ -69,7 +69,9 @@ public class SparkCopyOpenorgsSimRels extends AbstractSparkAction {
Dataset rawRels = spark
.read()
- .textFile(relationPath)
+ .schema(Encoders.bean(Relation.class).schema())
+ .json(relationPath)
+ .as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.filter(this::filterOpenorgsRels);
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java
index e10f41c82..308e397e1 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java
@@ -58,7 +58,9 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction {
JavaRDD simRels = spark
.read()
- .textFile(relationPath)
+ .schema(Encoders.bean(Relation.class).schema())
+ .json(relationPath)
+ .as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(x -> !isOpenorgsDedupRel(x));
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java
index 0507b7b9a..39f86713d 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java
@@ -111,7 +111,9 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction {
// collect diffrels from the raw graph relations:
JavaPairRDD diffRels = spark
.read()
- .textFile(relationPath)
+ .schema(Encoders.bean(Relation.class).schema())
+ .json(relationPath)
+ .as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, ModelSupport.getMainType(EntityType.organization)))
diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
index 83ec7e522..1ab7f739f 100644
--- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
+++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java
@@ -133,7 +133,9 @@ public class SparkPrepareOrgRels extends AbstractSparkAction {
// collect diffrels from the raw graph relations: <, "diffRel">
JavaRDD, String>> diffRels = spark
.read()
- .textFile(relationPath)
+ .schema(Encoders.bean(Relation.class).schema())
+ .json(relationPath)
+ .as(Encoders.bean(Relation.class))
.map(patchRelFn(), Encoders.bean(Relation.class))
.toJavaRDD()
.filter(r -> filterRels(r, "organization"))