Consistency graph workflow merges all the entities by ID

This commit is contained in:
Claudio Atzori 2020-11-25 14:55:32 +01:00
parent 36173c13a5
commit dfd6205b95
8 changed files with 539 additions and 107 deletions

View File

@ -19,24 +19,24 @@ public class OafMapperUtils {
public static Oaf merge(final Oaf o1, final Oaf o2) {
if (ModelSupport.isSubClass(o1, OafEntity.class)) {
if (ModelSupport.isSubClass(o1, Result.class)) {
return mergeResults((Result) o1, (Result) o2);
} else if (ModelSupport.isSubClass(o1, Datasource.class)) {
((Datasource) o1).mergeFrom((Datasource) o2);
} else if (ModelSupport.isSubClass(o1, Organization.class)) {
((Organization) o1).mergeFrom((Organization) o2);
} else if (ModelSupport.isSubClass(o1, Project.class)) {
((Project) o1).mergeFrom((Project) o2);
} else {
throw new RuntimeException("invalid OafEntity subtype:" + o1.getClass().getCanonicalName());
}
return mergeEntities((OafEntity) o1, (OafEntity) o2);
} else if (ModelSupport.isSubClass(o1, Relation.class)) {
((Relation) o1).mergeFrom((Relation) o2);
} else {
}
throw new RuntimeException("invalid Oaf type:" + o1.getClass().getCanonicalName());
}
return o1;
public static OafEntity mergeEntities(OafEntity e1, OafEntity e2) {
if (ModelSupport.isSubClass(e1, Result.class)) {
return mergeResults((Result) e1, (Result) e2);
} else if (ModelSupport.isSubClass(e1, Datasource.class)) {
((Datasource) e1).mergeFrom((Datasource) e2);
} else if (ModelSupport.isSubClass(e1, Organization.class)) {
((Organization) e1).mergeFrom((Organization) e2);
} else if (ModelSupport.isSubClass(e1, Project.class)) {
((Project) e1).mergeFrom((Project) e2);
}
throw new RuntimeException("invalid OafEntity subtype:" + e1.getClass().getCanonicalName());
}
public static Result mergeResults(Result r1, Result r2) {

View File

@ -0,0 +1,93 @@
package eu.dnetlib.dhp.oa.dedup;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
public class DispatchEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(DispatchEntitiesSparkJob.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
DispatchEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/dispatch_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String inputPath = parser.get("inputPath");
log.info("inputPath: {}", inputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
dispatchEntities(spark, inputPath, entityClazz, outputPath);
});
}
private static <T extends Oaf> void dispatchEntities(
SparkSession spark,
String inputPath,
Class<T> clazz,
String outputPath) {
spark
.read()
.textFile(inputPath)
.filter((FilterFunction<String>) s -> isEntityType(s, clazz))
.map((MapFunction<String, String>) s -> StringUtils.substringAfter(s, "|"), Encoders.STRING())
.map(
(MapFunction<String, T>) value -> OBJECT_MAPPER.readValue(value, clazz),
Encoders.bean(clazz))
.write()
.mode(SaveMode.Overwrite)
.option("compression", "gzip")
.json(outputPath);
}
private static <T extends Oaf> boolean isEntityType(final String s, final Class<T> clazz) {
return StringUtils.substringBefore(s, "|").equals(clazz.getName());
}
}

View File

