1
0
Fork 0
This commit is contained in:
Alessia Bardi 2023-09-19 13:38:45 +02:00
commit d94b9bebf7
2 changed files with 10 additions and 20 deletions

View File

@ -47,17 +47,14 @@ public class DispatchEntitiesSparkJob {
String outputPath = parser.get("outputPath"); String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath); log.info("outputPath: {}", outputPath);
boolean filterInvisible = Boolean.valueOf(parser.get("filterInvisible")); boolean filterInvisible = Boolean.parseBoolean(parser.get("filterInvisible"));
log.info("filterInvisible: {}", filterInvisible); log.info("filterInvisible: {}", filterInvisible);
SparkConf conf = new SparkConf(); SparkConf conf = new SparkConf();
runWithSparkSession( runWithSparkSession(
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> dispatchEntities(spark, inputPath, outputPath, filterInvisible));
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
dispatchEntities(spark, inputPath, outputPath, filterInvisible);
});
} }
private static void dispatchEntities( private static void dispatchEntities(
@ -72,7 +69,9 @@ public class DispatchEntitiesSparkJob {
String entityType = entry.getKey(); String entityType = entry.getKey();
Class<?> clazz = entry.getValue(); Class<?> clazz = entry.getValue();
final String entityPath = outputPath + "/" + entityType;
if (!entityType.equalsIgnoreCase("relation")) { if (!entityType.equalsIgnoreCase("relation")) {
HdfsSupport.remove(entityPath, spark.sparkContext().hadoopConfiguration());
Dataset<Row> entityDF = spark Dataset<Row> entityDF = spark
.read() .read()
.schema(Encoders.bean(clazz).schema()) .schema(Encoders.bean(clazz).schema())
@ -91,7 +90,7 @@ public class DispatchEntitiesSparkJob {
.write() .write()
.mode(SaveMode.Overwrite) .mode(SaveMode.Overwrite)
.option("compression", "gzip") .option("compression", "gzip")
.json(outputPath + "/" + entityType); .json(entityPath);
} }
}); });
} }

View File

@ -39,7 +39,8 @@
<switch> <switch>
<!-- The default will be set as the normal start, a.k.a. get-doi-synonyms --> <!-- The default will be set as the normal start, a.k.a. get-doi-synonyms -->
<!-- If any different condition is set, go to the corresponding start --> <!-- If any different condition is set, go to the corresponding start -->
<case to="non-iterative-rankings">${wf:conf('resume') eq "rankings-start"}</case> <case to="spark-cc">${wf:conf('resume') eq "cc"}</case>
<case to="spark-ram">${wf:conf('resume') eq "ram"}</case>
<case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case> <case to="spark-impulse">${wf:conf('resume') eq "impulse"}</case>
<case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case> <case to="spark-pagerank">${wf:conf('resume') eq "pagerank"}</case>
<case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case> <case to="spark-attrank">${wf:conf('resume') eq "attrank"}</case>
@ -89,18 +90,11 @@
<file>${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file> <file>${nameNode}${wfAppPath}/create_openaire_ranking_graph.py#create_openaire_ranking_graph.py</file>
</spark> </spark>
<ok to="non-iterative-rankings" /> <ok to="spark-cc"/>
<error to="openaire-graph-error" /> <error to="openaire-graph-error" />
</action> </action>
<!-- Citation Count and RAM are calculated in parallel-->
<fork name="non-iterative-rankings">
<path start="spark-cc"/>
<!-- <path start="spark-impulse"/> -->
<path start="spark-ram"/>
</fork>
<!-- Run Citation Count calculation --> <!-- Run Citation Count calculation -->
<action name="spark-cc"> <action name="spark-cc">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
@ -129,7 +123,7 @@
<file>${wfAppPath}/bip-ranker/CC.py#CC.py</file> <file>${wfAppPath}/bip-ranker/CC.py#CC.py</file>
</spark> </spark>
<ok to="join-non-iterative-rankings" /> <ok to="spark-ram" />
<error to="cc-fail" /> <error to="cc-fail" />
</action> </action>
@ -165,14 +159,11 @@
<file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file> <file>${wfAppPath}/bip-ranker/TAR.py#TAR.py</file>
</spark> </spark>
<ok to="join-non-iterative-rankings" /> <ok to="spark-impulse" />
<error to="ram-fail" /> <error to="ram-fail" />
</action> </action>
<!-- Join non-iterative methods -->
<join name="join-non-iterative-rankings" to="spark-impulse"/>
<action name="spark-impulse"> <action name="spark-impulse">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">