diff --git a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java index 58009bfcf..3f27b9442 100644 --- a/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java +++ b/dhp-workflows/dhp-dedup-openaire/src/main/java/eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.java @@ -38,7 +38,8 @@ import scala.Tuple2; /** * Groups the graph content by entity identifier to ensure ID uniqueness */ -public class GroupEntitiesSparkJob { +public class +GroupEntitiesSparkJob { private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class); diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala index 0d0dc4159..82bf3c50e 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/sx/graph/SparkResolveRelation.scala @@ -1,8 +1,10 @@ package eu.dnetlib.dhp.sx.graph +import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.schema.oaf.{Relation, Result} import org.apache.commons.io.IOUtils +import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -40,7 +42,9 @@ object SparkResolveRelation { extractPidResolvedTableFromJsonRDD(spark, entityPath, workingPath) - val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/resolvedPid").as[(String,String)] + val mappper = new ObjectMapper() + + val rPid:Dataset[(String,String)] = spark.read.load(s"$workingPath/relationResolvedPid").as[(String,String)] val relationDs:Dataset[(String,Relation)] = spark.read.load(relationPath).as[Relation].map(r => (r.getSource.toLowerCase, r))(Encoders.tuple(Encoders.STRING, relEncoder)) @@ -68,6 +72,11 @@ object SparkResolveRelation { .write .mode(SaveMode.Overwrite) .save(s"$workingPath/relation_resolved") + + spark.read.load(s"$workingPath/relation_resolved").as[Relation] + .map(r => mappper.writeValueAsString(r)) + .rdd.saveAsTextFile(s"$workingPath/relation", classOf[GzipCodec]) + } @@ -102,7 +111,7 @@ object SparkResolveRelation { .map(s => s._2) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + .save(s"$workingPath/relationResolvedPid") } @@ -124,7 +133,7 @@ object SparkResolveRelation { .map(s => s._2) .write .mode(SaveMode.Overwrite) - .save(s"$workingPath/resolvedPid") + .save(s"$workingPath/relationResolvedPid") } def convertPidToDNETIdentifier(pid:String, pidType: String):String = {