[oalex] added fix
This commit is contained in:
parent
f8479083f2
commit
e2f8007433
|
@ -12,6 +12,7 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate()
|
||||||
|
|
||||||
folder_path = sys.argv[1]
|
folder_path = sys.argv[1]
|
||||||
hdfs_output_path = sys.argv[2]
|
hdfs_output_path = sys.argv[2]
|
||||||
|
working_dir = sys.argv[3]
|
||||||
|
|
||||||
matchings_schema = ArrayType(
|
matchings_schema = ArrayType(
|
||||||
StructType([
|
StructType([
|
||||||
|
@ -90,8 +91,8 @@ rdd = explode \
|
||||||
.rdd \
|
.rdd \
|
||||||
.flatMap(lambda row: [{"affiliation":row['affiliation'], "match": m} for m in oalex_affro(row['affiliation'])])
|
.flatMap(lambda row: [{"affiliation":row['affiliation'], "match": m} for m in oalex_affro(row['affiliation'])])
|
||||||
|
|
||||||
#affs.map(json.dumps).saveAsTextFile("./out/rdd")
|
rdd.map(json.dumps).saveAsTextFile(working_dir + "/ tmp")
|
||||||
affs = spark.createDataFrame(rdd, schema=result_schema)
|
affs = spark.read.json(working_dir + "/ tmp")
|
||||||
|
|
||||||
affs.join(explode, on="affiliation") \
|
affs.join(explode, on="affiliation") \
|
||||||
.select(col("DOI"),
|
.select(col("DOI"),
|
||||||
|
|
Loading…
Reference in New Issue