
223 lines
6.9 KiB
Raw Normal View History

2021-07-29 13:04:39 +02:00
package eu.dnetlib.dhp.oa.graph.hostedbymap
2021-07-27 12:27:26 +02:00
import eu.dnetlib.dhp.application.ArgumentApplicationParser
2021-07-29 13:04:39 +02:00
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DOAJModel, UnibiGoldModel}
import eu.dnetlib.dhp.schema.oaf.Datasource
2021-07-27 12:27:26 +02:00
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SaveMode, SparkSession}
import org.json4s.DefaultFormats
import org.slf4j.{Logger, LoggerFactory}
2021-07-28 10:24:13 +02:00
import com.fasterxml.jackson.databind.ObjectMapper
2021-07-29 13:04:39 +02:00
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import java.io.PrintWriter
import org.apache.hadoop.io.compress.GzipCodec
2021-07-27 12:27:26 +02:00
2021-07-29 13:04:39 +02:00
object SparkProduceHostedByMap {
2021-07-27 12:27:26 +02:00
implicit val tupleForJoinEncoder: Encoder[(String, HostedByItemType)] = Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType])
2021-07-28 10:24:13 +02:00
2021-07-27 12:27:26 +02:00
def toHostedByItemType(input: ((HostedByInfo, HostedByInfo), HostedByInfo)) : HostedByItemType = {
val openaire: HostedByInfo = input._1._1
val doaj: HostedByInfo = input._1._2
val gold: HostedByInfo = input._2
val isOpenAccess: Boolean = doaj == null && gold == null
openaire.journal_id match {
2021-07-28 10:24:13 +02:00
case Constants.ISSN => HostedByItemType(openaire.id, openaire.officialname, openaire.journal_id, "", "", isOpenAccess)
case Constants.EISSN => HostedByItemType(openaire.id, openaire.officialname, "", openaire.journal_id, "", isOpenAccess)
case Constants.ISSNL => HostedByItemType(openaire.id, openaire.officialname, "", "", openaire.journal_id, isOpenAccess)
2021-07-27 12:27:26 +02:00
// catch the default with a variable so you can print it
2021-07-28 10:24:13 +02:00
case whoa => null
2021-07-27 12:27:26 +02:00
2021-07-29 13:04:39 +02:00
def toHostedByMap(input: (String, HostedByItemType)): String = {
import org.json4s.jackson.Serialization
implicit val formats = org.json4s.DefaultFormats
val map: Map [String, HostedByItemType] = Map (input._1 -> input._2 )
2021-07-28 10:24:13 +02:00
def getHostedByItemType(id:String, officialname: String, issn:String, eissn:String, issnl:String, oa:Boolean): HostedByItemType = {
if(issn != null){
if(eissn != null){
if(issnl != null){
HostedByItemType(id, officialname, issn, eissn, issnl , oa)
HostedByItemType(id, officialname, issn, eissn, "" , oa)
if(issnl != null){
HostedByItemType(id, officialname, issn, "", issnl , oa)
HostedByItemType(id, officialname, issn, "", "" , oa)
if(eissn != null){
if(issnl != null){
HostedByItemType(id, officialname, "", eissn, issnl , oa)
HostedByItemType(id, officialname, "", eissn, "" , oa)
if(issnl != null){
HostedByItemType(id, officialname, "", "", issnl , oa)
HostedByItemType("", "", "", "", "" , oa)
def oaToHostedbyItemType(dats: Datasource): HostedByItemType = {
if (dats.getJournal != null) {
return getHostedByItemType(dats.getId, dats.getOfficialname.getValue, dats.getJournal.getIssnPrinted, dats.getJournal.getIssnOnline, dats.getJournal.getIssnLinking, false)
def oaHostedByDataset(spark:SparkSession, datasourcePath : String) : Dataset[HostedByItemType] = {
import spark.implicits._
val mapper = new ObjectMapper()
implicit var encoderD = Encoders.kryo[Datasource]
val dd : Dataset[Datasource] = spark.read.textFile(datasourcePath)
.map(r => mapper.readValue(r, classOf[Datasource]))
dd.map{ddt => oaToHostedbyItemType(ddt)}.filter(hb => !(hb.id.equals("")))
def goldToHostedbyItemType(gold: UnibiGoldModel): HostedByItemType = {
2021-08-13 12:00:42 +02:00
return getHostedByItemType(Constants.UNIBI, gold.getTitle, gold.getIssn, "", gold.getIssnL, true)
2021-07-28 10:24:13 +02:00
def goldHostedByDataset(spark:SparkSession, datasourcePath:String) : Dataset[HostedByItemType] = {
import spark.implicits._
implicit val mapEncoderUnibi: Encoder[UnibiGoldModel] = Encoders.kryo[UnibiGoldModel]
2021-07-27 12:27:26 +02:00
2021-07-28 10:24:13 +02:00
val mapper = new ObjectMapper()
val dd : Dataset[UnibiGoldModel] = spark.read.textFile(datasourcePath)
.map(r => mapper.readValue(r, classOf[UnibiGoldModel]))
dd.map{ddt => goldToHostedbyItemType(ddt)}.filter(hb => !(hb.id.equals("")))
def doajToHostedbyItemType(doaj: DOAJModel): HostedByItemType = {
return getHostedByItemType(Constants.DOAJ, doaj.getJournalTitle, doaj.getIssn, doaj.getEissn, "", true)
def doajHostedByDataset(spark:SparkSession, datasourcePath:String) : Dataset[HostedByItemType] = {
import spark.implicits._
implicit val mapEncoderDOAJ: Encoder[DOAJModel] = Encoders.kryo[DOAJModel]
val mapper = new ObjectMapper()
val dd : Dataset[DOAJModel] = spark.read.textFile(datasourcePath)
.map(r => mapper.readValue(r, classOf[DOAJModel]))
dd.map{ddt => doajToHostedbyItemType(ddt)}.filter(hb => !(hb.id.equals("")))
def toList(input: HostedByItemType): List[(String, HostedByItemType)] = {
var lst : List[(String, HostedByItemType)] = List()
2021-07-27 12:27:26 +02:00
2021-07-28 10:24:13 +02:00
lst = (input.issn, input) :: lst
2021-07-27 12:27:26 +02:00
2021-07-28 10:24:13 +02:00
lst = (input.eissn, input) :: lst
2021-07-27 12:27:26 +02:00
2021-07-28 10:24:13 +02:00
lst = (input.lissn, input) :: lst
2021-07-27 12:27:26 +02:00
2021-07-28 10:24:13 +02:00
2021-07-27 12:27:26 +02:00
2021-07-29 13:04:39 +02:00
def writeToHDFS(input: Array[String], outputPath: String, hdfsNameNode : String):Unit = {
val conf = new Configuration()
conf.set("fs.defaultFS", hdfsNameNode)
val fs= FileSystem.get(conf)
val output = fs.create(new Path(outputPath))
val writer = new PrintWriter(output)
try {
input.foreach(hbi => writer.println(hbi))
finally {
2021-07-27 12:27:26 +02:00
def main(args: Array[String]): Unit = {
val logger: Logger = LoggerFactory.getLogger(getClass)
val conf: SparkConf = new SparkConf()
2021-07-29 13:04:39 +02:00
val parser = new ArgumentApplicationParser(IOUtils.toString(getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/hostedbymap/hostedby_params.json")))
2021-07-27 12:27:26 +02:00
val spark: SparkSession =
val datasourcePath = parser.get("datasourcePath")
val workingDirPath = parser.get("workingPath")
2021-07-29 13:04:39 +02:00
val outputPath = parser.get("outputPath")
2021-07-27 12:27:26 +02:00
2021-07-28 10:24:13 +02:00
implicit val formats = DefaultFormats
2021-07-27 12:27:26 +02:00
logger.info("Getting the Datasources")
2021-07-29 13:04:39 +02:00
Aggregators.explodeHostedByItemType(oaHostedByDataset(spark, datasourcePath)
2021-08-13 12:23:15 +02:00
.union(goldHostedByDataset(spark, workingDirPath + "/unibi_gold.json"))
.union(doajHostedByDataset(spark, workingDirPath + "/doaj.json"))
2021-07-29 13:04:39 +02:00
.flatMap(hbi => toList(hbi))).filter(hbi => hbi._2.id.startsWith("10|"))
.map(hbi => toHostedByMap(hbi))(Encoders.STRING)
2021-08-04 10:16:30 +02:00
.rdd.saveAsTextFile(outputPath , classOf[GzipCodec])
2021-07-27 12:27:26 +02:00