WIP: fixed PrepareRelationsJob; parallel implementation of CreateRelatedEntitiesJob_phase2, now works by OafType; introduced custom aggregator in AdjacencyListBuilderJob

This commit is contained in:
Claudio Atzori 2020-05-29 10:58:15 +02:00
parent a57965a3ea
commit b2f9564f13
9 changed files with 329 additions and 150 deletions

View File

@ -58,6 +58,18 @@ public class ModelSupport {
oafTypes.put("relation", Relation.class);
}
public static final Map<Class, String> idPrefixMap = Maps.newHashMap();
static {
idPrefixMap.put(Datasource.class, "10");
idPrefixMap.put(Organization.class, "20");
idPrefixMap.put(Project.class, "40");
idPrefixMap.put(Dataset.class, "50");
idPrefixMap.put(OtherResearchProduct.class, "50");
idPrefixMap.put(Software.class, "50");
idPrefixMap.put(Publication.class, "50");
}
public static final Map<String, String> entityIdPrefix = Maps.newHashMap();
static {
@ -289,6 +301,10 @@ public class ModelSupport {
private ModelSupport() {
}
public static <E extends OafEntity> String getIdPrefix(Class<E> clazz) {
return idPrefixMap.get(clazz);
}
/**
* Checks subclass-superclass relationship.
*

View File

@ -30,6 +30,8 @@ import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import scala.Function1;
import scala.Function2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
/**
* Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The
@ -83,18 +85,7 @@ public class AdjacencyListBuilderJob {
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
List<Class<?>> modelClasses = Arrays.asList(ModelSupport.getOafModelClasses());
modelClasses
.addAll(
Lists
.newArrayList(
TypedRow.class,
EntityRelEntity.class,
JoinedEntity.class,
RelatedEntity.class,
Tuple2.class,
SortableRelation.class));
conf.registerKryoClasses(modelClasses.toArray(new Class[] {}));
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession(
conf,
@ -108,11 +99,17 @@ public class AdjacencyListBuilderJob {
private static void createAdjacencyListsKryo(
SparkSession spark, String inputPath, String outputPath) {
TypedColumn<EntityRelEntity, JoinedEntity> aggregator = new AdjacencyListAggregator().toColumn();
log.info("Reading joined entities from: {}", inputPath);
final List<String> paths = HdfsSupport
.listFiles(inputPath, spark.sparkContext().hadoopConfiguration());
log.info("Found paths: {}", String.join(",", paths));
TypedColumn<EntityRelEntity, JoinedEntity> aggregator = new AdjacencyListAggregator().toColumn();
spark
.read()
.load(inputPath)
.load(toSeq(paths))
.as(Encoders.kryo(EntityRelEntity.class))
.groupByKey(
(MapFunction<EntityRelEntity, String>) value -> value.getEntity().getId(),
@ -232,6 +229,10 @@ public class AdjacencyListBuilderJob {
.parquet(outputPath);
}
private static Seq<String> toSeq(List<String> list) {
return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq();
}
private static void removeOutputDir(SparkSession spark, String path) {
HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration());
}

View File

@ -134,7 +134,7 @@ public class CreateRelatedEntitiesJob_phase1 {
Encoders.bean(EntityRelEntity.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath + "/" + EntityType.fromClass(clazz));
.parquet(outputPath);
}
private static <E extends OafEntity> Dataset<E> readPathEntity(

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.*;
@ -87,8 +88,8 @@ public class CreateRelatedEntitiesJob_phase2 {
String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath");
log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath);
String inputGraphRootPath = parser.get("inputGraphRootPath");
log.info("inputGraphRootPath: {}", inputGraphRootPath);
String inputEntityPath = parser.get("inputEntityPath");
log.info("inputEntityPath: {}", inputEntityPath);
String outputPath = parser.get("outputPath");
log.info("outputPath: {}", outputPath);
@ -96,44 +97,49 @@ public class CreateRelatedEntitiesJob_phase2 {
int numPartitions = Integer.parseInt(parser.get("numPartitions"));
log.info("numPartitions: {}", numPartitions);
String graphTableClassName = parser.get("graphTableClassName");
log.info("graphTableClassName: {}", graphTableClassName);
Class<? extends OafEntity> entityClazz = (Class<? extends OafEntity>) Class.forName(graphTableClassName);
SparkConf conf = new SparkConf();
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(ModelSupport.getOafModelClasses());
conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
runWithSparkSession(
conf,
isSparkSessionManaged,
spark -> {
removeOutputDir(spark, outputPath);
joinAllEntities(
spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions);
joinEntityWithRelatedEntities(
spark, inputRelatedEntitiesPath, inputEntityPath, outputPath, numPartitions, entityClazz);
});
}
private static void joinAllEntities(
private static <E extends OafEntity> void joinEntityWithRelatedEntities(
SparkSession spark,
String inputRelatedEntitiesPath,
String inputGraphRootPath,
String relatedEntitiesPath,
String entityPath,
String outputPath,
int numPartitions) {
int numPartitions,
Class<E> entityClazz) {
Dataset<Tuple2<String, TypedRow>> entities = readAllEntities(spark, inputGraphRootPath, numPartitions);
Dataset<Tuple2<String, EntityRelEntity>> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath);
Dataset<Tuple2<String, E>> entity = readPathEntity(spark, entityPath, entityClazz);
Dataset<Tuple2<String, EntityRelEntity>> relatedEntities = readRelatedEntities(
spark, relatedEntitiesPath, entityClazz);
entities
.joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer")
.map(
(MapFunction<Tuple2<Tuple2<String, TypedRow>, Tuple2<String, EntityRelEntity>>, EntityRelEntity>) value -> {
entity
.joinWith(relatedEntities, entity.col("_1").equalTo(relatedEntities.col("_1")), "left_outer")
.map((MapFunction<Tuple2<Tuple2<String, E>, Tuple2<String, EntityRelEntity>>, EntityRelEntity>) value -> {
EntityRelEntity re = new EntityRelEntity();
re.setEntity(value._1()._2());
re.setEntity(getTypedRow(entityClazz.getCanonicalName().toLowerCase(), value._1()._2()));
Optional<EntityRelEntity> related = Optional.ofNullable(value._2()).map(Tuple2::_2);
if (related.isPresent()) {
re.setRelation(related.get().getRelation());
re.setTarget(related.get().getTarget());
}
return re;
},
Encoders.bean(EntityRelEntity.class))
}, Encoders.bean(EntityRelEntity.class))
.repartition(numPartitions)
.filter(
(FilterFunction<EntityRelEntity>) value -> value.getEntity() != null
@ -143,33 +149,8 @@ public class CreateRelatedEntitiesJob_phase2 {
.parquet(outputPath);
}
private static Dataset<Tuple2<String, TypedRow>> readAllEntities(
SparkSession spark, String inputGraphPath, int numPartitions) {
Dataset<TypedRow> publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class);
Dataset<TypedRow> dataset = readPathEntity(
spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class);
Dataset<TypedRow> other = readPathEntity(
spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class);
Dataset<TypedRow> software = readPathEntity(spark, inputGraphPath + "/software", Software.class);
Dataset<TypedRow> datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class);
Dataset<TypedRow> organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class);
Dataset<TypedRow> project = readPathEntity(spark, inputGraphPath + "/project", Project.class);
return publication
.union(dataset)
.union(other)
.union(software)
.union(datasource)
.union(organization)
.union(project)
.map(
(MapFunction<TypedRow, Tuple2<String, TypedRow>>) value -> new Tuple2<>(value.getId(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class)))
.repartition(numPartitions);
}
private static Dataset<Tuple2<String, EntityRelEntity>> readRelatedEntities(
SparkSession spark, String inputRelatedEntitiesPath) {
private static <E extends OafEntity> Dataset<Tuple2<String, EntityRelEntity>> readRelatedEntities(
SparkSession spark, String inputRelatedEntitiesPath, Class<E> entityClazz) {
log.info("Reading related entities from: {}", inputRelatedEntitiesPath);
@ -178,17 +159,20 @@ public class CreateRelatedEntitiesJob_phase2 {
log.info("Found paths: {}", String.join(",", paths));
final String idPrefix = ModelSupport.getIdPrefix(entityClazz);
return spark
.read()
.load(toSeq(paths))
.as(Encoders.bean(EntityRelEntity.class))
.filter((FilterFunction<EntityRelEntity>) e -> e.getRelation().getSource().startsWith(idPrefix))
.map(
(MapFunction<EntityRelEntity, Tuple2<String, EntityRelEntity>>) value -> new Tuple2<>(
value.getRelation().getSource(), value),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class)));
}
private static <E extends OafEntity> Dataset<TypedRow> readPathEntity(
private static <E extends OafEntity> Dataset<Tuple2<String, E>> readPathEntity(
SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
log.info("Reading Graph table from: {}", inputEntityPath);
@ -201,9 +185,8 @@ public class CreateRelatedEntitiesJob_phase2 {
.filter("dataInfo.invisible == false")
.map((MapFunction<E, E>) e -> pruneOutliers(entityClazz, e), Encoders.bean(entityClazz))
.map(
(MapFunction<E, TypedRow>) value -> getTypedRow(
StringUtils.substringAfterLast(inputEntityPath, "/"), value),
Encoders.bean(TypedRow.class));
(MapFunction<E, Tuple2<String, E>>) e -> new Tuple2<>(e.getId(), e),
Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz)));
}
private static <E extends OafEntity> E pruneOutliers(Class<E> entityClazz, E e) {

View File

@ -3,9 +3,8 @@ package eu.dnetlib.dhp.oa.provision;
import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
@ -20,6 +19,7 @@ import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.clearspring.analytics.util.Lists;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
@ -27,9 +27,11 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import scala.Function1;
import scala.Tuple2;
/**
@ -111,37 +113,10 @@ public class PrepareRelationsJob {
spark -> {
removeOutputDir(spark, outputPath);
prepareRelationsRDD(
spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations);
spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions);
});
}
/**
* Dataset based implementation that prepares the graph relations by limiting the number of outgoing links and
* filtering the relation types according to the given criteria.
*
* @param spark the spark session
* @param inputRelationsPath source path for the graph relations
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
*/
private static void prepareRelations(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter,
int maxRelations) {
readPathRelation(spark, inputRelationsPath)
.filter("dataInfo.deletedbyinference == false")
.filter((FilterFunction<SortableRelation>) rel -> !relationFilter.contains(rel.getRelClass()))
.groupByKey(
(MapFunction<SortableRelation, String>) value -> value.getSource(), Encoders.STRING())
.flatMapGroups(
(FlatMapGroupsFunction<String, SortableRelation, SortableRelation>) (key, values) -> Iterators
.limit(values, maxRelations),
Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering
* the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are
@ -152,50 +127,41 @@ public class PrepareRelationsJob {
* @param outputPath output path for the processed relations
* @param relationFilter set of relation filters applied to the `relClass` field
* @param maxRelations maximum number of allowed outgoing edges
* @param relPartitions number of partitions for the output RDD
*/
// TODO work in progress
private static void prepareRelationsRDD(
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int relPartitions,
int maxRelations) {
JavaRDD<SortableRelation> rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions);
RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions());
SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations,
int relPartitions) {
// only consider those that are not virtually deleted
RDD<SortableRelation> d = rels
RDD<SortableRelation> cappedRels = readPathRelationRDD(spark, inputRelationsPath)
.repartition(relPartitions)
.filter(rel -> !rel.getDataInfo().getDeletedbyinference())
.filter(rel -> !relationFilter.contains(rel.getRelClass()))
.mapToPair(
(PairFunction<SortableRelation, SortableRelation, SortableRelation>) rel -> new Tuple2<>(rel, rel))
.groupByKey(partitioner)
.map(group -> Iterables.limit(group._2(), maxRelations))
.flatMap(group -> group.iterator())
// group by SOURCE and apply limit
.mapToPair(rel -> new Tuple2<>(rel.getSource(), rel))
.groupByKey(new RelationPartitioner(relPartitions))
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
// group by TARGET and apply limit
.mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel))
.groupByKey(new RelationPartitioner(relPartitions))
.flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator())
.rdd();
spark
.createDataset(d, Encoders.bean(SortableRelation.class))
.createDataset(cappedRels, Encoders.bean(SortableRelation.class))
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
}
/**
* Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text
* file,
*
* @param spark
* @param inputPath
* @return the Dataset<SortableRelation> containing all the relationships
* @return the JavaRDD<SortableRelation> containing all the relationships
*/
private static Dataset<SortableRelation> readPathRelation(
SparkSession spark, final String inputPath) {
return spark
.read()
.textFile(inputPath)
.map(
(MapFunction<String, SortableRelation>) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class),
Encoders.bean(SortableRelation.class));
}
private static JavaRDD<SortableRelation> readPathRelationRDD(
SparkSession spark, final String inputPath) {
JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext());

