|
|
|
@ -106,14 +106,15 @@ object SparkPrepareHostedByInfoToApply {
|
|
|
|
|
import spark.implicits._
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//STEP1: leggere la hostedbymap e trasformarla in entity info
|
|
|
|
|
//STEP1: read the hostedbymap and transform it in EntityInfo
|
|
|
|
|
val hostedByInfo:Dataset[EntityInfo] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath)).map(toEntityInfo)
|
|
|
|
|
|
|
|
|
|
//STEP2: creare la mappa publication id issn, eissn, lissn esplosa
|
|
|
|
|
//STEP2: create association (publication, issn), (publication, eissn), (publication, lissn)
|
|
|
|
|
val resultInfoDataset:Dataset[EntityInfo] = prepareResultInfo(spark, graphPath + "/publication")
|
|
|
|
|
|
|
|
|
|
//STEP3: join resultInfo con hostedByInfo sul journal_id dal result con left
|
|
|
|
|
// e riduzione di tutti i result con lo stesso id in una unica entry con aggiunto l'id della datasource
|
|
|
|
|
//STEP3: left join resultInfo with hostedByInfo on journal_id. Reduction of all the results with the same id in just
|
|
|
|
|
//one entry (one result could be associated to issn and eissn and so possivly matching more than once against the map)
|
|
|
|
|
//to this entry we add the id of the datasource for the next step
|
|
|
|
|
joinResHBM(resultInfoDataset, hostedByInfo)
|
|
|
|
|
.write.mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath)
|
|
|
|
|
|
|
|
|
|