forked from D-Net/dnet-hadoop
Updated mapping
This commit is contained in:
parent
5142f462b5
commit
ef582948a7
|
@ -1,20 +1,10 @@
|
||||||
package eu.dnetlib.dhp.collection.mag
|
package eu.dnetlib.dhp.collection.mag
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.schema.common.ModelConstants
|
import eu.dnetlib.dhp.schema.common.ModelConstants
|
||||||
import eu.dnetlib.dhp.schema.oaf
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.{
|
|
||||||
Dataset => OafDataset,
|
|
||||||
Author,
|
|
||||||
DataInfo,
|
|
||||||
Instance,
|
|
||||||
Journal,
|
|
||||||
Publication,
|
|
||||||
Qualifier,
|
|
||||||
Result
|
|
||||||
}
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.{IdentifierFactory, OafMapperUtils, PidType}
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._
|
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils._
|
||||||
import eu.dnetlib.dhp.utils
|
import eu.dnetlib.dhp.schema.oaf.utils.{OafMapperUtils, PidType}
|
||||||
|
import eu.dnetlib.dhp.schema.oaf.{Author, DataInfo, Instance, Journal, Publication, Result, Dataset => OafDataset}
|
||||||
import eu.dnetlib.dhp.utils.DHPUtils
|
import eu.dnetlib.dhp.utils.DHPUtils
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.sql.{Dataset, Row, SparkSession}
|
import org.apache.spark.sql.{Dataset, Row, SparkSession}
|
||||||
|
@ -73,6 +63,10 @@ case class MAGAuthor(
|
||||||
|
|
||||||
object MagUtility extends Serializable {
|
object MagUtility extends Serializable {
|
||||||
|
|
||||||
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
val MAGCollectedFrom =keyValue(ModelConstants.MAG_ID,ModelConstants.MAG_NAME)
|
||||||
|
|
||||||
val datatypedict = Map(
|
val datatypedict = Map(
|
||||||
"bool" -> BooleanType,
|
"bool" -> BooleanType,
|
||||||
"int" -> IntegerType,
|
"int" -> IntegerType,
|
||||||
|
@ -357,7 +351,7 @@ object MagUtility extends Serializable {
|
||||||
ModelConstants.DNET_PROVENANCE_ACTIONS
|
ModelConstants.DNET_PROVENANCE_ACTIONS
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if (magType == null) {
|
if (magType == null || magType.orNull ==null) {
|
||||||
result = new Publication
|
result = new Publication
|
||||||
result.setDataInfo(di)
|
result.setDataInfo(di)
|
||||||
val i = new Instance
|
val i = new Instance
|
||||||
|
@ -392,7 +386,7 @@ object MagUtility extends Serializable {
|
||||||
result = new Publication
|
result = new Publication
|
||||||
qualifier("0043", "Journal", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE)
|
qualifier("0043", "Journal", ModelConstants.DNET_PUBLICATION_RESOURCE, ModelConstants.DNET_PUBLICATION_RESOURCE)
|
||||||
case "patent" =>
|
case "patent" =>
|
||||||
if (source != null) {
|
if (source!= null && source.orNull != null) {
|
||||||
val s = source.get.toLowerCase
|
val s = source.get.toLowerCase
|
||||||
if (s.contains("patent") || s.contains("brevet")) {
|
if (s.contains("patent") || s.contains("brevet")) {
|
||||||
result = new Publication
|
result = new Publication
|
||||||
|
@ -469,20 +463,21 @@ object MagUtility extends Serializable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def convertMAGtoOAF(paper: MAGPaper): Result = {
|
def convertMAGtoOAF(paper: MAGPaper): String = {
|
||||||
|
|
||||||
// FILTER all the MAG paper with no URL
|
// FILTER all the MAG paper with no URL
|
||||||
if (paper.urls == null || paper.urls.get != null || paper.urls.get.isEmpty)
|
if (paper.urls.orNull == null )
|
||||||
return null
|
return null
|
||||||
|
|
||||||
val result = createResultFromType(paper.docType, paper.originalVenue)
|
val result = createResultFromType(paper.docType, paper.originalVenue)
|
||||||
|
|
||||||
if (result == null)
|
if (result == null)
|
||||||
return null
|
return null
|
||||||
|
|
||||||
|
|
||||||
|
result.setCollectedfrom(List(MAGCollectedFrom).asJava)
|
||||||
val pidList = List(
|
val pidList = List(
|
||||||
structuredProperty(
|
structuredProperty(
|
||||||
paper.doi.get,
|
paper.paperId.get.toString,
|
||||||
qualifier(
|
qualifier(
|
||||||
PidType.mag_id.toString,
|
PidType.mag_id.toString,
|
||||||
PidType.mag_id.toString,
|
PidType.mag_id.toString,
|
||||||
|
@ -587,6 +582,7 @@ object MagUtility extends Serializable {
|
||||||
|
|
||||||
val instance = result.getInstance().get(0)
|
val instance = result.getInstance().get(0)
|
||||||
instance.setPid(pidList.asJava)
|
instance.setPid(pidList.asJava)
|
||||||
|
if(paper.doi.orNull != null)
|
||||||
instance.setAlternateIdentifier(
|
instance.setAlternateIdentifier(
|
||||||
List(
|
List(
|
||||||
structuredProperty(
|
structuredProperty(
|
||||||
|
@ -603,6 +599,7 @@ object MagUtility extends Serializable {
|
||||||
)
|
)
|
||||||
instance.setUrl(paper.urls.get.asJava)
|
instance.setUrl(paper.urls.get.asJava)
|
||||||
instance.setHostedby(ModelConstants.UNKNOWN_REPOSITORY)
|
instance.setHostedby(ModelConstants.UNKNOWN_REPOSITORY)
|
||||||
|
instance.setCollectedfrom(MAGCollectedFrom)
|
||||||
instance.setAccessright(accessRight(
|
instance.setAccessright(accessRight(
|
||||||
ModelConstants.UNKNOWN,
|
ModelConstants.UNKNOWN,
|
||||||
ModelConstants.NOT_AVAILABLE,
|
ModelConstants.NOT_AVAILABLE,
|
||||||
|
@ -648,7 +645,7 @@ object MagUtility extends Serializable {
|
||||||
}
|
}
|
||||||
.asJava
|
.asJava
|
||||||
)
|
)
|
||||||
result
|
mapper.writeValueAsString(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
def convertInvertedIndexString(json_input: String): String = {
|
def convertInvertedIndexString(json_input: String): String = {
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package eu.dnetlib.dhp.collection.mag
|
package eu.dnetlib.dhp.collection.mag
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||||
import eu.dnetlib.dhp.schema.oaf.Result
|
import eu.dnetlib.dhp.schema.oaf.{Publication, Result}
|
||||||
import org.apache.spark.sql.{Encoders, SaveMode, SparkSession}
|
import org.apache.spark.sql.{Encoder, Encoders, SaveMode, SparkSession}
|
||||||
import org.slf4j.{Logger, LoggerFactory}
|
import org.slf4j.{Logger, LoggerFactory}
|
||||||
|
|
||||||
class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger)
|
class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger)
|
||||||
|
@ -21,19 +21,17 @@ class SparkMAGtoOAF(propertyPath: String, args: Array[String], log: Logger)
|
||||||
|
|
||||||
def convertMAG(spark: SparkSession, workingPath: String, mdStorePath: String): Unit = {
|
def convertMAG(spark: SparkSession, workingPath: String, mdStorePath: String): Unit = {
|
||||||
import spark.implicits._
|
import spark.implicits._
|
||||||
implicit val resultEncoder = Encoders.bean(classOf[Result])
|
|
||||||
|
|
||||||
spark.read
|
|
||||||
.load(s"$workingPath/mag")
|
|
||||||
.as[MAGPaper].show()
|
|
||||||
|
|
||||||
spark.read
|
spark.read
|
||||||
.load(s"$workingPath/mag")
|
.load(s"$workingPath/mag")
|
||||||
.as[MAGPaper]
|
.as[MAGPaper]
|
||||||
.map(s => MagUtility.convertMAGtoOAF(s))
|
.map(s => MagUtility.convertMAGtoOAF(s))
|
||||||
.write
|
.write
|
||||||
|
.option("compression", "gzip")
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.json(mdStorePath)
|
.text(mdStorePath)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package eu.dnetlib.dhp.collection.mag
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper
|
import com.fasterxml.jackson.databind.ObjectMapper
|
||||||
import eu.dnetlib.dhp.schema.oaf.{Dataset, Publication, Result}
|
import eu.dnetlib.dhp.schema.oaf.{Dataset, Publication, Result}
|
||||||
import org.apache.spark.sql.SparkSession
|
import org.apache.spark.sql.SparkSession
|
||||||
import org.apache.spark.sql.functions.col
|
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import org.junit.jupiter.api.Assertions._
|
import org.junit.jupiter.api.Assertions._
|
||||||
|
|
||||||
|
@ -14,18 +13,18 @@ class MAGMappingTest {
|
||||||
val mapper = new ObjectMapper()
|
val mapper = new ObjectMapper()
|
||||||
|
|
||||||
|
|
||||||
@Test
|
// @Test
|
||||||
def mappingTest(): Unit = {
|
// def mappingTest(): Unit = {
|
||||||
|
//
|
||||||
val spark = SparkSession
|
// val spark = SparkSession
|
||||||
.builder()
|
// .builder()
|
||||||
.appName("Test")
|
// .appName("Test")
|
||||||
.master("local[*]")
|
// .master("local[*]")
|
||||||
.getOrCreate()
|
// .getOrCreate()
|
||||||
|
//
|
||||||
new SparkMAGtoOAF(null,null,null).convertMAG(spark,"/Users/sandro/Downloads", "/Users/sandro/Downloads/mag_oaf")
|
// new SparkMAGtoOAF(null,null,null).convertMAG(spark,"/home/sandro/Downloads", "/home/sandro/Downloads/mag_oaf")
|
||||||
|
//
|
||||||
}
|
// }
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue