diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala index 4ecea63f32..fad313f1cd 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala @@ -45,7 +45,7 @@ object SparkApplyHostedByMapToDatasource { val graphPath = parser.get("graphPath") val outputPath = parser.get("outputPath") - val workingPath = parser.get("workingPath") + val preparedInfoPath = parser.get("preparedInfoPath") implicit val formats = DefaultFormats @@ -55,17 +55,15 @@ object SparkApplyHostedByMapToDatasource { implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) val mapper = new ObjectMapper() - val dats : Dataset[Datasource] = spark.read.textFile("$graphPath/datasource") + val dats : Dataset[Datasource] = spark.read.textFile(graphPath + "/datasource") .map(r => mapper.readValue(r, classOf[Datasource])) - val pinfo : Dataset[EntityInfo] = spark.read.textFile("$workingPath/preparedInfo") - .map(ei => mapper.readValue(ei, classOf[EntityInfo])) - - + val pinfo : Dataset[EntityInfo] = Aggregators.datasourceToSingleId( spark.read.textFile(preparedInfoPath) + .map(ei => mapper.readValue(ei, classOf[EntityInfo]))) //c. dataset join risultato del passo prima di a per datasource id, gruppo per ds id e cambio compatibilita' se necessario - applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(s"$graphPath/datasource") + applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath) }