diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py
similarity index 95%
rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py
rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py
index 4cffa86a3e..cda12a77c5 100644
--- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/create_openaire_ranking_graph.py
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/create_openaire_ranking_graph.py
@@ -126,12 +126,19 @@ oa_objects_df = oa_objects_df.drop('deletedbyinference').drop('invisible').disti
# Collect only valid citations i.e., invisible = false & deletedbyinference=false
cites_df = spark.read.json(graph_folder + "/relation")\
- .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\
+ .select(F.col('source').alias('citing'), F.col('target').alias('cited'), 'collectedfrom.value', 'relClass', 'dataInfo.deletedbyinference', 'dataInfo.invisible')\
.where( (F.col('relClass') == "Cites") \
& (F.col('dataInfo.deletedbyinference') == "false")\
& (F.col('dataInfo.invisible') == "false"))\
.drop('dataInfo.deletedbyinference').drop('dataInfo.invisible')\
- .repartition(num_partitions, 'citing').drop('relClass')
+ .repartition(num_partitions, 'citing').drop('relClass')\
+ .withColumn('collected_lower', F.expr('transform(collectedfrom.value, x -> lower(x))'))\
+ .drop('collectedfrom.value')\
+ .where(
+ (F.array_contains(F.col('collected_lower'), "opencitations"))
+ | (F.array_contains(F.col('collected_lower'), "crossref"))
+ | (F.array_contains(F.col('collected_lower'), "mag"))
+ ).drop('collected_lower')
# print ("Cited df has: " + str(cites_df.count()) + " entries")
# DEPRECATED
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/format_ranking_results.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py
similarity index 100%
rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/format_ranking_results.py
rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/format_ranking_results.py
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/get_ranking_files.sh b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_ranking_files.sh
similarity index 100%
rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/get_ranking_files.sh
rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/get_ranking_files.sh
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties
similarity index 93%
rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties
rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties
index 9ad9def218..a902c413f3 100644
--- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/job.properties
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/job.properties
@@ -63,6 +63,9 @@ oozieWorkflowPath=user/ilias.kanellos/workflow_example/
# The directory where the workflow data is/should be stored
workflowDataDir=user/ilias.kanellos/ranking_workflow
+# Directory where json data containing scores will be output
+bipScorePath=${workflowDataDir}/openaire_universe_scores/
+
# Directory where dataframes are checkpointed
checkpointDir=${nameNode}/${workflowDataDir}/check/
@@ -84,3 +87,6 @@ wfAppPath=${nameNode}/${oozieWorkflowPath}
# The following is needed as a property of a workflow
oozie.wf.application.path=${wfAppPath}
+# Path where the final output should be?
+actionSetOutputPath=${workflowDataDir}/bip_actionsets/
+
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_openaire_ids_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py
similarity index 100%
rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_openaire_ids_to_dois.py
rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_openaire_ids_to_dois.py
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_scores_to_dois.py b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py
similarity index 100%
rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/map_scores_to_dois.py
rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/map_scores_to_dois.py
diff --git a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml
similarity index 93%
rename from dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml
rename to dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml
index 807c32063c..d99dc16a26 100644
--- a/dhp-workflows/dhp-impact-indicators/src/main/resources/eu.dnetlib/workflow.xml
+++ b/dhp-workflows/dhp-impact-indicators/src/main/resources/eu/dnetlib/dhp/oa/graph/impact_indicators/oozie_app/workflow.xml
@@ -552,11 +552,50 @@
-
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ yarn
+ cluster
+ Produces the atomic action with the bip finder scores for publications
+ eu.dnetlib.dhp.actionmanager.bipfinder.SparkAtomicActionScoreJob
+ dhp-aggregation-${projectVersion}.jar
+
+ --executor-memory=${sparkExecutorMemory}
+ --executor-cores=${sparkExecutorCores}
+ --driver-memory=${sparkDriverMemory}
+ --conf spark.extraListeners=${spark2ExtraListeners}
+ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
+ --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
+ --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
+ --conf spark.sql.warehouse.dir=${sparkSqlWarehouseDir}
+
+ --inputPath${bipScorePath}
+ --outputPath${actionSetOutputPath}
+
+
+
+
+
@@ -597,4 +636,12 @@
Mapping scores to DOIs failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+ Deleting output path for actionsets failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
+ ActionSet creation failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+