code refactor
This commit is contained in:
parent
c532831718
commit
ee1fcb672b
|
@ -52,7 +52,8 @@ case class MAGPaper(
|
|||
// List of authors
|
||||
authors: Option[List[MAGAuthor]],
|
||||
// List of Fields of Study
|
||||
fos: Option[List[MAGFieldOfStudy]]
|
||||
fos: Option[List[MAGFieldOfStudy]],
|
||||
urls: Option[List[String]]
|
||||
)
|
||||
|
||||
case class MAGAuthor(
|
||||
|
|
|
@ -146,7 +146,7 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String],
|
|||
$"Publisher".as("journalPublisher"),
|
||||
$"Webpage".as("journalWebpage")
|
||||
)
|
||||
step3
|
||||
val step4 = step3
|
||||
.join(journals, step3("JournalId") === journals("JournalId"), "left")
|
||||
.select(
|
||||
step3("*"),
|
||||
|
@ -155,6 +155,20 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String],
|
|||
journals("journalPublisher"),
|
||||
journals("journalWebpage")
|
||||
)
|
||||
.cache
|
||||
step4.count()
|
||||
|
||||
val paper_urls = MagUtility
|
||||
.loadMagEntity(spark, "PaperUrls", magBasePath)
|
||||
.groupBy("PaperId")
|
||||
.agg(slice(collect_set("SourceUrl"), 1, 6).alias("urls"))
|
||||
.cache
|
||||
|
||||
paper_urls.count
|
||||
|
||||
step4
|
||||
.join(paper_urls, step4("PaperId") === paper_urls("PaperId"))
|
||||
.select(step4("*"), paper_urls("urls"))
|
||||
.select(
|
||||
$"PaperId".as("paperId"),
|
||||
$"Rank".as("rank"),
|
||||
|
@ -192,7 +206,8 @@ class SparkCreateMagDenormalizedTable(propertyPath: String, args: Array[String],
|
|||
$"journalName".as("journalName"),
|
||||
$"journalIssn".as("journalIssn"),
|
||||
$"journalPublisher".as("journalPublisher"),
|
||||
$"journalWebpage".as("journalWebpage")
|
||||
$"journalWebpage".as("journalWebpage"),
|
||||
$"urls"
|
||||
)
|
||||
.write
|
||||
.mode("OverWrite")
|
||||
|
|
Loading…
Reference in New Issue