From 79e4728afdf119822661b63d8c606794234f35e9 Mon Sep 17 00:00:00 2001 From: Giambattista Bloisi Date: Fri, 20 Dec 2024 15:24:35 +0100 Subject: [PATCH] Use json loading in place of text loading + jackson mapper --- dhp-shade-package/pom.xml | 10 +++++----- .../dhp/oa/dedup/AbstractSparkAction.java | 9 ++++----- .../dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java | 16 +++++++--------- .../dhp/oa/dedup/SparkCopyOpenorgsSimRels.java | 4 +++- .../oa/dedup/SparkCopyRelationsNoOpenorgs.java | 4 +++- .../dhp/oa/dedup/SparkPrepareNewOrgs.java | 4 +++- .../dhp/oa/dedup/SparkPrepareOrgRels.java | 4 +++- 7 files changed, 28 insertions(+), 23 deletions(-) diff --git a/dhp-shade-package/pom.xml b/dhp-shade-package/pom.xml index fe3b3c0d21..be0563ef13 100644 --- a/dhp-shade-package/pom.xml +++ b/dhp-shade-package/pom.xml @@ -45,11 +45,11 @@ - - - - - + + eu.dnetlib.dhp + dhp-dedup-openaire + ${project.version} + diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java index 0af7bb6d01..7211c46f5d 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/AbstractSparkAction.java @@ -128,13 +128,12 @@ abstract class AbstractSparkAction implements Serializable { .collect(Collectors.joining(SP_SEPARATOR)); } - protected static MapFunction patchRelFn() { + protected static MapFunction patchRelFn() { return value -> { - final Relation rel = OBJECT_MAPPER.readValue(value, Relation.class); - if (rel.getDataInfo() == null) { - rel.setDataInfo(new DataInfo()); + if (value.getDataInfo() == null) { + value.setDataInfo(new DataInfo()); } - return rel; + return value; }; } diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java index eca2193af2..8eb4de5f82 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsMergeRels.java @@ -68,22 +68,20 @@ public class SparkCopyOpenorgsMergeRels extends AbstractSparkAction { final String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation"); // collect organization merge relations from openorgs database - JavaRDD mergeRelsRDD = spark + Dataset relations = spark .read() - .textFile(relationPath) + .schema(Encoders.bean(Relation.class).schema()) + .json(relationPath) + .as(Encoders.bean(Relation.class)) .map(patchRelFn(), Encoders.bean(Relation.class)) - .toJavaRDD() .filter(this::isOpenorgs) // take only openorgs relations .filter(this::isMergeRel); // take merges and isMergedIn relations - log.info("Number of Openorgs Merge Relations collected: {}", mergeRelsRDD.count()); - - final Dataset relations = spark - .createDataset( - mergeRelsRDD.rdd(), - Encoders.bean(Relation.class)); + relations.cache(); + log.info("Number of Openorgs Merge Relations collected: {}", relations.count()); saveParquet(relations, outputPath, SaveMode.Append); + relations.unpersist(); } private boolean isMergeRel(Relation rel) { diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java index 93027e99ab..a166549f28 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyOpenorgsSimRels.java @@ -69,7 +69,9 @@ public class SparkCopyOpenorgsSimRels extends AbstractSparkAction { Dataset rawRels = spark .read() - .textFile(relationPath) + .schema(Encoders.bean(Relation.class).schema()) + .json(relationPath) + .as(Encoders.bean(Relation.class)) .map(patchRelFn(), Encoders.bean(Relation.class)) .filter(this::filterOpenorgsRels); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java index e10f41c823..308e397e14 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkCopyRelationsNoOpenorgs.java @@ -58,7 +58,9 @@ public class SparkCopyRelationsNoOpenorgs extends AbstractSparkAction { JavaRDD simRels = spark .read() - .textFile(relationPath) + .schema(Encoders.bean(Relation.class).schema()) + .json(relationPath) + .as(Encoders.bean(Relation.class)) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() .filter(x -> !isOpenorgsDedupRel(x)); diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java index 0507b7b9af..39f86713da 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareNewOrgs.java @@ -111,7 +111,9 @@ public class SparkPrepareNewOrgs extends AbstractSparkAction { // collect diffrels from the raw graph relations: JavaPairRDD diffRels = spark .read() - .textFile(relationPath) + .schema(Encoders.bean(Relation.class).schema()) + .json(relationPath) + .as(Encoders.bean(Relation.class)) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() .filter(r -> filterRels(r, ModelSupport.getMainType(EntityType.organization))) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java index 83ec7e5222..1ab7f739fa 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/SparkPrepareOrgRels.java @@ -133,7 +133,9 @@ public class SparkPrepareOrgRels extends AbstractSparkAction { // collect diffrels from the raw graph relations: <, "diffRel"> JavaRDD, String>> diffRels = spark .read() - .textFile(relationPath) + .schema(Encoders.bean(Relation.class).schema()) + .json(relationPath) + .as(Encoders.bean(Relation.class)) .map(patchRelFn(), Encoders.bean(Relation.class)) .toJavaRDD() .filter(r -> filterRels(r, "organization"))