2021-07-29 13:04:39 +02:00
|
|
|
package eu.dnetlib.dhp.oa.graph.hostedbymap
|
2021-07-27 12:27:26 +02:00
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper
|
2021-07-27 12:27:26 +02:00
|
|
|
import eu.dnetlib.dhp.application.ArgumentApplicationParser
|
2022-03-04 11:06:09 +01:00
|
|
|
import eu.dnetlib.dhp.common.HdfsSupport
|
2021-07-29 13:04:39 +02:00
|
|
|
import eu.dnetlib.dhp.oa.graph.hostedbymap.model.{DOAJModel, UnibiGoldModel}
|
|
|
|
import eu.dnetlib.dhp.schema.oaf.Datasource
|
2022-03-04 11:06:09 +01:00
|
|
|
import org.apache.commons.io.{FileUtils, IOUtils}
|
2021-12-06 13:57:41 +01:00
|
|
|
import org.apache.hadoop.conf.Configuration
|
|
|
|
import org.apache.hadoop.fs.{FileSystem, Path}
|
|
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
2021-07-27 12:27:26 +02:00
|
|
|
import org.apache.spark.SparkConf
|
2021-12-06 13:57:41 +01:00
|
|
|
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
|
2021-07-27 12:27:26 +02:00
|
|
|
import org.json4s.DefaultFormats
|
|
|
|
import org.slf4j.{Logger, LoggerFactory}
|
|
|
|
|
2022-03-04 11:06:09 +01:00
|
|
|
import java.io.{File, PrintWriter}
|
|
|
|
import scala.collection.JavaConverters._
|
2021-07-29 13:04:39 +02:00
|
|
|
|
|
|
|
object SparkProduceHostedByMap {
|
2021-07-27 12:27:26 +02:00
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
implicit val tupleForJoinEncoder: Encoder[(String, HostedByItemType)] =
|
|
|
|
Encoders.tuple(Encoders.STRING, Encoders.product[HostedByItemType])
|
2021-07-28 10:24:13 +02:00
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
def toHostedByItemType(input: ((HostedByInfo, HostedByInfo), HostedByInfo)): HostedByItemType = {
|
2021-07-27 12:27:26 +02:00
|
|
|
val openaire: HostedByInfo = input._1._1
|
|
|
|
val doaj: HostedByInfo = input._1._2
|
|
|
|
val gold: HostedByInfo = input._2
|
|
|
|
val isOpenAccess: Boolean = doaj == null && gold == null
|
|
|
|
|
|
|
|
openaire.journal_id match {
|
2022-01-11 16:57:48 +01:00
|
|
|
case Constants.ISSN =>
|
|
|
|
HostedByItemType(
|
|
|
|
openaire.id,
|
|
|
|
openaire.officialname,
|
|
|
|
openaire.journal_id,
|
|
|
|
"",
|
|
|
|
"",
|
2022-03-04 15:18:21 +01:00
|
|
|
isOpenAccess
|
2022-01-11 16:57:48 +01:00
|
|
|
)
|
|
|
|
case Constants.EISSN =>
|
|
|
|
HostedByItemType(
|
|
|
|
openaire.id,
|
|
|
|
openaire.officialname,
|
|
|
|
"",
|
|
|
|
openaire.journal_id,
|
|
|
|
"",
|
2022-03-04 15:18:21 +01:00
|
|
|
isOpenAccess
|
2022-01-11 16:57:48 +01:00
|
|
|
)
|
|
|
|
case Constants.ISSNL =>
|
|
|
|
HostedByItemType(
|
|
|
|
openaire.id,
|
|
|
|
openaire.officialname,
|
|
|
|
"",
|
|
|
|
"",
|
|
|
|
openaire.journal_id,
|
2022-03-04 15:18:21 +01:00
|
|
|
isOpenAccess
|
2022-01-11 16:57:48 +01:00
|
|
|
)
|
2021-07-27 12:27:26 +02:00
|
|
|
|
|
|
|
// catch the default with a variable so you can print it
|
2021-12-06 13:57:41 +01:00
|
|
|
case whoa => null
|
2021-07-27 12:27:26 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-29 13:04:39 +02:00
|
|
|
def toHostedByMap(input: (String, HostedByItemType)): String = {
|
|
|
|
import org.json4s.jackson.Serialization
|
|
|
|
|
|
|
|
implicit val formats = org.json4s.DefaultFormats
|
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
val map: Map[String, HostedByItemType] = Map(input._1 -> input._2)
|
2021-07-29 13:04:39 +02:00
|
|
|
|
|
|
|
Serialization.write(map)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
def getHostedByItemType(
|
|
|
|
id: String,
|
|
|
|
officialname: String,
|
|
|
|
issn: String,
|
|
|
|
eissn: String,
|
|
|
|
issnl: String,
|
2022-03-04 15:18:21 +01:00
|
|
|
oa: Boolean
|
2022-01-11 16:57:48 +01:00
|
|
|
): HostedByItemType = {
|
2021-12-06 13:57:41 +01:00
|
|
|
if (issn != null) {
|
|
|
|
if (eissn != null) {
|
|
|
|
if (issnl != null) {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType(id, officialname, issn, eissn, issnl, oa)
|
2021-12-06 13:57:41 +01:00
|
|
|
} else {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType(id, officialname, issn, eissn, "", oa)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
2021-12-06 13:57:41 +01:00
|
|
|
} else {
|
|
|
|
if (issnl != null) {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType(id, officialname, issn, "", issnl, oa)
|
2021-12-06 13:57:41 +01:00
|
|
|
} else {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType(id, officialname, issn, "", "", oa)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
|
|
|
}
|
2021-12-06 13:57:41 +01:00
|
|
|
} else {
|
|
|
|
if (eissn != null) {
|
|
|
|
if (issnl != null) {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType(id, officialname, "", eissn, issnl, oa)
|
2021-12-06 13:57:41 +01:00
|
|
|
} else {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType(id, officialname, "", eissn, "", oa)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
2021-12-06 13:57:41 +01:00
|
|
|
} else {
|
|
|
|
if (issnl != null) {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType(id, officialname, "", "", issnl, oa)
|
2021-12-06 13:57:41 +01:00
|
|
|
} else {
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType("", "", "", "", "", oa)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
def oaToHostedbyItemType(dats: Datasource): HostedByItemType = {
|
|
|
|
if (dats.getJournal != null) {
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
return getHostedByItemType(
|
|
|
|
dats.getId,
|
|
|
|
dats.getOfficialname.getValue,
|
|
|
|
dats.getJournal.getIssnPrinted,
|
|
|
|
dats.getJournal.getIssnOnline,
|
|
|
|
dats.getJournal.getIssnLinking,
|
2022-03-04 15:18:21 +01:00
|
|
|
false
|
2022-01-11 16:57:48 +01:00
|
|
|
)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
2022-03-04 15:18:21 +01:00
|
|
|
HostedByItemType("", "", "", "", "", false)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
def oaHostedByDataset(spark: SparkSession, datasourcePath: String): Dataset[HostedByItemType] = {
|
2021-07-28 10:24:13 +02:00
|
|
|
|
|
|
|
import spark.implicits._
|
|
|
|
|
|
|
|
val mapper = new ObjectMapper()
|
|
|
|
|
|
|
|
implicit var encoderD = Encoders.kryo[Datasource]
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val dd: Dataset[Datasource] = spark.read
|
|
|
|
.textFile(datasourcePath)
|
2021-07-28 10:24:13 +02:00
|
|
|
.map(r => mapper.readValue(r, classOf[Datasource]))
|
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
dd.map { ddt => oaToHostedbyItemType(ddt) }.filter(hb => !(hb.id.equals("")))
|
2021-07-28 10:24:13 +02:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
def goldToHostedbyItemType(gold: UnibiGoldModel): HostedByItemType = {
|
2022-01-11 16:57:48 +01:00
|
|
|
return getHostedByItemType(
|
|
|
|
Constants.UNIBI,
|
|
|
|
gold.getTitle,
|
|
|
|
gold.getIssn,
|
|
|
|
"",
|
|
|
|
gold.getIssnL,
|
2022-03-04 15:18:21 +01:00
|
|
|
true
|
2022-01-11 16:57:48 +01:00
|
|
|
)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
def goldHostedByDataset(
|
|
|
|
spark: SparkSession,
|
|
|
|
datasourcePath: String
|
|
|
|
): Dataset[HostedByItemType] = {
|
2021-07-28 10:24:13 +02:00
|
|
|
import spark.implicits._
|
|
|
|
|
|
|
|
implicit val mapEncoderUnibi: Encoder[UnibiGoldModel] = Encoders.kryo[UnibiGoldModel]
|
2021-07-27 12:27:26 +02:00
|
|
|
|
2021-07-28 10:24:13 +02:00
|
|
|
val mapper = new ObjectMapper()
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val dd: Dataset[UnibiGoldModel] = spark.read
|
|
|
|
.textFile(datasourcePath)
|
2021-07-28 10:24:13 +02:00
|
|
|
.map(r => mapper.readValue(r, classOf[UnibiGoldModel]))
|
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
dd.map { ddt => goldToHostedbyItemType(ddt) }.filter(hb => !(hb.id.equals("")))
|
2021-07-28 10:24:13 +02:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
def doajToHostedbyItemType(doaj: DOAJModel): HostedByItemType = {
|
2022-03-04 11:06:09 +01:00
|
|
|
if (doaj.getOaStart == null) {
|
|
|
|
return getHostedByItemType(
|
|
|
|
Constants.DOAJ,
|
|
|
|
doaj.getJournalTitle,
|
|
|
|
doaj.getIssn,
|
|
|
|
doaj.getEissn,
|
|
|
|
"",
|
2022-03-04 15:18:21 +01:00
|
|
|
true
|
2022-03-04 11:06:09 +01:00
|
|
|
)
|
|
|
|
}
|
2022-01-11 16:57:48 +01:00
|
|
|
return getHostedByItemType(
|
|
|
|
Constants.DOAJ,
|
|
|
|
doaj.getJournalTitle,
|
|
|
|
doaj.getIssn,
|
|
|
|
doaj.getEissn,
|
|
|
|
"",
|
2022-03-04 15:18:21 +01:00
|
|
|
true
|
2022-01-11 16:57:48 +01:00
|
|
|
)
|
2021-07-28 10:24:13 +02:00
|
|
|
}
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
def doajHostedByDataset(
|
|
|
|
spark: SparkSession,
|
|
|
|
datasourcePath: String
|
|
|
|
): Dataset[HostedByItemType] = {
|
2021-07-28 10:24:13 +02:00
|
|
|
import spark.implicits._
|
|
|
|
|
|
|
|
implicit val mapEncoderDOAJ: Encoder[DOAJModel] = Encoders.kryo[DOAJModel]
|
|
|
|
|
|
|
|
val mapper = new ObjectMapper()
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
val dd: Dataset[DOAJModel] = spark.read
|
|
|
|
.textFile(datasourcePath)
|
2021-07-28 10:24:13 +02:00
|
|
|
.map(r => mapper.readValue(r, classOf[DOAJModel]))
|
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
dd.map { ddt => doajToHostedbyItemType(ddt) }.filter(hb => !(hb.id.equals("")))
|
2021-07-28 10:24:13 +02:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
def toList(input: HostedByItemType): List[(String, HostedByItemType)] = {
|
2021-12-06 13:57:41 +01:00
|
|
|
var lst: List[(String, HostedByItemType)] = List()
|
|
|
|
if (!input.issn.equals("")) {
|
2021-07-28 10:24:13 +02:00
|
|
|
lst = (input.issn, input) :: lst
|
2021-07-27 12:27:26 +02:00
|
|
|
}
|
2021-12-06 13:57:41 +01:00
|
|
|
if (!input.eissn.equals("")) {
|
2021-07-28 10:24:13 +02:00
|
|
|
lst = (input.eissn, input) :: lst
|
2021-07-27 12:27:26 +02:00
|
|
|
}
|
2021-12-06 13:57:41 +01:00
|
|
|
if (!input.lissn.equals("")) {
|
2021-07-28 10:24:13 +02:00
|
|
|
lst = (input.lissn, input) :: lst
|
2021-07-27 12:27:26 +02:00
|
|
|
}
|
2021-07-28 10:24:13 +02:00
|
|
|
lst
|
2021-07-27 12:27:26 +02:00
|
|
|
}
|
|
|
|
|
2021-12-06 13:57:41 +01:00
|
|
|
def writeToHDFS(input: Array[String], outputPath: String, hdfsNameNode: String): Unit = {
|
2021-07-29 13:04:39 +02:00
|
|
|
val conf = new Configuration()
|
|
|
|
|
|
|
|
conf.set("fs.defaultFS", hdfsNameNode)
|
2021-12-06 13:57:41 +01:00
|
|
|
val fs = FileSystem.get(conf)
|
2021-07-29 13:04:39 +02:00
|
|
|
val output = fs.create(new Path(outputPath))
|
|
|
|
val writer = new PrintWriter(output)
|
|
|
|
try {
|
|
|
|
input.foreach(hbi => writer.println(hbi))
|
2022-01-11 16:57:48 +01:00
|
|
|
} finally {
|
2021-07-29 13:04:39 +02:00
|
|
|
writer.close()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2021-07-27 12:27:26 +02:00
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
|
|
|
|
val logger: Logger = LoggerFactory.getLogger(getClass)
|
|
|
|
val conf: SparkConf = new SparkConf()
|
2022-01-11 16:57:48 +01:00
|
|
|
val parser = new ArgumentApplicationParser(
|
|
|
|
IOUtils.toString(
|
|
|
|
getClass.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/hostedbymap/hostedby_params.json")
|
|
|
|
)
|
|
|
|
)
|
2021-07-27 12:27:26 +02:00
|
|
|
parser.parseArgument(args)
|
|
|
|
val spark: SparkSession =
|
|
|
|
SparkSession
|
|
|
|
.builder()
|
|
|
|
.config(conf)
|
|
|
|
.appName(getClass.getSimpleName)
|
2022-01-11 16:57:48 +01:00
|
|
|
.master(parser.get("master"))
|
|
|
|
.getOrCreate()
|
2021-07-27 12:27:26 +02:00
|
|
|
|
|
|
|
val datasourcePath = parser.get("datasourcePath")
|
|
|
|
val workingDirPath = parser.get("workingPath")
|
2021-07-29 13:04:39 +02:00
|
|
|
val outputPath = parser.get("outputPath")
|
2021-07-27 12:27:26 +02:00
|
|
|
|
2021-07-28 10:24:13 +02:00
|
|
|
implicit val formats = DefaultFormats
|
|
|
|
|
2021-07-27 12:27:26 +02:00
|
|
|
logger.info("Getting the Datasources")
|
|
|
|
|
2022-03-04 11:06:09 +01:00
|
|
|
HdfsSupport.remove(outputPath, spark.sparkContext.hadoopConfiguration)
|
|
|
|
|
2022-01-11 16:57:48 +01:00
|
|
|
Aggregators
|
|
|
|
.explodeHostedByItemType(
|
|
|
|
oaHostedByDataset(spark, datasourcePath)
|
|
|
|
.union(goldHostedByDataset(spark, workingDirPath + "/unibi_gold.json"))
|
|
|
|
.union(doajHostedByDataset(spark, workingDirPath + "/doaj.json"))
|
|
|
|
.flatMap(hbi => toList(hbi))
|
|
|
|
)
|
|
|
|
.filter(hbi => hbi._2.id.startsWith("10|"))
|
2021-07-29 13:04:39 +02:00
|
|
|
.map(hbi => toHostedByMap(hbi))(Encoders.STRING)
|
2022-01-11 16:57:48 +01:00
|
|
|
.rdd
|
|
|
|
.saveAsTextFile(outputPath, classOf[GzipCodec])
|
2021-07-27 12:27:26 +02:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|