diff --git a/strings.py b/strings.py index 2622f56..4f7dcc9 100644 --- a/strings.py +++ b/strings.py @@ -1,11 +1,11 @@ -import json +import time from pyspark.sql.types import StringType, ArrayType, StructType, StructField, DoubleType - +from threading import Thread from affro_cluster import * from pyspark.sql import SparkSession -from pyspark.sql.functions import col, explode, first, collect_set, udf +from pyspark.sql.functions import col, explode, first, collect_list, udf, collect_set import sys spark = SparkSession.builder.appName("JSONProcessing").getOrCreate() @@ -23,10 +23,14 @@ matchings_schema = ArrayType( ]) ) -def oalex_affro_2(aff_string): +operation_counter = spark.sparkContext.accumulator(0) + +#Version of affro application on a single raw_aff_string and returns just the Matchins set +def oalex_affro(aff_string): + global operation_counter try: matchings = affro(aff_string) - + operation_counter += 1 # Ensure matchings is a list, even if affro returns a single dict if not isinstance(matchings, list): matchings = [matchings] @@ -49,18 +53,15 @@ def oalex_affro_2(aff_string): print(f"Error processing affiliation string {aff_string}: {str(e)}") return () -#Version of affro application on a single raw_aff_string and returns just the Matchins set -def oalex_affro(aff_string): - try: - matchings = affro(aff_string) - if not isinstance(matchings, list): - matchings = [matchings] - return matchings - except Exception as e: - print(f"Error processing affiliation string {aff_string}: {str(e)}") - return [] +oalex_affro_udf = udf(oalex_affro, matchings_schema) +monitor_done = False -oalex_affro_udf = udf(oalex_affro_2, matchings_schema) +def monitor_counter(interval): + while True: + print(f"Number of calls to AffRo: {operation_counter.value}") + time.sleep(interval) + if monitor_done: + break exploded = spark.read.json(folder_path) \ .filter(col("doi").isNotNull()) \ @@ -69,6 +70,10 @@ exploded = spark.read.json(folder_path) \ col("rors").alias("OAlex"), explode(col("raw_aff_string")).alias("affiliation") #this allows to split all the raw_aff_string and to parallelize better ) + +monitor_thread = Thread(target=monitor_counter, args=(600,), daemon=True) +monitor_thread.start() + affs = exploded \ .select("affiliation") \ .distinct() \ @@ -87,3 +92,6 @@ affs.join(exploded, on="affiliation") \ .mode("overwrite") \ .option("compression","gzip") \ .json(hdfs_output_path) + +monitor_done = True +monitor_thread.join() \ No newline at end of file