2020-05-13 10:38:04 +02:00
package eu.dnetlib.doiboost.mag
import eu.dnetlib.dhp.application.ArgumentApplicationParser
2020-05-28 09:57:46 +02:00
import eu.dnetlib.dhp.schema.oaf.Publication
2021-06-29 18:51:11 +02:00
import eu.dnetlib.doiboost.DoiBoostMappingUtil
2020-05-13 10:38:04 +02:00
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
2020-05-28 09:57:46 +02:00
import org.apache.spark.sql._
import org.slf4j. { Logger , LoggerFactory }
2020-05-22 15:15:09 +02:00
2020-05-19 09:24:45 +02:00
import scala.collection.JavaConverters._
2021-01-04 17:37:08 +01:00
object SparkProcessMAG {
2021-06-29 18:51:11 +02:00
def getDistinctResults ( d : Dataset [ MagPapers ] ) : Dataset [ MagPapers ] = {
d . where ( col ( "Doi" ) . isNotNull )
. groupByKey ( mp => DoiBoostMappingUtil . normalizeDoi ( mp . Doi ) ) ( Encoders . STRING )
. reduceGroups ( ( p1 : MagPapers , p2 : MagPapers ) => ConversionUtil . choiceLatestMagArtitcle ( p1 , p2 ) )
. map ( _ . _2 ) ( Encoders . product [ MagPapers ] )
. map ( mp => {
new MagPapers ( mp . PaperId , mp . Rank , DoiBoostMappingUtil . normalizeDoi ( mp . Doi ) ,
mp . DocType , mp . PaperTitle , mp . OriginalTitle ,
mp . BookTitle , mp . Year , mp . Date , mp . Publisher : String ,
mp . JournalId , mp . ConferenceSeriesId , mp . ConferenceInstanceId ,
mp . Volume , mp . Issue , mp . FirstPage , mp . LastPage ,
mp . ReferenceCount , mp . CitationCount , mp . EstimatedCitation ,
mp . OriginalVenue , mp . FamilyId , mp . CreatedDate )
} ) ( Encoders . product [ MagPapers ] )
}
2020-05-13 10:38:04 +02:00
def main ( args : Array [ String ] ) : Unit = {
val logger : Logger = LoggerFactory . getLogger ( getClass )
val conf : SparkConf = new SparkConf ( )
val parser = new ArgumentApplicationParser ( IOUtils . toString ( getClass . getResourceAsStream ( "/eu/dnetlib/dhp/doiboost/mag/preprocess_mag_params.json" ) ) )
parser . parseArgument ( args )
val spark : SparkSession =
SparkSession
. builder ( )
. config ( conf )
. appName ( getClass . getSimpleName )
. master ( parser . get ( "master" ) ) . getOrCreate ( )
2020-05-19 09:24:45 +02:00
val sourcePath = parser . get ( "sourcePath" )
2020-12-07 19:59:33 +01:00
val workingPath = parser . get ( "workingPath" )
val targetPath = parser . get ( "targetPath" )
2020-05-13 10:38:04 +02:00
import spark.implicits._
2020-05-19 09:24:45 +02:00
implicit val mapEncoderPubs : Encoder [ Publication ] = org . apache . spark . sql . Encoders . kryo [ Publication ]
2020-05-20 08:14:03 +02:00
implicit val tupleForJoinEncoder : Encoder [ ( String , Publication ) ] = Encoders . tuple ( Encoders . STRING , mapEncoderPubs )
2021-06-29 18:51:11 +02:00
logger . info ( "Phase 1) make uninue DOI in Papers:" )
2020-12-07 19:59:33 +01:00
val d : Dataset [ MagPapers ] = spark . read . load ( s" $sourcePath /Papers " ) . as [ MagPapers ]
2020-05-20 17:05:46 +02:00
// Filtering Papers with DOI, and since for the same DOI we have multiple version of item with different PapersId we get the last one
2021-06-29 18:51:11 +02:00
val distinctPaper : Dataset [ MagPapers ] = getDistinctResults ( d )
2020-12-07 19:59:33 +01:00
distinctPaper . write . mode ( SaveMode . Overwrite ) . save ( s" $workingPath /Papers_distinct " )
2020-05-28 09:57:46 +02:00
2020-06-25 10:48:15 +02:00
logger . info ( "Phase 0) Enrich Publication with description" )
2020-12-07 19:59:33 +01:00
val pa = spark . read . load ( s" $sourcePath /PaperAbstractsInvertedIndex " ) . as [ MagPaperAbstract ]
pa . map ( ConversionUtil . transformPaperAbstract ) . write . mode ( SaveMode . Overwrite ) . save ( s" $workingPath /PaperAbstract " )
2020-05-20 17:05:46 +02:00
logger . info ( "Phase 3) Group Author by PaperId" )
val authors = spark . read . load ( s" $sourcePath /Authors " ) . as [ MagAuthor ]
val affiliation = spark . read . load ( s" $sourcePath /Affiliations " ) . as [ MagAffiliation ]
val paperAuthorAffiliation = spark . read . load ( s" $sourcePath /PaperAuthorAffiliations " ) . as [ MagPaperAuthorAffiliation ]
paperAuthorAffiliation . joinWith ( authors , paperAuthorAffiliation ( "AuthorId" ) . equalTo ( authors ( "AuthorId" ) ) )
2021-03-11 11:32:32 +01:00
. map { case ( a : MagPaperAuthorAffiliation , b : MagAuthor ) => ( a . AffiliationId , MagPaperAuthorDenormalized ( a . PaperId , b , null , a . AuthorSequenceNumber ) ) }
2020-05-20 17:05:46 +02:00
. joinWith ( affiliation , affiliation ( "AffiliationId" ) . equalTo ( col ( "_1" ) ) , "left" )
. map ( s => {
val mpa = s . _1 . _2
val af = s . _2
if ( af != null ) {
2021-03-11 11:32:32 +01:00
MagPaperAuthorDenormalized ( mpa . PaperId , mpa . author , af . DisplayName , mpa . sequenceNumber )
2020-05-20 17:05:46 +02:00
} else
mpa
2021-03-17 12:12:56 +01:00
} ) . groupBy ( "PaperId" ) . agg ( collect_list ( struct ( $ "author" , $ "affiliation" , $ "sequenceNumber" ) ) . as ( "authors" ) )
2020-12-07 19:59:33 +01:00
. write . mode ( SaveMode . Overwrite ) . save ( s" $workingPath /merge_step_1_paper_authors " )
2020-05-20 17:05:46 +02:00
logger . info ( "Phase 4) create First Version of publication Entity with Paper Journal and Authors" )
val journals = spark . read . load ( s" $sourcePath /Journals " ) . as [ MagJournal ]
2020-12-07 19:59:33 +01:00
val papers = spark . read . load ( ( s" $workingPath /Papers_distinct " ) ) . as [ MagPapers ]
2020-05-20 17:05:46 +02:00
2020-12-07 19:59:33 +01:00
val paperWithAuthors = spark . read . load ( s" $workingPath /merge_step_1_paper_authors " ) . as [ MagPaperWithAuthorList ]
2020-05-20 17:05:46 +02:00
val firstJoin = papers . joinWith ( journals , papers ( "JournalId" ) . equalTo ( journals ( "JournalId" ) ) , "left" )
firstJoin . joinWith ( paperWithAuthors , firstJoin ( "_1.PaperId" ) . equalTo ( paperWithAuthors ( "PaperId" ) ) , "left" )
2020-05-28 09:57:46 +02:00
. map { a => ConversionUtil . createOAFFromJournalAuthorPaper ( a ) }
2020-12-07 19:59:33 +01:00
. write . mode ( SaveMode . Overwrite ) . save ( s" $workingPath /merge_step_2 " )
2020-05-20 17:05:46 +02:00
2020-05-28 09:57:46 +02:00
var magPubs : Dataset [ ( String , Publication ) ] =
2020-12-07 19:59:33 +01:00
spark . read . load ( s" $workingPath /merge_step_2 " ) . as [ Publication ]
2020-05-28 09:57:46 +02:00
. map ( p => ( ConversionUtil . extractMagIdentifier ( p . getOriginalId . asScala ) , p ) ) . as [ ( String , Publication ) ]
2020-05-20 17:05:46 +02:00
2020-05-22 15:15:09 +02:00
2020-05-28 09:57:46 +02:00
val conference = spark . read . load ( s" $sourcePath /ConferenceInstances " )
. select ( $ "ConferenceInstanceId" . as ( "ci" ) , $ "DisplayName" , $ "Location" , $ "StartDate" , $ "EndDate" )
val conferenceInstance = conference . joinWith ( papers , papers ( "ConferenceInstanceId" ) . equalTo ( conference ( "ci" ) ) )
. select ( $ "_1.ci" , $ "_1.DisplayName" , $ "_1.Location" , $ "_1.StartDate" , $ "_1.EndDate" , $ "_2.PaperId" ) . as [ MagConferenceInstance ]
2020-05-22 15:15:09 +02:00
magPubs . joinWith ( conferenceInstance , col ( "_1" ) . equalTo ( conferenceInstance ( "PaperId" ) ) , "left" )
2020-05-28 09:57:46 +02:00
. map ( item => ConversionUtil . updatePubsWithConferenceInfo ( item ) )
. write
. mode ( SaveMode . Overwrite )
2021-11-03 15:51:26 +01:00
. save ( s" $workingPath /merge_step_3 " )
2020-05-20 17:05:46 +02:00
2021-11-03 15:51:26 +01:00
//no more needed to add the instance to mag records
// magPubs= spark.read.load(s"$workingPath/merge_step_2_conference").as[Publication]
// .map(p => (ConversionUtil.extractMagIdentifier(p.getOriginalId.asScala), p)).as[(String, Publication)]
//
// val paperUrlDataset = spark.read.load(s"$sourcePath/PaperUrls").as[MagPaperUrl].groupBy("PaperId").agg(collect_list(struct("sourceUrl")).as("instances")).as[MagUrl]
//
//
//
// logger.info("Phase 5) enrich publication with URL and Instances")
// magPubs.joinWith(paperUrlDataset, col("_1").equalTo(paperUrlDataset("PaperId")), "left")
// .map { a: ((String, Publication), MagUrl) => ConversionUtil.addInstances((a._1._2, a._2)) }
// .write.mode(SaveMode.Overwrite)
// .save(s"$workingPath/merge_step_3")
2020-05-20 17:05:46 +02:00
2020-05-29 09:32:04 +02:00
// logger.info("Phase 6) Enrich Publication with description")
// val pa = spark.read.load(s"${parser.get("sourcePath")}/PaperAbstractsInvertedIndex").as[MagPaperAbstract]
// pa.map(ConversionUtil.transformPaperAbstract).write.mode(SaveMode.Overwrite).save(s"${parser.get("targetPath")}/PaperAbstract")
2020-05-20 17:05:46 +02:00
2020-12-07 19:59:33 +01:00
val paperAbstract = spark . read . load ( ( s" $workingPath /PaperAbstract " ) ) . as [ MagPaperAbstract ]
2020-05-20 17:05:46 +02:00
2020-12-07 19:59:33 +01:00
magPubs = spark . read . load ( s" $workingPath /merge_step_3 " ) . as [ Publication ]
2020-05-28 09:57:46 +02:00
. map ( p => ( ConversionUtil . extractMagIdentifier ( p . getOriginalId . asScala ) , p ) ) . as [ ( String , Publication ) ]
2020-05-20 17:05:46 +02:00
2020-05-28 09:57:46 +02:00
magPubs . joinWith ( paperAbstract , col ( "_1" ) . equalTo ( paperAbstract ( "PaperId" ) ) , "left" )
. map ( item => ConversionUtil . updatePubsWithDescription ( item )
2020-12-07 19:59:33 +01:00
) . write . mode ( SaveMode . Overwrite ) . save ( s" $workingPath /merge_step_4 " )
2020-05-20 17:05:46 +02:00
2020-05-20 08:14:03 +02:00
logger . info ( "Phase 7) Enrich Publication with FieldOfStudy" )
2020-12-07 19:59:33 +01:00
magPubs = spark . read . load ( s" $workingPath /merge_step_4 " ) . as [ Publication ]
2020-05-28 09:57:46 +02:00
. map ( p => ( ConversionUtil . extractMagIdentifier ( p . getOriginalId . asScala ) , p ) ) . as [ ( String , Publication ) ]
2020-05-20 08:14:03 +02:00
val fos = spark . read . load ( s" $sourcePath /FieldsOfStudy " ) . select ( $ "FieldOfStudyId" . alias ( "fos" ) , $ "DisplayName" , $ "MainType" )
val pfos = spark . read . load ( s" $sourcePath /PaperFieldsOfStudy " )
val paperField = pfos . joinWith ( fos , fos ( "fos" ) . equalTo ( pfos ( "FieldOfStudyId" ) ) )
. select ( $ "_1.FieldOfStudyId" , $ "_2.DisplayName" , $ "_2.MainType" , $ "_1.PaperId" , $ "_1.Score" )
. groupBy ( $ "PaperId" ) . agg ( collect_list ( struct ( $ "FieldOfStudyId" , $ "DisplayName" , $ "MainType" , $ "Score" ) ) . as ( "subjects" ) )
. as [ MagFieldOfStudy ]
2020-05-28 09:57:46 +02:00
magPubs . joinWith ( paperField , col ( "_1" )
. equalTo ( paperField ( "PaperId" ) ) , "left" )
. map ( item => ConversionUtil . updatePubsWithSubject ( item ) )
. write . mode ( SaveMode . Overwrite )
2020-12-07 19:59:33 +01:00
. save ( s" $workingPath /mag_publication " )
2020-05-28 09:57:46 +02:00
2021-11-04 16:16:40 +01:00
spark . read . load ( s" $workingPath /mag_publication " ) . as [ Publication ]
. filter ( p => p . getId == null )
. groupByKey ( p => p . getId )
. reduceGroups ( ( a : Publication , b : Publication ) => ConversionUtil . mergePublication ( a , b ) )
. map ( _ . _2 )
. write . mode ( SaveMode . Overwrite ) . save ( s" $targetPath /magPublication " )
// val s:RDD[Publication] = spark.read.load(s"$workingPath/mag_publication").as[Publication]
// .map(p=>Tuple2(p.getId, p)).rdd.reduceByKey((a:Publication, b:Publication) => ConversionUtil.mergePublication(a,b))
// .map(_._2)
//
// spark.createDataset(s).as[Publication].write.mode(SaveMode.Overwrite).save(s"$targetPath/magPublication")
2020-05-28 09:57:46 +02:00
2020-05-13 10:38:04 +02:00
}
}