From 8d59fdf34eaa438d0362bd3393b0755570ee7b9d Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 26 Jun 2020 14:32:58 +0200 Subject: [PATCH 1/2] WIP: dataset based PrepareRelationsJob --- .../CreateRelatedEntitiesJob_phase2.java | 17 --- .../dhp/oa/provision/PrepareRelationsJob.java | 128 ++++++++++++++++-- .../dhp/oa/provision/RelationComparator.java | 43 ++++++ .../dhp/oa/provision/RelationList.java | 25 ++++ .../dhp/oa/provision/SortableRelation.java | 80 +++++++++++ .../model/ProvisionModelSupport.java | 7 +- .../provision/model/RelatedEntityWrapper.java | 4 - .../dhp/oa/provision/model/TypedRow.java | 64 --------- .../oa/provision/utils/XmlRecordFactory.java | 4 - 9 files changed, 271 insertions(+), 101 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java create mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java delete mode 100644 dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java index 5ef30d6e1..bfcc648a3 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.java @@ -19,7 +19,6 @@ import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; @@ -28,8 +27,6 @@ import eu.dnetlib.dhp.common.HdfsSupport; import eu.dnetlib.dhp.oa.provision.model.JoinedEntity; import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper; -import eu.dnetlib.dhp.oa.provision.model.TypedRow; -import eu.dnetlib.dhp.schema.common.EntityType; import eu.dnetlib.dhp.schema.common.ModelSupport; import eu.dnetlib.dhp.schema.oaf.*; import scala.Tuple2; @@ -305,20 +302,6 @@ public class CreateRelatedEntitiesJob_phase2 { private static FilterFunction filterEmptyEntityFn() { return (FilterFunction) v -> Objects.nonNull(v.getEntity()); - /* - * return (FilterFunction) v -> Optional .ofNullable(v.getEntity()) .map(e -> - * StringUtils.isNotBlank(e.getId())) .orElse(false); - */ - } - - private static TypedRow getTypedRow(String type, OafEntity entity) - throws JsonProcessingException { - TypedRow t = new TypedRow(); - t.setType(type); - t.setDeleted(entity.getDataInfo().getDeletedbyinference()); - t.setId(entity.getId()); - t.setOaf(OBJECT_MAPPER.writeValueAsString(entity)); - return t; } private static void removeOutputDir(SparkSession spark, String path) { diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index 19823120c..bf9806787 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -3,28 +3,33 @@ package eu.dnetlib.dhp.oa.provision; import static eu.dnetlib.dhp.common.SparkSessionSupport.runWithSparkSession; -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; +import java.util.*; +import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.commons.io.IOUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.rdd.RDD; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.*; +import org.apache.spark.sql.expressions.Aggregator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import eu.dnetlib.dhp.application.ArgumentApplicationParser; import eu.dnetlib.dhp.common.HdfsSupport; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey; import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner; import eu.dnetlib.dhp.schema.oaf.Relation; @@ -102,13 +107,15 @@ public class PrepareRelationsJob { log.info("maxRelations: {}", maxRelations); SparkConf conf = new SparkConf(); + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); runWithSparkSession( conf, isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareRelationsRDD( + prepareRelationsDataset( spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); }); } @@ -125,9 +132,8 @@ public class PrepareRelationsJob { * @param maxRelations maximum number of allowed outgoing edges * @param relPartitions number of partitions for the output RDD */ - private static void prepareRelationsRDD( - SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, - int relPartitions) { + private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, + Set relationFilter, int maxRelations, int relPartitions) { // group by SOURCE and apply limit RDD bySource = readPathRelationRDD(spark, inputRelationsPath) @@ -163,6 +169,108 @@ public class PrepareRelationsJob { .parquet(outputPath); } + private static void prepareRelationsDataset( + SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, + int relPartitions) { + + Dataset bySource = pruneRelations( + spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, + (Function) r -> r.getSource()); + Dataset byTarget = pruneRelations( + spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, + (Function) r -> r.getTarget()); + + bySource + .union(byTarget) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); + } + + private static Dataset pruneRelations(SparkSession spark, String inputRelationsPath, + Set relationFilter, int maxRelations, int relPartitions, + Function idFn) { + return readRelations(spark, inputRelationsPath, relationFilter, relPartitions) + .groupByKey( + (MapFunction) r -> idFn.call(r), + Encoders.STRING()) + .agg(new RelationAggregator(maxRelations).toColumn()) + .flatMap( + (FlatMapFunction, Relation>) t -> t + ._2() + .getRelations() + .iterator(), + Encoders.bean(Relation.class)); + } + + private static Dataset readRelations(SparkSession spark, String inputRelationsPath, + Set relationFilter, int relPartitions) { + return spark + .read() + .textFile(inputRelationsPath) + .repartition(relPartitions) + .map( + (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), + Encoders.kryo(Relation.class)) + .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) + .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false); + } + + public static class RelationAggregator + extends Aggregator { + + private int maxRelations; + + public RelationAggregator(int maxRelations) { + this.maxRelations = maxRelations; + } + + @Override + public RelationList zero() { + return new RelationList(); + } + + @Override + public RelationList reduce(RelationList b, Relation a) { + b.getRelations().add(a); + return getSortableRelationList(b); + } + + @Override + public RelationList merge(RelationList b1, RelationList b2) { + b1.getRelations().addAll(b2.getRelations()); + return getSortableRelationList(b1); + } + + @Override + public RelationList finish(RelationList r) { + return getSortableRelationList(r); + } + + private RelationList getSortableRelationList(RelationList b1) { + RelationList sr = new RelationList(); + sr + .setRelations( + b1 + .getRelations() + .stream() + .limit(maxRelations) + .collect(Collectors.toCollection(() -> new PriorityQueue<>(new RelationComparator())))); + return sr; + } + + @Override + public Encoder bufferEncoder() { + return Encoders.kryo(RelationList.class); + } + + @Override + public Encoder outputEncoder() { + return Encoders.kryo(RelationList.class); + } + } + /** * Reads a JavaRDD of eu.dnetlib.dhp.oa.provision.model.SortableRelation objects from a newline delimited json text * file, diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java new file mode 100644 index 000000000..f2209c26c --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationComparator.java @@ -0,0 +1,43 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.util.Comparator; +import java.util.Map; +import java.util.Optional; + +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class RelationComparator implements Comparator { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("review", 2); + weights.put("citation", 3); + weights.put("affiliation", 4); + weights.put("relationship", 5); + weights.put("publicationDataset", 6); + weights.put("similarity", 7); + + weights.put("provision", 8); + weights.put("participation", 9); + weights.put("dedup", 10); + } + + private Integer getWeight(Relation o) { + return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); + } + + @Override + public int compare(Relation o1, Relation o2) { + return ComparisonChain + .start() + .compare(getWeight(o1), getWeight(o2)) + .result(); + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java new file mode 100644 index 000000000..6e5fd7dba --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/RelationList.java @@ -0,0 +1,25 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.Serializable; +import java.util.PriorityQueue; +import java.util.Queue; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class RelationList implements Serializable { + + private Queue relations; + + public RelationList() { + this.relations = new PriorityQueue<>(new RelationComparator()); + } + + public Queue getRelations() { + return relations; + } + + public void setRelations(Queue relations) { + this.relations = relations; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java new file mode 100644 index 000000000..8ce92a6a0 --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/SortableRelation.java @@ -0,0 +1,80 @@ + +package eu.dnetlib.dhp.oa.provision; + +import java.io.Serializable; +import java.util.Map; +import java.util.Optional; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Maps; + +import eu.dnetlib.dhp.schema.oaf.Relation; + +public class SortableRelation extends Relation implements Comparable, Serializable { + + private static final Map weights = Maps.newHashMap(); + + static { + weights.put("outcome", 0); + weights.put("supplement", 1); + weights.put("review", 2); + weights.put("citation", 3); + weights.put("affiliation", 4); + weights.put("relationship", 5); + weights.put("publicationDataset", 6); + weights.put("similarity", 7); + + weights.put("provision", 8); + weights.put("participation", 9); + weights.put("dedup", 10); + } + + private static final long serialVersionUID = 34753984579L; + + private String groupingKey; + + public static SortableRelation create(Relation r, String groupingKey) { + SortableRelation sr = new SortableRelation(); + sr.setGroupingKey(groupingKey); + sr.setSource(r.getSource()); + sr.setTarget(r.getTarget()); + sr.setRelType(r.getRelType()); + sr.setSubRelType(r.getSubRelType()); + sr.setRelClass(r.getRelClass()); + sr.setDataInfo(r.getDataInfo()); + sr.setCollectedfrom(r.getCollectedfrom()); + sr.setLastupdatetimestamp(r.getLastupdatetimestamp()); + sr.setProperties(r.getProperties()); + sr.setValidated(r.getValidated()); + sr.setValidationDate(r.getValidationDate()); + + return sr; + } + + @JsonIgnore + public Relation asRelation() { + return this; + } + + @Override + public int compareTo(SortableRelation o) { + return ComparisonChain + .start() + .compare(getGroupingKey(), o.getGroupingKey()) + .compare(getWeight(this), getWeight(o)) + .result(); + } + + private Integer getWeight(SortableRelation o) { + return Optional.ofNullable(weights.get(o.getSubRelType())).orElse(Integer.MAX_VALUE); + } + + public String getGroupingKey() { + return groupingKey; + } + + public void setGroupingKey(String groupingKey) { + this.groupingKey = groupingKey; + } +} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java index 051fe923d..c09ed86e5 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/ProvisionModelSupport.java @@ -5,6 +5,8 @@ import java.util.List; import com.google.common.collect.Lists; +import eu.dnetlib.dhp.oa.provision.RelationList; +import eu.dnetlib.dhp.oa.provision.SortableRelation; import eu.dnetlib.dhp.schema.common.ModelSupport; public class ProvisionModelSupport { @@ -15,11 +17,12 @@ public class ProvisionModelSupport { .addAll( Lists .newArrayList( - TypedRow.class, RelatedEntityWrapper.class, JoinedEntity.class, RelatedEntity.class, - SortableRelationKey.class)); + SortableRelationKey.class, + SortableRelation.class, + RelationList.class)); return modelClasses.toArray(new Class[] {}); } } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java index cbb143ee2..4a4a4a5be 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper.java @@ -16,10 +16,6 @@ public class RelatedEntityWrapper implements Serializable { } public RelatedEntityWrapper(Relation relation, RelatedEntity target) { - this(null, relation, target); - } - - public RelatedEntityWrapper(TypedRow entity, Relation relation, RelatedEntity target) { this.relation = relation; this.target = target; } diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java deleted file mode 100644 index cbec372e4..000000000 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/model/TypedRow.java +++ /dev/null @@ -1,64 +0,0 @@ - -package eu.dnetlib.dhp.oa.provision.model; - -import java.io.Serializable; - -import com.google.common.base.Objects; - -public class TypedRow implements Serializable { - - private String id; - - private Boolean deleted; - - private String type; - - private String oaf; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public Boolean getDeleted() { - return deleted; - } - - public void setDeleted(Boolean deleted) { - this.deleted = deleted; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getOaf() { - return oaf; - } - - public void setOaf(String oaf) { - this.oaf = oaf; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - TypedRow typedRow2 = (TypedRow) o; - return Objects.equal(id, typedRow2.id); - } - - @Override - public int hashCode() { - return Objects.hashCode(id); - } -} diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java index 5d8d9fa20..db9a68d3d 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/utils/XmlRecordFactory.java @@ -121,10 +121,6 @@ public class XmlRecordFactory implements Serializable { } } - private static OafEntity toOafEntity(TypedRow typedRow) { - return parseOaf(typedRow.getOaf(), typedRow.getType()); - } - private static OafEntity parseOaf(final String json, final String type) { try { switch (EntityType.valueOf(type)) { From 7817338e0510da3c798f8076625f6962ee207b87 Mon Sep 17 00:00:00 2001 From: Claudio Atzori Date: Fri, 26 Jun 2020 17:58:33 +0200 Subject: [PATCH 2/2] added test to verify the relation pre-processing --- .../dhp/oa/provision/PrepareRelationsJob.java | 68 ++++--------- .../dhp/oa/provision/oozie_app/workflow.xml | 1 + .../oa/provision/PrepareRelationsJobTest.java | 93 ++++++++++++++++++ .../eu/dnetlib/dhp/oa/provision/relations.gz | Bin 0 -> 681 bytes .../src/test/resources/log4j.properties | 11 +++ 5 files changed, 122 insertions(+), 51 deletions(-) create mode 100644 dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.gz create mode 100644 dhp-workflows/dhp-graph-provision/src/test/resources/log4j.properties diff --git a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java index bf9806787..601cf6449 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java +++ b/dhp-workflows/dhp-graph-provision/src/main/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJob.java @@ -115,7 +115,7 @@ public class PrepareRelationsJob { isSparkSessionManaged, spark -> { removeOutputDir(spark, outputPath); - prepareRelationsDataset( + prepareRelationsRDD( spark, inputRelationsPath, outputPath, relationFilter, maxRelations, relPartitions); }); } @@ -148,21 +148,8 @@ public class PrepareRelationsJob { .map(Tuple2::_2) .rdd(); - // group by TARGET and apply limit - RDD byTarget = readPathRelationRDD(spark, inputRelationsPath) - .filter(rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter(rel -> relationFilter.contains(rel.getRelClass()) == false) - .mapToPair(r -> new Tuple2<>(SortableRelationKey.create(r, r.getTarget()), r)) - .repartitionAndSortWithinPartitions(new RelationPartitioner(relPartitions)) - .groupBy(Tuple2::_1) - .map(Tuple2::_2) - .map(t -> Iterables.limit(t, maxRelations)) - .flatMap(Iterable::iterator) - .map(Tuple2::_2) - .rdd(); - spark - .createDataset(bySource.union(byTarget), Encoders.bean(Relation.class)) + .createDataset(bySource, Encoders.bean(Relation.class)) .repartition(relPartitions) .write() .mode(SaveMode.Overwrite) @@ -172,41 +159,7 @@ public class PrepareRelationsJob { private static void prepareRelationsDataset( SparkSession spark, String inputRelationsPath, String outputPath, Set relationFilter, int maxRelations, int relPartitions) { - - Dataset bySource = pruneRelations( - spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, - (Function) r -> r.getSource()); - Dataset byTarget = pruneRelations( - spark, inputRelationsPath, relationFilter, maxRelations, relPartitions, - (Function) r -> r.getTarget()); - - bySource - .union(byTarget) - .repartition(relPartitions) - .write() - .mode(SaveMode.Overwrite) - .parquet(outputPath); - } - - private static Dataset pruneRelations(SparkSession spark, String inputRelationsPath, - Set relationFilter, int maxRelations, int relPartitions, - Function idFn) { - return readRelations(spark, inputRelationsPath, relationFilter, relPartitions) - .groupByKey( - (MapFunction) r -> idFn.call(r), - Encoders.STRING()) - .agg(new RelationAggregator(maxRelations).toColumn()) - .flatMap( - (FlatMapFunction, Relation>) t -> t - ._2() - .getRelations() - .iterator(), - Encoders.bean(Relation.class)); - } - - private static Dataset readRelations(SparkSession spark, String inputRelationsPath, - Set relationFilter, int relPartitions) { - return spark + spark .read() .textFile(inputRelationsPath) .repartition(relPartitions) @@ -214,7 +167,20 @@ public class PrepareRelationsJob { (MapFunction) s -> OBJECT_MAPPER.readValue(s, Relation.class), Encoders.kryo(Relation.class)) .filter((FilterFunction) rel -> rel.getDataInfo().getDeletedbyinference() == false) - .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false); + .filter((FilterFunction) rel -> relationFilter.contains(rel.getRelClass()) == false) + .groupByKey( + (MapFunction) Relation::getSource, + Encoders.STRING()) + .agg(new RelationAggregator(maxRelations).toColumn()) + .flatMap( + (FlatMapFunction, Relation>) t -> Iterables + .limit(t._2().getRelations(), maxRelations) + .iterator(), + Encoders.bean(Relation.class)) + .repartition(relPartitions) + .write() + .mode(SaveMode.Overwrite) + .parquet(outputPath); } public static class RelationAggregator diff --git a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml index 0d5121cf1..697a00a09 100644 --- a/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml +++ b/dhp-workflows/dhp-graph-provision/src/main/resources/eu/dnetlib/dhp/oa/provision/oozie_app/workflow.xml @@ -133,6 +133,7 @@ --inputRelationsPath${inputGraphRootPath}/relation --outputPath${workingDir}/relation + --maxRelations${maxRelations} --relPartitions5000 diff --git a/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java new file mode 100644 index 000000000..c16bbc6fb --- /dev/null +++ b/dhp-workflows/dhp-graph-provision/src/test/java/eu/dnetlib/dhp/oa/provision/PrepareRelationsJobTest.java @@ -0,0 +1,93 @@ + +package eu.dnetlib.dhp.oa.provision; + +import com.fasterxml.jackson.databind.ObjectMapper; +import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport; +import eu.dnetlib.dhp.schema.oaf.Relation; +import org.apache.commons.io.FileUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.function.FilterFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class PrepareRelationsJobTest { + + private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJobTest.class); + + public static final String SUBRELTYPE = "subRelType"; + public static final String OUTCOME = "outcome"; + public static final String SUPPLEMENT = "supplement"; + + private static SparkSession spark; + + private static Path workingDir; + + @BeforeAll + public static void setUp() throws IOException { + workingDir = Files.createTempDirectory(PrepareRelationsJobTest.class.getSimpleName()); + log.info("using work dir {}", workingDir); + + SparkConf conf = new SparkConf(); + + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + conf.registerKryoClasses(ProvisionModelSupport.getModelClasses()); + + spark = SparkSession + .builder() + .appName(PrepareRelationsJobTest.class.getSimpleName()) + .master("local[*]") + .config(conf) + .getOrCreate(); + } + + @AfterAll + public static void afterAll() throws IOException { + FileUtils.deleteDirectory(workingDir.toFile()); + spark.stop(); + } + + @Test + public void testRunPrepareRelationsJob(@TempDir Path testPath) throws Exception { + + final int maxRelations = 10; + PrepareRelationsJob + .main( + new String[] { + "-isSparkSessionManaged", Boolean.FALSE.toString(), + "-inputRelationsPath", getClass().getResource("relations.gz").getPath(), + "-outputPath", testPath.toString(), + "-relPartitions", "10", + "-relationFilter", "asd", + "-maxRelations", String.valueOf(maxRelations) + }); + + Dataset out = spark.read() + .parquet(testPath.toString()) + .as(Encoders.bean(Relation.class)) + .cache(); + + Assertions.assertEquals(10, out.count()); + + Dataset freq = out.toDF().cube(SUBRELTYPE).count().filter((FilterFunction) value -> !value.isNullAt(0)); + long outcome = freq.filter(freq.col(SUBRELTYPE).equalTo(OUTCOME)).collectAsList().get(0).getAs("count"); + long supplement = freq.filter(freq.col(SUBRELTYPE).equalTo(SUPPLEMENT)).collectAsList().get(0).getAs("count"); + + Assertions.assertTrue(outcome > supplement); + Assertions.assertEquals(7, outcome); + Assertions.assertEquals(3, supplement); + } + +} diff --git a/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.gz b/dhp-workflows/dhp-graph-provision/src/test/resources/eu/dnetlib/dhp/oa/provision/relations.gz new file mode 100644 index 0000000000000000000000000000000000000000..13bc01c8c37f15ce901e259b2510c7495cd9316d GIT binary patch literal 681 zcmV;a0#^MWiwFpQ^YvZ;19D|-VRUJ4ZgT+b+cA&RFc`-1olkM{jL=<%FdxQ~3_8E7O|s)qCN@sT zXqQ)2oTKgi7}CLnF~(Qzejj8=HqAkvZA@WSd6x3Rp!;QJ^`yf|V&2&DB$^lLVP9_Kh12Bou}HFgURC!#KYrNko9%A5-!!{z zbNQEELb4LC2N}eDUN*hVr7qXsXD`>wI8LD)r)tXObo_Q3Fiy#gd8^BQba8`WRvKDIfV=!R*u^F)am<`x|EC*~q z#sjt=`(fRFd2GRpFV-_4Y6N6KfCDlizyaA1;DC$>a6ncBI3P1(-GF{NI%|sIm+TRR z-$Pf-$fT%(kzr8