Oalex #13
|
@ -5,7 +5,7 @@ from pyspark.sql.types import StringType, ArrayType, StructType, StructField, Do
|
||||||
from affro_cluster import *
|
from affro_cluster import *
|
||||||
|
|
||||||
from pyspark.sql import SparkSession
|
from pyspark.sql import SparkSession
|
||||||
from pyspark.sql.functions import col, explode, first, collect_list, udf
|
from pyspark.sql.functions import col, explode, first, collect_set, udf
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
||||||
|
@ -81,7 +81,7 @@ affs.join(exploded, on="affiliation") \
|
||||||
) \
|
) \
|
||||||
.groupBy("DOI") \
|
.groupBy("DOI") \
|
||||||
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
|
.agg(first("OAlex").alias("OAlex"), #for each DOI it says what are the other columns Since OALEX is equal for each doi just select the first, while use the collect_list function to aggregate the Matchings
|
||||||
collect_list("match").alias("Matchings") #each exploded match is collected again
|
collect_set("match").alias("Matchings") #each exploded match is collected again
|
||||||
) \
|
) \
|
||||||
.write \
|
.write \
|
||||||
.mode("overwrite") \
|
.mode("overwrite") \
|
||||||
|
|
Loading…
Reference in New Issue