@ -0,0 +1,202 @@
package eu.dnetlib.dhp.oa.dedup;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import static eu.dnetlib.dhp.utils.DHPUtils.toSeq;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
/**
* Groups the graph content by entity identifier to ensure ID uniqueness
*/
public class GroupEntitiesSparkJob {
private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
private final static String ID_JPATH = "$.id";
private final static String SOURCE_JPATH = "$.source";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
public static void main(String[] args) throws Exception {
String jsonConfiguration = IOUtils
.toString(
GroupEntitiesSparkJob.class
.getResourceAsStream(
"/eu/dnetlib/dhp/oa/dedup/group_graph_entities_parameters.json"));
final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
parser.parseArgument(args);
Boolean isSparkSessionManaged = Optional
.ofNullable(parser.get("isSparkSessionManaged"))
.map(Boolean::valueOf)
.orElse(Boolean.TRUE);
log.info("isSparkSessionManaged: {}", isSparkSessionManaged);
String graphInputPath = parser.get("graphInputPath");
log.info("graphInputPath: {}", graphInputPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
HdfsSupport.remove(outputPath, spark.sparkContext().hadoopConfiguration());
groupEntities(spark, graphInputPath, outputPath);
});
}
private static void groupEntities(
SparkSession spark,
String inputPath,
String outputPath) {
final TypedColumn<OafEntity, OafEntity> aggregator = new GroupingAggregator().toColumn();
final JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
spark
.read()
.textFile(toSeq(listPaths(inputPath, sc)))
.map((MapFunction<String, OafEntity>) s -> parseOaf(s), Encoders.kryo(OafEntity.class))
.filter((FilterFunction<OafEntity>) e -> StringUtils.isNotBlank(ModelSupport.idFn().apply(e)))
.groupByKey((MapFunction<OafEntity, String>) oaf -> ModelSupport.idFn().apply(oaf), Encoders.STRING())
.agg(aggregator)
.map(
(MapFunction<Tuple2<String, OafEntity>, String>) t -> t._2().getClass().getName() +
"|" + OBJECT_MAPPER.writeValueAsString(t._2()),
Encoders.STRING())
.write()
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.text(outputPath);
}
public static class GroupingAggregator extends Aggregator<OafEntity, OafEntity, OafEntity> {
@Override
public OafEntity zero() {
return null;
}
@Override
public OafEntity reduce(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
if (Objects.nonNull(a) && Objects.nonNull(b)) {
return OafMapperUtils.mergeEntities(b, a);
}
return Objects.isNull(a) ? b : a;
}
@Override
public OafEntity merge(OafEntity b, OafEntity a) {
return mergeAndGet(b, a);
}
@Override
public OafEntity finish(OafEntity j) {
return j;
}
@Override
public Encoder<OafEntity> bufferEncoder() {
return Encoders.kryo(OafEntity.class);
}
@Override
public Encoder<OafEntity> outputEncoder() {
return Encoders.kryo(OafEntity.class);
}
}
private static OafEntity parseOaf(String s) {
DocumentContext dc = JsonPath
.parse(s, Configuration.defaultConfiguration().addOptions(Option.SUPPRESS_EXCEPTIONS));
final String id = dc.read(ID_JPATH);
if (StringUtils.isNotBlank(id)) {
String prefix = StringUtils.substringBefore(id, "|");
switch (prefix) {
case "10":
return parse(s, Datasource.class);
case "20":
return parse(s, Organization.class);
case "40":
return parse(s, Project.class);
case "50":
String resultType = dc.read("$.resulttype.classid");
switch (resultType) {
case "publication":
return parse(s, Publication.class);
case "dataset":
return parse(s, eu.dnetlib.dhp.schema.oaf.Dataset.class);
case "software":
return parse(s, Software.class);
case "other":
return parse(s, OtherResearchProduct.class);
default:
throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
}
default:
throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
}
} else {
throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
}
}
private static <T extends OafEntity> OafEntity parse(String s, Class<T> clazz) {
try {
return OBJECT_MAPPER.readValue(s, clazz);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static List<String> listPaths(String inputPath, JavaSparkContext sc) {
return HdfsSupport
.listFiles(inputPath, sc.hadoopConfiguration())
.stream()
.filter(f -> !f.equals("relation"))
.collect(Collectors.toList());
}
}

View File

@ -3,6 +3,8 @@ package eu.dnetlib.dhp.oa.dedup;
import static org.apache.spark.sql.functions.col;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
@ -28,7 +30,7 @@ public class SparkPropagateRelation extends AbstractSparkAction {
SOURCE, TARGET
}
public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) throws Exception {
public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) {
super(parser, spark);
}
@ -55,13 +57,13 @@ public class SparkPropagateRelation extends AbstractSparkAction {
final String graphBasePath = parser.get("graphBasePath");
final String workingPath = parser.get("workingPath");
final String dedupGraphPath = parser.get("dedupGraphPath");
final String graphOutputPath = parser.get("graphOutputPath");
log.info("graphBasePath: '{}'", graphBasePath);
log.info("workingPath: '{}'", workingPath);
log.info("dedupGraphPath: '{}'", dedupGraphPath);
log.info("graphOutputPath: '{}'", graphOutputPath);
final String outputRelationPath = DedupUtility.createEntityPath(dedupGraphPath, "relation");
final String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
removeOutputDir(spark, outputRelationPath);
Dataset<Relation> mergeRels = spark
@ -101,7 +103,8 @@ public class SparkPropagateRelation extends AbstractSparkAction {
newRels
.union(updated)
.union(mergeRels)
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class))),
.map((MapFunction<Relation, Relation>) r -> r, Encoders.kryo(Relation.class)))
.filter((FilterFunction<Relation>) r -> !Objects.equals(r.getSource(), r.getTarget())),
outputRelationPath, SaveMode.Overwrite);
}

View File

