From 1e859706a3516b8103342cbe875004a6c112d35b Mon Sep 17 00:00:00 2001 From: "miriam.baglioni" Date: Mon, 2 Aug 2021 19:35:23 +0200 Subject: [PATCH] Hosted By Map - Classes to apply the HBM to results and datasources --- .../SparkApplyHostedByMapToDatasource.scala | 72 +++++++++++++++ .../SparkApplyHostedByMapToResult.scala | 88 +++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala create mode 100644 dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala new file mode 100644 index 000000000..4ecea63f3 --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToDatasource.scala @@ -0,0 +1,72 @@ +package eu.dnetlib.dhp.oa.graph.hostedbymap + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.oa.graph.hostedbymap.SparkApplyHostedByMapToResult.{applyHBtoPubs, getClass} +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo +import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf.{Datasource, Publication} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.json4s.DefaultFormats +import org.slf4j.{Logger, LoggerFactory} + +object SparkApplyHostedByMapToDatasource { + + def applyHBtoDats(join: Dataset[EntityInfo], dats: Dataset[Datasource]): Dataset[Datasource] = { + dats.joinWith(join, dats.col("id").equalTo(join.col("hb_id")), "left") + .map(t2 => { + val d: Datasource = t2._1 + if (t2._2 != null) { + if (d.getOpenairecompatibility.getClassid.equals(ModelConstants.UNKNOWN)) { + d.getOpenairecompatibility.setClassid("hostedBy") + d.getOpenairecompatibility.setClassname("collected from a compatible aggregator") + } + } + d + })(Encoders.bean((classOf[Datasource]))) + } + def main(args: Array[String]): Unit = { + + + val logger: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/hostedbymap/hostedby_apply_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val graphPath = parser.get("graphPath") + + val outputPath = parser.get("outputPath") + val workingPath = parser.get("workingPath") + + + implicit val formats = DefaultFormats + + + implicit val mapEncoderPubs: Encoder[Datasource] = Encoders.bean(classOf[Datasource]) + implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + val mapper = new ObjectMapper() + + val dats : Dataset[Datasource] = spark.read.textFile("$graphPath/datasource") + .map(r => mapper.readValue(r, classOf[Datasource])) + + val pinfo : Dataset[EntityInfo] = spark.read.textFile("$workingPath/preparedInfo") + .map(ei => mapper.readValue(ei, classOf[EntityInfo])) + + + + //c. dataset join risultato del passo prima di a per datasource id, gruppo per ds id e cambio compatibilita' se necessario + + applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(s"$graphPath/datasource") + } + + +} diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala new file mode 100644 index 000000000..37afc319a --- /dev/null +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/hostedbymap/SparkApplyHostedByMapToResult.scala @@ -0,0 +1,88 @@ +package eu.dnetlib.dhp.oa.graph.hostedbymap + +import com.fasterxml.jackson.databind.ObjectMapper +import eu.dnetlib.dhp.application.ArgumentApplicationParser +import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo +import eu.dnetlib.dhp.schema.common.ModelConstants +import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils +import eu.dnetlib.dhp.schema.oaf.{Datasource, Instance, OpenAccessRoute, Publication} +import org.apache.commons.io.IOUtils +import org.apache.spark.SparkConf +import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession} +import org.json4s.DefaultFormats +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConverters._ + +//a. publication join risultato del passo precedente su result id (left) setto la istanza (se piu' di una instance +// nel result => salto)con l'hosted by anche access right della instance se openaccess e' true + + +object SparkApplyHostedByMapToResult { + + def applyHBtoPubs(join: Dataset[EntityInfo], pubs: Dataset[Publication]) = { + pubs.joinWith(join, pubs.col("id").equalTo(join.col("id")), "left") + .map(t2 => { + val p: Publication = t2._1 + if (t2._2 != null) { + val ei: EntityInfo = t2._2 + val i = p.getInstance().asScala + if (i.size == 1) { + val inst: Instance = i(0) + + inst.getHostedby.setKey(ei.getHb_id) + inst.getHostedby.setValue(ei.getName) + if (ei.getOpenaccess) { + inst.setAccessright(OafMapperUtils.accessRight(ModelConstants.ACCESS_RIGHT_OPEN, "Open Access", ModelConstants.DNET_ACCESS_MODES, ModelConstants.DNET_ACCESS_MODES)) + inst.getAccessright.setOpenAccessRoute(OpenAccessRoute.hybrid) + } + + } + } + p + })(Encoders.bean(classOf[Publication])) + } + def main(args: Array[String]): Unit = { + + + val logger: Logger = LoggerFactory.getLogger(getClass) + val conf: SparkConf = new SparkConf() + val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/hostedbymap/hostedby_apply_params.json"))) + parser.parseArgument(args) + val spark: SparkSession = + SparkSession + .builder() + .config(conf) + .appName(getClass.getSimpleName) + .master(parser.get("master")).getOrCreate() + + + val graphPath = parser.get("graphPath") + + val outputPath = parser.get("outputPath") + val workingPath = parser.get("workingPath") + + + implicit val formats = DefaultFormats + + + implicit val mapEncoderPubs: Encoder[Publication] = Encoders.bean(classOf[Publication]) + implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo]) + val mapper = new ObjectMapper() + + val pubs : Dataset[Publication] = spark.read.textFile("$graphPath/publication") + .map(r => mapper.readValue(r, classOf[Publication])) + + val pinfo : Dataset[EntityInfo] = spark.read.textFile("$workingPath/preparedInfo") + .map(ei => mapper.readValue(ei, classOf[EntityInfo])) + + //a. publication join risultato del passo precedente su result id (left) setto la istanza (se piu' di una instance + // nel result => salto)con l'hosted by anche access right della instance se openaccess e' true + applyHBtoPubs(pinfo, pubs).write.mode(SaveMode.Overwrite).option("compression","gzip").json("$graphPath/publication") + + + + } + + +}