1
0
Fork 0

added default from vocab for missing instance.refereed; remove spurious prefixes from orcid values; WIP: prepare relation job

This commit is contained in:
Claudio Atzori 2020-06-24 18:34:42 +02:00
parent 38bb45d0b6
commit 0e723d378b
10 changed files with 128 additions and 551 deletions

View File

@ -147,6 +147,9 @@ public class CleanGraphSparkJob {
if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) {
i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY);
}
if (Objects.isNull(i.getRefereed())) {
i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS));
}
}
}

View File

@ -25,9 +25,7 @@ import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntity;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
import scala.Tuple2;
@ -109,11 +107,12 @@ public class CreateRelatedEntitiesJob_phase1 {
Class<E> clazz,
String outputPath) {
Dataset<Tuple2<String, SortableRelation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
Dataset<Tuple2<String, Relation>> relsByTarget = readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false")
.map(
(MapFunction<SortableRelation, Tuple2<String, SortableRelation>>) r -> new Tuple2<>(r.getTarget(), r),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class)))
(MapFunction<Relation, Tuple2<String, Relation>>) r -> new Tuple2<>(r.getTarget(),
r),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class)))
.cache();
Dataset<Tuple2<String, RelatedEntity>> entities = readPathEntity(spark, inputEntityPath, clazz)
@ -129,7 +128,7 @@ public class CreateRelatedEntitiesJob_phase1 {
relsByTarget
.joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner")
.map(
(MapFunction<Tuple2<Tuple2<String, SortableRelation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
(MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, RelatedEntity>>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper(
t._1()._2(), t._2()._2()),
Encoders.kryo(RelatedEntityWrapper.class))
.write()
@ -232,11 +231,11 @@ public class CreateRelatedEntitiesJob_phase1 {
* @param relationPath
* @return the Dataset<SortableRelation> containing all the relationships
*/
private static Dataset<SortableRelation> readPathRelation(
private static Dataset<Relation> readPathRelation(
SparkSession spark, final String relationPath) {
log.info("Reading relations from: {}", relationPath);
return spark.read().load(relationPath).as(Encoders.bean(SortableRelation.class));
return spark.read().load(relationPath).as(Encoders.bean(Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -4,35 +4,28 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.*;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clearspring.analytics.util.Lists;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import scala.Function1;
import scala.Tuple2;
import eu.dnetlib.dhp.schema.oaf.Relation;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
@ -133,22 +126,22 @@ public class PrepareRelationsJob {
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
int relPartitions) {
RDD<SortableRelation> cappedRels = readPathRelationRDD(spark, inputRelationsPath)
RDD<Relation> cappedRels = readPathRelationRDD(spark, inputRelationsPath)
.repartition(relPartitions)
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
// group by SOURCE and apply limit
.mapToPair(rel -> new Tuple2<>(rel.getSource(), rel))
.groupByKey(new RelationPartitioner(relPartitions))
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
.groupBy(r -> SortableRelationKey.create(r, r.getSource()))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator())
// group by TARGET and apply limit
.mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel))
.groupByKey(new RelationPartitioner(relPartitions))
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
.groupBy(r -> SortableRelationKey.create(r, r.getTarget()))
.repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions))
.flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator())
.rdd();
spark
.createDataset(cappedRels, Encoders.bean(SortableRelation.class))
.createDataset(cappedRels, Encoders.bean(Relation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
@ -162,10 +155,10 @@ public class PrepareRelationsJob {
* @param inputPath
* @return the JavaRDD<SortableRelation> containing all the relationships
*/
private static JavaRDD<SortableRelation> readPathRelationRDD(
private static JavaRDD<Relation> readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());
return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class));
return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class));
}
private static void removeOutputDir(SparkSession spark, String path) {

View File

@ -19,7 +19,7 @@ public class ProvisionModelSupport {
RelatedEntityWrapper.class,
JoinedEntity.class,
RelatedEntity.class,
SortableRelation.class));
SortableRelationKey.class));
return modelClasses.toArray(new Class[] {});
}
}

View File

