From b2f9564f13b90312a8f462a27fb56935f29e944d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 29 May 2020 10:58:15 +0200 Subject: [PATCH] WIP: fixed PrepareRelationsJob; parallel implementation of CreateRelatedEntitiesJob_phase2, now works by OafType; introduced custom aggregator in AdjacencyListBuilderJob --- .../dhp/schema/common/ModelSupport.java | 16 ++ .../oa/provision/AdjacencyListBuilderJob.java | 29 +-- .../CreateRelatedEntitiesJob_phase1.java | 2 +- .../CreateRelatedEntitiesJob_phase2.java | 97 ++++---- .../dhp/oa/provision/PrepareRelationsJob.java | 78 ++----- .../model/ProvisionModelSupport.java | 26 +++ .../provision/utils/RelationPartitioner.java | 6 +- ...input_params_related_entities_pahase2.json | 10 +- .../dhp/oa/provision/oozie_app/workflow.xml | 215 ++++++++++++++++-- 9 files changed, 329 insertions(+), 150 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java diff --git a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java index 9ee7c2deb..7d8be81ac 100644 --- a/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java +++ b/dhp-schemas/src/main/java/eu/dnetlib/dhp/schema/common/ModelSupport.java @@ -58,6 +58,18 @@ public class ModelSupport { oafTypes.put("relation", Relation.class); } + public static final Map idPrefixMap = Maps.newHashMap(); + + static { + idPrefixMap.put(Datasource.class, "10"); + idPrefixMap.put(Organization.class, "20"); + idPrefixMap.put(Project.class, "40"); + idPrefixMap.put(Dataset.class, "50"); + idPrefixMap.put(OtherResearchProduct.class, "50"); + idPrefixMap.put(Software.class, "50"); + idPrefixMap.put(Publication.class, "50"); + } + public static final Map entityIdPrefix = Maps.newHashMap(); static { @@ -289,6 +301,10 @@ public class ModelSupport { private ModelSupport() { } + public static String getIdPrefix(Class clazz) { + return idPrefixMap.get(clazz); + } + /** * Checks subclass-superclass relationship. * diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java index 63b90be7c..910138988 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/AdjacencyListBuilderJob.java @@ -30,6 +30,8 @@ import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.Oaf; import scala.Function1; import scala.Function2; +import scala.collection.JavaConverters; +import scala.collection.Seq; /** * Joins the graph nodes by resolving the links of distance = 1 to create an adjacency list of linked objects. The @@ -83,18 +85,7 @@ public class AdjacencyListBuilderJob { SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - List> modelClasses = Arrays.asList(ModelSupport.getOafModelClasses()); - modelClasses - .addAll( - Lists - .newArrayList( - TypedRow.class, - EntityRelEntity.class, - JoinedEntity.class, - RelatedEntity.class, - Tuple2.class, - SortableRelation.class)); - conf.registerKryoClasses(modelClasses.toArray(new Class[] {})); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, @@ -108,11 +99,17 @@ public class AdjacencyListBuilderJob { private static void createAdjacencyListsKryo( SparkSession spark, String inputPath, String outputPath) { - TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); log.info("Reading joined entities from: {}", inputPath); + + final List paths = HdfsSupport + .listFiles(inputPath, spark.sparkContext().hadoopConfiguration()); + + log.info("Found paths: {}", String.join(",", paths)); + + TypedColumn aggregator = new AdjacencyListAggregator().toColumn(); spark .read() - .load(inputPath) + .load(toSeq(paths)) .as(Encoders.kryo(EntityRelEntity.class)) .groupByKey( (MapFunction) value -> value.getEntity().getId(), @@ -232,6 +229,10 @@ public class AdjacencyListBuilderJob { .parquet(outputPath); } + private static Seq toSeq(List list) { + return JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq(); + } + private static void removeOutputDir(SparkSession spark, String path) { HdfsSupport.remove(path, spark.sparkContext().hadoopConfiguration()); } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java index 606fa4cc0..ccb20a136 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase1.java @@ -134,7 +134,7 @@ public class CreateRelatedEntitiesJob_phase1 { Encoders.bean(EntityRelEntity.class)) .write() .mode(SaveMode.Overwrite) - .parquet(outputPath + "/" + EntityType.fromClass(clazz)); + .parquet(outputPath); } private static Dataset readPathEntity( diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 1de734ee4..757ab47d3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.TypedRow; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; @@ -74,7 +75,7 @@ public class CreateRelatedEntitiesJob_phase2 { .toString( PrepareRelationsJob.class .getResourceAsStream( - "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); + "/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")); final ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration); parser.parseArgument(args); @@ -87,8 +88,8 @@ public class CreateRelatedEntitiesJob_phase2 { String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath"); log.info("inputRelatedEntitiesPath: {}", inputRelatedEntitiesPath); - String inputGraphRootPath = parser.get("inputGraphRootPath"); - log.info("inputGraphRootPath: {}", inputGraphRootPath); + String inputEntityPath = parser.get("inputEntityPath"); + log.info("inputEntityPath: {}", inputEntityPath); String outputPath = parser.get("outputPath"); log.info("outputPath: {}", outputPath); @@ -96,44 +97,49 @@ public class CreateRelatedEntitiesJob_phase2 { int numPartitions = Integer.parseInt(parser.get("numPartitions")); log.info("numPartitions: {}", numPartitions); + String graphTableClassName = parser.get("graphTableClassName"); + log.info("graphTableClassName: {}", graphTableClassName); + + Class entityClazz = (Class) Class.forName(graphTableClassName); + SparkConf conf = new SparkConf(); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); - conf.registerKryoClasses(ModelSupport.getOafModelClasses()); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - joinAllEntities( - spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions); + joinEntityWithRelatedEntities( + spark, inputRelatedEntitiesPath, inputEntityPath, outputPath, numPartitions, entityClazz); }); } - private static void joinAllEntities( + private static void joinEntityWithRelatedEntities( SparkSession spark, - String inputRelatedEntitiesPath, - String inputGraphRootPath, + String relatedEntitiesPath, + String entityPath, String outputPath, - int numPartitions) { + int numPartitions, + Class entityClazz) { - Dataset> entities = readAllEntities(spark, inputGraphRootPath, numPartitions); - Dataset> relsBySource = readRelatedEntities(spark, inputRelatedEntitiesPath); + Dataset> entity = readPathEntity(spark, entityPath, entityClazz); + Dataset> relatedEntities = readRelatedEntities( + spark, relatedEntitiesPath, entityClazz); - entities - .joinWith(relsBySource, entities.col("_1").equalTo(relsBySource.col("_1")), "left_outer") - .map( - (MapFunction, Tuple2>, EntityRelEntity>) value -> { - EntityRelEntity re = new EntityRelEntity(); - re.setEntity(value._1()._2()); - Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); - if (related.isPresent()) { - re.setRelation(related.get().getRelation()); - re.setTarget(related.get().getTarget()); - } - return re; - }, - Encoders.bean(EntityRelEntity.class)) + entity + .joinWith(relatedEntities, entity.col("_1").equalTo(relatedEntities.col("_1")), "left_outer") + .map((MapFunction, Tuple2>, EntityRelEntity>) value -> { + EntityRelEntity re = new EntityRelEntity(); + re.setEntity(getTypedRow(entityClazz.getCanonicalName().toLowerCase(), value._1()._2())); + Optional related = Optional.ofNullable(value._2()).map(Tuple2::_2); + if (related.isPresent()) { + re.setRelation(related.get().getRelation()); + re.setTarget(related.get().getTarget()); + } + return re; + }, Encoders.bean(EntityRelEntity.class)) .repartition(numPartitions) .filter( (FilterFunction) value -> value.getEntity() != null @@ -143,33 +149,8 @@ public class CreateRelatedEntitiesJob_phase2 { .parquet(outputPath); } - private static Dataset> readAllEntities( - SparkSession spark, String inputGraphPath, int numPartitions) { - Dataset publication = readPathEntity(spark, inputGraphPath + "/publication", Publication.class); - Dataset dataset = readPathEntity( - spark, inputGraphPath + "/dataset", eu.dnetlib.dhp.schema.oaf.Dataset.class); - Dataset other = readPathEntity( - spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class); - Dataset software = readPathEntity(spark, inputGraphPath + "/software", Software.class); - Dataset datasource = readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class); - Dataset organization = readPathEntity(spark, inputGraphPath + "/organization", Organization.class); - Dataset project = readPathEntity(spark, inputGraphPath + "/project", Project.class); - - return publication - .union(dataset) - .union(other) - .union(software) - .union(datasource) - .union(organization) - .union(project) - .map( - (MapFunction>) value -> new Tuple2<>(value.getId(), value), - Encoders.tuple(Encoders.STRING(), Encoders.kryo(TypedRow.class))) - .repartition(numPartitions); - } - - private static Dataset> readRelatedEntities( - SparkSession spark, String inputRelatedEntitiesPath) { + private static Dataset> readRelatedEntities( + SparkSession spark, String inputRelatedEntitiesPath, Class entityClazz) { log.info("Reading related entities from: {}", inputRelatedEntitiesPath); @@ -178,17 +159,20 @@ public class CreateRelatedEntitiesJob_phase2 { log.info("Found paths: {}", String.join(",", paths)); + final String idPrefix = ModelSupport.getIdPrefix(entityClazz); + return spark .read() .load(toSeq(paths)) .as(Encoders.bean(EntityRelEntity.class)) + .filter((FilterFunction) e -> e.getRelation().getSource().startsWith(idPrefix)) .map( (MapFunction>) value -> new Tuple2<>( value.getRelation().getSource(), value), Encoders.tuple(Encoders.STRING(), Encoders.kryo(EntityRelEntity.class))); } - private static Dataset readPathEntity( + private static Dataset> readPathEntity( SparkSession spark, String inputEntityPath, Class entityClazz) { log.info("Reading Graph table from: {}", inputEntityPath); @@ -201,9 +185,8 @@ public class CreateRelatedEntitiesJob_phase2 { .filter("dataInfo.invisible == false") .map((MapFunction) e -> pruneOutliers(entityClazz, e), Encoders.bean(entityClazz)) .map( - (MapFunction) value -> getTypedRow( - StringUtils.substringAfterLast(inputEntityPath, "/"), value), - Encoders.bean(TypedRow.class)); + (MapFunction>) e -> new Tuple2<>(e.getId(), e), + Encoders.tuple(Encoders.STRING(), Encoders.kryo(entityClazz))); } private static E pruneOutliers(Class entityClazz, E e) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 72d68a389..6b184071a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,9 +3,8 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; +import java.util.*; +import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; @@ -20,6 +19,7 @@ import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.clearspring.analytics.util.Lists; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; @@ -27,9 +27,11 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; +import eu.dnetlib.dhp.common.FunctionalInterfaceSupport; import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelation; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; +import scala.Function1; import scala.Tuple2; /** @@ -111,37 +113,10 @@ public class PrepareRelationsJob { spark -> { removeOutputDir(spark, outputPath); prepareRelationsRDD( - spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations); + spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); }); } - /** - * Dataset based implementation that prepares the graph relations by limiting the number of outgoing links and - * filtering the relation types according to the given criteria. - * - * @param spark the spark session - * @param inputRelationsPath source path for the graph relations - * @param outputPath output path for the processed relations - * @param relationFilter set of relation filters applied to the `relClass` field - * @param maxRelations maximum number of allowed outgoing edges - */ - private static void prepareRelations( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, - int maxRelations) { - readPathRelation(spark, inputRelationsPath) - .filter("dataInfo.deletedbyinference == false") - .filter((FilterFunction) rel -> !relationFilter.contains(rel.getRelClass())) - .groupByKey( - (MapFunction) value -> value.getSource(), Encoders.STRING()) - .flatMapGroups( - (FlatMapGroupsFunction) (key, values) -> Iterators - .limit(values, maxRelations), - Encoders.bean(SortableRelation.class)) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - /** * RDD based implementation that prepares the graph relations by limiting the number of outgoing links and filtering * the relation types according to the given criteria. Moreover, outgoing links kept within the given limit are @@ -152,50 +127,41 @@ public class PrepareRelationsJob { * @param outputPath output path for the processed relations * @param relationFilter set of relation filters applied to the `relClass` field * @param maxRelations maximum number of allowed outgoing edges + * @param relPartitions number of partitions for the output RDD */ - // TODO work in progress private static void prepareRelationsRDD( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int relPartitions, - int maxRelations) { - JavaRDD rels = readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions); - RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions()); + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { - // only consider those that are not virtually deleted - RDD d = rels + RDD cappedRels = readPathRelationRDD(spark, inputRelationsPath) + .repartition(relPartitions) .filter(rel -> !rel.getDataInfo().getDeletedbyinference()) .filter(rel -> !relationFilter.contains(rel.getRelClass())) - .mapToPair( - (PairFunction) rel -> new Tuple2<>(rel, rel)) - .groupByKey(partitioner) - .map(group -> Iterables.limit(group._2(), maxRelations)) - .flatMap(group -> group.iterator()) + // group by SOURCE and apply limit + .mapToPair(rel -> new Tuple2<>(rel.getSource(), rel)) + .groupByKey(new RelationPartitioner(relPartitions)) + .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) + // group by TARGET and apply limit + .mapToPair(rel -> new Tuple2<>(rel.getTarget(), rel)) + .groupByKey(new RelationPartitioner(relPartitions)) + .flatMap(group -> Iterables.limit(group._2(), maxRelations).iterator()) .rdd(); spark - .createDataset(d, Encoders.bean(SortableRelation.class)) + .createDataset(cappedRels, Encoders.bean(SortableRelation.class)) .write() .mode(SaveMode.Overwrite) .parquet(outputPath); } /** - * Reads a Dataset of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text + * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * file, * * @param spark * @param inputPath - * @return the Dataset containing all the relationships + * @return the JavaRDD containing all the relationships */ - private static Dataset readPathRelation( - SparkSession spark, final String inputPath) { - return spark - .read() - .textFile(inputPath) - .map( - (MapFunction) value -> OBJECT_MAPPER.readValue(value, SortableRelation.class), - Encoders.bean(SortableRelation.class)); - } - private static JavaRDD readPathRelationRDD( SparkSession spark, final String inputPath) { JavaSparkContext sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java new file mode 100644 index 000000000..3cccce7c4 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -0,0 +1,26 @@ + +package eu.dnetlib.dhp.oa.provision.model; + +import java.util.List; + +import com.google.common.collect.Lists; + +import eu.dnetlib.dhp.schema.common.ModelSupport; + +public class ProvisionModelSupport { + + public static Class[] getModelClasses() { + List> modelClasses = Lists.newArrayList(ModelSupport.getOafModelClasses()); + modelClasses + .addAll( + Lists + .newArrayList( + TypedRow.class, + EntityRelEntity.class, + JoinedEntity.class, + RelatedEntity.class, + Tuple2.class, + SortableRelation.class)); + return modelClasses.toArray(new Class[] {}); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java index a09a27837..c7862b48a 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/RelationPartitioner.java @@ -4,8 +4,6 @@ package eu.dnetlib.dhp.oa.provision.utils; import org.apache.spark.Partitioner; import org.apache.spark.util.Utils; -import eu.dnetlib.dhp.oa.provision.model.SortableRelation; - /** * Used in combination with SortableRelationKey, allows to partition the records by source id, therefore allowing to * sort relations sharing the same source id by the ordering defined in SortableRelationKey. @@ -25,6 +23,8 @@ public class RelationPartitioner extends Partitioner { @Override public int getPartition(Object key) { - return Utils.nonNegativeMod(((SortableRelation) key).getSource().hashCode(), numPartitions()); + String partitionKey = (String) key; + return Utils.nonNegativeMod(partitionKey.hashCode(), numPartitions()); } + } diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json index 2727f153b..2c9f0e4f3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json @@ -13,8 +13,14 @@ }, { "paramName": "iep", - "paramLongName": "inputGraphRootPath", - "paramDescription": "root graph path", + "paramLongName": "inputEntityPath", + "paramDescription": "input Entity Path", + "paramRequired": true + }, + { + "paramName": "clazz", + "paramLongName": "graphTableClassName", + "paramDescription": "class name associated to the input entity path", "paramRequired": true }, { diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 02148ed57..dcd434e9b 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -103,7 +103,7 @@ ${wf:conf('resumeFrom') eq 'prepare_relations'} ${wf:conf('resumeFrom') eq 'fork_join_related_entities'} - ${wf:conf('resumeFrom') eq 'join_all_entities'} + ${wf:conf('resumeFrom') eq 'fork_join_all_entities'} ${wf:conf('resumeFrom') eq 'adjancency_lists'} ${wf:conf('resumeFrom') eq 'convert_to_xml'} ${wf:conf('resumeFrom') eq 'to_solr_index'} @@ -134,7 +134,7 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation - --relPartitions3000 + --relPartitions5000 @@ -171,7 +171,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/publication --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/publication @@ -198,7 +198,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/dataset --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/dataset @@ -225,7 +225,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/otherresearchproduct --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/otherresearchproduct @@ -252,7 +252,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/software --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/software @@ -279,7 +279,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/datasource --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/datasource @@ -306,7 +306,7 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/organization --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/organization @@ -333,19 +333,29 @@ --inputRelationsPath${workingDir}/relation --inputEntityPath${inputGraphRootPath}/project --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project - --outputPath${workingDir}/join_partial + --outputPath${workingDir}/join_partial/project - + - + + + + + + + + + + + yarn cluster - Join[entities.id = relatedEntity.source] + Join[publication.id = relatedEntity.source] eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 dhp-graph-provision-${projectVersion}.jar @@ -356,18 +366,189 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=7680 + --conf spark.sql.shuffle.partitions=15360 --conf spark.network.timeout=${sparkNetworkTimeout} - --inputGraphRootPath${inputGraphRootPath} + --inputEntityPath${inputGraphRootPath}/publication + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Publication --inputRelatedEntitiesPath${workingDir}/join_partial - --outputPath${workingDir}/join_entities + --outputPath${workingDir}/join_entities/publication --numPartitions35000 - + + + + yarn + cluster + Join[dataset.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/dataset + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Dataset + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/dataset + --numPartitions20000 + + + + + + + + yarn + cluster + Join[otherresearchproduct.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/otherresearchproduct + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.OtherResearchProduct + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/otherresearchproduct + --numPartitions10000 + + + + + + + + yarn + cluster + Join[software.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/software + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Software + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/software + --numPartitions10000 + + + + + + + + yarn + cluster + Join[datasource.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/datasource + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Datasource + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/datasource + --numPartitions1000 + + + + + + + + yarn + cluster + Join[organization.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=2 + --executor-memory=12G + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/organization + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Organization + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/organization + --numPartitions20000 + + + + + + + + yarn + cluster + Join[project.id = relatedEntity.source] + eu.dnetlib.dhp.oa.provision.CreateRelatedEntitiesJob_phase2 + dhp-graph-provision-${projectVersion}.jar + + --executor-cores=${sparkExecutorCoresForJoining} + --executor-memory=${sparkExecutorMemoryForJoining} + --driver-memory=${sparkDriverMemoryForJoining} + --conf spark.extraListeners=${spark2ExtraListeners} + --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} + --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} + --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} + --conf spark.sql.shuffle.partitions=15360 + --conf spark.network.timeout=${sparkNetworkTimeout} + + --inputEntityPath${inputGraphRootPath}/project + --graphTableClassNameeu.dnetlib.dhp.schema.oaf.Project + --inputRelatedEntitiesPath${workingDir}/join_partial + --outputPath${workingDir}/join_entities/project + --numPartitions10000 + + + + + + + yarn @@ -383,7 +564,7 @@ --conf spark.sql.queryExecutionListeners=${spark2SqlQueryExecutionListeners} --conf spark.yarn.historyServer.address=${spark2YarnHistoryServerAddress} --conf spark.eventLog.dir=${nameNode}${spark2EventLogDir} - --conf spark.sql.shuffle.partitions=15000 + --conf spark.sql.shuffle.partitions=15360 --conf spark.network.timeout=${sparkNetworkTimeout} --inputPath${workingDir}/join_entities