8172_impact_indicators_workflow #284

Merged
miriam.baglioni merged 49 commits from 8172_impact_indicators_workflow into beta 2023-08-14 15:50:48 +02:00
2 changed files with 11 additions and 4 deletions
Showing only changes of commit ec4e010687 - Show all commits

View File

@ -114,6 +114,12 @@ print ("Total num of research objects: " + str(oa_objects_df.count()))
# Keep only required fields - we still keep resulttype.classname to
# filter the citation relationships we consider valid
oa_objects_df = oa_objects_df.drop('deletedbyinference').drop('invisible').distinct().cache()
'''
print ("OA objects Schema:")
oa_objects_df.printSchema()
sys.exit(0)
'''
############################################################################################################################
# 2. Get the relation objects and filter them based on their existence in the oa_objects_df
# NOTE: we are only interested in citations of type "cites"
@ -154,8 +160,8 @@ cites_df = spark.read.json(graph_folder + "/relation")\
# references_df = references_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), references_df.cited == oa_objects_df.id).drop('id').distinct().repartition(num_partitions, 'citing').cache()
# print ("References df now has: " + str(references_df.count()) + " entries")
cites_df = cites_df.join(oa_objects_df.select('id'), cites_df.citing == oa_objects_df.id).where( F.col('resulttype.classname').isin(valid_result_types) ).drop('id').drop('resulttype.classname')
cites_df = cites_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), cites_df.cited == oa_objects_df.id).drop('id').drop('resulttype.classname').distinct().repartition(num_partitions, 'citing').cache()
cites_df = cites_df.join(oa_objects_df.select('id', 'classname'), cites_df.citing == oa_objects_df.id).where( F.col('classname').isin(valid_result_types) ).drop('id').drop('classname')
cites_df = cites_df.repartition(num_partitions, 'cited').join(oa_objects_df.select('id'), cites_df.cited == oa_objects_df.id).distinct().repartition(num_partitions, 'citing').cache()
# TODO: add here a clause filtering out the citations
# originating from "other" types of research objects which we consider valid

View File

@ -81,7 +81,7 @@
</spark>
<!-- Do this after finishing okay -->
<ok to="end" />
<ok to="non-iterative-rankings" />
<!-- Go there if we have an error -->
<error to="openaire-graph-error" />
@ -335,7 +335,8 @@
</action>
<!-- JOIN ITERATIVE METHODS AND THEN END -->
<join name="join-iterative-rankings" to="get-file-names"/>
<join name="join-iterative-rankings" to="end">
<!-- to="get-file-names"/> -->
<!-- This will be a shell action that will output key-value pairs for output files -->