@ -5,28 +5,30 @@ import java.io.Serializable;
import com.google.common.base.Objects;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class RelatedEntityWrapper implements Serializable {
private SortableRelation relation;
private Relation relation;
private RelatedEntity target;
public RelatedEntityWrapper() {
}
public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) {
public RelatedEntityWrapper(Relation relation, RelatedEntity target) {
this(null, relation, target);
}
public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) {
public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) {
this.relation = relation;
this.target = target;
}
public SortableRelation getRelation() {
public Relation getRelation() {
return relation;
}
public void setRelation(SortableRelation relation) {
public void setRelation(Relation relation) {
this.relation = relation;
}

View File

@ -1,38 +0,0 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.Map;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelation extends Relation implements Comparable<Relation>, Serializable {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("affiliation", 2);
weights.put("relationship", 3);
weights.put("publicationDataset", 4);
weights.put("similarity", 5);
weights.put("provision", 6);
weights.put("participation", 7);
weights.put("dedup", 8);
}
@Override
public int compareTo(Relation o) {
return ComparisonChain
.start()
.compare(weights.get(getSubRelType()), weights.get(o.getSubRelType()))
.compare(getSource(), o.getSource())
.compare(getTarget(), o.getTarget())
.result();
}
}

View File

@ -0,0 +1,90 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.schema.oaf.Relation;
public class SortableRelationKey implements Comparable<SortableRelationKey>, Serializable {
private static final Map<String, Integer> weights = Maps.newHashMap();
static {
weights.put("outcome", 0);
weights.put("supplement", 1);
weights.put("affiliation", 2);
weights.put("relationship", 3);
weights.put("publicationDataset", 4);
weights.put("similarity", 5);
weights.put("provision", 6);
weights.put("participation", 7);
weights.put("dedup", 8);
}
private String groupingKey;
private String source;
private String target;
private String subRelType;
public String getSource() {
return source;
}
public static SortableRelationKey create(Relation r, String groupingKey) {
SortableRelationKey sr = new SortableRelationKey();
sr.setGroupingKey(groupingKey);
sr.setSource(r.getSource());
sr.setTarget(r.getTarget());
sr.setSubRelType(r.getSubRelType());
return sr;
}
@Override
public int compareTo(SortableRelationKey o) {
final Integer wt = Optional.ofNullable(weights.get(getSubRelType())).orElse(Integer.MAX_VALUE);
final Integer wo = Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE);
return ComparisonChain
.start()
.compare(wt, wo)
.compare(getSource(), o.getSource())
.compare(getTarget(), o.getTarget())
.result();
}
public void setSource(String source) {
this.source = source;
}
public String getTarget() {
return target;
}
public void setTarget(String target) {
this.target = target;
}
public String getSubRelType() {
return subRelType;
}
public void setSubRelType(String subRelType) {
this.subRelType = subRelType;
}
public String getGroupingKey() {
return groupingKey;
}
public void setGroupingKey(String groupingKey) {
this.groupingKey = groupingKey;
}
}

View File

@ -4,6 +4,8 @@ package eu.dnetlib.dhp.oa.provision.utils;
import org.apache.spark.Partitioner;
import org.apache.spark.util.Utils;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
/**
* Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to
* sort relations sharing the same source id by the ordering defined in SortableRelationKey.
@ -23,8 +25,8 @@ public class RelationPartitioner extends Partitioner {
@Override
public int getPartition(Object key) {
String partitionKey = (String) key;
return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions());
SortableRelationKey partitionKey = (SortableRelationKey) key;
return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions());
}
}

View File

@ -276,7 +276,7 @@ public class XmlRecordFactory implements Serializable {
pidType,
pidValue
.toLowerCase()
.replaceAll("orcid", "")));
.replaceAll("^.*orcid\\.org\\/", "")));
}
}
});

View File

@ -97,18 +97,7 @@
</configuration>
</global>
<start to="resume_from"/>
<decision name="resume_from">
<switch>
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
<default to="prepare_relations"/>
</switch>
</decision>
<start to="prepare_relations"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
@ -135,475 +124,12 @@
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--relPartitions</arg><arg>5000</arg>
</spark>
<ok to="fork_join_related_entities"/>
<error to="Kill"/>
</action>
<fork name="fork_join_related_entities">
<path start="join_relation_publication"/>
<path start="join_relation_dataset"/>
<path start="join_relation_otherresearchproduct"/>
<path start="join_relation_software"/>
<path start="join_relation_datasource"/>
<path start="join_relation_organization"/>
<path start="join_relation_project"/>
</fork>
<action name="join_relation_publication">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = publication.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/publication</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_dataset">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = dataset.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/dataset</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_otherresearchproduct">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = otherresearchproduct.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/otherresearchproduct</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_software">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = software.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/software</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_datasource">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = datasource.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/datasource</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_organization">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = organization.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/organization</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<action name="join_relation_project">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[relation.target = project.id]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/project</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<join name="wait_joins" to="fork_join_all_entities"/>
<fork name="fork_join_all_entities">
<path start="join_publication_relations"/>
<path start="join_dataset_relations"/>
<path start="join_otherresearchproduct_relations"/>
<path start="join_software_relations"/>
<path start="join_datasource_relations"/>
<path start="join_organization_relations"/>
<path start="join_project_relations"/>
</fork>
<action name="join_publication_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[publication.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/publication</arg>
<arg>--numPartitions</arg><arg>30000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_dataset_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[dataset.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/dataset</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_otherresearchproduct_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[otherresearchproduct.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/otherresearchproduct</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_software_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[software.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/software</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_datasource_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[datasource.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/datasource</arg>
<arg>--numPartitions</arg><arg>1000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_organization_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[organization.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/organization</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_project_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[project.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/project</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<join name="wait_join_phase2" to="convert_to_xml"/>
<action name="convert_to_xml">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>convert_to_xml</name>
<class>eu.dnetlib.dhp.oa.provision.XmlConverterJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=3840
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--otherDsTypeId</arg><arg>${otherDsTypeId}</arg>
</spark>
<ok to="to_solr_index"/>
<error to="Kill"/>
</action>
<action name="to_solr_index">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>to_solr_index</name>
<class>eu.dnetlib.dhp.oa.provision.XmlIndexingJob</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-memory=${sparkExecutorMemoryForIndexing}
--driver-memory=${sparkDriverMemoryForIndexing}
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.map.speculative=false
--conf spark.hadoop.mapreduce.reduce.speculative=false
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/xml</arg>
<arg>--isLookupUrl</arg><arg>${isLookupUrl}</arg>
<arg>--format</arg><arg>${format}</arg>
<arg>--batchSize</arg><arg>${batchSize}</arg>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>