View File

@ -0,0 +1,26 @@
package eu.dnetlib.dhp.oa.provision.model;
import java.util.List;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.schema.common.ModelSupport;
public class ProvisionModelSupport {
public static Class[] getModelClasses() {
List<Class<?>> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses());
modelClasses
.addAll(
Lists
.newArrayList(
TypedRow.class,
EntityRelEntity.class,
JoinedEntity.class,
RelatedEntity.class,
Tuple2.class,
SortableRelation.class));
return modelClasses.toArray(new Class[] {});
}
}

View File

@ -4,8 +4,6 @@ package eu.dnetlib.dhp.oa.provision.utils;
import org.apache.spark.Partitioner;
import org.apache.spark.util.Utils;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
/**
* Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to
* sort relations sharing the same source id by the ordering defined in SortableRelationKey.
@ -25,6 +23,8 @@ public class RelationPartitioner extends Partitioner {
@Override
public int getPartition(Object key) {
return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions());
String partitionKey = (String) key;
return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions());
}
}

View File

@ -13,8 +13,14 @@
},
{
"paramName": "iep",
"paramLongName": "inputGraphRootPath",
"paramDescription": "root graph path",
"paramLongName": "inputEntityPath",
"paramDescription": "input Entity Path",
"paramRequired": true
},
{
"paramName": "clazz",
"paramLongName": "graphTableClassName",
"paramDescription": "class name associated to the input entity path",
"paramRequired": true
},
{

View File

@ -103,7 +103,7 @@
<switch>
<case to="prepare_relations">${wf:conf('resumeFrom') eq 'prepare_relations'}</case>
<case to="fork_join_related_entities">${wf:conf('resumeFrom') eq 'fork_join_related_entities'}</case>
<case to="join_all_entities">${wf:conf('resumeFrom') eq 'join_all_entities'}</case>
<case to="fork_join_all_entities">${wf:conf('resumeFrom') eq 'fork_join_all_entities'}</case>
<case to="adjancency_lists">${wf:conf('resumeFrom') eq 'adjancency_lists'}</case>
<case to="convert_to_xml">${wf:conf('resumeFrom') eq 'convert_to_xml'}</case>
<case to="to_solr_index">${wf:conf('resumeFrom') eq 'to_solr_index'}</case>
@ -134,7 +134,7 @@
</spark-opts>
<arg>--inputRelationsPath</arg><arg>${inputGraphRootPath}/relation</arg>
<arg>--outputPath</arg><arg>${workingDir}/relation</arg>
<arg>--relPartitions</arg><arg>3000</arg>
<arg>--relPartitions</arg><arg>5000</arg>
</spark>
<ok to="fork_join_related_entities"/>
<error to="Kill"/>
@ -171,7 +171,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/publication</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -198,7 +198,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/dataset</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -225,7 +225,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/otherresearchproduct</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -252,7 +252,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/software</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -279,7 +279,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/datasource</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -306,7 +306,7 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/organization</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
@ -333,19 +333,29 @@
<arg>--inputRelationsPath</arg><arg>${workingDir}/relation</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_partial/project</arg>
</spark>
<ok to="wait_joins"/>
<error to="Kill"/>
</action>
<join name="wait_joins" to="join_all_entities"/>
<join name="wait_joins" to="fork_join_all_entities"/>
<action name="join_all_entities">
<fork name="fork_join_all_entities">
<path start="join_publication_relations"/>
<path start="join_dataset_relations"/>
<path start="join_otherresearchproduct_relations"/>
<path start="join_software_relations"/>
<path start="join_datasource_relations"/>
<path start="join_organization_relations"/>
<path start="join_project_relations"/>
</fork>
<action name="join_publication_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[entities.id = relatedEntity.source]</name>
<name>Join[publication.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
@ -356,18 +366,189 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=7680
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputGraphRootPath</arg><arg>${inputGraphRootPath}</arg>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/publication</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Publication</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/publication</arg>
<arg>--numPartitions</arg><arg>35000</arg>
</spark>
<ok to="adjancency_lists"/>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_dataset_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[dataset.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/dataset</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Dataset</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/dataset</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_otherresearchproduct_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[otherresearchproduct.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/otherresearchproduct</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.OtherResearchProduct</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/otherresearchproduct</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_software_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[software.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/software</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Software</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/software</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_datasource_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[datasource.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/datasource</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Datasource</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/datasource</arg>
<arg>--numPartitions</arg><arg>1000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_organization_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[organization.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=2
--executor-memory=12G
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/organization</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Organization</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/organization</arg>
<arg>--numPartitions</arg><arg>20000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<action name="join_project_relations">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
<mode>cluster</mode>
<name>Join[project.id = relatedEntity.source]</name>
<class>eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2</class>
<jar>dhp-graph-provision-${projectVersion}.jar</jar>
<spark-opts>
--executor-cores=${sparkExecutorCoresForJoining}
--executor-memory=${sparkExecutorMemoryForJoining}
--driver-memory=${sparkDriverMemoryForJoining}
--conf spark.extraListeners=${spark2ExtraListeners}
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputEntityPath</arg><arg>${inputGraphRootPath}/project</arg>
<arg>--graphTableClassName</arg><arg>eu.dnetlib.dhp.schema.oaf.Project</arg>
<arg>--inputRelatedEntitiesPath</arg><arg>${workingDir}/join_partial</arg>
<arg>--outputPath</arg><arg>${workingDir}/join_entities/project</arg>
<arg>--numPartitions</arg><arg>10000</arg>
</spark>
<ok to="wait_join_phase2"/>
<error to="Kill"/>
</action>
<join name="wait_join_phase2" to="adjancency_lists"/>
<action name="adjancency_lists">
<spark xmlns="uri:oozie:spark-action:0.2">
<master>yarn</master>
@ -383,7 +564,7 @@
--conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners}
--conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress}
--conf spark.eventLog.dir=${nameNode}${spark2EventLogDir}
--conf spark.sql.shuffle.partitions=15000
--conf spark.sql.shuffle.partitions=15360
--conf spark.network.timeout=${sparkNetworkTimeout}
</spark-opts>
<arg>--inputPath</arg><arg>${workingDir}/join_entities</arg>