diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala index 03ab09d8e8..7395b24506 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala @@ -3,7 +3,8 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap import com.fasterxml.jackson.databind.ObjectMapper import eu.dnetlib.dhp.application.ArgumentApplicationParser import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DatasourceInfo, EntityInfo} -import eu.dnetlib.dhp.schema.oaf.{Datasource, Journal, Publication} + +import eu.dnetlib.dhp.schema.oaf.{Journal, Publication} import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} @@ -12,11 +13,13 @@ import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse import org.slf4j.{Logger, LoggerFactory} + + object SparkPrepareHostedByInfoToApply { - implicit val mapEncoderDSInfo: Encoder[DatasourceInfo] = Encoders.kryo[DatasourceInfo] - implicit val mapEncoderPInfo: Encoder[EntityInfo] = Encoders.kryo[EntityInfo] + implicit val mapEncoderDSInfo: Encoder[DatasourceInfo] = Encoders.bean(classOf[DatasourceInfo]) + implicit val mapEncoderPInfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) def getList(id: String, j: Journal, name: String ) : List[EntityInfo] = { var lst:List[EntityInfo] = List() @@ -47,25 +50,12 @@ object SparkPrepareHostedByInfoToApply { } - - def prepareDatasourceInfo(spark:SparkSession, datasourcePath:String) : Dataset[DatasourceInfo] = { - implicit val mapEncoderDats: Encoder[Datasource] = Encoders.bean(classOf[Datasource]) - - val mapper = new ObjectMapper() - - val dd : Dataset[Datasource] = spark.read.textFile(datasourcePath) - .map(r => mapper.readValue(r, classOf[Datasource])) - - dd.filter(d => d.getJournal != null ).map(d => DatasourceInfo.newInstance(d.getId, d.getOfficialname.getValue, - d.getJournal.getIssnPrinted, d.getJournal.getIssnOnline, d.getJournal.getIssnLinking)) - - } - def toHostedByItem(input:String): HostedByItemType = { + def toEntityInfo(input:String): EntityInfo = { implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats lazy val json: json4s.JValue = parse(input) val c :Map[String,HostedByItemType] = json.extract[Map[String, HostedByItemType]] - c.values.head + toEntityItem(c.keys.head, c.values.head) } def explodeJournalInfo(input: DatasourceInfo): List[EntityInfo] = { @@ -73,10 +63,49 @@ object SparkPrepareHostedByInfoToApply { if (input.getEissn != null && !input.getEissn.equals("")){ lst = EntityInfo.newInstance(input.getId, input.getEissn, input.getOfficialname, input.getOpenAccess) :: lst } + if (input.getLissn != null && !input.getLissn.equals("")){ + lst = EntityInfo.newInstance(input.getId, input.getLissn, input.getOfficialname, input.getOpenAccess) :: lst + } + if (input.getIssn != null && !input.getIssn.equals("")){ + lst = EntityInfo.newInstance(input.getId, input.getIssn, input.getOfficialname, input.getOpenAccess) :: lst + } lst } + def joinDsandHBM(left:Dataset[DatasourceInfo], right:Dataset[HostedByItemType]): Dataset[EntityInfo] = { + left.joinWith(right, + left.col("id").equalTo(right.col("id")), "left") + .map(t2 => { + val dsi : DatasourceInfo = t2._1 + if(t2._2 != null){ + val hbi : HostedByItemType = t2._2 + dsi.setOpenAccess(hbi.openAccess) + } + dsi + }).flatMap(explodeJournalInfo) + } + + def toEntityItem(journal_id: String , hbi: HostedByItemType): EntityInfo = { + + EntityInfo.newInstance(hbi.id, journal_id, hbi.officialname, hbi.openAccess) + + } + + def joinResHBM(res: Dataset[EntityInfo], hbm: Dataset[EntityInfo]): Dataset[EntityInfo] = { + Aggregators.resultToSingleId(res.joinWith(hbm, res.col("journal_id").equalTo(hbm.col("journal_id")), "left") + .map(t2 => { + val res: EntityInfo = t2._1 + if(t2._2 != null ){ + val ds = t2._2 + res.setHb_id(ds.getId) + res.setOpenaccess(ds.getOpenaccess) + res.setName(ds.getName) + } + res + })) + } + def main(args: Array[String]): Unit = { @@ -105,43 +134,20 @@ object SparkPrepareHostedByInfoToApply { import spark.implicits._ - //STEP1: leggere le DS e creare le entries {dsid, dsofficialname, issn, eissn, lissn, openaccess} - val datasourceInfoDataset: Dataset[DatasourceInfo] = prepareDatasourceInfo(spark, "$graphPath/datasource") - //STEP2: leggere la hostedbymap e raggruppare per datasource id - val hostedByDataset = Aggregators.hostedByToSingleDSId(spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(toHostedByItem))) + //STEP1: leggere la hostedbymap e trasformarla in entity info + val hostedByInfo:Dataset[EntityInfo] = spark.createDataset(spark.sparkContext.textFile(hostedByMapPath)).map(toEntityInfo) - //STEP3: eseguire una join fra le datasource e la hostedby map (left) per settare se la datasource e' open access o no - //ed esplodere l'info della datasource per ogni journal id diverso da nullo - val join : Dataset[EntityInfo] = datasourceInfoDataset.joinWith(hostedByDataset, - datasourceInfoDataset.col("id").equalTo(hostedByDataset.col("id"), "left")) - .map(t2 => { - val dsi : DatasourceInfo = t2._1 - if(t2._2 != null){ - dsi.setOpenAccess(t2._2.openAccess) - } - dsi - }).flatMap(explodeJournalInfo) - - //STEP4: creare la mappa publication id issn, eissn, lissn esplosa + //STEP2: creare la mappa publication id issn, eissn, lissn esplosa val resultInfoDataset:Dataset[EntityInfo] = prepareResultInfo(spark, "$graphPath/publication") - //STEP5: join di join con resultInfo sul journal_id dal result con left - // e riduzione di tutti i result con lo stesso id in una unica entry - Aggregators.resultToSingleId(resultInfoDataset.joinWith(join, resultInfoDataset.col("journal_id").equalTo(join.col("journal_id")), "left") - .map(t2 => { - val res: EntityInfo = t2._1 - if(t2._2 != null ){ - val ds = t2._2 - res.setHb_id(ds.getId) - res.setOpenaccess(ds.getOpenaccess) - res.setName(ds.getName) - } - res - })).write.mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath) - + //STEP3: join resultInfo con hostedByInfo sul journal_id dal result con left + // e riduzione di tutti i result con lo stesso id in una unica entry con aggiunto l'id della datasource + joinResHBM(resultInfoDataset, hostedByInfo) + .write.mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath) } + }