From 0e723d378b3e1cf63cb40bdb27e5ab4cb272bfa2 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Wed, 24 Jun 2020 18:34:42 +0200 Subject: [PATCH] added default from vocab for missing instance.refereed; remove spurious prefixes from orcid values; WIP: prepare relation job --- .../oa/graph/clean/CleanGraphSparkJob.java | 3 + .../CreateRelatedEntitiesJob_phase1.java | 15 +- .../dhp/oa/provision/PrepareRelationsJob.java | 31 +- .../model/ProvisionModelSupport.java | 2 +- .../provision/model/RelatedEntityWrapper.java | 12 +- .../oa/provision/model/SortableRelation.java | 38 -- .../provision/model/SortableRelationKey.java | 90 ++++ .../provision/utils/RelationPartitioner.java | 6 +- .../oa/provision/utils/XmlRecordFactory.java | 2 +- .../dhp/oa/provision/oozie_app/workflow.xml | 480 +----------------- 10 files changed, 128 insertions(+), 551 deletions(-) delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java diff --git a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java index 8f43ab1cf3..bdbd641604 100644 --- a/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java +++ b/dhp-workflows/dhp-graph-mapper/src/main/java/eu/dnetlib/dhp/oa/graph/clean/CleanGraphSparkJob.java @@ -147,6 +147,9 @@ public class CleanGraphSparkJob { if (Objects.isNull(i.getHostedby()) || StringUtils.isBlank(i.getHostedby().getKey())) { i.setHostedby(ModelConstants.UNKNOWN_REPOSITORY); } + if (Objects.isNull(i.getRefereed())) { + i.setRefereed(qualifier("0000", "Unknown", ModelConstants.DNET_REVIEW_LEVELS)); + } } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 4d2633bc54..80b8000173 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -25,9 +25,7 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntity; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.schema.common.EntityType; -import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -109,11 +107,12 @@ public class CreateRelatedEntitiesJob_phase1 { Class clazz, String outputPath) { - Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) + Dataset> relsByTarget = readPathRelation(spark, inputRelationsPath) .filter("dataInfo.deletedbyinference == false") .map( - (MapFunction>) r -> new Tuple2<>(r.getTarget(), r), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(SortableRelation.class))) + (MapFunction>) r -> new Tuple2<>(r.getTarget(), + r), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(Relation.class))) .cache(); Dataset> entities = readPathEntity(spark, inputEntityPath, clazz) @@ -129,7 +128,7 @@ public class CreateRelatedEntitiesJob_phase1 { relsByTarget .joinWith(entities, entities.col("_1").equalTo(relsByTarget.col("_1")), "inner") .map( - (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( + (MapFunction, Tuple2>, RelatedEntityWrapper>) t -> new RelatedEntityWrapper( t._1()._2(), t._2()._2()), Encoders.kryo(RelatedEntityWrapper.class)) .write() @@ -232,11 +231,11 @@ public class CreateRelatedEntitiesJob_phase1 { * @param relationPath * @return the Dataset containing all the relationships */ - private static Dataset readPathRelation( + private static Dataset readPathRelation( SparkSession spark, final String relationPath) { log.info("Reading relations from: {}", relationPath); - return spark.read().load(relationPath).as(Encoders.bean(SortableRelation.class)); + return spark.read().load(relationPath).as(Encoders.bean(Relation.class)); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 6b184071ac..d69b75b654 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -4,35 +4,28 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; import java.util.*; -import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.*; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.clearspring.analytics.util.Lists; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; -import eu.dnetlib.dhp.common.FunctionalInterfaceSupport; import eu.dnetlib.dhp.common.HdfsSupport; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; -import scala.Function1; -import scala.Tuple2; +import eu.dnetlib.dhp.schema.oaf.Relation; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -133,22 +126,22 @@ public class PrepareRelationsJob { SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { - RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) + RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) .repartition(relPartitions) .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) .filter(rel -> !relationFilter.contains(rel.getRelClass())) // group by SOURCE and apply limit - .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel)) - .groupByKey(new RelationPartitioner(relPartitions)) - .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + .groupBy(r -> SortableRelationKey.create(r, r.getSource())) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) // group by TARGET and apply limit - .mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel)) - .groupByKey(new RelationPartitioner(relPartitions)) - .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + .groupBy(r -> SortableRelationKey.create(r, r.getTarget())) + .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) + .flatMap(t -> Iterables.limit(t._2(), maxRelations).iterator()) .rdd(); spark - .createDataset(cappedRels, Encoders.bean(SortableRelation.class)) + .createDataset(cappedRels, Encoders.bean(Relation.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); @@ -162,10 +155,10 @@ public class PrepareRelationsJob { * @param inputPath * @return the JavaRDD containing all the relationships */ - private static JavaRDD readPathRelationRDD( + private static JavaRDD readPathRelationRDD( SparkSession spark, final String inputPath) { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, SortableRelation.class)); + return sc.textFile(inputPath).map(s -> OBJECT_MAPPER.readValue(s, Relation.class)); } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index f9fde14e59..051fe923d7 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -19,7 +19,7 @@ public class ProvisionModelSupport { RelatedEntityWrapper.class, JoinedEntity.class, RelatedEntity.class, - SortableRelation.class)); + SortableRelationKey.class)); return modelClasses.toArray(new Class[] {}); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index d708b6ed0e..cbb143ee2b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -5,28 +5,30 @@ import java.io.Serializable; import com.google.common.base.Objects; +import eu.dnetlib.dhp.schema.oaf.Relation; + public class RelatedEntityWrapper implements Serializable { - private SortableRelation relation; + private Relation relation; private RelatedEntity target; public RelatedEntityWrapper() { } - public RelatedEntityWrapper(SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(Relation relation, RelatedEntity target) { this(null, relation, target); } - public RelatedEntityWrapper(TypedRow entity, SortableRelation relation, RelatedEntity target) { + public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) { this.relation = relation; this.target = target; } - public SortableRelation getRelation() { + public Relation getRelation() { return relation; } - public void setRelation(SortableRelation relation) { + public void setRelation(Relation relation) { this.relation = relation; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java deleted file mode 100644 index b6571b9bf9..0000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelation.java +++ /dev/null @@ -1,38 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; -import java.util.Map; - -import com.google.common.collect.ComparisonChain; -import com.google.common.collect.Maps; - -import eu.dnetlib.dhp.schema.oaf.Relation; - -public class SortableRelation extends Relation implements Comparable, Serializable { - - private static final Map weights = Maps.newHashMap(); - - static { - weights.put("outcome", 0); - weights.put("supplement", 1); - weights.put("affiliation", 2); - weights.put("relationship", 3); - weights.put("publicationDataset", 4); - weights.put("similarity", 5); - - weights.put("provision", 6); - weights.put("participation", 7); - weights.put("dedup", 8); - } - - @Override - public int compareTo(Relation o) { - return ComparisonChain - .start() - .compare(weights.get(getSubRelType()), weights.get(o.getSubRelType())) - .compare(getSource(), o.getSource()) - .compare(getTarget(), o.getTarget()) - .result(); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java new file mode 100644 index 0000000000..ad61fa0440 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/SortableRelationKey.java @@ -0,0 +1,90 @@ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.io.Serializable; +import java.util.Map; +import java.util.Optional; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class SortableRelationKey implements Comparable, Serializable { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("affiliation", 2); + weights.put("relationship", 3); + weights.put("publicationDataset", 4); + weights.put("similarity", 5); + + weights.put("provision", 6); + weights.put("participation", 7); + weights.put("dedup", 8); + } + + private String groupingKey; + + private String source; + + private String target; + + private String subRelType; + + public String getSource() { + return source; + } + + public static SortableRelationKey create(Relation r, String groupingKey) { + SortableRelationKey sr = new SortableRelationKey(); + sr.setGroupingKey(groupingKey); + sr.setSource(r.getSource()); + sr.setTarget(r.getTarget()); + sr.setSubRelType(r.getSubRelType()); + return sr; + } + + @Override + public int compareTo(SortableRelationKey o) { + final Integer wt = Optional.ofNullable(weights.get(getSubRelType())).orElse(Integer.MAX_VALUE); + final Integer wo = Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); + return ComparisonChain + .start() + .compare(wt, wo) + .compare(getSource(), o.getSource()) + .compare(getTarget(), o.getTarget()) + .result(); + } + + public void setSource(String source) { + this.source = source; + } + + public String getTarget() { + return target; + } + + public void setTarget(String target) { + this.target = target; + } + + public String getSubRelType() { + return subRelType; + } + + public void setSubRelType(String subRelType) { + this.subRelType = subRelType; + } + + public String getGroupingKey() { + return groupingKey; + } + + public void setGroupingKey(String groupingKey) { + this.groupingKey = groupingKey; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index c7862b48a9..bdece36ab0 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -4,6 +4,8 @@ package eu.dnetlib.dhp.oa.provision.utils; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; +import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; + /** * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to * sort relations sharing the same source id by the ordering defined in SortableRelationKey. @@ -23,8 +25,8 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - String partitionKey = (String) key; - return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions()); + SortableRelationKey partitionKey = (SortableRelationKey) key; + return Utils.nonNegativeMod(partitionKey.getGroupingKey().hashCode(), numPartitions()); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index b2aa01dc74..5d8d9fa200 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -276,7 +276,7 @@ public class XmlRecordFactory implements Serializable { pidType, pidValue .toLowerCase() - .replaceAll("orcid", ""))); + .replaceAll("^.*orcid\\.org\\/", ""))); } } }); diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 0d5121cf13..e98cbbc730 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -97,18 +97,7 @@ - - - - - ${wf:conf('resumeFrom') eq 'prepare_relations'} - ${wf:conf('resumeFrom') eq 'fork_join_related_entities'} - ${wf:conf('resumeFrom') eq 'fork_join_all_entities'} - ${wf:conf('resumeFrom') eq 'convert_to_xml'} - ${wf:conf('resumeFrom') eq 'to_solr_index'} - - - + Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] @@ -135,475 +124,12 @@ --outputPath${workingDir}/relation --relPartitions5000 - - - - - - - - - - - - - - - - - yarn - cluster - Join[relation.target = publication.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/join_partial/publication - - - - - - - - yarn - cluster - Join[relation.target = dataset.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/join_partial/dataset - - - - - - - - yarn - cluster - Join[relation.target = otherresearchproduct.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/join_partial/otherresearchproduct - - - - - - - - yarn - cluster - Join[relation.target = software.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/join_partial/software - - - - - - - - yarn - cluster - Join[relation.target = datasource.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --outputPath${workingDir}/join_partial/datasource - - - - - - - - yarn - cluster - Join[relation.target = organization.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --outputPath${workingDir}/join_partial/organization - - - - - - - - yarn - cluster - Join[relation.target = project.id] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase1 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputRelationsPath${workingDir}/relation - --inputEntityPath${inputGraphRootPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --outputPath${workingDir}/join_partial/project - - - - - - - - - - - - - - - - - - - - yarn - cluster - Join[publication.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15360 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/publication - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/publication - --numPartitions30000 - - - - - - - - yarn - cluster - Join[dataset.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/dataset - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/dataset - --numPartitions20000 - - - - - - - - yarn - cluster - Join[otherresearchproduct.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/otherresearchproduct - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/otherresearchproduct - --numPartitions10000 - - - - - - - - yarn - cluster - Join[software.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/software - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/software - --numPartitions10000 - - - - - - - - yarn - cluster - Join[datasource.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/datasource - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/datasource - --numPartitions1000 - - - - - - - - yarn - cluster - Join[organization.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/organization - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/organization - --numPartitions20000 - - - - - - - - yarn - cluster - Join[project.id = relatedEntity.source] - eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputEntityPath${inputGraphRootPath}/project - --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities/project - --numPartitions10000 - - - - - - - - - - yarn - cluster - convert_to_xml - eu.dnetlib.dhp.oa.provision.XmlConverterJob - dhp-graph-provision-${projectVersion}.jar - - --executor-cores=${sparkExecutorCoresForJoining} - --executor-memory=${sparkExecutorMemoryForJoining} - --driver-memory=${sparkDriverMemoryForJoining} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=3840 - --conf spark.network.timeout=${sparkNetworkTimeout} - - --inputPath${workingDir}/join_entities - --outputPath${workingDir}/xml - --isLookupUrl${isLookupUrl} - --otherDsTypeId${otherDsTypeId} - - - - - - - - yarn - cluster - to_solr_index - eu.dnetlib.dhp.oa.provision.XmlIndexingJob - dhp-graph-provision-${projectVersion}.jar - - --executor-memory=${sparkExecutorMemoryForIndexing} - --driver-memory=${sparkDriverMemoryForIndexing} - --conf spark.dynamicAllocation.enabled=true - --conf spark.dynamicAllocation.maxExecutors=${sparkExecutorCoresForIndexing} - --conf spark.extraListeners=${spark2ExtraListeners} - --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} - --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} - --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.speculation=false - --conf spark.hadoop.mapreduce.map.speculative=false - --conf spark.hadoop.mapreduce.reduce.speculative=false - - --inputPath${workingDir}/xml - --isLookupUrl${isLookupUrl} - --format${format} - --batchSize${batchSize} - + + \ No newline at end of file