forked from D-Net/dnet-hadoop
grouping graph entities by id turned out to be an easy extension for the already existing cleaning workflow
This commit is contained in:
parent
2bed29eb09
commit
528231a287
|
@ -6,7 +6,9 @@ import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.function.FilterFunction;
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
import org.apache.spark.api.java.function.MapFunction;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Encoders;
|
import org.apache.spark.sql.Encoders;
|
||||||
|
@ -69,12 +71,12 @@ public class CleanGraphSparkJob {
|
||||||
conf,
|
conf,
|
||||||
isSparkSessionManaged,
|
isSparkSessionManaged,
|
||||||
spark -> {
|
spark -> {
|
||||||
removeOutputDir(spark, outputPath);
|
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
||||||
fixGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
|
cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends Oaf> void fixGraphTable(
|
private static <T extends Oaf> void cleanGraphTable(
|
||||||
SparkSession spark,
|
SparkSession spark,
|
||||||
VocabularyGroup vocs,
|
VocabularyGroup vocs,
|
||||||
String inputPath,
|
String inputPath,
|
||||||
|
@ -100,13 +102,15 @@ public class CleanGraphSparkJob {
|
||||||
return spark
|
return spark
|
||||||
.read()
|
.read()
|
||||||
.textFile(inputEntityPath)
|
.textFile(inputEntityPath)
|
||||||
|
.filter((FilterFunction<String>) s -> isEntityType(s, clazz))
|
||||||
|
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
|
||||||
.map(
|
.map(
|
||||||
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
|
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
|
||||||
Encoders.bean(clazz));
|
Encoders.bean(clazz));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void removeOutputDir(SparkSession spark, String path) {
|
private static <T extends Oaf> boolean isEntityType(final String s, final Class<T> clazz) {
|
||||||
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
|
return StringUtils.substringBefore(s, "|").equals(clazz.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.groupbyid;
|
package eu.dnetlib.dhp.oa.graph.clean;
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
||||||
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
|
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
|
|
@ -1,97 +0,0 @@
|
||||||
|
|
||||||
package eu.dnetlib.dhp.oa.graph.groupbyid;
|
|
||||||
|
|
||||||
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
|
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.spark.SparkConf;
|
|
||||||
import org.apache.spark.api.java.function.FilterFunction;
|
|
||||||
import org.apache.spark.api.java.function.MapFunction;
|
|
||||||
import org.apache.spark.sql.Encoders;
|
|
||||||
import org.apache.spark.sql.SaveMode;
|
|
||||||
import org.apache.spark.sql.SparkSession;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
|
|
||||||
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
|
|
||||||
import eu.dnetlib.dhp.common.HdfsSupport;
|
|
||||||
import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication;
|
|
||||||
import eu.dnetlib.dhp.schema.common.ModelSupport;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.Oaf;
|
|
||||||
import eu.dnetlib.dhp.schema.oaf.OafEntity;
|
|
||||||
import scala.Tuple2;
|
|
||||||
|
|
||||||
public class DispatchEntitiesSparkJob {
|
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
|
|
||||||
|
|
||||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
|
||||||
final ArgumentApplicationParser parser = new ArgumentApplicationParser(
|
|
||||||
IOUtils
|
|
||||||
.toString(
|
|
||||||
MigrateMongoMdstoresApplication.class
|
|
||||||
.getResourceAsStream(
|
|
||||||
"/eu/dnetlib/dhp/oa/graph/dispatch_entities_bytype_parameters.json")));
|
|
||||||
parser.parseArgument(args);
|
|
||||||
|
|
||||||
Boolean isSparkSessionManaged = Optional
|
|
||||||
.ofNullable(parser.get("isSparkSessionManaged"))
|
|
||||||
.map(Boolean::valueOf)
|
|
||||||
.orElse(Boolean.TRUE);
|
|
||||||
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
|
|
||||||
|
|
||||||
final String entitiesPath = parser.get("entitiesPath");
|
|
||||||
log.info("entitiesPath: {}", entitiesPath);
|
|
||||||
|
|
||||||
final String outputPath = parser.get("outputPath");
|
|
||||||
log.info("outputPath: {}", outputPath);
|
|
||||||
|
|
||||||
String graphTableClassName = parser.get("graphTableClassName");
|
|
||||||
log.info("graphTableClassName: {}", graphTableClassName);
|
|
||||||
|
|
||||||
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
|
|
||||||
|
|
||||||
SparkConf conf = new SparkConf();
|
|
||||||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
|
|
||||||
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
|
|
||||||
|
|
||||||
runWithSparkSession(
|
|
||||||
conf,
|
|
||||||
isSparkSessionManaged,
|
|
||||||
spark -> {
|
|
||||||
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
|
|
||||||
dispatchOaf(spark, entityClazz, entitiesPath, outputPath);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <T extends Oaf> void dispatchOaf(
|
|
||||||
final SparkSession spark,
|
|
||||||
final Class<T> clazz,
|
|
||||||
final String sourcePath,
|
|
||||||
final String targetPath) {
|
|
||||||
|
|
||||||
log.info("Processing entities ({}) in file: {}", clazz.getName(), sourcePath);
|
|
||||||
|
|
||||||
spark
|
|
||||||
.read()
|
|
||||||
.textFile(sourcePath)
|
|
||||||
.filter((FilterFunction<String>) s -> isEntityType(s, clazz))
|
|
||||||
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
|
|
||||||
.write()
|
|
||||||
.option("compression", "gzip")
|
|
||||||
.mode(SaveMode.Overwrite)
|
|
||||||
.text(targetPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static <T extends Oaf> boolean isEntityType(final String s, final Class<T> clazz) {
|
|
||||||
return StringUtils.substringBefore(s, "|").equals(clazz.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -50,12 +50,36 @@
|
||||||
</property>
|
</property>
|
||||||
</parameters>
|
</parameters>
|
||||||
|
|
||||||
<start to="fork_clean_graph"/>
|
<start to="group_entities"/>
|
||||||
|
|
||||||
<kill name="Kill">
|
<kill name="Kill">
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
||||||
</kill>
|
</kill>
|
||||||
|
|
||||||
|
<action name="group_entities">
|
||||||
|
<spark xmlns="uri:oozie:spark-action:0.2">
|
||||||
|
<master>yarn</master>
|
||||||
|
<mode>cluster</mode>
|
||||||
|
<name>group graph entities and relations</name>
|
||||||
|
<class>eu.dnetlib.dhp.oa.graph.clean.GroupEntitiesAndRelationsSparkJob</class>
|
||||||
|
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
||||||
|
<spark-opts>
|
||||||
|
--executor-cores=${sparkExecutorCores}
|
||||||
|
--executor-memory=${sparkExecutorMemory}
|
||||||
|
--driver-memory=${sparkDriverMemory}
|
||||||
|
--conf spark.extraListeners=${spark2ExtraListeners}
|
||||||
|
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
||||||
|
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
||||||
|
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
||||||
|
--conf spark.sql.shuffle.partitions=7680
|
||||||
|
</spark-opts>
|
||||||
|
<arg>--graphInputPath</arg><arg>${graphInputPath}</arg>
|
||||||
|
<arg>--outputPath</arg><arg>${workingDir}/grouped_entities</arg>
|
||||||
|
</spark>
|
||||||
|
<ok to="fork_clean_graph"/>
|
||||||
|
<error to="Kill"/>
|
||||||
|
</action>
|
||||||
|
|
||||||
<fork name="fork_clean_graph">
|
<fork name="fork_clean_graph">
|
||||||
<path start="clean_publication"/>
|
<path start="clean_publication"/>
|
||||||
<path start="clean_dataset"/>
|
<path start="clean_dataset"/>
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
<configuration>
|
|
||||||
<property>
|
|
||||||
<name>jobTracker</name>
|
|
||||||
<value>yarnRM</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>nameNode</name>
|
|
||||||
<value>hdfs://nameservice1</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>oozie.use.system.libpath</name>
|
|
||||||
<value>true</value>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>oozie.action.sharelib.for.spark</name>
|
|
||||||
<value>spark2</value>
|
|
||||||
</property>
|
|
||||||
</configuration>
|
|
|
@ -1,293 +0,0 @@
|
||||||
<workflow-app name="group graph entities and relations" xmlns="uri:oozie:workflow:0.5">
|
|
||||||
|
|
||||||
<parameters>
|
|
||||||
<property>
|
|
||||||
<name>graphInputPath</name>
|
|
||||||
<description>the graph root input path</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>outputPath</name>
|
|
||||||
<description>the graph root output path</description>
|
|
||||||
</property>
|
|
||||||
|
|
||||||
<property>
|
|
||||||
<name>sparkDriverMemory</name>
|
|
||||||
<description>memory for driver process</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>sparkExecutorMemory</name>
|
|
||||||
<description>memory for individual executor</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>sparkExecutorCores</name>
|
|
||||||
<description>number of cores used by single executor</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>oozieActionShareLibForSpark2</name>
|
|
||||||
<description>oozie action sharelib for spark 2.*</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>spark2ExtraListeners</name>
|
|
||||||
<value>com.cloudera.spark.lineage.NavigatorAppListener</value>
|
|
||||||
<description>spark 2.* extra listeners classname</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>spark2SqlQueryExecutionListeners</name>
|
|
||||||
<value>com.cloudera.spark.lineage.NavigatorQueryListener</value>
|
|
||||||
<description>spark 2.* sql query execution listeners classname</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>spark2YarnHistoryServerAddress</name>
|
|
||||||
<description>spark 2.* yarn history server address</description>
|
|
||||||
</property>
|
|
||||||
<property>
|
|
||||||
<name>spark2EventLogDir</name>
|
|
||||||
<description>spark 2.* event log dir location</description>
|
|
||||||
</property>
|
|
||||||
</parameters>
|
|
||||||
|
|
||||||
<start to="group_entities"/>
|
|
||||||
|
|
||||||
<kill name="Kill">
|
|
||||||
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
|
|
||||||
</kill>
|
|
||||||
|
|
||||||
<action name="group_entities">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>group graph entities and relations</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.GroupEntitiesAndRelationsSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--graphInputPath</arg><arg>${graphInputPath}</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="fork_dispatch_entities"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<fork name="fork_dispatch_entities">
|
|
||||||
<path start="dispatch_publication"/>
|
|
||||||
<path start="dispatch_dataset"/>
|
|
||||||
<path start="dispatch_software"/>
|
|
||||||
<path start="dispatch_otherresearchproduct"/>
|
|
||||||
<path start="dispatch_datasource"/>
|
|
||||||
<path start="dispatch_organization"/>
|
|
||||||
<path start="dispatch_project"/>
|
|
||||||
<path start="dispatch_relation"/>
|
|
||||||
</fork>
|
|
||||||
|
|
||||||
<action name="dispatch_publication">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch publications</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/publication</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_dataset">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch datasets</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/dataset</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_software">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch softwares</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/software</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_otherresearchproduct">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch otherresearchproducts</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/otherresearchproduct</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_datasource">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch datasources</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/datasource</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_organization">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch organizations</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/organization</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_project">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch project</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/project</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<action name="dispatch_relation">
|
|
||||||
<spark xmlns="uri:oozie:spark-action:0.2">
|
|
||||||
<master>yarn</master>
|
|
||||||
<mode>cluster</mode>
|
|
||||||
<name>Dispatch relations</name>
|
|
||||||
<class>eu.dnetlib.dhp.oa.graph.groupbyid.DispatchEntitiesSparkJob</class>
|
|
||||||
<jar>dhp-graph-mapper-${projectVersion}.jar</jar>
|
|
||||||
<spark-opts>
|
|
||||||
--executor-cores=${sparkExecutorCores}
|
|
||||||
--executor-memory=${sparkExecutorMemory}
|
|
||||||
--driver-memory=${sparkDriverMemory}
|
|
||||||
--conf spark.extraListeners=${spark2ExtraListeners}
|
|
||||||
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
|
|
||||||
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
|
|
||||||
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
|
|
||||||
--conf spark.sql.shuffle.partitions=7680
|
|
||||||
</spark-opts>
|
|
||||||
<arg>--entitiesPath</arg><arg>${workingDir}/grouped_entities</arg>
|
|
||||||
<arg>--outputPath</arg><arg>${outputPath}/relation</arg>
|
|
||||||
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Relation</arg>
|
|
||||||
</spark>
|
|
||||||
<ok to="wait_dispatch"/>
|
|
||||||
<error to="Kill"/>
|
|
||||||
</action>
|
|
||||||
|
|
||||||
<join name="wait_dispatch" to="End"/>
|
|
||||||
|
|
||||||
<end name="End"/>
|
|
||||||
</workflow-app>
|
|
Loading…
Reference in New Issue