diff --git a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala index 9334fc6e03..1211dcc786 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/scala/eu/dnetlib/dhp/sx/graph/SparkCreateScholexplorerDump.scala @@ -107,9 +107,13 @@ class SparkCreateScholexplorerDump(propertyPath: String, args: Array[String], lo .joinWith(resource, relations("source") === resource("dnetIdentifier"), "inner") .map(res => ScholexplorerUtils.generateScholix(res._1, res._2)) + val resourceTarget = relations + .joinWith(resource, relations("target") === resource("dnetIdentifier"), "inner") + .map(res => (res._1.id, res._2))(Encoders.tuple(Encoders.STRING, Encoders.kryo(classOf[ScholixResource]))) + scholix_one_verse - .joinWith(resource, scholix_one_verse("target.dnetIdentifier") === resource("dnetIdentifier"), "inner") - .map(k => ScholexplorerUtils.updateTarget(k._1, k._2)) + .joinWith(resourceTarget, scholix_one_verse("identifier") === resourceTarget("_1"), "inner") + .map(k => ScholexplorerUtils.updateTarget(k._1, k._2._2)) .write .mode(SaveMode.Overwrite) .option("compression", "gzip")