@ -2,15 +2,15 @@
<parameters>
<property>
<name>graphBasePath</name>
<description>the raw graph base path</description>
<description>the input graph base path</description>
</property>
<property>
<name>workingPath</name>
<description>path of the working directory</description>
</property>
<property>
<name>dedupGraphPath</name>
<description>path of the dedup graph</description>
<name>graphOutputPath</name>
<description>path of the output graph</description>
</property>
<property>
<name>sparkDriverMemory</name>
@ -91,116 +91,224 @@
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--i</arg><arg>${graphBasePath}</arg>
<arg>--o</arg><arg>${dedupGraphPath}</arg>
<arg>--w</arg><arg>${workingPath}</arg>
<arg>--graphBasePath</arg><arg>${graphBasePath}</arg>
<arg>--o</arg><arg>${graphOutputPath}</arg>
<arg>--workingPath</arg><arg>${workingPath}</arg>
</spark>
<ok to="fork_copy_entities"/>
<ok to="group_entities"/>
<error to="Kill"/>
</action>
<fork name="fork_copy_entities">
<path start="copy_datasource"/>
<path start="copy_project"/>
<path start="copy_organization"/>
<path start="copy_publication"/>
<path start="copy_dataset"/>
<path start="copy_software"/>
<path start="copy_otherresearchproduct"/>
<action name="group_entities">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>group graph entities</name>
<class>eu.dnetlib.dhp.oa.dedup.GroupEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--graphInputPath</arg><arg>${graphBasePath}</arg>
<arg>--outputPath</arg><arg>${workingPath}/grouped_entities</arg>
</spark>
<ok to="fork_dispatch_entities"/>
<error to="Kill"/>
</action>
<fork name="fork_dispatch_entities">
<path start="dispatch_datasource"/>
<path start="dispatch_project"/>
<path start="dispatch_organization"/>
<path start="dispatch_publication"/>
<path start="dispatch_dataset"/>
<path start="dispatch_software"/>
<path start="dispatch_otherresearchproduct"/>
</fork>
<action name="copy_datasource">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/datasource"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/datasource</arg>
<arg>${dedupGraphPath}/datasource</arg>
</distcp>
<ok to="wait_copy"/>
<action name="dispatch_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch publications</name>
<class>eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="copy_project">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/project"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/project</arg>
<arg>${dedupGraphPath}/project</arg>
</distcp>
<ok to="wait_copy"/>
<action name="dispatch_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch project</name>
<class>eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="copy_organization">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/organization"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/organization</arg>
<arg>${dedupGraphPath}/organization</arg>
</distcp>
<ok to="wait_copy"/>
<action name="dispatch_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch organization</name>
<class>eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="copy_publication">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/publication"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/publication</arg>
<arg>${dedupGraphPath}/publication</arg>
</distcp>
<ok to="wait_copy"/>
<action name="dispatch_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch publication</name>
<class>eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="copy_dataset">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/dataset"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/dataset</arg>
<arg>${dedupGraphPath}/dataset</arg>
</distcp>
<ok to="wait_copy"/>
<action name="dispatch_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch dataset</name>
<class>eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="copy_software">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/software"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/software</arg>
<arg>${dedupGraphPath}/software</arg>
</distcp>
<ok to="wait_copy"/>
<action name="dispatch_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch software</name>
<class>eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<action name="copy_otherresearchproduct">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${dedupGraphPath}/otherresearchproduct"/>
</prepare>
<arg>-pb</arg>
<arg>${graphBasePath}/otherresearchproduct</arg>
<arg>${dedupGraphPath}/otherresearchproduct</arg>
</distcp>
<ok to="wait_copy"/>
<action name="dispatch_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Dispatch otherresearchproduct</name>
<class>eu.dnetlib.dhp.oa.dedup.DispatchEntitiesSparkJob</class>
<jar>dhp-dedup-openaire-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCores}
--executor-memory=${sparkExecutorMemory}
--driver-memory=${sparkDriverMemory}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
</spark-opts>
<arg>--inputPath</arg><arg>${workingPath}/grouped_entities</arg>
<arg>--outputPath</arg><arg>${graphOutputPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
</spark>
<ok to="wait_dispatch"/>
<error to="Kill"/>
</action>
<join name="wait_copy" to="End"/>
<join name="wait_dispatch" to="End"/>
<end name="End"/>
</workflow-app>

View File

@ -0,0 +1,26 @@
[
{
"paramName": "issm",
"paramLongName": "isSparkSessionManaged",
"paramDescription": "when true will stop SparkSession after job execution",
"paramRequired": false
},
{
"paramName": "i",
"paramLongName": "inputPath",
"paramDescription": "the source path",
"paramRequired": true
},
{
"paramName": "o",
"paramLongName": "outputPath",
"paramDescription": "path of the output graph",
"paramRequired": true
},
{
"paramName": "c",
"paramLongName": "graphTableClassName",
"paramDescription": "the graph entity class name",
"paramRequired": true
}
]

View File

@ -13,7 +13,7 @@
},
{
"paramName": "o",
"paramLongName": "dedupGraphPath",
"paramLongName": "graphOutputPath",
"paramDescription": "the path of the dedup graph",
"paramRequired": true
}