From 3920c69bc8e050ada6284f8d40e687e5dd9b1761 Mon Sep 17 00:00:00 2001 From: Sandro La Bruzzo Date: Sun, 25 Jul 2021 09:51:24 +0200 Subject: [PATCH] change implementation of resolve Relation to generate jsonRdd in output --- .../dhp/oa/dedup/GroupEntitiesSparkJob.java | 3 ++- .../dhp/sx/graph/SparkResolveRelation.scala | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java index 58009bfcfc..3f27b94422 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java @@ -38,7 +38,8 @@ import scala.Tuple2; /** * Groups the graph content by entity identifier to ensure ID uniqueness */ -public class GroupEntitiesSparkJob { +public class +GroupEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 0d0dc4159b..82bf3c50e2 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -1,8 +1,10 @@ package eu.dnetlib.dhp.sx.graph +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -40,7 +42,9 @@ object SparkResolveRelation { extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) - val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] + val mappper = new ObjectMapper() + + val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)] val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) @@ -68,6 +72,11 @@ object SparkResolveRelation { .write .mode(SaveMode.Overwrite) .save(s"$workingPath/relation_resolved") + + spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .map(r => mappper.writeValueAsString(r)) + .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec]) + } @@ -102,7 +111,7 @@ object SparkResolveRelation { .map(s => s._2) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + .save(s"$workingPath/relationResolvedPid") } @@ -124,7 +133,7 @@ object SparkResolveRelation { .map(s => s._2) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + .save(s"$workingPath/relationResolvedPid") } def convertPidToDNETIdentifier(pid:String, pidType: String):String = {