forked from D-Net/dnet-hadoop
fixing issue on propagation organization. added --config to workflow definition. added oozie_app to communtiy project
This commit is contained in:
parent
f1b898c6b4
commit
a4214ced1e
|
@ -109,12 +109,12 @@ public class PrepareResultCommunitySet {
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
if(cl.size() == 0)
|
if (cl.size() == 0)
|
||||||
return null;
|
return null;
|
||||||
rpl.setCommunityList(cl);
|
rpl.setCommunityList(cl);
|
||||||
return rpl;
|
return rpl;
|
||||||
}, Encoders.bean(ResultProjectList.class))
|
}, Encoders.bean(ResultProjectList.class))
|
||||||
.filter(Objects::nonNull)
|
.filter(Objects::nonNull)
|
||||||
.write()
|
.write()
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.option("compression", "gzip")
|
.option("compression", "gzip")
|
||||||
|
|
|
@ -62,10 +62,8 @@ public class SparkResultToCommunityFromProject implements Serializable {
|
||||||
final String possibleupdatespath = parser.get("preparedInfoPath");
|
final String possibleupdatespath = parser.get("preparedInfoPath");
|
||||||
log.info("preparedInfoPath: {}", possibleupdatespath);
|
log.info("preparedInfoPath: {}", possibleupdatespath);
|
||||||
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
SparkConf conf = new SparkConf();
|
||||||
|
|
||||||
|
|
||||||
runWithSparkSession(
|
runWithSparkSession(
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
|
|
|
@ -92,7 +92,7 @@
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=6
|
--executor-cores=6
|
||||||
--executor-memory=5G
|
--executor-memory=5G
|
||||||
--spark.executor.memoryOverhead=3g
|
--conf spark.executor.memoryOverhead=3g
|
||||||
--conf spark.sql.shuffle.partitions=3284
|
--conf spark.sql.shuffle.partitions=3284
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
@ -121,7 +121,7 @@
|
||||||
<spark-opts>
|
<spark-opts>
|
||||||
--executor-cores=6
|
--executor-cores=6
|
||||||
--executor-memory=5G
|
--executor-memory=5G
|
||||||
--spark.executor.memoryOverhead=3g
|
--conf spark.executor.memoryOverhead=3g
|
||||||
--conf spark.sql.shuffle.partitions=3284
|
--conf spark.sql.shuffle.partitions=3284
|
||||||
--driver-memory=${sparkDriverMemory}
|
--driver-memory=${sparkDriverMemory}
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
|
|
@ -5,12 +5,7 @@
|
||||||
"paramDescription": "the path of the sequencial file to read",
|
"paramDescription": "the path of the sequencial file to read",
|
||||||
"paramRequired": true
|
"paramRequired": true
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"paramName":"h",
|
|
||||||
"paramLongName":"hive_metastore_uris",
|
|
||||||
"paramDescription": "the hive metastore uris",
|
|
||||||
"paramRequired": true
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"paramName": "ssm",
|
"paramName": "ssm",
|
||||||
"paramLongName": "isSparkSessionManaged",
|
"paramLongName": "isSparkSessionManaged",
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>jobTracker</name>
|
||||||
|
<value>yarnRM</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>nameNode</name>
|
||||||
|
<value>hdfs://nameservice1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.use.system.libpath</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>spark2</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hive_metastore_uris</name>
|
||||||
|
<value>thrift://iis-cdh5-test-m3.ocean.icm.edu.pl:9083</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2YarnHistoryServerAddress</name>
|
||||||
|
<value>http://iis-cdh5-test-gw.ocean.icm.edu.pl:18089</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2EventLogDir</name>
|
||||||
|
<value>/user/spark/spark2ApplicationHistory</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2ExtraListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2SqlQueryExecutionListeners</name>
|
||||||
|
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorNumber</name>
|
||||||
|
<value>4</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkDriverMemory</name>
|
||||||
|
<value>15G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorMemory</name>
|
||||||
|
<value>6G</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>sparkExecutorCores</name>
|
||||||
|
<value>1</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>spark2MaxExecutors</name>
|
||||||
|
<value>50</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
|
@ -0,0 +1,144 @@
|
||||||
|
<workflow-app name="community_to_result_propagation_project" xmlns="uri:oozie:workflow:0.5">
|
||||||
|
<parameters>
|
||||||
|
<property>
|
||||||
|
<name>sourcePath</name>
|
||||||
|
<description>the source path</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>outputPath</name>
|
||||||
|
<description>the output path</description>
|
||||||
|
</property>
|
||||||
|
</parameters>
|
||||||
|
|
||||||
|
<global>
|
||||||
|
<job-tracker>${jobTracker}</job-tracker>
|
||||||
|
<name-node>${nameNode}</name-node>
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>oozie.action.sharelib.for.spark</name>
|
||||||
|
<value>${oozieActionShareLibForSpark2}</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
||||||
|
</global>
|
||||||
|
|
||||||
|
<start to="reset_outputpath"/>
|
||||||
|
|
||||||
|
<kill name="Kill">
|
||||||
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
|
</kill>
|
||||||
|
|
||||||
|
<action name="reset_outputpath">
|
||||||
|
<fs>
|
||||||
|
<delete path="${outputPath}"/>
|
||||||
|
<mkdir path="${outputPath}"/>
|
||||||
|
</fs>
|
||||||
|
<ok to="copy_entities"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<fork name="copy_entities">
|
||||||
|
<path start="copy_relation"/>
|
||||||
|
<path start="copy_organization"/>
|
||||||
|
<path start="copy_projects"/>
|
||||||
|
<path start="copy_datasources"/>
|
||||||
|
</fork>
|
||||||
|
|
||||||
|
<action name="copy_relation">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<arg>${nameNode}/${sourcePath}/relation</arg>
|
||||||
|
<arg>${nameNode}/${outputPath}/relation</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="copy_organization">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<arg>${nameNode}/${sourcePath}/organization</arg>
|
||||||
|
<arg>${nameNode}/${outputPath}/organization</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="copy_projects">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<arg>${nameNode}/${sourcePath}/project</arg>
|
||||||
|
<arg>${nameNode}/${outputPath}/project</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="copy_datasources">
|
||||||
|
<distcp xmlns="uri:oozie:distcp-action:0.2">
|
||||||
|
<arg>${nameNode}/${sourcePath}/datasource</arg>
|
||||||
|
<arg>${nameNode}/${outputPath}/datasource</arg>
|
||||||
|
</distcp>
|
||||||
|
<ok to="copy_wait"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<join name="copy_wait" to="prepare_result_communitylist"/>
|
||||||
|
|
||||||
|
<action name="prepare_result_communitylist">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>Prepare-Community-Result-Organization</name>
|
||||||
|
<class>eu.dnetlib.dhp.resulttocommunityfromproject.PrepareResultCommunitySet</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=6
|
||||||
|
--executor-memory=5G
|
||||||
|
--conf spark.executor.memoryOverhead=3g
|
||||||
|
--conf spark.sql.shuffle.partitions=3284
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}/relation</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
|
||||||
|
<arg>--production</arg><arg>${production}</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="exec-propagation"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
<action name="exec-propagation">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>community2resultfromproject</name>
|
||||||
|
<class>eu.dnetlib.dhp.resulttocommunityfromproject.SparkResultToCommunityFromProject</class>
|
||||||
|
<jar>dhp-enrichment-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=6
|
||||||
|
--executor-memory=5G
|
||||||
|
--conf spark.executor.memoryOverhead=3g
|
||||||
|
--conf spark.sql.shuffle.partitions=3284
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.dynamicAllocation.maxExecutors=${spark2MaxExecutors}
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--preparedInfoPath</arg><arg>${workingDir}/preparedInfo/resultCommunityList</arg>
|
||||||
|
<arg>--sourcePath</arg><arg>${sourcePath}/</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${outputPath}/</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="End"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
<end name="End"/>
|
||||||
|
|
||||||
|
</workflow-app>
|
Loading…
Reference in New Issue