master set to 'yarn' in spark actions, removed path to rawSet from the dedup scan workflow

This commit is contained in:
Claudio Atzori 2020-03-25 14:16:06 +01:00
parent efb0b7d660
commit 36f8f2ea66
4 changed files with 10 additions and 33 deletions

View File

@ -43,22 +43,17 @@ public class SparkCreateSimRels implements Serializable {
//read oozie parameters //read oozie parameters
final String graphBasePath = parser.get("graphBasePath"); final String graphBasePath = parser.get("graphBasePath");
final String isLookUpUrl = parser.get("isLookUpUrl"); final String isLookUpUrl = parser.get("isLookUpUrl");
final String rawSet = parser.get("rawSet");
final String actionSetId = parser.get("actionSetId"); final String actionSetId = parser.get("actionSetId");
final String workingPath = parser.get("workingPath"); final String workingPath = parser.get("workingPath");
System.out.println(String.format("graphBasePath: '%s'", graphBasePath)); System.out.println(String.format("graphBasePath: '%s'", graphBasePath));
System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl)); System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl));
System.out.println(String.format("rawSet: '%s'", rawSet));
System.out.println(String.format("actionSetId: '%s'", actionSetId)); System.out.println(String.format("actionSetId: '%s'", actionSetId));
System.out.println(String.format("workingPath: '%s'", workingPath)); System.out.println(String.format("workingPath: '%s'", workingPath));
try (SparkSession spark = getSparkSession(parser)) { try (SparkSession spark = getSparkSession(parser)) {
final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); final JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//create empty sequenceFile for the accumulation
JavaRDD<Tuple2<Text,Text>> simRel = sc.emptyRDD();
//for each dedup configuration //for each dedup configuration
for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) { for (DedupConfig dedupConf: DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
final String entity = dedupConf.getWf().getEntityType(); final String entity = dedupConf.getWf().getEntityType();
@ -83,23 +78,16 @@ public class SparkCreateSimRels implements Serializable {
.write() .write()
.mode("overwrite") .mode("overwrite")
.save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)); .save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
}
if (rawSet != null) {
//create atomic actions
JavaRDD<Tuple2<Text, Text>> newSimRels = relationsRDD
.map(this::createSequenceFileRow);
simRel = simRel.union(newSimRels);
} }
} }
if (rawSet != null) /**
simRel.mapToPair(r -> r) * Utility method used to create an atomic action from a Relation object
.saveAsHadoopFile(rawSet, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class); * @param relation input relation
} * @return A tuple2 with [id, json serialization of the atomic action]
* @throws JsonProcessingException
} */
public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException { public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();

View File

@ -65,7 +65,7 @@
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--w</arg><arg>${workingPath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg> <arg>--o</arg><arg>${dedupGraphPath}</arg>
@ -92,7 +92,7 @@
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg> <arg>--o</arg><arg>${dedupGraphPath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--w</arg><arg>${workingPath}</arg>

View File

@ -23,12 +23,6 @@
"paramDescription": "the base path of the raw graph", "paramDescription": "the base path of the raw graph",
"paramRequired": true "paramRequired": true
}, },
{
"paramName": "o",
"paramLongName": "rawSet",
"paramDescription": "the raw set to be saved (full path)",
"paramRequired": false
},
{ {
"paramName": "w", "paramName": "w",
"paramLongName": "workingPath", "paramLongName": "workingPath",

View File

@ -4,10 +4,6 @@
<name>graphBasePath</name> <name>graphBasePath</name>
<description>the raw graph base path</description> <description>the raw graph base path</description>
</property> </property>
<property>
<name>rawSet</name>
<description>the output directory in the targetPath</description>
</property>
<property> <property>
<name>isLookUpUrl</name> <name>isLookUpUrl</name>
<description>the address of the lookUp service</description> <description>the address of the lookUp service</description>
@ -58,7 +54,6 @@
<action name="CreateSimRel"> <action name="CreateSimRel">
<spark xmlns="uri:oozie:spark-action:0.2"> <spark xmlns="uri:oozie:spark-action:0.2">
<prepare> <prepare>
<delete path="${rawSet}"/>
<delete path="${workingPath}/${actionSetId}/*_simrel"/> <delete path="${workingPath}/${actionSetId}/*_simrel"/>
</prepare> </prepare>
<master>yarn</master> <master>yarn</master>
@ -101,7 +96,7 @@
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--la</arg><arg>${isLookUpUrl}</arg>
@ -129,7 +124,7 @@
--conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener" --conf spark.sql.queryExecutionListeners="com.cloudera.spark.lineage.NavigatorQueryListener"
--conf spark.sql.warehouse.dir="/user/hive/warehouse" --conf spark.sql.warehouse.dir="/user/hive/warehouse"
</spark-opts> </spark-opts>
<arg>-mt</arg><arg>yarn-cluster</arg> <arg>-mt</arg><arg>yarn</arg>
<arg>--i</arg><arg>${graphBasePath}</arg> <arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--w</arg><arg>${workingPath}</arg> <arg>--w</arg><arg>${workingPath}</arg>
<arg>--la</arg><arg>${isLookUpUrl}</arg> <arg>--la</arg><arg>${isLookUpUrl}</arg>