1
0
Fork 0

added output directory removal in the blacklist spark actions; included common global properties in blacklist's workflow.xml

This commit is contained in:
Claudio Atzori 2020-05-15 09:53:37 +02:00
parent 18f46e47b9
commit 50d6a2ad3c
3 changed files with 34 additions and 17 deletions

View File

@ -5,6 +5,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkHiveSession;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.common.HdfsSupport;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -56,6 +57,7 @@ public class PrepareMergedRelationJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath);
selectMergesRelations( selectMergesRelations(
spark, spark,
inputPath, inputPath,
@ -84,4 +86,9 @@ public class PrepareMergedRelationJob {
(MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class), (MapFunction<String, Relation>) value -> OBJECT_MAPPER.readValue(value, Relation.class),
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
} }
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
} }

View File

@ -6,6 +6,7 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import eu.dnetlib.dhp.common.HdfsSupport;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
@ -62,6 +63,7 @@ public class SparkRemoveBlacklistedRelationJob {
conf, conf,
isSparkSessionManaged, isSparkSessionManaged,
spark -> { spark -> {
removeOutputDir(spark, outputPath);
removeBlacklistedRelations( removeBlacklistedRelations(
spark, spark,
blacklistPath, blacklistPath,
@ -69,7 +71,6 @@ public class SparkRemoveBlacklistedRelationJob {
outputPath, outputPath,
mergesPath); mergesPath);
}); });
} }
private static void removeBlacklistedRelations(SparkSession spark, String blacklistPath, String inputPath, private static void removeBlacklistedRelations(SparkSession spark, String blacklistPath, String inputPath,
@ -144,4 +145,8 @@ public class SparkRemoveBlacklistedRelationJob {
Encoders.bean(Relation.class)); Encoders.bean(Relation.class));
} }
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}
} }

View File

@ -22,6 +22,25 @@
</property> </property>
</parameters> </parameters>
<global>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapreduce.job.queuename</name>
<value>${queueName}</value>
</property>
<property>
<name>oozie.launcher.mapred.job.queue.name</name>
<value>${oozieLauncherQueueName}</value>
</property>
<property>
<name>oozie.action.sharelib.for.spark</name>
<value>${oozieActionShareLibForSpark2}</value>
</property>
</configuration>
</global>
<start to="reset_outputpath"/> <start to="reset_outputpath"/>
<kill name="Kill"> <kill name="Kill">
@ -49,8 +68,6 @@
<action name="copy_publication"> <action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/publication</arg> <arg>${nameNode}/${sourcePath}/publication</arg>
<arg>${nameNode}/${outputPath}/publication</arg> <arg>${nameNode}/${outputPath}/publication</arg>
</distcp> </distcp>
@ -60,8 +77,6 @@
<action name="copy_dataset"> <action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/dataset</arg> <arg>${nameNode}/${sourcePath}/dataset</arg>
<arg>${nameNode}/${outputPath}/dataset</arg> <arg>${nameNode}/${outputPath}/dataset</arg>
</distcp> </distcp>
@ -71,8 +86,6 @@
<action name="copy_orp"> <action name="copy_orp">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/otherresearchproduct</arg> <arg>${nameNode}/${sourcePath}/otherresearchproduct</arg>
<arg>${nameNode}/${outputPath}/otherresearchproduct</arg> <arg>${nameNode}/${outputPath}/otherresearchproduct</arg>
</distcp> </distcp>
@ -82,8 +95,6 @@
<action name="copy_software"> <action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/software</arg> <arg>${nameNode}/${sourcePath}/software</arg>
<arg>${nameNode}/${outputPath}/software</arg> <arg>${nameNode}/${outputPath}/software</arg>
</distcp> </distcp>
@ -93,8 +104,6 @@
<action name="copy_organization"> <action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/organization</arg> <arg>${nameNode}/${sourcePath}/organization</arg>
<arg>${nameNode}/${outputPath}/organization</arg> <arg>${nameNode}/${outputPath}/organization</arg>
</distcp> </distcp>
@ -104,8 +113,6 @@
<action name="copy_project"> <action name="copy_project">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/project</arg> <arg>${nameNode}/${sourcePath}/project</arg>
<arg>${nameNode}/${outputPath}/project</arg> <arg>${nameNode}/${outputPath}/project</arg>
</distcp> </distcp>
@ -115,8 +122,6 @@
<action name="copy_datasource"> <action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2"> <distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>${nameNode}/${sourcePath}/datasource</arg> <arg>${nameNode}/${sourcePath}/datasource</arg>
<arg>${nameNode}/${outputPath}/datasource</arg> <arg>${nameNode}/${outputPath}/datasource</arg>
</distcp> </distcp>
@ -128,8 +133,6 @@
<action name="read_blacklist"> <action name="read_blacklist">
<java> <java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>eu.dnetlib.dhp.blacklist.ReadBlacklistFromDB</main-class> <main-class>eu.dnetlib.dhp.blacklist.ReadBlacklistFromDB</main-class>
<arg>--hdfsPath</arg><arg>${workingDir}/blacklist</arg> <arg>--hdfsPath</arg><arg>${workingDir}/blacklist</arg>
<arg>--hdfsNameNode</arg><arg>${nameNode}</arg> <arg>--hdfsNameNode</arg><arg>${nameNode}</arg>
@ -156,6 +159,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg> <arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/mergesRelation</arg> <arg>--outputPath</arg><arg>${workingDir}/mergesRelation</arg>
@ -180,6 +184,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
</spark-opts> </spark-opts>
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg> <arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
<arg>--outputPath</arg><arg>${outputPath}/relation</arg> <arg>--outputPath</arg><arg>${outputPath}/relation</arg>