2021-08-02 19:35:23 +02:00
|
|
|
package eu.dnetlib.dhp.oa.graph.hostedbymap
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper
|
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
|
|
|
import eu.dnetlib.dhp.oa.graph.hostedbymap.SparkApplyHostedByMapToResult.{applyHBtoPubs, getClass}
|
|
|
|
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.EntityInfo
|
|
|
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.{Datasource, Publication}
|
|
|
|
import org.apache.commons.io.IOUtils
|
|
|
|
import org.apache.spark.SparkConf
|
|
|
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
|
|
|
|
import org.json4s.DefaultFormats
|
|
|
|
import org.slf4j.{Logger, LoggerFactory}
|
|
|
|
|
|
|
|
object SparkApplyHostedByMapToDatasource {
|
|
|
|
|
|
|
|
def applyHBtoDats(join: Dataset[EntityInfo], dats: Dataset[Datasource]): Dataset[Datasource] = {
|
2021-08-13 12:00:42 +02:00
|
|
|
dats.joinWith(join, dats.col("id").equalTo(join.col("hostedById")), "left")
|
2021-08-02 19:35:23 +02:00
|
|
|
.map(t2 => {
|
|
|
|
val d: Datasource = t2._1
|
|
|
|
if (t2._2 != null) {
|
|
|
|
if (d.getOpenairecompatibility.getClassid.equals(ModelConstants.UNKNOWN)) {
|
|
|
|
d.getOpenairecompatibility.setClassid("hostedBy")
|
|
|
|
d.getOpenairecompatibility.setClassname("collected from a compatible aggregator")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
d
|
|
|
|
})(Encoders.bean((classOf[Datasource])))
|
|
|
|
}
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
|
|
|
|
|
|
|
|
val logger: Logger = LoggerFactory.getLogger(getClass)
|
|
|
|
val conf: SparkConf = new SparkConf()
|
|
|
|
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/hostedbymap/hostedby_apply_params.json")))
|
|
|
|
parser.parseArgument(args)
|
|
|
|
val spark: SparkSession =
|
|
|
|
SparkSession
|
|
|
|
.builder()
|
|
|
|
.config(conf)
|
|
|
|
.appName(getClass.getSimpleName)
|
|
|
|
.master(parser.get("master")).getOrCreate()
|
|
|
|
|
|
|
|
|
|
|
|
val graphPath = parser.get("graphPath")
|
|
|
|
|
|
|
|
val outputPath = parser.get("outputPath")
|
2021-08-04 10:14:20 +02:00
|
|
|
val preparedInfoPath = parser.get("preparedInfoPath")
|
2021-08-02 19:35:23 +02:00
|
|
|
|
|
|
|
|
|
|
|
implicit val formats = DefaultFormats
|
|
|
|
|
|
|
|
|
|
|
|
implicit val mapEncoderPubs: Encoder[Datasource] = Encoders.bean(classOf[Datasource])
|
|
|
|
implicit val mapEncoderEinfo: Encoder[EntityInfo] = Encoders.bean(classOf[EntityInfo])
|
|
|
|
val mapper = new ObjectMapper()
|
|
|
|
|
2021-08-04 10:14:20 +02:00
|
|
|
val dats : Dataset[Datasource] = spark.read.textFile(graphPath + "/datasource")
|
2021-08-02 19:35:23 +02:00
|
|
|
.map(r => mapper.readValue(r, classOf[Datasource]))
|
|
|
|
|
2021-08-04 10:14:20 +02:00
|
|
|
val pinfo : Dataset[EntityInfo] = Aggregators.datasourceToSingleId( spark.read.textFile(preparedInfoPath)
|
|
|
|
.map(ei => mapper.readValue(ei, classOf[EntityInfo])))
|
2021-08-02 19:35:23 +02:00
|
|
|
|
2021-08-04 10:14:20 +02:00
|
|
|
applyHBtoDats(pinfo, dats).write.mode(SaveMode.Overwrite).option("compression","gzip").json(outputPath)
|
2021-08-02 19:35:23 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|