forked from D-Net/dnet-hadoop
added first part of refactoring of the code generating MAG,
make it more readable using spark sql queries
This commit is contained in:
parent
9d94648f3b
commit
ad0e9aa80c
|
@ -0,0 +1,302 @@
|
|||
package eu.dnetlib.dhp.collection.mag
|
||||
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.{Dataset, Row, SparkSession}
|
||||
import org.json4s
|
||||
import org.json4s.DefaultFormats
|
||||
import org.json4s.jackson.JsonMethods.parse
|
||||
|
||||
|
||||
object MagUtility extends Serializable {
|
||||
|
||||
|
||||
val datatypedict = Map(
|
||||
"bool" -> BooleanType,
|
||||
"int" -> IntegerType,
|
||||
"uint" -> IntegerType,
|
||||
"long" -> LongType,
|
||||
"ulong" -> LongType,
|
||||
"float" -> FloatType,
|
||||
"string" -> StringType,
|
||||
"DateTime" -> DateType
|
||||
)
|
||||
|
||||
val stream = Map(
|
||||
"Affiliations" -> Tuple2(
|
||||
"mag/Affiliations.txt",
|
||||
Seq(
|
||||
"AffiliationId:long",
|
||||
"Rank:uint",
|
||||
"NormalizedName:string",
|
||||
"DisplayName:string",
|
||||
"GridId:string",
|
||||
"OfficialPage:string",
|
||||
"WikiPage:string",
|
||||
"PaperCount:long",
|
||||
"PaperFamilyCount:long",
|
||||
"CitationCount:long",
|
||||
"Iso3166Code:string",
|
||||
"Latitude:float?",
|
||||
"Longitude:float?",
|
||||
"CreatedDate:DateTime"
|
||||
)
|
||||
),
|
||||
"AuthorExtendedAttributes" -> Tuple2(
|
||||
"mag/AuthorExtendedAttributes.txt",
|
||||
Seq("AuthorId:long", "AttributeType:int", "AttributeValue:string")
|
||||
),
|
||||
"Authors" -> Tuple2(
|
||||
"mag/Authors.txt",
|
||||
Seq(
|
||||
"AuthorId:long",
|
||||
"Rank:uint",
|
||||
"NormalizedName:string",
|
||||
"DisplayName:string",
|
||||
"LastKnownAffiliationId:long?",
|
||||
"PaperCount:long",
|
||||
"PaperFamilyCount:long",
|
||||
"CitationCount:long",
|
||||
"CreatedDate:DateTime"
|
||||
)
|
||||
),
|
||||
"ConferenceInstances" -> Tuple2(
|
||||
"mag/ConferenceInstances.txt",
|
||||
Seq(
|
||||
"ConferenceInstanceId:long",
|
||||
"NormalizedName:string",
|
||||
"DisplayName:string",
|
||||
"ConferenceSeriesId:long",
|
||||
"Location:string",
|
||||
"OfficialUrl:string",
|
||||
"StartDate:DateTime?",
|
||||
"EndDate:DateTime?",
|
||||
"AbstractRegistrationDate:DateTime?",
|
||||
"SubmissionDeadlineDate:DateTime?",
|
||||
"NotificationDueDate:DateTime?",
|
||||
"FinalVersionDueDate:DateTime?",
|
||||
"PaperCount:long",
|
||||
"PaperFamilyCount:long",
|
||||
"CitationCount:long",
|
||||
"Latitude:float?",
|
||||
"Longitude:float?",
|
||||
"CreatedDate:DateTime"
|
||||
)
|
||||
),
|
||||
"ConferenceSeries" -> Tuple2(
|
||||
"mag/ConferenceSeries.txt",
|
||||
Seq(
|
||||
"ConferenceSeriesId:long",
|
||||
"Rank:uint",
|
||||
"NormalizedName:string",
|
||||
"DisplayName:string",
|
||||
"PaperCount:long",
|
||||
"PaperFamilyCount:long",
|
||||
"CitationCount:long",
|
||||
"CreatedDate:DateTime"
|
||||
)
|
||||
),
|
||||
"EntityRelatedEntities" -> Tuple2(
|
||||
"advanced/EntityRelatedEntities.txt",
|
||||
Seq(
|
||||
"EntityId:long",
|
||||
"EntityType:string",
|
||||
"RelatedEntityId:long",
|
||||
"RelatedEntityType:string",
|
||||
"RelatedType:int",
|
||||
"Score:float"
|
||||
)
|
||||
),
|
||||
"FieldOfStudyChildren" -> Tuple2(
|
||||
"advanced/FieldOfStudyChildren.txt",
|
||||
Seq("FieldOfStudyId:long", "ChildFieldOfStudyId:long")
|
||||
),
|
||||
"FieldOfStudyExtendedAttributes" -> Tuple2(
|
||||
"advanced/FieldOfStudyExtendedAttributes.txt",
|
||||
Seq("FieldOfStudyId:long", "AttributeType:int", "AttributeValue:string")
|
||||
),
|
||||
"FieldsOfStudy" -> Tuple2(
|
||||
"advanced/FieldsOfStudy.txt",
|
||||
Seq(
|
||||
"FieldOfStudyId:long",
|
||||
"Rank:uint",
|
||||
"NormalizedName:string",
|
||||
"DisplayName:string",
|
||||
"MainType:string",
|
||||
"Level:int",
|
||||
"PaperCount:long",
|
||||
"PaperFamilyCount:long",
|
||||
"CitationCount:long",
|
||||
"CreatedDate:DateTime"
|
||||
)
|
||||
),
|
||||
"Journals" -> Tuple2(
|
||||
"mag/Journals.txt",
|
||||
Seq(
|
||||
"JournalId:long",
|
||||
"Rank:uint",
|
||||
"NormalizedName:string",
|
||||
"DisplayName:string",
|
||||
"Issn:string",
|
||||
"Publisher:string",
|
||||
"Webpage:string",
|
||||
"PaperCount:long",
|
||||
"PaperFamilyCount:long",
|
||||
"CitationCount:long",
|
||||
"CreatedDate:DateTime"
|
||||
)
|
||||
),
|
||||
"PaperAbstractsInvertedIndex" -> Tuple2(
|
||||
"nlp/PaperAbstractsInvertedIndex.txt.*",
|
||||
Seq("PaperId:long", "IndexedAbstract:string")
|
||||
),
|
||||
"PaperAuthorAffiliations" -> Tuple2(
|
||||
"mag/PaperAuthorAffiliations.txt",
|
||||
Seq(
|
||||
"PaperId:long",
|
||||
"AuthorId:long",
|
||||
"AffiliationId:long?",
|
||||
"AuthorSequenceNumber:uint",
|
||||
"OriginalAuthor:string",
|
||||
"OriginalAffiliation:string"
|
||||
)
|
||||
),
|
||||
"PaperCitationContexts" -> Tuple2(
|
||||
"nlp/PaperCitationContexts.txt",
|
||||
Seq("PaperId:long", "PaperReferenceId:long", "CitationContext:string")
|
||||
),
|
||||
"PaperExtendedAttributes" -> Tuple2(
|
||||
"mag/PaperExtendedAttributes.txt",
|
||||
Seq("PaperId:long", "AttributeType:int", "AttributeValue:string")
|
||||
),
|
||||
"PaperFieldsOfStudy" -> Tuple2(
|
||||
"advanced/PaperFieldsOfStudy.txt",
|
||||
Seq("PaperId:long", "FieldOfStudyId:long", "Score:float")
|
||||
),
|
||||
"PaperMeSH" -> Tuple2(
|
||||
"advanced/PaperMeSH.txt",
|
||||
Seq(
|
||||
"PaperId:long",
|
||||
"DescriptorUI:string",
|
||||
"DescriptorName:string",
|
||||
"QualifierUI:string",
|
||||
"QualifierName:string",
|
||||
"IsMajorTopic:bool"
|
||||
)
|
||||
),
|
||||
"PaperRecommendations" -> Tuple2(
|
||||
"advanced/PaperRecommendations.txt",
|
||||
Seq("PaperId:long", "RecommendedPaperId:long", "Score:float")
|
||||
),
|
||||
"PaperReferences" -> Tuple2(
|
||||
"mag/PaperReferences.txt",
|
||||
Seq("PaperId:long", "PaperReferenceId:long")
|
||||
),
|
||||
"PaperResources" -> Tuple2(
|
||||
"mag/PaperResources.txt",
|
||||
Seq(
|
||||
"PaperId:long",
|
||||
"ResourceType:int",
|
||||
"ResourceUrl:string",
|
||||
"SourceUrl:string",
|
||||
"RelationshipType:int"
|
||||
)
|
||||
),
|
||||
"PaperUrls" -> Tuple2(
|
||||
"mag/PaperUrls.txt",
|
||||
Seq("PaperId:long", "SourceType:int?", "SourceUrl:string", "LanguageCode:string")
|
||||
),
|
||||
"Papers" -> Tuple2(
|
||||
"mag/Papers.txt",
|
||||
Seq(
|
||||
"PaperId:long",
|
||||
"Rank:uint",
|
||||
"Doi:string",
|
||||
"DocType:string",
|
||||
"PaperTitle:string",
|
||||
"OriginalTitle:string",
|
||||
"BookTitle:string",
|
||||
"Year:int?",
|
||||
"Date:DateTime?",
|
||||
"OnlineDate:DateTime?",
|
||||
"Publisher:string",
|
||||
"JournalId:long?",
|
||||
"ConferenceSeriesId:long?",
|
||||
"ConferenceInstanceId:long?",
|
||||
"Volume:string",
|
||||
"Issue:string",
|
||||
"FirstPage:string",
|
||||
"LastPage:string",
|
||||
"ReferenceCount:long",
|
||||
"CitationCount:long",
|
||||
"EstimatedCitation:long",
|
||||
"OriginalVenue:string",
|
||||
"FamilyId:long?",
|
||||
"FamilyRank:uint?",
|
||||
"DocSubTypes:string",
|
||||
"CreatedDate:DateTime"
|
||||
)
|
||||
),
|
||||
"RelatedFieldOfStudy" -> Tuple2(
|
||||
"advanced/RelatedFieldOfStudy.txt",
|
||||
Seq(
|
||||
"FieldOfStudyId1:long",
|
||||
"Type1:string",
|
||||
"FieldOfStudyId2:long",
|
||||
"Type2:string",
|
||||
"Rank:float"
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
def getSchema(streamName: String): StructType = {
|
||||
var schema = new StructType()
|
||||
val d: Seq[String] = stream(streamName)._2
|
||||
d.foreach { case t =>
|
||||
val currentType = t.split(":")
|
||||
val fieldName: String = currentType.head
|
||||
var fieldType: String = currentType.last
|
||||
val nullable: Boolean = fieldType.endsWith("?")
|
||||
if (nullable)
|
||||
fieldType = fieldType.replace("?", "")
|
||||
schema = schema.add(StructField(fieldName, datatypedict(fieldType), nullable))
|
||||
}
|
||||
schema
|
||||
}
|
||||
|
||||
|
||||
def loadMagEntity(spark:SparkSession, entity:String, basePath:String):Dataset[Row] = {
|
||||
if (stream.contains(entity)) {
|
||||
val s =getSchema(entity)
|
||||
val pt = stream(entity)._1
|
||||
spark.read
|
||||
.option("header", "false")
|
||||
.option("charset", "UTF8")
|
||||
.option("delimiter", "\t")
|
||||
.schema(s)
|
||||
.csv(s"$basePath/$pt")
|
||||
} else
|
||||
null
|
||||
|
||||
}
|
||||
def convertInvertedIndexString(json_input: String): String = {
|
||||
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
|
||||
lazy val json: json4s.JValue = parse(json_input)
|
||||
val idl = (json \ "IndexLength").extract[Int]
|
||||
if (idl > 0) {
|
||||
val res = Array.ofDim[String](idl)
|
||||
|
||||
val iid = (json \ "InvertedIndex").extract[Map[String, List[Int]]]
|
||||
|
||||
for { (k: String, v: List[Int]) <- iid } {
|
||||
v.foreach(item => res(item) = k)
|
||||
}
|
||||
(0 until idl).foreach(i => {
|
||||
if (res(i) == null)
|
||||
res(i) = ""
|
||||
})
|
||||
return res.mkString(" ")
|
||||
}
|
||||
""
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
package eu.dnetlib.dhp.collection.mag
|
||||
|
||||
import eu.dnetlib.dhp.application.AbstractScalaApplication
|
||||
import org.apache.spark.sql.{Dataset, Row, SparkSession}
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
import org.slf4j.Logger
|
||||
|
||||
class SparkCreateMagDataset (propertyPath: String, args: Array[String], log: Logger)
|
||||
extends AbstractScalaApplication(propertyPath, args, log: Logger) {
|
||||
|
||||
|
||||
|
||||
/** Here all the spark applications runs this method
|
||||
* where the whole logic of the spark node is defined
|
||||
*/
|
||||
override def run(): Unit = {
|
||||
|
||||
}
|
||||
|
||||
|
||||
private def loadAndFilterPapers(spark:SparkSession, crossrefPath:String, magBasePath:String, workingPath): Unit = {
|
||||
|
||||
import spark.implicits._
|
||||
val schema:StructType= StructType(StructField("DOI", StringType)::Nil)
|
||||
log.info("Phase 1 intersect MAG Paper containing doi also present in crossref")
|
||||
|
||||
//Filter all the MAG Papers that intersect with a Crossref DOI
|
||||
val crId= spark.read.schema(schema).json(crossrefPath).withColumn("crId", lower(col("DOI"))).distinct.select("crId")
|
||||
val magPapers = MagUtility.loadMagEntity(spark, "Papers", magBasePath).withColumn("Doi", lower(col("Doi"))).where(col("Doi").isNotNull)
|
||||
|
||||
|
||||
val intersectedPapers:Dataset[Row] =magPapers.join(crId, magPapers("Doi").equalTo(crId("crId")), "leftsemi").dropDuplicates("Doi")
|
||||
intersectedPapers.cache()
|
||||
intersectedPapers.count()
|
||||
log.info("Create current abstract")
|
||||
|
||||
//Abstract is an inverted list, we define a function that convert in string the abstract and recreate
|
||||
// a table(PaperId, Abstract)
|
||||
val paperAbstract = MagUtility.loadMagEntity(spark, "PaperAbstractsInvertedIndex", magBasePath)
|
||||
.map(s => (s.getLong(0),MagUtility.convertInvertedIndexString(s.getString(1))))
|
||||
.withColumnRenamed("_1","PaperId")
|
||||
.withColumnRenamed("_2","Abstract")
|
||||
|
||||
//We define Step0 as the result of the Join between PaperIntersection and the PaperAbstract
|
||||
|
||||
val step0 =intersectedPapers
|
||||
.join(paperAbstract, intersectedPapers("PaperId") === paperAbstract("PaperId"), "left")
|
||||
.select(intersectedPapers("*"),paperAbstract("Abstract")).cache()
|
||||
|
||||
step0.count()
|
||||
|
||||
intersectedPapers.unpersist()
|
||||
|
||||
// We have three table Author, Affiliation, and PaperAuthorAffiliation, in the
|
||||
//next step we create a table containing
|
||||
val authors = MagUtility.loadMagEntity(spark, "Authors", magBasePath)
|
||||
val affiliations= MagUtility.loadMagEntity(spark, "Affiliations", magBasePath)
|
||||
val paaf= MagUtility.loadMagEntity(spark, "PaperAuthorAffiliations", magBasePath)
|
||||
|
||||
val paperAuthorAffiliations =paaf.join(step0,paaf("PaperId") === step0("PaperId"),"leftsemi")
|
||||
|
||||
val j1 = paperAuthorAffiliations.join(authors,paperAuthorAffiliations("AuthorId") === authors("AuthorId"), "inner")
|
||||
.select(col("PaperId"), col("AffiliationId"),col("AuthorSequenceNumber"), authors("DisplayName").alias("AuthorName"), authors("AuthorId"))
|
||||
|
||||
val paperAuthorAffiliationNormalized = j1.join(affiliations, j1("AffiliationId")=== affiliations("AffiliationId"), "left")
|
||||
.select(j1("*"), affiliations("DisplayName").alias("AffiliationName"), affiliations("GridId"))
|
||||
.groupBy("PaperId")
|
||||
.agg(collect_list(struct("AffiliationId","AuthorSequenceNumber","AffiliationName","AuthorName","AuthorId","GridId")).alias("authors"))
|
||||
val step1 =step0.join(paperAuthorAffiliationNormalized, step0("PaperId")=== paperAuthorAffiliationNormalized("PaperId"), "left").cache()
|
||||
step1.count()
|
||||
|
||||
step0.unpersist()
|
||||
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue