diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala index 6a9346ed50..8077efe302 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/Aggregators.scala @@ -1,5 +1,6 @@ package eu.dnetlib.dhp.oa.graph.hostedbymap +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo import org.apache.spark.sql.{Dataset, Encoder, Encoders, TypedColumn} import org.apache.spark.sql.expressions.Aggregator @@ -25,43 +26,10 @@ object Aggregators { } - def createHostedByItemTypes(df: Dataset[HostedByItemType]): Dataset[HostedByItemType] = { - val transformedData : Dataset[HostedByItemType] = df - .groupByKey(_.id)(Encoders.STRING) - .agg(Aggregators.hostedByAggregator) - .map{ - case (id:String , res:HostedByItemType) => res - }(Encoders.product[HostedByItemType]) - - transformedData - } - - val hostedByAggregator: TypedColumn[HostedByItemType, HostedByItemType] = new Aggregator[HostedByItemType, HostedByItemType, HostedByItemType] { - override def zero: HostedByItemType = HostedByItemType("","","","","",false) - override def reduce(b: HostedByItemType, a:HostedByItemType): HostedByItemType = { - return merge(b, a) - } - override def merge(b1: HostedByItemType, b2: HostedByItemType): HostedByItemType = { - if (b1 == null){ - return b2 - } - if(b2 == null){ - return b1 - } - - HostedByItemType(getId(b1.id, b2.id), getId(b1.officialname, b2.officialname), getId(b1.issn, b2.issn), getId(b1.eissn, b2.eissn), getId(b1.lissn, b2.lissn), b1.openAccess || b2.openAccess) - - } - override def finish(reduction: HostedByItemType): HostedByItemType = reduction - override def bufferEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType] - - override def outputEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType] - }.toColumn - def explodeHostedByItemType(df: Dataset[(String, HostedByItemType)]): Dataset[(String, HostedByItemType)] = { val transformedData : Dataset[(String, HostedByItemType)] = df .groupByKey(_._1)(Encoders.STRING) - .agg(Aggregators.hostedByAggregator1) + .agg(Aggregators.hostedByAggregator) .map{ case (id:String , res:(String, HostedByItemType)) => res }(Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType])) @@ -69,7 +37,7 @@ object Aggregators { transformedData } - val hostedByAggregator1: TypedColumn[(String, HostedByItemType), (String, HostedByItemType)] = new Aggregator[(String, HostedByItemType), (String, HostedByItemType), (String, HostedByItemType)] { + val hostedByAggregator: TypedColumn[(String, HostedByItemType), (String, HostedByItemType)] = new Aggregator[(String, HostedByItemType), (String, HostedByItemType), (String, HostedByItemType)] { override def zero: (String, HostedByItemType) = ("", HostedByItemType("","","","","",false)) override def reduce(b: (String, HostedByItemType), a:(String,HostedByItemType)): (String, HostedByItemType) = { return merge(b, a) @@ -94,4 +62,79 @@ object Aggregators { override def outputEncoder: Encoder[(String,HostedByItemType)] = Encoders.tuple(Encoders.STRING,Encoders.product[HostedByItemType]) }.toColumn + def hostedByToSingleDSId(df: Dataset[ HostedByItemType]): Dataset[ HostedByItemType] = { + val transformedData : Dataset[HostedByItemType] = df + .groupByKey(_.id)(Encoders.STRING) + .agg(Aggregators.hostedByToDSAggregator) + .map{ + case (id:String , res: HostedByItemType) => res + }(Encoders.product[HostedByItemType]) + + transformedData + } + + def hostedByToDSAggregator: TypedColumn[HostedByItemType, HostedByItemType] = new Aggregator[HostedByItemType, HostedByItemType, HostedByItemType] { + override def zero: HostedByItemType = HostedByItemType("","","","","",false) + + override def reduce(b: HostedByItemType, a:HostedByItemType): HostedByItemType = { + return merge(b, a) + } + override def merge(b1: HostedByItemType, b2: HostedByItemType): HostedByItemType = { + if (b1 == null){ + return b2 + } + if(b2 == null){ + return b1 + } + if(!b1.id.equals("")){ + return HostedByItemType(b1.id, b1.officialname, b1.issn, b1.eissn, b1.lissn, b1.openAccess || b2.openAccess) + + } + return HostedByItemType(b2.id, b2.officialname, b2.issn, b2.eissn, b2.lissn, b1.openAccess || b2.openAccess) + + } + override def finish(reduction: HostedByItemType): HostedByItemType = reduction + override def bufferEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType] + + override def outputEncoder: Encoder[HostedByItemType] = Encoders.product[HostedByItemType] + }.toColumn + + + def resultToSingleIdAggregator: TypedColumn[EntityInfo, EntityInfo] = new Aggregator[EntityInfo, EntityInfo, EntityInfo]{ + override def zero: EntityInfo = EntityInfo.newInstance("","","") + + override def reduce(b: EntityInfo, a:EntityInfo): EntityInfo = { + return merge(b, a) + } + override def merge(b1: EntityInfo, b2: EntityInfo): EntityInfo = { + if (b1 == null){ + return b2 + } + if(b2 == null){ + return b1 + } + if(!b1.getHb_id.equals("")){ + b1.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess) + } + b2.setOpenaccess(b1.getOpenaccess || b2.getOpenaccess) + b2 + + } + override def finish(reduction: EntityInfo): EntityInfo = reduction + override def bufferEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + + override def outputEncoder: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + }.toColumn + + def resultToSingleId(df:Dataset[EntityInfo]): Dataset[EntityInfo] = { + val transformedData : Dataset[EntityInfo] = df + .groupByKey(_.getId)(Encoders.STRING) + .agg(Aggregators.resultToSingleIdAggregator) + .map{ + case (id:String , res: EntityInfo) => res + }(Encoders.bean(classOf[EntityInfo])) + + transformedData + } + } diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala new file mode 100644 index 0000000000..03ab09d8e8 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkPrepareHostedByInfoToApply.scala @@ -0,0 +1,147 @@ +package eu.dnetlib.dhp.oa.graph.hostedbymap + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DatasourceInfo, EntityInfo} +import eu.dnetlib.dhp.schema.oaf.{Datasource, Journal, Publication} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.json4s +import org.json4s.DefaultFormats +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.{Logger, LoggerFactory} + +object SparkPrepareHostedByInfoToApply { + + + implicit val mapEncoderDSInfo: Encoder[DatasourceInfo] = Encoders.kryo[DatasourceInfo] + implicit val mapEncoderPInfo: Encoder[EntityInfo] = Encoders.kryo[EntityInfo] + + def getList(id: String, j: Journal, name: String ) : List[EntityInfo] = { + var lst:List[EntityInfo] = List() + + + if (j.getIssnLinking != null && !j.getIssnLinking.equals("")){ + lst = EntityInfo.newInstance(id, j.getIssnLinking, name) :: lst + } + if (j.getIssnOnline != null && !j.getIssnOnline.equals("")){ + lst = EntityInfo.newInstance(id, j.getIssnOnline, name) :: lst + } + if (j.getIssnPrinted != null && !j.getIssnPrinted.equals("")){ + lst = EntityInfo.newInstance(id, j.getIssnPrinted, name) :: lst + } + lst + } + + def prepareResultInfo(spark:SparkSession, publicationPath:String) : Dataset[EntityInfo] = { + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication]) + + val mapper = new ObjectMapper() + + val dd : Dataset[Publication] = spark.read.textFile(publicationPath) + .map(r => mapper.readValue(r, classOf[Publication])) + + dd.filter(p => p.getJournal != null ).flatMap(p => getList(p.getId, p.getJournal, "")) + + } + + + + def prepareDatasourceInfo(spark:SparkSession, datasourcePath:String) : Dataset[DatasourceInfo] = { + implicit val mapEncoderDats: Encoder[Datasource] = Encoders.bean(classOf[Datasource]) + + val mapper = new ObjectMapper() + + val dd : Dataset[Datasource] = spark.read.textFile(datasourcePath) + .map(r => mapper.readValue(r, classOf[Datasource])) + + dd.filter(d => d.getJournal != null ).map(d => DatasourceInfo.newInstance(d.getId, d.getOfficialname.getValue, + d.getJournal.getIssnPrinted, d.getJournal.getIssnOnline, d.getJournal.getIssnLinking)) + + } + def toHostedByItem(input:String): HostedByItemType = { + implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats + + lazy val json: json4s.JValue = parse(input) + val c :Map[String,HostedByItemType] = json.extract[Map[String, HostedByItemType]] + c.values.head + } + + def explodeJournalInfo(input: DatasourceInfo): List[EntityInfo] = { + var lst : List[EntityInfo] = List() + if (input.getEissn != null && !input.getEissn.equals("")){ + lst = EntityInfo.newInstance(input.getId, input.getEissn, input.getOfficialname, input.getOpenAccess) :: lst + } + + lst + } + + def main(args: Array[String]): Unit = { + + + val logger: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/hostedbymap/hostedby_prepare_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val graphPath = parser.get("graphPath") + + val outputPath = parser.get("outputPath") + val hostedByMapPath = parser.get("hostedByMapPath") + + + implicit val formats = DefaultFormats + + + logger.info("Getting the Datasources") + + import spark.implicits._ + + //STEP1: leggere le DS e creare le entries {dsid, dsofficialname, issn, eissn, lissn, openaccess} + val datasourceInfoDataset: Dataset[DatasourceInfo] = prepareDatasourceInfo(spark, "$graphPath/datasource") + + //STEP2: leggere la hostedbymap e raggruppare per datasource id + val hostedByDataset = Aggregators.hostedByToSingleDSId(spark.createDataset(spark.sparkContext.textFile(hostedByMapPath).map(toHostedByItem))) + + //STEP3: eseguire una join fra le datasource e la hostedby map (left) per settare se la datasource e' open access o no + //ed esplodere l'info della datasource per ogni journal id diverso da nullo + val join : Dataset[EntityInfo] = datasourceInfoDataset.joinWith(hostedByDataset, + datasourceInfoDataset.col("id").equalTo(hostedByDataset.col("id"), "left")) + .map(t2 => { + val dsi : DatasourceInfo = t2._1 + if(t2._2 != null){ + dsi.setOpenAccess(t2._2.openAccess) + } + dsi + }).flatMap(explodeJournalInfo) + + //STEP4: creare la mappa publication id issn, eissn, lissn esplosa + val resultInfoDataset:Dataset[EntityInfo] = prepareResultInfo(spark, "$graphPath/publication") + + //STEP5: join di join con resultInfo sul journal_id dal result con left + // e riduzione di tutti i result con lo stesso id in una unica entry + Aggregators.resultToSingleId(resultInfoDataset.joinWith(join, resultInfoDataset.col("journal_id").equalTo(join.col("journal_id")), "left") + .map(t2 => { + val res: EntityInfo = t2._1 + if(t2._2 != null ){ + val ds = t2._2 + res.setHb_id(ds.getId) + res.setOpenaccess(ds.getOpenaccess) + res.setName(ds.getName) + } + res + })).write.mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath) + + + + } + +}