diff --git a/strings.py b/strings.py index 98e2037..961918e 100644 --- a/strings.py +++ b/strings.py @@ -12,6 +12,7 @@ spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() folder_path = sys.argv[1] hdfs_output_path = sys.argv[2] +working_dir = sys.argv[3] matchings_schema = ArrayType( StructType([ @@ -90,8 +91,8 @@ rdd = explode \ .rdd \ .flatMap(lambda row: [{"affiliation":row['affiliation'], "match": m} for m in oalex_affro(row['affiliation'])]) -#affs.map(json.dumps).saveAsTextFile("./out/rdd") -affs = spark.createDataFrame(rdd, schema=result_schema) +rdd.map(json.dumps).saveAsTextFile(working_dir + "/ tmp") +affs = spark.read.json(working_dir + "/ tmp") affs.join(explode, on="affiliation") \ .select(col("DOI"),