diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py index 4cffa86a3e..cda12a77c5 100644 --- a/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py +++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/create_openaire_ranking_graph.py @@ -126,12 +126,19 @@ oa_objects_df = oa_objects_df.drop('deletedbyinference').drop('invisible').disti # Collect only valid citations i.e., invisible = false & deletedbyinference=false cites_df = spark.read.json(graph_folder + "/relation")\ - .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ + .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'collectedfrom.value', 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\ .where( (F.col('relClass') == "Cites") \ & (F.col('dataInfo.deletedbyinference') == "false")\ & (F.col('dataInfo.invisible') == "false"))\ .drop('dataInfo.deletedbyinference').drop('dataInfo.invisible')\ - .repartition(num_partitions, 'citing').drop('relClass') + .repartition(num_partitions, 'citing').drop('relClass')\ + .withColumn('collected_lower', F.expr('transform(collectedfrom.value, x -> lower(x))'))\ + .drop('collectedfrom.value')\ + .where( + (F.array_contains(F.col('collected_lower'), "opencitations")) + | (F.array_contains(F.col('collected_lower'), "crossref")) + | (F.array_contains(F.col('collected_lower'), "mag")) + ).drop('collected_lower') # print ("Cited df has: " + str(cites_df.count()) + " entries") # DEPRECATED